Summarizing Data
Introduction
Summarizing data refers to obtaining aggregates in an incremental manner for a specified set of time periods.
Summarization by clock-time
Performing clock-time based based summarization involves calculating, storing, and then retrieving aggregations for a selected range of time granularities. This process is carried out in two parts:
- Calculating the aggregations for the selected time granularities and storing the results.
- Retrieving previously calculated aggregations for selected time granularities.
To understand data summarization further, consider the scenario where a business that sells multiple brands stores its sales data in a physical database for the purpose of retrieving them later to perform sales analysis. Each sales transaction is received with the following details:
`symbol`: The symbol that represents the brand of the items sold.
`price`: the price at which each item was sold.
`amount`: The number of items sold.
The Sales Analyst needs to retrieve the total number of items sold of each brand per month, per week, per day etc., and then retrieve these totals for specific time durations to prepare sales analysis reports.
It is not always required to maintain a physical database for incremental analysis, but it enables you to try out your aggregations with ease.
The following sections explain how to calculate and store time-based aggregations for this scenarios, and then retrieve them.
Calculate and store clock-time-based aggregate values
To calculate and store time-based aggregation values for the scenario explained above, follow the procedure below.
Start creating a new stream worker. You can name it
TradeApp
For instructions, see Creating a Stream Worker.@App:name("TradeApp");
@App:qlVersion("2")To capture the input events based on which the aggregations are calculated, define an input stream as follows.
CREATE STREAM TradeStream (symbol string, price double, quantity long, timestamp long);
infoIn addition to the `symbol`, `price`, and `quantity` attributes to capture the input details already mentioned, the above stream definition includes an attribute named timestamp to capture the time at which the sales transaction occurs. The aggregations are executed based on this time. This attribute's value could either be a long value (reflecting the Unix timestamp in milliseconds), or a string value adhering to one of the following formats.
* **`<YYYY>-<MM>-<dd> <HH>:<mm>:<ss> <Z>`**: This format can be used if the timezone needs to be specified explicitly. Here the ISO 8601 UTC offset must be provided for <Z> . e.g., +05:30 reflects the India Time Zone. If time is not in GMT, this value must be provided.)
* **`<yyyy>-<MM>-<dd> <HH>:<mm>:<ss>`**: This format can be used if the timezone is in GMT.Create an aggregation as follows. You can name it
TradeAggregation
.infoThe system uses the aggregation name you define here as part of the database table name. Table name is `<StreamWorker_Name>-<Aggregation_Name>_<Granularity>`. System will automatically create a collection `TradeApp-TradeAggregation_HOUR` as we will be calculating the aggregation hourly in the next step.
CREATE AGGREGATION TradeAggregation WITH(store.type='database', store.replication.type='global')
To calculate aggregations, include a query as follows:
To select attributes to be included in the output event, add a
select
clause as follows.select symbol, avg(price) as avgPrice, sum(quantity) as total
Here, the
avg()
fuction is applied to theprice
attribute to derive the average price. Thesum()
function is applied to thequantity
attribute to derive the total quantity.To get input events from the
TradeStream
stream that you previously defined, add afrom
clause as follows.from TradeStream
To group the output by the symbol, add a group by clause as follows.
group by symbol
The timestamp included in each input event allows you to calculate aggregates for the range of time granularities seconds-years. Therefore, to calculate aggregates for each time granularity within this range, add the
aggregate by
clause to this aggregate query as follows.aggregate by timestamp every hour;
The completed stream worker is as follows.
@App:name("TradeApp")
@App:qlVersion("2")
CREATE STREAM TradeStream (symbol string, price double, quantity long, timestamp long);
CREATE AGGREGATION TradeAggregation WITH(store.type='database', store.replication.type='global')
@info(name = 'CalculatingAggregates')
select symbol, avg(price) as avgPrice, sum(quantity) as total
from TradeStream
group by symbol
aggregate by timestamp every hour;
Retrieve the stored aggregate values
This section involves retrieving the aggregate values that you calculated and persisted in the Calculate and store clock-time-based aggregate values subsection.
To do this, let's add the definitions and queries required for retrieval to the TradeApp
stream worker that you have already created in the previous section.
Open the
TradeApp
stream worker in the editor by clicking the Edit icon on the stream workers tab.To retrieve aggregations, you need to make retrieval requests. To process these requests, let's define a stream as follows:
CREATE SINK STREAM TradeSummaryStream (symbol string, total long, avgPrice double);
To retrieve and process the events, add a new query as follows:
insert into TradeSummaryStream
select a.symbol, a.total, a.avgPrice
from TradeStream as b join TradeAggregation as a
on a.symbol == b.symbol
within "2014-02-15 00:00:00 +05:30", "2014-03-16 00:00:00 +05:30"
per "days" ;The completed stream worker is as follows:
@App:name("TradeApp")
@App:qlVersion("2")
CREATE STREAM TradeStream (symbol string, price double, quantity long, timestamp long);
CREATE SINK STREAM TradeSummaryStream (symbol string, total long, avgPrice double);
CREATE AGGREGATION TradeAggregation
select symbol, avg(price) as avgPrice, sum(quantity) as total
from TradeStream
group by symbol
aggregate by timestamp every hour;
@info(name = 'RetrievingAggregates')
insert into TradeSummaryStream
select a.symbol, a.total, a.avgPrice
from TradeStream as b join TradeAggregation as a
on a.symbol == b.symbol
within "2014-02-15 00:00:00 +05:30", "2014-03-16 00:00:00 +05:30"
per "days" ;
Summarization by Windowing Criteria
This section explains how to apply stream processing logic to process a subset of events received to a stream based on time or the number of events. This is achieved via stream windows
.
The window can apply to a batch of events or in a sliding manner. This is further explained in the following sections.
Performing a time-based summarization in a sliding manner
This subsection demonstrates how to summarize data for a short term based on time and well as how to do a summarization in a sliding manner.
To demonstrate this, consider a factory manager who wants to be able to check the production for the last hour at any given time. Every event represents a production run. For this purpose, a Stream worker can be created as follows:
Start creating a new stream worker. You can name it
PastHourProductionApp
For instructions, see Creating a Stream Worker.@App:name('PastHourProductionApp');
@App:qlVersion("2")To capture details about each production run, define an input stream:
CREATE STREAM ProductionStream (name string, amount long);
To publish the production for the last hour, define the output stream:
CREATE SINK PastHourProductionStream WITH (type='logger', prefix='Production totals over the past hour:') (name string, pastHourTotal long);
noteA sink annotation is connected to the output stream to log the output events. You can view the logged events by clicking on the **Log Viewer** on the stream worker editor tab. For more information about adding sinks to publish events, see the [Publishing Data](/cep/tutorials/publishing-data).
To define how the output is derived, add the
select
statement:select name, sum(amount) as pastHourTotal
Here, the total is derived by applying the
sum()
function to theamount
attribute of theProductionStream
input stream.To specify that the processing done as defined via the
select
statement applies to a time window, add thefrom
clause and include the time window as shown below. This must be added above theselect
clause.from ProductionStream window time(1 hour)
note`window time` indicates that the window added is a time window. The time considered is one hour. The window is a sliding window which considers the last hour at any given time.
(For example, when the stream processor calculates the total production during the time 13.00-14.00, next it calculates the total production during the time 13.01-14.01 after the 13.01 minute as elapsed.)
For details about other window types supported, see [Functions - Unique](/cep/query-guide/functions/unique/deduplicate).To group by the product name, add the
group by
clause as follows.group by name
To insert the results into the
PastHourProductionStream
output stream, add theinsert into
clause as follows.insert into PastHourProductionStream
The completed stream worker is as follows:
@App:name('PastHourProductionApp')
@App:qlVersion("2")
CREATE STREAM ProductionStream (name string, amount long);
CREATE SINK PastHourProductionStream WITH (type='logger', prefix='Production totals over the past hour:') (name string, pastHourTotal long);
insert into PastHourProductionStream
select name, sum(amount) as pastHourTotal
from ProductionStream window time(1 hour)
group by name;
Performing a length-based summarization to a batch of events
This subsection demonstrates how to summarize data for a specific number of events as well as how to do that summarization for batches of events.
To demonstrate this, assume that a factory manager wants to track the maximum production in every 10 production runs. IOn order to do so, let's create a Stream worker as follows:
Start creating a new stream worker. You can name it
ProductionApp
For instructions, see Creating a Stream Worker.@App:name('MaximumProductionApp')
@App:qlVersion("2")Define an input stream as follows to capture details about the production.
CREATE STREAM ProductionStream (name string, amount long);
To output the maximum production detected every 10 production runs, define an output stream as follows.
CREATE SINK DetectedMaximumProductionStream WITH (type='logger', prefix='Maximum production in last 10 runs') (name string, maximumValue long);
noteA sink annotation is connected to the output stream to log the output events. You can view the logged events by simply clicking on the `Log Viewer` button on the stream worker editor tab. For more information about adding sinks to publish events, see the [Publishing Data](/cep/tutorials/publishing-data).
To define the subset of events to be considered based on the number of events, add the
from
clause with alengthBatch
window as follows.from ProductionStream window lengthBatch(10)
window lengthBatch
indicates that the window added is a length window that considers events in batches when determining subsets. The number of events in each batch is10
. For details about other window types supported, see Functions - Unique.To derive the values for the
DetectedMaximumProductionStream
output stream, add theselect
statement as follows.select name, max(amount) as maximumValue
Here, the
max()
function is applied to theamount
attribute to derive the maximum value.To group by the product name, add the
group by
clause as follows.group by name
To insert the maximum production detected into the
DetectedMaximumProductionStream
output stream, add theinsert into
clause as follows.insert into DetectedMaximumProductionStream
The completed stream worker is as follows.
@App:name('MaximumProductionApp')
@App:qlVersion("2")
CREATE STREAM ProductionStream (name string, amount long);
CREATE SINK DetectedMaximumProductionStream WITH (type='logger', prefix='Maximum production in last 10 runs') (name string, maximumValue long);
insert into DetectedMaximumProductionStream
select name, max(amount) as maximumValue
from ProductionStream window lengthBatch(10)
group by name
;