コンテンツにスキップ

AMQPコネクタ

AMQPコネクタは、RabbitMQおよびAMQP 0.9.1ブローカーを通じて、エクスチェンジ、キュー、ルーティングキーを用いたパブリッシュ/サブスクライブメッセージングを可能にします。

コネクタタイプ:

  • AmqpReader - キューまたはエクスチェンジバインディングからメッセージを消費
  • AmqpWriter - ルーティングキーを用いてエクスチェンジにメッセージをパブリッシュ

既存のキューからメッセージを消費:

{
"type": "AmqpReader",
"config": {
"url": "amqp://guest:guest@localhost:5672/",
"queue": "sensor_events",
"autoAck": false,
"prefetchCount": 10,
"consumerTag": "meddle-reader"
}
}

queueが空でexchangeが設定されている場合、リーダーは一時的な排他キューを宣言し、routingKeyを使用してエクスチェンジにバインドします:

{
"type": "AmqpReader",
"config": {
"url": "amqp://user:password@rabbit.example.com:5672/",
"exchange": "factory.events",
"routingKey": "line1.*.temperature",
"autoAck": true
}
}
{
"type": "AmqpReader",
"config": {
"url": "amqps://user:password@rabbit.example.com:5671/",
"queue": "secure_events",
"tls": true,
"autoAck": false
}
}

エクスチェンジにメッセージをパブリッシュ:

{
"type": "AmqpWriter",
"config": {
"url": "amqp://guest:guest@localhost:5672/",
"exchange": "factory.events",
"routingKey": "line1.temperature",
"mandatory": false,
"contentType": "application/json"
}
}

デフォルトエクスチェンジへのパブリッシュ(直接キューへ)

Section titled “デフォルトエクスチェンジへのパブリッシュ(直接キューへ)”

exchangeを空のままにし、ルーティングキーとしてキュー名を使用します:

{
"type": "AmqpWriter",
"config": {
"url": "amqp://guest:guest@localhost:5672/",
"exchange": "",
"routingKey": "sensor_events"
}
}
  • url: AMQP接続URI(amqp://user:pass@host:port/vhostまたはamqps://...
  • exchange: エクスチェンジ名(空文字列はデフォルトエクスチェンジを対象)
  • queue: 消費元のキュー名(Readerのみ)
  • routingKey: ルーティングキーまたはバインディングパターン
  • tls: 接続にTLSを使用
  • prefetchCount: (Reader)チャネル上の未確認メッセージの最大数
  • autoAck: (Reader)配信時にメッセージを自動確認応答
  • consumerTag: (Reader)クライアント提供のコンシューマ識別子
  • mandatory: (Writer)ルーティングできない場合にパブリッシャーにメッセージを返す
  • contentType: (Writer)パブリッシュメッセージのMIMEタイプ(デフォルト:application/json
  • autoAck: true - メッセージが送信された時点で配信完了と見なされます。高速ですが、コンシューマがクラッシュするとメッセージが失われる可能性があります。
  • autoAck: false - Meddleが成功した各メッセージを明示的にackし、ペイロードエラー時にはnackします。prefetchCountと組み合わせて使用してインフライトメッセージ数を制限します。

トピックエクスチェンジはバインディングキーでワイルドカードをサポートします:

  • *は正確に1つの単語にマッチ:line1.*.temperature
  • #は0個以上の単語にマッチ:factory.#
  • MQTT - 軽量なパブ/サブの代替
  • Kafka - 高スループットなログベースメッセージング
  • NATS - クラウドネイティブなパブ/サブの代替