Create Stream Workers
Introduction
Stream workers are declarative specs that define the processing logic to process the events sent to the stream processor. A stream worker definition contains the following configurations:
Configuration | Description |
---|---|
Stream | A logical series of events ordered in time with a uniquely identifiable name, and set of defined attributes with specific data types defining its schema. |
Source | This consumes data from external sources (such as `TCP` , ` Kafka ` , ` HTTP ` , etc) in the form of events, then converts each event (that can be in `XML` , ` JSON` , ` binary` , etc. format) to a stream event, and passes that to a stream for processing. |
Sink | This takes events arriving at a stream, maps them to a predefined data format (such as ` XML ` , `JSON,` `binary` , etc), and publishes them to external endpoints (such as ` E-mail ` , ` TCP ` , ` Kafka ` , `HTTP ` , etc). |
Table | A structured representation of data stored with a defined schema. Stored data can be backed by In-Memory, or external data stores such as RDBMS, MongoDB, etc. The tables can be accessed and manipulated at runtime. |
Executional Element | An executional element can be one of the following:
|
Macrometa provide in-build source, sink and store explained in the later section of this document.
Creating a Stream Worker
To create a stream worker follow the steps below:
Open the GUI. Click the Stream Workers tab.
Click New Stream Worker to define a new stream worker.
Type a Name for the stream worker. For example,
SweetProductionAnalysis
.Type a Description.
Add the following sample stream worker:
CREATE SOURCE SweetProductionStream WITH (type = 'database', collection='SweetProductionData', collection.type='DOC', replication.type='GLOBAL', map.type='json') (name string, amount double);
CREATE SINK ProductionAlertStream WITH (type= 'stream', stream='ProductionAlertStream', map.type='json') (name string, amount double);
INSERT INTO ProductionAlertStream
SELECT *
FROM SweetProductionStream;Click Save to save the stream worker.
Select all the regions to deploy your application in.
Click Save.
Source
For this example, a source can be of type stream or database. The incoming data is used for processing. A source-of-type database is best if you need to store incoming data. Otherwise, you can use a source-type stream.
Create Streams
Syntax:
CREATE SOURCE SourceName WITH (type="stream", stream.list="STRING", replication.type="STRING", map.type='type') (strings);
Example:
CREATE SOURCE OrderStream WITH (type="stream", stream.list="OrderStream", replication.type="GLOBAL", map.type='json') (product_id string, quantity int);
Stream workers will use the stream with the default query parameters explained in the chart below.
Query Parameters:
Name | Description | Default Value | Possible Data Types | Optional |
---|---|---|---|---|
stream.list | This specifies the list of streams to which the source must listen. This list can be provided as a set of comma-separated values e.g. `stream_one,stream_two` | STRING | No | |
replication.type | Specifies if the replication type of the streams. Possible values can be `local` and `global` | local | STRING | Yes |
Create collections
You can create collections with your stream worker, and store incoming data in it for further processing, the syntax to achieve that is shown below:
Syntax:
CREATE SOURCE SourceName WITH (type="database", collection="STRING", replication.type="STRING", collection.type="STRING", map.type='type') (strings);
Example:
CREATE SOURCE SweetProductionStream WITH (type = 'database', collection='SweetProductionData', collection.type='DOC', replication.type='GLOBAL', map.type='json') (name string, amount double);
Query parameters:
Name | Description | Default Value | Possible Data Types | Optional |
---|---|---|---|---|
collection | This specifies the name of the collection to which the source must listen. | STRING | No | |
replication.type | Specifies if the replication type of the collection. At the moment local collections are not allowed, type must be `global` | local | STRING | No |
collection.type | This specifies the type of the data collection contains. Possible values can be `doc` and `edge`. | doc | STRING | Yes |
Sink
Sinks are used to publish events to an external source after being processed. Sink consumes events from streams and allows the definition of a schema for the output format.
Create streams
Syntax:
CREATE SINK SinkName WITH (type="stream", stream="STRING", replication.type="STRING", map.type='type') (strings);
Example:
CREATE SINK ProductionAlertStream WITH (type= 'stream', stream='ProductionAlertStream', map.type='json') (name string, amount double);
Query Parameters:
Name | Description | Default Value | Possible Data Types | Optional |
---|---|---|---|---|
stream | The streams to which the sink needs to publish events. | STRING | No | |
replication.type | Specifies if the replication type of the stream. Possible values can be `local` and `global` | local | STRING | Yes |
Table
Table is similar to collection, is a structured representation of data with a defined schema.
Syntax:
CREATE TABLE GLOBAL TableName(property type);
Example:
CREATE TABLE GLOBAL SweetProductionCollection(name string, amount double);
Or equivalent using STORE:
CREATE STORE SweetProductionCollection WITH (type="database", collection="SweetProductionCollection", replication.type="local", collection.type="DOC", map.type='json') (name string, amount double);
The stream worker will use the Macrometa collections with the default query parameters explained in the chart below.
Name | Description | Default Value | Possible Data Types | Optional |
---|---|---|---|---|
collection | This specifies the name of the collection to which events must written. | STRING | No | |
replication.type | Specifies if the replication type of the collection. Note: Type must be `global`. Local collections are not currently allowed. | local | STRING | No |
collection.type | This specifies the type of the data collection contains. Possible values can be `doc` and `edge`. | doc | STRING | Yes |
from | If `collection.type` is specified as `edge`, this field indicates which field to be considered as a source node of the edge. | _from | STRING | Yes |
to | If `collection.type` is specified as `edge`, this field indicates which field to be considered as a destination node of the edge. | _to | STRING | Yes |
Tutorials
Following tutorials cover various user scenarios using Macrometa Stream Processing.