Skip to main content

Basics

In this we cover basics of streams, queries, and how queries can be chained to one another. Similarly basics on sources and sink that are used to consume and publish events. Also about c8db tables aka collections that can be used to store and query events. Finally an introduction to the concept of Stream Application.

Streams and Queries

This section provides introduction to streams and queries, and how multiple queries can be chained to one another.

There are multiple type of queries such as window query, join query, pattern query, etc. Below example explains how pass-through and selection queries work. For more info refer the Stream Query Guide.

Example

Following is an example annotated with descriptive comments.

-- Defines `InputTemperatureStream` stream to pass events having `sensorId` and `temperature` attributes of types `string` and `double`.
CREATE STREAM InputTemperatureStream (sensorId string, temperature double);

-- Optional `@info` annotation to name the query.
@info(name = 'Pass-through')

-- Query to consume events from `InputTemperatureStream`, produce new events by selecting all the attributes from the incoming events, and outputs them to `TemperatureStream`.
insert into TemperatureAndSensorStream
select *
from InputTemperatureStream;

@info(name = 'Simple-selection')


-- Selects only the `temperature` attribute from events, and outputs to `TemperatureOnlyStream`.
-- Consumes events from `TemperatureAndSensorStream`. The schema of the stream is inferred from the previous query, hence no need to be defined.
insert into TemperatureOnlyStream
select temperature
from TemperatureAndSensorStream;

Events at each stream

When an event with values ['aq-14', 35.4] is sent to InputTemperatureStream stream it will get converted and travel through the streams as below.

  • InputTemperatureStream : ['aq-14', 35.4]
  • TemperatureAndSensorStream : ['aq-14', 35.4]
  • TemperatureOnlyStream : [35.4]

Source & Sink

Sources and Sink are used to consume and publish events to external systems.

There are multiple source and sink types, but this example only explains c8db source, c8stream sink, and kafka sink. For more info refer the Stream Query guide.

Example

This example creates a C8DB source from which a stream consumes JSON messages:

C8DB source to consume `JSON` messages from.
CREATE SOURCE TemperatureStream WITH (type='database', collection='TemparatureStream', collection.type="doc", replication.type="global", map.type='json') (sensorId string, temperature double);

This example creates a sink to log events that arrive from a stream called TemperatureOnlyStream with the temperature attribute of type double:

CREATE SINK TemperatureOnlyStream WITH (type='stream', stream="TemperatureOnlyStream", replication.type="local", map.type='json') (temperature double);

@info(name = 'Simple-selection')
insert into TemperatureOnlyStream
select temperature
from TemperatureStream;

Input

When a JSON message is written to the collection TemparatureStream, it will automatically get mapped to an event in the TemperatureStream stream.

{
"sensorId":"aq-14",
"temperature":35.4
}

To process custom input messages, please refer the examples related to Input Data Mapping.

Output

After processing, the event arriving at TemperatureOnlyStream will be emitted via c8stream and kafka sinks.

The message is published to TemperatureOnlyStream as

{"temperature":"35.4"}

The kafka sink maps the event to a custom JSON message as below and publishes it to the temperature topic.

{"temp":"35.4"}

To output messages using other message formats, pleases refer the examples related to Output Data Mapping.

Table & Store

Provides introduction to tables and database backed stores that can be used to store events.

Example

-- Defines `TemperatureStream` stream having `sensorId` and `temperature` attributes of types `string` and `double`.
CREATE STREAM TemperatureStream (sensorId string, temperature double);

-- Defines `TemperatureLogTable` having `sensorId`, `roomNo`, and `temperature` attributes of types `string`, `string`, and `double`.
CREATE TABLE GLOBAL TemperatureLogTable (sensorId string, roomNo string, temperature double);

-- Defines `SensorIdInfoTable` table.
CREATE TABLE GLOBAL SensorIdInfoTable (sensorId string, roomNo string);

@info(name = 'Join-query')
-- Selects `sensorId`, `roomNo`, and `temperature` attributes from stream and table, and adds events to `TemperatureLogTable`.
insert into TemperatureLogTable
select t.sensorId as sensorId, s.roomNo as roomNo, t.temperature as temperature
from TemperatureStream as t join SensorIdInfoTable as s
on t.sensorId == s.sensorId;

Event at table and store

When SensorIdInfoTable table contains a recode ['aq-14', '789'], and when an event with values ['aq-14', 35.4] is sent to TemperatureStream stream.

The event will get converted and added to the TemperatureLogTable table as below.

['aq-14', '789', 35.4]

Retrieving values from tables and stores

The stored values can be retrieved by join tables and stores with the streams as in the Join-query depicted in the example, or using on-demand queries.

The data in TemperatureDetailsTable can be retrieved via on-demand queries as below, using the On Demand Query REST API.

select *
from TemperatureDetailsTable

Stream Application

Provides introduction to the concept of Stream Application.

Stream App provides an isolated execution environment for processing the execution logic. It can be deployed and processed independently of other Stream Apps in the system. Stream Apps can use inMemory, c8db and c8stream sources and sinks to communicate between each other.

Example

This example creates a C8DB source to consume events from stream applications:

CREATE STREAM TemperatureStream (sensorId string, temperature double);

This example creates a sink to publish events from stream applications:

CREATE SINK TemperatureOnlyStream WITH (type='inMemory', topic='Temperature') (temperature double);

@info(name = 'Simple-selection')
insert into TemperatureOnlyStream
select temperature
from TemperatureStream;

Input

When an event ['aq-14', 35.4] is pushed via the SensorDetail topic of the inMemory transport from another Stream App, the event will be consumed and mapped to the TemperatureStream stream.

Output

After processing, the event [35.4] arriving at TemperatureOnlyStream will be emitted via Temperature topic of the inMemory transport to other subscribed Stream Apps.