Monday, February 20, 2012

RubyからRabbitMQに接続

RubyのクライアントからRabbitMQに接続し、Publish / Subscribeの動作確認をする。ここではdirect exchangeを使う。

ライブラリインストール

AMQPクライアントライブラリはamqpを使う。よくメンテナンスされているし、ドキュメントもよく整備されているので決定版でしょう。

$ gem install amqp

consumer

まずはメッセージを購読するconsumerを作成する。

require 'amqp'
def run
config = {
:host => 'localhost'
}
AMQP.start(config) do |connection|
channel = AMQP::Channel.new(connection)
queue = channel.queue('', :auto_delete => true)
exchange = channel.direct 'ex.direct'
queue.bind(exchange, :routing_key => 'tasks').subscribe do |headers, payload|
puts payload
end
stopper = Proc.new { connection.close { EventMachine.stop } }
Signal.trap "INT", stopper
end
end
run
view raw consumer.rb hosted with ❤ by GitHub
ここでは以下のような設定でメッセージを待ち受けている。
  • queue名:サーバー自動生成(空文字を渡すことで設定)
  • exchange名:ex.direct
  • exchangeからqueueに送信する際のルーティングキー:'tasks'
最後のあたりでCTRL+Cで止められるようにしている。

producer

次にメッセージをpublishするスクリプト。

require 'amqp'
def run
config = {
:host => 'localhost'
}
AMQP.start(config) do |connection, open_ok|
channel = AMQP::Channel.new(connection)
exchange = channel.direct 'ex.direct'
msg = 'Hello, world'
exchange.publish(msg, :routing_key => 'tasks') do
puts "sent: #{msg}"
connection.close { EventMachine.stop }
end
end
end
run
view raw producer.rb hosted with ❤ by GitHub
こちらは単純にexchangeを指定して送信するだけ。その際、consumerに合わせてルーティングキーを'tasks'にする。

実行

最初にconsumer.rbを実行し、別ウィンドウでproducer.rbを実行して、consumer.rbに'Hello, world'と表示されれば成功。その際、procuder.rbの方にも'sent: Hello, world'と表示される。publishメソッドでパブリッシュし、RabbitMQから送信完了の応答があるとブロック内が呼ばれる。

No comments: