Saturday, February 25, 2012

RabbitMQ + RubyでRPC

RabbitMQとRubyで、同期的なRPC (Remote Procedure Call)を実装する。

server

require 'amqp'
class Server
def initialize
@config = {
:host => 'localhost'
}
@acceptable_methods = [:ping, :echo]
end
def ping(payload)
"pong"
end
def echo(payload)
payload
end
def declare_api(name)
queue = @channel.queue("rpc.#{name}", :auto_delete => true)
queue.bind(@exchange, :routing_key => name.to_s).subscribe do |header, payload|
p [:received, payload]
exchange = @channel.direct "amq.direct"
reply_to = header.reply_to
msg = self.__send__(name, payload)
exchange.publish(msg, :routing_key => reply_to)
end
end
def run
AMQP.start(@config) do |connection|
@channel = AMQP::Channel.new connection
@exchange = @channel.direct 'ex.rpc'
@acceptable_methods.each { |m| declare_api m }
stopper = Proc.new { connection.close { EventMachine.stop } }
Signal.trap "INT", stopper
end
end
end
Server.new.run
view raw server.rb hosted with ❤ by GitHub

client

require 'amqp'
class Client
def initialize
@config = {
:host => 'localhost'
}
end
def request(api, msg, reply_to)
exchange = @channel.direct 'ex.rpc'
exchange.publish(msg, :routing_key => api.to_s, :reply_to => reply_to)
end
def listen_response(queue)
exchange = @channel.direct("amq.direct")
queue.bind(exchange, :routing_key => queue.name).subscribe do |header, payload|
@connection.close {
@response = payload
EventMachine.stop
}
end
end
def call(api, msg)
AMQP.start(@config) do |connection|
@connection = connection
@channel = AMQP::Channel.new connection
@channel.queue('', :auto_delete => true) do |queue|
request api, msg, queue.name
listen_response queue
end
end
@response
end
end
ping_res = Client.new.call(:ping, "ping")
p [:ping, ping_res]
echo_res = Client.new.call(:echo, "hello")
p [:echo, echo_res]
view raw client.rb hosted with ❤ by GitHub

サーバーにメッセージを送信するときは普通にサーバーがsubscribeしてるキューに届くように、適当なexchangeにpublishする。ここではex.rpc。

サーバーからのレスポンスの方は若干工夫が必要で、クライアントはリクエストのメッセージ送信前に、受信用の一時キューを作成しておいて、AMQPメッセージのreply_toヘッダーにキュー名を入れておく。サーバー側はリクエストを受け取ったら、処理結果をreply_toのキューに向けて送信する。このときはamq.directというデフォルトで用意されているexchangeを利用する。

サーバー側ではapiを増やすのが簡単なように、@acceptable_methodsにメソッド名を列挙しておくと、そのメソッドをクライアントから呼び出すことができるようにしてある。

このコードではクライアントはRPCの呼び出しの度にAMQPのコネクションを作成していて非効率なので、実際のアプリではAMQP.start (EventMachineのメインループ)はもっと外側にないとダメでしょう。

1 comment:

Violet Payne said...

Thaank you