Skip to main content

http (Source)

HTTP source receives POST requests via HTTP and HTTPS protocols in format such as text and JSON. It also supports basic authentication to ensure events are received from authorized users/systems. The request headers and properties can be accessed via transport properties in the format trp:<header>.

Syntax

CREATE SOURCE <NAME> WITH (type="http", map.type="<STRING>", receiver.url="<STRING>", basic.auth.enabled="<STRING>", worker.count="<INT>", socket.idle.timeout="<INT>", ssl.verify.client="<STRING>", ssl.protocol="<STRING>", tls.store.type="<STRING>", ssl.configurations="<STRING>", request.size.validation.configurations="<STRING>", header.validation.configurations="<STRING>", server.bootstrap.configurations="<STRING>", trace.log.enabled="<BOOL>")

Query Parameters

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
receiver.urlThe URL on which events should be received. To enable SSL, use https protocol in the URL.`http://0.0.0.0:9763//\`STRINGYesNo
basic.auth.enabledThis only works in VM, Docker and Kubernetes. Where when enabled it authenticates each request using the Authorization:'Basic encodeBase64(username:Password)' header.falseSTRINGYesNo
worker.countThe number of active worker threads to serve the incoming events. By default the value is set to 1 to ensure events are processed in the same order they arrived. By increasing this value, higher performance can be achieved in the expense of losing event ordering.1INTYesNo
socket.idle.timeoutIdle timeout for HTTP connection in millis.120000INTYesNo
ssl.verify.clientThe type of client certificate verification. Supported values are require, optional.-STRINGYesNo
ssl.protocolSSL/TLS protocol.TLSSTRINGYesNo
tls.store.typeTLS store type.JKSSTRINGYesNo
ssl.configurationsSSL/TSL configurations in format "'<key>:<value>','<key>:<value>'". Some supported parameters: - SSL/TLS protocols: 'sslEnabledProtocols:TLSv1.1,TLSv1.2' - List of ciphers: 'ciphers:TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256' - Enable session creation: 'client.enable.session.creation:true' - Supported server names: 'server.suported.server.names:server' - Add HTTP SNIMatcher: 'server.supported.snimatchers:SNIMatcher'-STRINGYesNo
request.size.validation.configurationsConfigurations to validate the HTTP request size. Expected format "'<key>:<value>','<key>:<value>'". Some supported configurations : - Enable request size validation: 'request.size.validation:true' If request size is validated - Maximum request size: 'request.size.validation.maximum.value:2048' - Response status code when request size validation fails: 'request.size.validation.reject.status.code:401' - Response message when request size validation fails: 'request.size.validation.reject.message:Message is bigger than the valid size' - Response Content-Type when request size validation fails: 'request.size.validation.reject.message.content.type:plain/text'-STRINGYesNo
header.validation.configurationsConfigurations to validate HTTP headers. Expected format "'<key>:<value>','<key>:<value>'". Some supported configurations : - Enable header size validation: 'header.size.validation:true' If header size is validated - Maximum length of initial line: 'header.validation.maximum.request.line:4096' - Maximum length of all headers: 'header.validation.maximum.size:8192' - Maximum length of the content or each chunk: 'header.validation.maximum.chunk.size:8192' - Response status code when header validation fails: 'header.validation.reject.status.code:401' - Response message when header validation fails: 'header.validation.reject.message:Message header is bigger than the valid size' - Response Content-Type when header validation fails: 'header.validation.reject.message.content.type:plain/text'-STRINGYesNo
server.bootstrap.configurationsServer bootstrap configurations in format "'<key>:<value>','<key>:<value>'". Some supported configurations : - Server connect timeout in millis: 'server.bootstrap.connect.timeout:15000' - Server socket timeout in seconds: 'server.bootstrap.socket.timeout:15' - Enable TCP no delay: 'server.bootstrap.nodelay:true' - Enable server keep alive: 'server.bootstrap.keepalive:true' - Send buffer size: 'server.bootstrap.sendbuffersize:1048576' - Receive buffer size: 'server.bootstrap.recievebuffersize:1048576' - Number of connections queued: 'server.bootstrap.socket.backlog:100'-STRINGYesNo
trace.log.enabledEnable trace log for traffic monitoring.falseBOOLYesNo

System Parameters

NameDescriptionDefault ValuePossible Parameters
serverBootstrapBossGroupSizeNumber of boss threads to accept incoming connections.Number of available processorsAny positive integer
serverBootstrapWorkerGroupSizeNumber of worker threads to accept the connections from boss threads and perform non-blocking read and write from one or more channels.(Number of available processors) * 2Any positive integer
serverBootstrapClientGroupSizeNumber of client threads to perform non-blocking read and write to one or more channels.(Number of available processors) * 2Any positive integer
defaultHostThe default host of the transport.0.0.0.0Any valid host
defaultSchemeThe default protocol.httphttp https
defaultHttpPortThe default HTTP port when default scheme is http.8280Any valid port
defaultHttpsPortThe default HTTPS port when default scheme is https.8243Any valid port
keyStoreLocationThe default keystore file path.`\${carbon.home}/resources/security/gdncarbon.jks`Path to `.jks` file
keyStorePasswordThe default keystore password.gdncarbonKeystore password as string

Example 1

@App:name("http-source")
@App:qlVersion("2")

CREATE SOURCE SampleHTTPSource WITH (type = 'http', map.type='json') (msg string);

CREATE SINK STREAM SampleHTTPInputStream (msg string);

INSERT INTO SampleHTTPInputStream
SELECT msg
FROM SampleHTTPSource;

It maps the incoming messages and sends them to SampleHTTPInputStream for processing.

Example 2

@App:name('Sample-HTTP-Source')
@App:description("This application how to receive POST requests via Stream Workers API.")
@App:qlVersion('2')

/**
Testing the Stream Application:
1. Open Stream `SampleHTTPOutputStream` in Console to monitor the output.

2. Go to Stream Workers API and try `Publish message via HTTP-Source stream.` endpoint. Run it with
application name set to `Sample-HTTP-Source`, stream name set to `SampleHTTPSource`, and body with the next data:
{"carId":"c1","longitude":18.4334, "latitude":30.2123}

3. This application read the carId, longitude and latitude from the `SampleHTTPSource` and sends it to
sink stream `SampleHTTPOutputStream`
**/

-- Defines `SampleHTTPSource` stream to process events having `carId`, `longitude`, and `latitude`.
CREATE SOURCE SampleHTTPSource WITH (type = 'http', map.type='json') (carId string, longitude double, latitude double);

-- Defines `SampleHTTPOutputStream` to emit the events after the data is processed by external service
CREATE SINK STREAM SampleHTTPOutputStream (carId string, longitude double, latitude double);

-- Note: Consume data received from the external service
@info(name = 'ConsumeProcessedData')
INSERT INTO SampleHTTPOutputStream
SELECT carId, longitude, latitude
FROM SampleHTTPSource;