Executing Scripts
The script provides the ability to write custom functions in other programming languages and execute them within stream workers. The custom functions using scripts can be defined via the function definitions and accessed in queries similar to any other inbuilt functions.
Scripts help to define custom functions in other programming languages such as JavaScript. This can eliminate the need for writing extensions to fulfill the functionalities that are not provided in stream workers or by its extension.
Syntax
The syntax for defining the script is as follows.
define function <function name>[<language name>] return <return type> {
<function logic>
};
The defined function can be used in the queries similar to inbuilt functions as follows.
<function name>( (<function parameter>(, <function parameter>)*)? )
Here, the <function parameter>
's are passed into the <function logic>
of the definition as an Object[]
with the name data
.
The functions defined via the function definitions have higher precedence compared to inbuilt functions and the functions provided via extensions.
The following parameters are used to configure the function definition:
Parameter | Description |
---|---|
`<function name>` | The name of the function created. (It is recommended to define a function name in `camelCase`.) |
`<language name>` | Name of the programming language used to define the script, such as `javascript`, `r`, or `scala` |
`<return type>` | The return type of the function. This can be `int, long, float, double, string, bool` or `object`. Here, the function implementer is responsible for returning the output according on the defined return type to ensure proper functionality. |
`<language name>` | The execution logic that is written in the language specified under the `<language name>`, where it consumes the `<function parameter>`'s through the `data` variable and returns the output in the type specified via the `<return type>` parameter. |
Transform data using Custom Functions
To write custom function calls, follow the procedure below:
Click the Stream Workers tab.
Click New to start defining a new stream worker.
Type a Name as
TemperatureProcessing
or feel free to chose any other name for the stream worker.Type a Description.
In the
TemperatureProcessing
stream worker, define a source stream as follows.CREATE STREAM TempStream (deviceID long, roomNo int, temp double);
Add sink stream to send results of script function
CREATE SINK DeviceTempStream WITH (type= 'stream', stream='DeviceTempStream', map.type='json') (id string, temp double);
In this example, you can write a function that can be used to concatenate the room number and device ID as follows.
define function concatFn[javascript] return string {
var str1 = data[0];
var str2 = data[1];
var str3 = data[2];
var responce = str1 + str2 + str3;
return responce;
};Add a query to a stream to apply the script you wrote to the relevant attributes of the input stream definition.
insert into DeviceTempStream
select concatFn(roomNo,'-',deviceID) as id, temp
from TempStream;Save the stream worker. The completed stream worker is as follows.
@App:name("TemperatureProcessing")
@App:description("Calculate an average temperature of the room")
@App:qlVersion("2")
CREATE STREAM TempStream (deviceID long, roomNo int, temp double);
CREATE SINK DeviceTempStream WITH (type= 'stream', stream='DeviceTempStream', map.type='json') (id string, temp double);
CREATE FUNCTION concatFn[javascript] return string {
var str1 = data[0];
var str2 = data[1];
var str3 = data[2];
var responce = str1 + str2 + str3;
return responce;
};
insert into DeviceTempStream
select concatFn(roomNo,'-',deviceID) as id, temp
from TempStream;
Transform complex json data using Custom Functions
Parsing complex JSON data would be good application to write custom functions. Consider that nested json data is received over an input stream. Defining a message schema while defining a stream as explained in Consuming Data - Introduction can be cumbersome or error prone.
In the below example we will see how complex data can be parsed using custom javascript function.
To write custom function calls, follow the procedure below:
Open the GUI. Click on
Stream Workers
tab.Click on New to start defining a new stream worker.
Enter a Name as
ProcessEmployeeData
or feel free to chose any other name for the stream worker.Enter a Description.
Define an input collection:
CREATE SOURCE CompanyXInputStream WITH (type = 'database', collection = "CompanyXInputStream", collection.type="doc" , replication.type="global", map.type='json') (seqNo string, name string, address string);
Define an output stream:
CREATE SINK CompanyXProfessionalInfo WITH (type = 'stream', stream = "CompanyXProfessionalInfo", replication.type="local") (name string, workAddress string);
Consider that
CompanyXInputStream
will receive employee data in below format.{
"seqNo": "1200001",
"name": "Raleigh McGilvra",
"address": {
"permanent": {
"street": "236 Pratt Avenue",
"city": "Orchards",
"state": "Washington",
"country": "USA",
"zip": "98662"
},
"work": {
"street": "1746 Rosebud Avenue",
"city": "Little Rock",
"state": "Arkansas",
"country": "USA",
"zip": "72212"
}
}
}Consider that we want to convert
address.work
in the well formatted string.Define a javascript function to process
address
field.define function getWorkAddress[javascript] return string {
work_address = JSON.parse(data[0]).work
// Concatenate all the address fields as a single string
formatted_address = work_address.street + ", " + work_address.city + ", " + work_address.state + ", " + work_address.country + ", " + work_address.zip;
return formatted_address
};Write a query to a stream to transfom data using
getWorkAddress
function.-- Data Processing
@info(name='Query')
insert into CompanyXProfessionalInfo
select name, getWorkAddress(address) as workAddress
from CompanyXInputStream;Save the stream worker. The completed stream worker is as follows.
@App:name("ProcessEmployeeData")
@App:qlVersion("2")
CREATE SOURCE CompanyXInputStream WITH (type = 'database', collection = "CompanyXInputStream", collection.type="doc" , replication.type="global", map.type='json') (seqNo string, name string, address string);
CREATE SINK CompanyXProfessionalInfo WITH (type = 'stream', stream = "CompanyXProfessionalInfo", replication.type="local", map.type='json') (name string, workAddress string);
CREATE FUNCTION getWorkAddress[javascript] return string {
var work_address = JSON.parse(data[0]).work
var formatted_address = work_address.street + ", " + work_address.city + ", " + work_address.state + ", " + work_address.country + ", " + work_address.zip;
return formatted_address
};
-- Data Processing
@info(name='Query')
insert into CompanyXProfessionalInfo
select name, getWorkAddress(address) as workAddress
from CompanyXInputStream;