Skip to main content

Named Aggregation

Named aggregation allows you to obtain aggregates in an incremental manner for a specified set of time periods.

This not only allows you to calculate aggregations with varied time granularity, but also allows you to access them in an interactive manner for reports, dashboards, and for further processing. Its schema is defined via the aggregation definition.

Purpose

Named aggregation allows you to retrieve the aggregate values for different time durations. That is, it allows you to obtain aggregates such as sum, count, avg, min, max, count and distinctCount of stream attributes for durations such as sec, min, hour, etc.

This is of considerable importance in many Analytics scenarios because aggregate values are often needed for several time periods. Furthermore, this ensures that the aggregations are not lost due to unexpected system failures because aggregates can be stored in different persistence stores.

Syntax

CREATE AGGREGATION <aggregator name> WITH (store.type='<store type>', store.replication.type='<global or local'>, purge.enable='<true or false>', purge.interval='<purging interval>', purge.retention.period='<retention period>')
select <attribute name>, <aggregate function>(<attribute name>) as <attribute name>, ...
from <input stream>
group by <attribute name>
aggregate by <timestamp attribute> every <time periods> ;

The above syntax includes the following:

ItemDescription
storeThis annotation is used to refer to the data store where the calculated aggregate results are stored. This annotation is optional. When no annotation is provided, the data is stored in the in-memory store.
purgeThis annotation is used to configure purging in aggregation granularities. If this annotation is not provided, the default purging mentioned above is applied. If you want to disable automatic data purging, you can use this annotation as follows:purge.enable='false' You should disable data purging if the aggregation query in included in the stream worker for read-only purposes.
purge.retention.PeriodThis annotation is used to specify the length of time the data needs to be retained when carrying out data purging. If this annotation is not provided, the default retention period is applied.
<aggregator name>This specifies a unique name for the aggregation so that it can be referred when accessing aggregate results.
<input stream>The stream that feeds the aggregation. Note! this stream should be already defined.
group by <attribute name>The group by clause is optional. If it is included in a stream worker, aggregate values are calculated per each group by attribute. If it is not used, all the events are aggregated together.
by <timestamp attribute>This clause is optional. This defines the attribute that should be used as the timestamp. If this clause is not used, the event time is used by default. The timestamp could be given as either a string or a long value. If it is a long value, the unix timestamp in milliseconds is expected (e.g. 1496289950000). If it is a string value, the supported formats are <yyyy>-<MM>-<dd> <HH>:<mm>:<ss> (if time is in GMT) and <yyyy>-<MM>-<dd> <HH>:<mm>:<ss> <Z> (if time is not in GMT), here the ISO 8601 UTC offset must be provided for <Z> .(e.g., +05:30, -11:00).
<time periods>Time periods can be specified as a range where the minimum and the maximum value are separated by three dots, or as comma-separated values. e.g., A range can be specified as sec...year where aggregation is done per second, minute, hour, day, month and year. Comma-separated values can be specified as min, hour. Skipping time durations (e.g., min, day where the hour duration is skipped) when specifying comma-separated values is supported only from v4.1.1 onwards

Aggregation's granularity data holders are automatically purged every 15 minutes. When carrying out data purging, the retention period you have specified for each granularity in the named aggregation query is taken into account. The retention period defined for a granularity needs to be greater than or equal to its minimum retention period as specified in the table below. If no valid retention period is defined for a granularity, the default retention period (as specified in the table below) is applied.

GranularityDefault retentionMinimum retention
second120 seconds120 seconds
minute24 hours120 minutes
hour30 days25 hours
day1 year32 days
monthAll13 month
yearAllnone
note

Aggregation is carried out at calendar start times for each granularity with the GMT timezone

note

the aggregation input stream should only feed events to one aggregation definition).

Example

This stream worker defines an aggregation named TradeAggregation to calculate the average for the price attribute and sum for the volume attribute of events arriving at the TradeStream stream. These aggregates are calculated per every time granularity in the second-year range.

CREATE STREAM TradeStream (symbol string, price double, volume long, timestamp long);

CREATE AGGREGATION TradeAggregation WITH (store.type='database', store.replication.type='global', purge.enable='true', purge.interval='10 sec', purge.retentionPeriod.sec='120 sec', purge.retentionPeriod.min='24 hours', purge.retentionPeriod.hours='30 days', purge.retentionPeriod.days='1 year', purge.retentionPeriod.months='all', purge.retentionPeriod.years='all')
select symbol, avg(price) as avgPrice, sum(volume) as total
from TradeStream
group by symbol
aggregate by timestamp every sec ... year;

Distributed Aggregation

Distributed aggregation allows you to partially process aggregations in different shards. This allows stream worker in one shard to be responsible only for processing a part of the aggregation.

