Salta ai contenuti

Connettore AMQP

Il connettore AMQP abilita messaggistica publish/subscribe tramite RabbitMQ e qualsiasi broker AMQP 0.9.1, con exchange, code e routing key.

Tipi Connettore:

  • AmqpReader - Consuma messaggi da una coda o binding di exchange
  • AmqpWriter - Pubblica messaggi a un exchange con una routing key

Consuma messaggi da una coda esistente:

{
"type": "AmqpReader",
"config": {
"url": "amqp://guest:guest@localhost:5672/",
"queue": "sensor_events",
"autoAck": false,
"prefetchCount": 10,
"consumerTag": "meddle-reader"
}
}

Quando queue è vuoto e exchange è impostato, il reader dichiara una coda esclusiva temporanea e la lega all’exchange usando routingKey:

{
"type": "AmqpReader",
"config": {
"url": "amqp://user:password@rabbit.example.com:5672/",
"exchange": "factory.events",
"routingKey": "line1.*.temperature",
"autoAck": true
}
}
{
"type": "AmqpReader",
"config": {
"url": "amqps://user:password@rabbit.example.com:5671/",
"queue": "secure_events",
"tls": true,
"autoAck": false
}
}

Pubblica messaggi a un exchange:

{
"type": "AmqpWriter",
"config": {
"url": "amqp://guest:guest@localhost:5672/",
"exchange": "factory.events",
"routingKey": "line1.temperature",
"mandatory": false,
"contentType": "application/json"
}
}

Pubblica all’Exchange Default (Coda Direttamente)

Sezione intitolata “Pubblica all’Exchange Default (Coda Direttamente)”

Lascia exchange vuoto e usa il nome della coda come routing key:

{
"type": "AmqpWriter",
"config": {
"url": "amqp://guest:guest@localhost:5672/",
"exchange": "",
"routingKey": "sensor_events"
}
}
  • url: URI di connessione AMQP (amqp://user:pass@host:port/vhost o amqps://...)
  • exchange: Nome dell’exchange (stringa vuota punta all’exchange default)
  • queue: Nome della coda da cui consumare (solo Reader)
  • routingKey: Routing key o pattern di binding
  • tls: Usa TLS per la connessione
  • prefetchCount: (Reader) Massimo numero di messaggi non riconosciuti sul canale
  • autoAck: (Reader) Riconoscimento automatico dei messaggi alla consegna
  • consumerTag: (Reader) Identificatore consumer fornito dal client
  • mandatory: (Writer) Restituisce il messaggio al publisher se non può essere instradato
  • contentType: (Writer) Tipo MIME per i messaggi pubblicati (default: application/json)
  • autoAck: true - Il broker considera i messaggi consegnati non appena vengono inviati. Più veloce ma i messaggi possono andare persi al crash del consumer.
  • autoAck: false - Meddle riconosce esplicitamente ogni messaggio riuscito e fa nack su errori del payload. Usalo con prefetchCount per limitare i messaggi in volo.

Gli exchange topic supportano wildcard nelle chiavi di binding:

  • * corrisponde esattamente a una parola: line1.*.temperature
  • # corrisponde a zero o più parole: factory.#
  • MQTT - Alternativa pub/sub leggera
  • Kafka - Messaggistica basata su log ad alto throughput
  • NATS - Alternativa pub/sub cloud-native