Skip to main content

nats (Source)

NATS Source allows users to subscribe to a NATS broker and receive messages. It has the ability to receive all the message types supported by NATS.

Syntax

CREATE SOURCE <NAME> WITH (type="nats", map.type="<STRING>", destination="<STRING>", bootstrap.servers="<STRING>", client.id="<STRING>", cluster.id="<STRING>", queue.group.name="<STRING>", durable.name="<STRING>", subscription.sequence="<STRING>")

Query Parameters

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
destinationSubject name which NATS Source should subscribe to.STRINGNoNo
bootstrap.serversThe NATS based url of the NATS server.nats://localhost:4222STRINGYesNo
client.idThe identifier of the client subscribing/connecting to the NATS broker.NoneSTRINGYesNo
cluster.idThe identifier of the NATS server/cluster.test-clusterSTRINGYesNo
queue.group.nameThis can be used when there is a requirement to share the load of a NATS subject. Clients belongs to the same queue group share the subscription load.NoneSTRINGYesNo
durable.nameThis can be used to subscribe to a subject from the last acknowledged message when a client or connection failure happens. The client can be uniquely identified using the tuple (client.id, durable.name).NoneSTRINGYesNo
subscription.sequenceThis can be used to subscribe to a subject from a given number of message sequence. All the messages from the given point of sequence number will be passed to the client. If not provided then the either the persisted value or 0 will be used.NoneSTRINGYesNo

Example 1

CREATE SOURCE inputStream WITH (type='nats', map.type='text', destination='SP_NATS_INPUT_TEST', bootstrap.servers='nats://localhost:4222',client.id='nats_client',server.id='test-cluster',queue.group.name = 'group_nats',durable.name = 'nats-durable',subscription.sequence = '100') (name string, age int, country string);

This example shows how to subscribe to a NATS subject with all supporting configurations.With the following configuration the source identified as nats-client will subscribes to a subject named as SP_NATS_INPUT_TEST which resides in a nats instance with a cluster id of test-cluster, running in localhost and listening to the port 4222 for client connection. This subscription will receive all the messages from 100th in the subject.

Example 2

CREATE SOURCE inputStream WITH (type='nats', map.type='text', destination='SP_NATS_INPUT_TEST') (name string, age int, country string);

This example shows how to subscribe to a NATS subject with mandatory configurations.With the following configuration the source identified with an auto generated client id will subscribes to a subject named as SP_NATS_INTPUT_TEST which resides in a nats instance with a cluster id of test-cluster, running in localhost and listening to the port 4222 for client connection. This will receive all available messages in the subject

Example 3

CREATE SOURCE inputStream WITH (type='nats', map.type='json', map.attributes="name='$.name', age='$.age', country='$.country', sequenceNum='trp:sequenceNumber'", destination='SIDDHI_NATS_SOURCE_TEST_DEST', client.id='nats_client', bootstrap.servers='nats://localhost:4222', cluster.id='test-cluster') (name string, age int, country string, sequenceNum string);

This example shows how to pass NATS Streaming sequence number to the event.