Skip to main content

inMemory (Source)

In-memory source subscribes to a topic to consume events which are published on the same topic by In-memory sinks. This provides a way to connect multiple Stream App Apps deployed under the same Stream App Manager (JVM). Here both the publisher and subscriber should have the same event schema (stream definition) for successful data transfer.

Syntax

CREATE SOURCE <NAME> WITH (type="inMemory", map.type="<STRING>", topic="<STRING>")

Query Parameters

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
topicSubscribes to the events sent on the given topic.STRINGNoNo

Example 1

CREATE SOURCE StocksStream WITH (type='stream', topic='Stocks', map.type='passThrough') (symbol string, price float, volume long);

Here the StocksStream uses inMemory source to consume events published on the topic Stocks by the inMemory sinks deployed in the same JVM.

jms (Source)

JMS Source allows users to subscribe to a JMS broker and receive JMS messages. It has the ability to receive Map messages and Text messages.

Syntax

CREATE SOURCE <NAME> WITH (type="jms", map.type="<STRING>", destination="<STRING>", connection.factory.jndi.name="<STRING>", factory.initial="<STRING>", provider.url="<STRING>", connection.factory.type="<STRING>", worker.count="<INT>", connection.username="<STRING>", connection.password="<STRING>", retry.interval="<INT>", retry.count="<INT>", use.receiver="<BOOL>", subscription.durable="<BOOL>", connection.factory.nature="<STRING>")

Query Parameters

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
destinationQueue/Topic name which JMS Source should subscribe toSTRINGNoNo
connection.factory.jndi.nameJMS Connection Factory JNDI name. This value will be used for the JNDI lookup to find the JMS Connection Factory.QueueConnectionFactorySTRINGYesNo
factory.initialNaming factory initial valueSTRINGNoNo
provider.urlJava naming provider URL. Property for specifying configuration information for the service provider to use. The value of the property should contain a URL string (e.g. "ldap://somehost:389")STRINGNoNo
connection.factory.typeType of the connection connection factory. This can be either queue or topic.queueSTRINGYesNo
worker.countNumber of worker threads listening on the given queue/topic.1INTYesNo
connection.usernameusername for the broker.NoneSTRINGYesNo
connection.passwordPassword for the brokerNoneSTRINGYesNo
retry.intervalInterval between each retry attempt in case of connection failure in milliseconds.10000INTYesNo
retry.countNumber of maximum reties that will be attempted in case of connection failure with broker.5INTYesNo
use.receiverImplementation to be used when consuming JMS messages. By default transport will use MessageListener and tweaking this property will make make use of MessageReceiverfalseBOOLYesNo
subscription.durableProperty to enable durable subscription.falseBOOLYesNo
connection.factory.natureConnection factory nature for the broker.defaultSTRINGYesNo

Example 1

CREATE SOURCE inputStream WITH (type='jms', map.type='json', factory.initial='org.apache.activemq.jndi.ActiveMQInitialContextFactory', provider.url='tcp://localhost:61616',destination='DAS_JMS_TEST', connection.factory.type='topic',connection.factory.jndi.name='TopicConnectionFactory') (name string, age int, country string);

This example shows how to connect to an ActiveMQ topic and receive messages.

Example 2

CREATE SOURCE inputStream WITH (type='jms', map.type='json', factory.initial='org.apache.activemq.jndi.ActiveMQInitialContextFactory', provider.url='tcp://localhost:61616',destination='DAS_JMS_TEST') (name string, age int, country string);

This example shows how to connect to an ActiveMQ queue and receive messages. Note that we are not providing properties like connection factory type