Skip to main content

Create Stream Workers

Introduction

Stream workers are declarative specs that define the processing logic to process the events sent to the stream processor. A stream worker definition contains the following configurations:

ConfigurationDescription
StreamA logical series of events ordered in time with a uniquely identifiable name, and set of defined attributes with specific data types defining its schema.
SourceThis consumes data from external sources (such as `TCP` , ` Kafka ` , ` HTTP ` , etc) in the form of events, then converts each event (that can be in `XML` , ` JSON` , ` binary` , etc. format) to a stream event, and passes that to a stream for processing.
SinkThis takes events arriving at a stream, maps them to a predefined data format (such as ` XML ` , `JSON,` `binary` , etc), and publishes them to external endpoints (such as ` E-mail ` , ` TCP ` , ` Kafka ` , `HTTP ` , etc).
TableA structured representation of data stored with a defined schema. Stored data can be backed by In-Memory, or external data stores such as RDBMS, MongoDB, etc. The tables can be accessed and manipulated at runtime.
Executional Element

An executional element can be one of the following:

  • Stateless query: Queries that only consider currently incoming events when generating an output. e.g., filters
  • Stateful query: Queries that consider both currently incoming events as well as past events when generating an output. e.g., windows, sequences, patterns, etc.
  • Partitions: Collections of stream definitions and queries separated from each other within a stream worker for the purpose of processing events in parallel and in isolation

Macrometa provide in-build source, sink and store explained in the later section of this document.

Creating a Stream Worker

To create a stream worker follow the steps below:

  1. Open the GUI. Click the Stream Workers tab.

  2. Click New Stream Worker to define a new stream worker.

  3. Type a Name for the stream worker. For example, SweetProductionAnalysis.

  4. Type a Description.

  5. Add the following sample stream worker:

    CREATE SOURCE SweetProductionStream WITH (type = 'database', collection='SweetProductionData', collection.type='DOC', replication.type='GLOBAL',  map.type='json') (name string, amount double);

    CREATE SINK ProductionAlertStream WITH (type= 'stream', stream='ProductionAlertStream', map.type='json') (name string, amount double);

    INSERT INTO ProductionAlertStream
    SELECT *
    FROM SweetProductionStream;
  6. Click Save to save the stream worker.

  7. Select all the regions to deploy your application in.

  8. Click Save.

Source

For this example, a source can be of type stream or database. The incoming data is used for processing. A source-of-type database is best if you need to store incoming data. Otherwise, you can use a source-type stream.

Create Streams

Syntax:

   CREATE SOURCE SourceName WITH (type="stream", stream.list="STRING", replication.type="STRING", map.type='type') (strings);

Example:

   CREATE SOURCE OrderStream WITH (type="stream", stream.list="OrderStream", replication.type="GLOBAL", map.type='json') (product_id string, quantity int);

Stream workers will use the stream with the default query parameters explained in the chart below.

Query Parameters:

NameDescriptionDefault ValuePossible Data TypesOptional
stream.listThis specifies the list of streams to which the source must listen. This list can be provided as a set of comma-separated values e.g. `stream_one,stream_two`STRINGNo
replication.typeSpecifies if the replication type of the streams. Possible values can be `local` and `global`localSTRINGYes

Create collections

You can create collections with your stream worker, and store incoming data in it for further processing, the syntax to achieve that is shown below:

Syntax:

   CREATE SOURCE SourceName WITH (type="database", collection="STRING", replication.type="STRING", collection.type="STRING", map.type='type') (strings);

Example:

   CREATE SOURCE SweetProductionStream WITH (type = 'database', collection='SweetProductionData', collection.type='DOC', replication.type='GLOBAL',  map.type='json') (name string, amount double);

Query parameters:

NameDescriptionDefault ValuePossible Data TypesOptional
collectionThis specifies the name of the collection to which the source must listen.STRINGNo
replication.typeSpecifies if the replication type of the collection. At the moment local collections are not allowed, type must be `global`localSTRINGNo
collection.typeThis specifies the type of the data collection contains. Possible values can be `doc` and `edge`.docSTRINGYes

Sink

Sinks are used to publish events to an external source after being processed. Sink consumes events from streams and allows the definition of a schema for the output format.

Create streams

Syntax:

   CREATE SINK SinkName WITH (type="stream", stream="STRING", replication.type="STRING", map.type='type') (strings);

Example:

   CREATE SINK ProductionAlertStream WITH (type= 'stream', stream='ProductionAlertStream', map.type='json') (name string, amount double);

Query Parameters:

NameDescriptionDefault ValuePossible Data TypesOptional
streamThe streams to which the sink needs to publish events.STRINGNo
replication.typeSpecifies if the replication type of the stream. Possible values can be `local` and `global`localSTRINGYes

Table

Table is similar to collection, is a structured representation of data with a defined schema.

Syntax:

   CREATE TABLE GLOBAL TableName(property type);

Example:

   CREATE TABLE GLOBAL SweetProductionCollection(name string, amount double);

Or equivalent using STORE:

   CREATE STORE SweetProductionCollection WITH (type="database", collection="SweetProductionCollection", replication.type="local", collection.type="DOC", map.type='json') (name string, amount double);

The stream worker will use the Macrometa collections with the default query parameters explained in the chart below.

NameDescriptionDefault ValuePossible Data TypesOptional
collectionThis specifies the name of the collection to which events must written.STRINGNo
replication.typeSpecifies if the replication type of the collection. Note: Type must be `global`. Local collections are not currently allowed.localSTRINGNo
collection.typeThis specifies the type of the data collection contains. Possible values can be `doc` and `edge`.docSTRINGYes
fromIf `collection.type` is specified as `edge`, this field indicates which field to be considered as a source node of the edge._fromSTRINGYes
toIf `collection.type` is specified as `edge`, this field indicates which field to be considered as a destination node of the edge._toSTRINGYes

Tutorials

Following tutorials cover various user scenarios using Macrometa Stream Processing.