Patterns and Trends Detection Examples
This page explains ways to detect patterns and trends in your data.
Simple Pattern
The pattern is a state machine implementation that detects event occurrences from events arrived via one or more event streams over time.
Simple Pattern Example
This stream worker shows a simple pattern that detects high-temperature event occurrence of a continuous event stream. The application sends an alert if the temperature of a room increases by 5 degrees within 10 min.
-- Defines `TemperatureStream` having information of room temperature such as `roomNo` and `temp`.
CREATE STREAM TemperatureStream(roomNo int, temp double);
-- Defines `HighTempAlertStream` which contains the alerts for high temperature.
CREATE SINK HighTempAlertStream WITH (type = 'log') (roomNo int, initialTemp double, finalTemp double);
-- Identify if the temperature of a room increases by 5 degrees within 10 min.
insert into HighTempAlertStream
select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp
from every( e1 = TemperatureStream ) ->
e2 = TemperatureStream[ e1.roomNo == roomNo
and (e1.temp + 5) <= temp ]
within 10 min;
Simple Pattern Input
Below events are sent to TemperatureStream
within 10 minutes:
, 35
, 37
, 40
Simple Pattern Output
After processing the above input events, the event arriving at HighTempAlertStream
, 35.0
, 40.0
Counting Pattern
Counting patterns allow to match multiple events that might have been received for the same matching condition. The number of events matched per condition can be limited with condition postfixes.
Refer the Stream Query Guide for more information.
Counting Pattern Example
This stream worker calculates the temperature difference between two regulator events. Here, when at least one TemperatureStream event occurs between two RegulatorStream events the pattern is valid and logs can be seen.
-- Defines `TemperatureStream` having information on room temperature such as `sensorID`, `roomNo` and `temp`.
CREATE STREAM TemperatureStream (sensorID long, roomNo int, temp double);
-- Defines `RegulatorStream` which contains the events from regulator with attributes `deviceID`, `roomNo`, `tempSet`, and `isOn`.
CREATE STREAM RegulatorStream (deviceID long, roomNo int, tempSet double, isOn bool);
-- Defines `TemperatureDiffStream` which contains the events related to temperature difference.
CREATE SINK TemperatureDiffStream WITH (type = 'log') (roomNo int, tempDiff double);
-- Calculates the temperature difference between two regulator events. Here, when at least one TemperatureStream event needs to arrive between two RegulatorStream events.
-- Finds the temperature difference between the first and last temperature event.
insert into TemperatureDiffStream
select e1.roomNo, e2[0].temp - e2[last].temp as tempDiff
from every( e1 = RegulatorStream)
-> e2 = TemperatureStream[e1.roomNo == roomNo] < 1: >
-> e3 = RegulatorStream[e1.roomNo == roomNo];
Counting Pattern Input
First, below event is sent to
]Below events are sent to
]Finally, below event is sent again to
Counting Pattern Output
After processing the above input events, the event arriving at TemperatureDiffStream
, 3.0
Logical Pattern
Logical patterns match events that arrive in temporal order and correlate them with logical relationships such as and
, or
, and not
Logical Pattern Example
This stream worker sends a stop
action on the regulator if a removed
action is triggered in the RoomKeyStream
-- Defines `RegulatorStateChangeStream` having information of regulator state change such as `deviceID`, `roomNo`, `tempSet`, and `action`.
CREATE STREAM RegulatorStateChangeStream(deviceID long, roomNo int, tempSet double, action string);
-- Defines `RoomKeyStream` which contains the events related to room key usage.
CREATE STREAM RoomKeyStream(deviceID long, roomNo int, action string);
-- Defines `RegulatorActionStream` which contains the events related to regulator state changes.
CREATE SINK RegulatorActionStream WITH (type='log') (roomNo int, action string);
-- Sends a stop action on RegulatorActionStream stream, if a removed action is triggered in RoomKeyStream before the regulator state changing to off which is notified in RegulatorStateChangeStream.
insert into RegulatorActionStream
select e1.roomNo,
-- Checks whether pattern triggered due to removal of room key.
ifThenElse( e2 is null, 'none', 'stop' ) as action
from every e1=RegulatorStateChangeStream[ action == 'on' ]
-> e2=RoomKeyStream
[ e1.roomNo == roomNo and action == 'removed' ]
or e3=RegulatorStateChangeStream
[ e1.roomNo == roomNo and action == 'off']
having action != 'none' ;
Logical Pattern Input
First, below event is sent to
]Then, send below events are sent to
Logical Pattern Output
After processing the above input events, the event arriving at RegulatorActionStream
, stop
Non-Occurrence Pattern
A non-occurrence patterns identifies the absence of events when detecting a pattern.
Non-Occurrence Pattern Example
This stream worker detects non-occurrence of events using the not keyword, and its effective non-occurrence checking period is bounded either by fulfillment of a condition associated by and or via an expiry time using <time period>
It sends a notification alert if the room temperature is not reduced to the expected level after the regulator is started.
-- Defines `RegulatorStateChangeStream` having information of regulator state change such as `deviceID`, `roomNo`, `tempSet`, and `action`.
CREATE STREAM RegulatorStateChangeStream(deviceID long, roomNo int, tempSet double, action string);
-- Defines `TemperatureStream` having information of room temperature such as `roomNo` and `temp`.
CREATE STREAM TemperatureStream (roomNo int, temp double);
-- Defines `RoomTemperatureAlertStream` which contains the temperature alerts.
CREATE SINK RoomTemperatureAlertStream WITH (type='log') (roomNo int);
-- Alerts if no temperature event having a temperature less than what is set in regulator arrives within 5 minutes after switching on the regulator.
insert into RoomTemperatureAlertStream
select e1.roomNo as roomNo
from e1=RegulatorStateChangeStream[action == 'on']
-> not TemperatureStream[e1.roomNo == roomNo and
temp <= e1.tempSet] for 30 sec;
Non-Occurrence Pattern Input
First, below event is sent to
Non-Occurrence Pattern Output
After processing the above input event, an alert event arrives at RoomTemperatureAlertStream
after the 30 seconds (from the first event):
Simple Sequence
Sequence is a state machine implementation that detects consecutive event occurrences from events arrived via one or more event streams over time. Here all matching events need to arrive consecutively, and there should not be any non-matching events in between the matching sequence of events.
Simple Sequence Example
This stream worker can be used to detect trends from a stock trades stream; in the above example, peak stock rate identified.
-- Defines `StockRateStream` having information on stock rate such as `symbol`, `price` and `volume`.
CREATE STREAM StockRateStream (symbol string, price float, volume int);
-- Defines `PeakStockRateStream` which contains the peak stock rate.
CREATE SINK PeakStockRateStream WITH (type='log') (symbol string, rateAtPeak float);
-- Partition the `StockRateStream` events by `symbol`
partition with (symbol of StockRateStream)
-- Identifies the peak stock price (top rate of the stock price trend)
insert into PeakStockRateStream
select e1.symbol, e2.price as rateAtPeak
from every e1=StockRateStream,
e2=StockRateStream[e1.price < price],
e3=StockRateStream[e2.price > price]
within 10 min;
Simple Sequence Input
Below events are sent to StockRateStream
within 10 minutes:
, 35
, 20
, 40
, 15
, 38
, 20
Simple Sequence Output
After processing the above input events, the event arriving at PeakStockRateStream
, 40
Sequence with Count
Sequence query does expect the matching events to occur immediately after each other, and it can successfully correlate the events who do not have other events in between. Here, sequence can count event occurrences.
Sequence with Count Example
This stream worker identifies temperature peaks by monitoring continuous increases in temp attribute and alerts upon the first drop.
-- Defines `TemperatureStream` having information on room temperatures such as `roomNo` and `temp`.
CREATE STREAM TemperatureStream(roomNo int, temp double);
-- Defines `PeakTemperatureStream` which contains events related to peak temperature trends.
CREATE SINK PeakTemperatureStream WITH (type='log') (roomNo int, initialTemp double, peakTemp double, firstDropTemp double);
-- Partition the `TemperatureStream` events by `roomNo`
partition with (roomNo of TemperatureStream)
@info(name = 'temperature-trend-analyzer')
insert into PeakTemperatureStream
-- Projects the lowest, highest and the first drop in the temperature trend
select e1.roomNo, e1.temp as initialTemp,
e2[last].temp as peakTemp, e3.temp as firstDropTemp
-- Identifies the trend of the temperature in a room
from every e1=TemperatureStream,
e2=TemperatureStream[ifThenElse(e2[last].temp is null,
e1.temp <= temp, e2[last].temp <= temp)]+,
e3=TemperatureStream[e2[last].temp > temp];
Sequence with Count Input
Below events are sent to
][`10`, `28`] [20
][`20`, `32`] [20
][`20`, `33`]
Sequence with Count Output
After processing the above input events, the event arriving at PeakTemperatureStream
, 29.0
, 35.0
, 33.0
Logical Sequence
The sequence can repetitively match event sequences
and use logical event ordering (using and
, or
, and not
Logical Sequence Example
This stream worker can be used identify a regulator activation event immediately followed by both temperature sensor and humidity sensor activation events in either order.
-- Defines `TempSensorStream` having information of temperature sensor device.
CREATE STREAM TempSensorStream(deviceID long, isActive bool);
-- Defines `HumidSensorStream` having information of humidity sensor device.
CREATE STREAM HumidSensorStream(deviceID long, isActive bool);
-- Defines `RegulatorStream` which contains the events from regulator with attributes `deviceID` and `isOn`.
CREATE STREAM RegulatorStream(deviceID long, isOn bool);
CREATE SINK StateNotificationStream WITH (type='log') (deviceID long, tempSensorActive bool, humidSensorActive bool);
-- Identifies a regulator activation event immediately followed by both temperature sensor and humidity sensor activation events in either order.
insert into StateNotificationStream
select e1.deviceID, e2.isActive as tempSensorActive,
e3.isActive as humidSensorActive
from every e1=RegulatorStream[isOn == true],
e2=TempSensorStream and e3=HumidSensorStream;
Logical Sequence Input
First, below event is sent to
]Then, below event is sent to
]Then, below event is sent to
Logical Sequence Output
After processing the above input events, the event arriving at StateNotificationStream
, false
, true