RabbitMQとRubyで、同期的なRPC (Remote Procedure Call)を実装する。
server
require 'amqp' | |
class Server | |
def initialize | |
@config = { | |
:host => 'localhost' | |
} | |
@acceptable_methods = [:ping, :echo] | |
end | |
def ping(payload) | |
"pong" | |
end | |
def echo(payload) | |
payload | |
end | |
def declare_api(name) | |
queue = @channel.queue("rpc.#{name}", :auto_delete => true) | |
queue.bind(@exchange, :routing_key => name.to_s).subscribe do |header, payload| | |
p [:received, payload] | |
exchange = @channel.direct "amq.direct" | |
reply_to = header.reply_to | |
msg = self.__send__(name, payload) | |
exchange.publish(msg, :routing_key => reply_to) | |
end | |
end | |
def run | |
AMQP.start(@config) do |connection| | |
@channel = AMQP::Channel.new connection | |
@exchange = @channel.direct 'ex.rpc' | |
@acceptable_methods.each { |m| declare_api m } | |
stopper = Proc.new { connection.close { EventMachine.stop } } | |
Signal.trap "INT", stopper | |
end | |
end | |
end | |
Server.new.run |
client
require 'amqp' | |
class Client | |
def initialize | |
@config = { | |
:host => 'localhost' | |
} | |
end | |
def request(api, msg, reply_to) | |
exchange = @channel.direct 'ex.rpc' | |
exchange.publish(msg, :routing_key => api.to_s, :reply_to => reply_to) | |
end | |
def listen_response(queue) | |
exchange = @channel.direct("amq.direct") | |
queue.bind(exchange, :routing_key => queue.name).subscribe do |header, payload| | |
@connection.close { | |
@response = payload | |
EventMachine.stop | |
} | |
end | |
end | |
def call(api, msg) | |
AMQP.start(@config) do |connection| | |
@connection = connection | |
@channel = AMQP::Channel.new connection | |
@channel.queue('', :auto_delete => true) do |queue| | |
request api, msg, queue.name | |
listen_response queue | |
end | |
end | |
@response | |
end | |
end | |
ping_res = Client.new.call(:ping, "ping") | |
p [:ping, ping_res] | |
echo_res = Client.new.call(:echo, "hello") | |
p [:echo, echo_res] |
サーバーにメッセージを送信するときは普通にサーバーがsubscribeしてるキューに届くように、適当なexchangeにpublishする。ここではex.rpc。
サーバーからのレスポンスの方は若干工夫が必要で、クライアントはリクエストのメッセージ送信前に、受信用の一時キューを作成しておいて、AMQPメッセージのreply_toヘッダーにキュー名を入れておく。サーバー側はリクエストを受け取ったら、処理結果をreply_toのキューに向けて送信する。このときはamq.directというデフォルトで用意されているexchangeを利用する。
サーバー側ではapiを増やすのが簡単なように、@acceptable_methodsにメソッド名を列挙しておくと、そのメソッドをクライアントから呼び出すことができるようにしてある。
このコードではクライアントはRPCの呼び出しの度にAMQPのコネクションを作成していて非効率なので、実際のアプリではAMQP.start (EventMachineのメインループ)はもっと外側にないとダメでしょう。