Showing posts with label rabbitmq. Show all posts
Showing posts with label rabbitmq. Show all posts

Saturday, February 25, 2012

RabbitMQ + RubyでRPC

RabbitMQとRubyで、同期的なRPC (Remote Procedure Call)を実装する。

server

client

サーバーにメッセージを送信するときは普通にサーバーがsubscribeしてるキューに届くように、適当なexchangeにpublishする。ここではex.rpc。

サーバーからのレスポンスの方は若干工夫が必要で、クライアントはリクエストのメッセージ送信前に、受信用の一時キューを作成しておいて、AMQPメッセージのreply_toヘッダーにキュー名を入れておく。サーバー側はリクエストを受け取ったら、処理結果をreply_toのキューに向けて送信する。このときはamq.directというデフォルトで用意されているexchangeを利用する。

サーバー側ではapiを増やすのが簡単なように、@acceptable_methodsにメソッド名を列挙しておくと、そのメソッドをクライアントから呼び出すことができるようにしてある。

このコードではクライアントはRPCの呼び出しの度にAMQPのコネクションを作成していて非効率なので、実際のアプリではAMQP.start (EventMachineのメインループ)はもっと外側にないとダメでしょう。

Tuesday, February 21, 2012

Rails3からRabbitMQに接続

Ruby AMQPはEventMachineに依存しているため、利用する際にはEventMachineのイベントループ内にいる必要がある。RailsからRabbitMQに接続する場合は、RailsのプロセスからRabbitMQにTCPのコネクションを張り、その中でchannelを作成することになる。

RabbitMQとのコネクションは、$RAILS_ROOT/config/initializers/amqp.rbというファイルを作成してその中で作成すると、起動時に実行されてコネクションを繋いでおくことができる。

接続設定

接続の設定ファイルはconfig/amqp.ymlなどとして保存しておけばいいんじゃないでしょうか。

development:
  host: localhost
  vhost: development

test:
  host: localhost
  vhost: test

production:
  host: localhost
  vhost: production

Thin

RailsのサーバーにThinを利用している場合、ThinがEventMachineベースなので特に気にせずconfig/initializers/amqp.rbには以下のように記述すれば良い。

def connect
  yml = YAML.load File.read(File.join(Rails.root, 'config/amqp.yml'))
  config = {}
  yml[Rails.env].each do |key, value|
    config[key.to_sym] = value
  end

  AMQP.connection = AMQP.connect(config)
end

EventMachine.next_tick do
  connect
end

controllerでパブリッシュ

例えばAmqpController/publishでパブリッシュする場合、以下のようになる。

class AmqpController < ApplicationController
  def publish
    connection = AMQP.connection
    channel = AMQP::Channel.new(connection)
    exchange = channel.direct 'ex.direct'
    msg = 'Hello, world'
    exchange.publish(msg, :routing_key => 'tasks')
    render :text => msg
  end
end

参考

Monday, February 20, 2012

ruby amqpで手動acknowledge

AMQPでは、キューからコンシューマーにメッセージが配信されても、コンシューマーがそのメッセージを正しく受け取ったことをブローカー(RabbitMQ)に通知(acknowledge)しないといけない。

subscribeメソッドを引数なしで使うと、自動的にacknowledgeされるが、:ack => trueを指定すると手動で設定することになる(最初、逆かと思った)。

# 自動 ack
queue.bind(exchange, :routing_key => 'tasks').subscribe do |headers, payload|
  # do some process
end
# 手動 ack
queue.bind(exchange, :routing_key => 'tasks').subscribe(:ack => true) do |headers, payload|
  # do some process
  headers.ack
end
メッセージ内容に応じてackするか決める場合は後者を使う。headers.ackメソッドでブローカーにacknowledgeしている。これをしないとメッセージはキューから削除されず、例えばこのコンシューマーがブローカーに再接続すると前回受け取ったメッセージが再送される。

RubyからRabbitMQに接続

RubyのクライアントからRabbitMQに接続し、Publish / Subscribeの動作確認をする。ここではdirect exchangeを使う。

ライブラリインストール

AMQPクライアントライブラリはamqpを使う。よくメンテナンスされているし、ドキュメントもよく整備されているので決定版でしょう。

$ gem install amqp

consumer

まずはメッセージを購読するconsumerを作成する。 ここでは以下のような設定でメッセージを待ち受けている。

  • queue名:サーバー自動生成(空文字を渡すことで設定)
  • exchange名:ex.direct
  • exchangeからqueueに送信する際のルーティングキー:'tasks'
最後のあたりでCTRL+Cで止められるようにしている。

producer

次にメッセージをpublishするスクリプト。

こちらは単純にexchangeを指定して送信するだけ。その際、consumerに合わせてルーティングキーを'tasks'にする。

実行