Syntax

CREATE AGGREGATION <aggregator name> WITH (store.type='database', store.replication.type='global', PartitionById.enable='false')
select <attribute name>, <aggregate function>(<attribute name>) as <attribute name>, ...
from <input stream>
group by <attribute name>
aggregate by <timestamp attribute> every <time periods> ;

Following table includes the annotation to be used to enable distributed aggregation:

ItemDescription
@partitionByIdIf the property is given, then the distributed aggregation is enabled. Further this can be disabled by using enable element, PartitionById.enable='false'.

Further, following system properties are also available,

System PropertyDescriptionPossible ValuesOptionalDefault Value
shardIdThe id of the shard one of the distributed aggregation is running in. This should be unique to a single shardAny stringNo<Empty_String>
partitionByIdThis allows user to enable/disable distributed aggregation for all aggregations running in one stream processing manager.true/falseYesiofalse
note

ShardIds should not be changed after the first configuration in order to keep data consistency.

Join (Aggregation)

This allows a stream to retrieve calculated aggregate values from the aggregation.

note

A join can also be performed with two streams, with a table and a stream, or with a stream against externally named windows.

Syntax

A join with aggregation is similer to the join with table, but with additional within and per clauses.

insert into <output stream>
select <attribute name>, <attribute name>, ...
from <input stream> join <aggrigation>
on <join condition>
within <time range>
per <time granularity>;

Apart from constructs of table join this includes the following. Please note that the 'on' condition is optional :

ItemDescription
within <time range>This allows you to specify the time interval for which the aggregate values need to be retrieved. This can be specified by providing the start and end time separated by a comma as string or long values, or by using the wildcard string specifying the data range. For details refer examples.
per <time granularity>This specifies the time granularity by which the aggregate values must be grouped and returned. e.g., If you specify days, the retrieved aggregate values are grouped for each day within the selected time interval.

within and per clauses also accept attribute values from the stream. The timestamp of the aggregations can be accessed through the AGG_TIMESTAMP attribute.

Example

Following aggregation definition will be used for the examples.

CREATE STREAM TradeStream (symbol string, price double, volume long, timestamp long);

CREATE AGGREGATION TradeAggregation WITH (store.type='database', store.replication.type='global')
select symbol, avg(price) as avgPrice, sum(volume) as total
from TradeStream
group by symbol
aggregate by timestamp every sec ... year;

This query retrieves daily aggregations within the time range "2014-02-15 00:00:00 +05:30", "2014-03-16 00:00:00 +05:30" (Please note that +05:30 can be omitted if timezone is GMT)

CREATE SINK STREAM TradeSummaryStream (symbol string, total long, avgPrice double);

@info(name = 'RetrievingAggregates')
insert into TradeSummaryStream
select a.symbol, a.total, a.avgPrice
from TradeStream as b join TradeAggregation as a
on a.symbol == b.symbol
within "2014-02-15 00:00:00 +05:30", "2014-03-16 00:00:00 +05:30"
per "days";

This query retrieves hourly aggregations within the day 2014-02-15.

CREATE SINK STREAM TradeSummaryStream (symbol string, total long, avgPrice double);

@info(name = 'RetrievingHourlyAggregates')
insert into TradeSummaryStream
select a.symbol, a.total, a.avgPrice
from TradeStream as b join TradeAggregation as a
on a.symbol == b.symbol
within "2014-02-15 **:**:** +05:30"
per "hours";

This query retrieves all aggregations per perValue stream attribute within the time period between timestamps 1496200000000 and 1596434876000.

CREATE STREAM TradeStream (symbol string, price double, volume long, timestamp long, perValue string);
CREATE SINK STREAM TradeSummaryStream (symbol string, total long, avgPrice double);

@info(name = 'RetrievingPervalueAggregates')
insert into TradeSummaryStream
select a.symbol, a.total, a.avgPrice
from TradeStream as b join TradeAggregation as a
on a.symbol == b.symbol
within 1496200000000L, 1596434876000L
per b.perValue;

Supported join types

Aggregation join supports following join operations.

  • Inner join (join)

    This is the default behaviour of a join operation. join is used as the keyword to join the stream with the aggregation. The output is generated only if there is a matching event in the stream and the aggregation.

  • Left outer join

    The left outer join operation allows you to join a stream on left side with a aggregation on the right side based on a condition. Here, it returns all the events of left stream even if there are no matching events in the right aggregation by having null values for the attributes of the right aggregation.

  • Right outer join

    This is similar to a left outer join. right outer join is used as the keyword to join a stream on right side with a aggregation on the left side based on a condition. It returns all the events of the right stream even if there are no matching events in the left aggregation.