Skip to main content

rabbitmq (Source)

The rabbitmq source receives the events from the rabbitmq broker via the AMQP protocol.

Syntax

CREATE SOURCE <NAME> WITH (type="rabbitmq", map.type="<STRING>", uri="<STRING>", heartbeat="<INT>", exchange.name="<STRING>", exchange.type="<STRING>", exchange.durable.enabled="<BOOL>", exchange.autodelete.enabled="<BOOL>", routing.key="<STRING>", headers="<STRING>", queue.name="<STRING>", queue.durable.enabled="<BOOL>", queue.exclusive.enabled="<BOOL>", queue.autodelete.enabled="<BOOL>", tls.enabled="<BOOL>", tls.truststore.path="<STRING>", tls.truststore.password="<STRING>", tls.truststore.type="<STRING>", tls.version="<STRING>", auto.ack="<BOOL>")

Query Parameters

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
uriThe URI that is used to connect to an AMQP server. If no URI is specified,an error is logged in the CLI.e.g., amqp://guest:guest, amqp://guest:guest@localhost:5672STRINGNoNo
heartbeatThe period of time (in seconds) after which the peer TCP connection should be considered unreachable (down) by RabbitMQ and client libraries.60INTYesNo
exchange.nameThe name of the exchange that decides what to do with a message it receives.If the exchange.name already exists in the RabbitMQ server, then the system uses that exchange.name instead of redeclaring.STRINGNoNo
exchange.typeThe type of the exchange name. The exchange types available are direct, fanout, topic and headers. For a detailed description of each type, see [RabbitMQ - AMQP Concepts](https://www.rabbitmq.com/tutorials/amqp-concepts.html).directSTRINGYesNo
exchange.durable.enabledIf this is set to true, the exchange remains declared even if the broker restarts.falseBOOLYesNo
exchange.autodelete.enabledIf this is set to true, the exchange is automatically deleted when it is not used anymore.falseBOOLYesNo
routing.keyThe key based on which the exchange determines how to route the message to queues. The routing key is like an address for the message. The routing.key must be initialized when the value for the exchange.type parameter is direct or topic.emptySTRINGYesNo
headersThe headers of the message. The attributes used for routing are taken from the this paremeter. A message is considered matching if the value of the header equals the value specified upon binding.nullSTRINGYesNo
queue.nameA queue is a buffer that stores messages. If the queue name already exists in the RabbitMQ server, then the system usees that queue name instead of redeclaring it. If no value is specified for this parameter, the system uses the unique queue name that is automatically generated by the RabbitMQ server.system generated queue nameSTRINGYesNo
queue.durable.enabledIf this parameter is set to true, the queue remains declared even if the broker restartsfalseBOOLYesNo
queue.exclusive.enabledIf this parameter is set to true, the queue is exclusive for the current connection. If it is set to false, it is also consumable by other connections.falseBOOLYesNo
queue.autodelete.enabledIf this parameter is set to true, the queue is automatically deleted when it is not used anymore.falseBOOLYesNo
tls.enabledThis parameter specifies whether an encrypted communication channel should be established or not. When this parameter is set to true, the tls.truststore.path and tls.truststore.password parameters are initialized.falseBOOLYesNo
tls.truststore.pathThe file path to the location of the truststore of the client that receives the RabbitMQ events via the AMQP protocol. A custom client-truststore can be specified if required. If a custom truststore is not specified, then the system uses the default client-trustore in the ${carbon.home}/resources/security directory.\${carbon.home}/resources/security/client-truststore.jksSTRINGYesNo
tls.truststore.passwordThe password for the client-truststore. A custom password can be specified if required. If no custom password is specified, then the system uses gdncarbon as the default password.gdncarbonSTRINGYesNo
tls.truststore.typeThe type of the truststore.JKSSTRINGYesNo
tls.versionThe version of the tls/ssl.SSLSTRINGYesNo
auto.ackIf this parameter is set to false, the server should expect explicit messages acknowledgements once deliveredtrueBOOLYesNo

Example 1

@App:name('TestExecutionPlan')
CREATE STREAM FooStream (symbol string, price float, volume long);

@info(name = 'query1')
CREATE SOURCE BarStream WITH (type ='rabbitmq', uri = 'amqp://guest:guest@localhost:5672', exchange.name = 'direct', routing.key= 'direct', map.type='xml') (symbol string, price float, volume long);

insert into BarStream
from FooStream select symbol, price, volume ;

This query receives events from the direct exchange with the directexchange type, and the directTest routing key.