最初にconsumer.rbを実行し、別ウィンドウでproducer.rbを実行して、consumer.rbに'Hello, world'と表示されれば成功。その際、procuder.rbの方にも'sent: Hello, world'と表示される。publishメソッドでパブリッシュし、RabbitMQから送信完了の応答があるとブロック内が呼ばれる。

Sunday, February 19, 2012

MacでのRabbiqMQ インストール

MacにRabbitMQとWebの管理コンソールをインストールする。

  • MacOS X 10.6.8
  • Erlang 5.9
  • RabbitMQ v2.7.1

RabbitMQインストール

RabbitMQはMacPortsでインストール。

$ sudo port install rabbitmq-server
起動はrabbitmq-serverコマンド。
$ sudo rabbitmq-server
Activating RabbitMQ plugins ...

+---+   +---+
|   |   |   |
|   |   |   |
|   |   |   |
|   +---+   +-------+
|                   |
| RabbitMQ  +---+   |
|           |   |   |
|   v2.7.1  +---+   |
|                   |
+-------------------+
AMQP 0-9-1 / 0-9 / 0-8
Copyright (C) 2007-2011 VMware, Inc.
Licensed under the MPL.  See http://www.rabbitmq.com/

起動できたらCTRL-Cでメニューっぽいものが表示されるのでAキー(abort)で一旦終了。本来、rabbitmqctl stopとするのが正常な止め方のようだが、僕の環境でsudo rabbitmqctl stopとするとログ(/opt/local/var/log/rabbitmq/rabbit@hostname.log)に以下のようなエラーメッセージが出た。
=ERROR REPORT==== 19-Feb-2012::23:03:53 ===
** Connection attempt from disallowed node 'rabbitmqctl67610@xxx' **
これはErlang cookieというのが正しく設定されていないのが原因のようだが、ひとまずここでは放っておく。

ホスト名に注意

Erlang関連はホスト名に注意が必要。会社の環境にインストールしたとき、DHCPを使っていた影響で、macのホスト名がIPアドレスになっていた。この状態だとRabbitMQを起動しようとしても、以下のようなログが出力されうまく起動できない。(以下の149はDHCPで振られたIPの最後の8ビットの値)

ERROR: epmd error for host "149": timeout (timed out establishing tcp connection)
正しいホスト名に設定すれば起動できる。デフォルトのbashならターミナルのプロンプトの一番左側におそらくホスト名が表示されていると思われるので、参考に。

RabbitMQ Web管理コンソールプラグイン インストール

RabbitMQにはWeb画面でRabbitMQを管理できるプラグインがデフォルトでバンドルされているので、公式ページを参考にインストールする。 最初に必要なディレクトリがないので作成してから、コマンドを一発叩くだけ。

$ sudo mkdir /opt/local/etc/rabbitmq/
$ sudo rabbitmq-plugins enable rabbitmq_management
# 再度RabbitMQ起動
$ sudo rabbitmq-server
Activating RabbitMQ plugins ...
6 plugins activated:
* amqp_client-0.0.0
* mochiweb-1.3-rmq0.0.0-git
* rabbitmq_management-0.0.0
* rabbitmq_management_agent-0.0.0
* rabbitmq_mochiweb-0.0.0
* webmachine-1.7.0-rmq0.0.0-hg
# ... 略
そうしたら以下にアクセス。デフォルトのユーザー/パスワードはguest/guest。
RabbitMQ WebConsole
これでWeb画面からexchangeやらqueueやらをWebコンソールから作ったり削除したりできる。

参考URL

Thursday, February 16, 2012

RabbitMQ / ActiveMQ, and so on.

今開発しているシステムに、MQを導入しようと考えている。今の所RabbitMQが第一候補だが、比較しているページなどを調べてみた。

  1. stackoverflow : ActiveMQ or RabbitMQ or ZeroMQ or
  2. Python messaging: ActiveMQ and RabbitMQ
  3. Second Life Wiki : Message Queue Evaluation Notes
  4. RabbitMQ vs Apache ActiveMQ vs Apache qpid

Pythonを使ってActiveMQとRabbitMQを比較している上記2のサイトが特に参考になる。要約すると:

  • どちらも実用に耐えられる製品である
  • 全体的に言うとActiveMQの方がRabbitMQよりも性能が良い
  • RabbitMQはpulish(書き込み)が非同期なので高速だが、subscribe(読み出し)がそれに比較して遅い。書き込みが非常に多いケースだとメッセージがキューにたまっていくので、ベンチマークをすると一定の時間でクラッシュする。
  • 同時に作成可能なキュー数は、ActiveMQだと64,000くらい、RabbitMQだと32,000くらい

ActiveMQも良さそうである。ただやはりActiveMQはJMSでJavaから使うのが普通っぽいし、ドキュメントもそういった使い方のものが多そうである。うちのシステムは今のところ大体がRuby。もちろんRuby用のクライアントライブラリも用意されているが、どうしようか。