RubyのクライアントからRabbitMQに接続し、Publish / Subscribeの動作確認をする。ここではdirect exchangeを使う。
ライブラリインストール
AMQPクライアントライブラリはamqpを使う。よくメンテナンスされているし、ドキュメントもよく整備されているので決定版でしょう。
$ gem install amqp
consumer
まずはメッセージを購読するconsumerを作成する。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'amqp' | |
def run | |
config = { | |
:host => 'localhost' | |
} | |
AMQP.start(config) do |connection| | |
channel = AMQP::Channel.new(connection) | |
queue = channel.queue('', :auto_delete => true) | |
exchange = channel.direct 'ex.direct' | |
queue.bind(exchange, :routing_key => 'tasks').subscribe do |headers, payload| | |
puts payload | |
end | |
stopper = Proc.new { connection.close { EventMachine.stop } } | |
Signal.trap "INT", stopper | |
end | |
end | |
run |
- queue名:サーバー自動生成(空文字を渡すことで設定)
- exchange名:ex.direct
- exchangeからqueueに送信する際のルーティングキー:'tasks'
producer
次にメッセージをpublishするスクリプト。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'amqp' | |
def run | |
config = { | |
:host => 'localhost' | |
} | |
AMQP.start(config) do |connection, open_ok| | |
channel = AMQP::Channel.new(connection) | |
exchange = channel.direct 'ex.direct' | |
msg = 'Hello, world' | |
exchange.publish(msg, :routing_key => 'tasks') do | |
puts "sent: #{msg}" | |
connection.close { EventMachine.stop } | |
end | |
end | |
end | |
run |
実行
最初にconsumer.rbを実行し、別ウィンドウでproducer.rbを実行して、consumer.rbに'Hello, world'と表示されれば成功。その際、procuder.rbの方にも'sent: Hello, world'と表示される。publishメソッドでパブリッシュし、RabbitMQから送信完了の応答があるとブロック内が呼ばれる。
No comments:
Post a Comment