http-call-response (Source)
The http-call-response source receives the responses for the calls made
by its corresponding http-call sink, and maps them from formats such as
text
, XML
and JSON
. To handle messages with different HTTP status
codes having different formats, multiple http-call-response sources are
allowed to associate with a single http-call sink. It allows accessing
the attributes of the event that initiated the call, and the response
headers and properties via transport properties in the format
trp:<attribute name>
and trp:<header/property>
respectively.
Syntax
CREATE SOURCE <NAME> WITH (type="http-call-response", map.type="<STRING>", sink.id="<STRING>", http.status.code="<STRING>", allow.streaming.responses="<BOOL>")
Query Parameters
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
sink.id | Identifier to correlate the http-call-response source with its corresponding http-call sink that published the messages. | STRING | No | No | |
http.status.code | The matching http responses status code regex, that is used to filter the the messages which will be processed by the source.Eg: http.status.code = '200' , http.status.code = '4\\d+' | 200 | STRING | Yes | No |
allow.streaming.responses | Enable consuming responses on a streaming manner. | false | BOOL | Yes | No |
Example 1
CREATE SINK EmployeeRequestStream WITH (type='http-call', method='POST', publisher.url='http://localhost:8005/registry/employee', sink.id='employee-info', map.type='json') (name string, id int);
CREATE SOURCE EmployeeResponseStream WITH (type='http-call-response', sink.id='employee-info', http.status.code='2\\d+', map.type='json', map.attributes="name='trp:name', id='trp:id', location='$.town', age='$.age'") (name string, id int, location string, age int);
CREATE SOURCE EmployeeErrorStream WITH (type='http-call-response', sink.id='employee-info', http.status.code='4\\d+', map.type='text', map.regex.A='((.|\n)*)', map.attributes="error='A[1]'") (error string);
When events arrive in EmployeeRequestStream
, http-call sink makes
calls to endpoint on url http://localhost:8005/registry/employee
with
POST
method and Content-Type application/json
.
If the arriving event has attributes name
:John
and id
:1423
it will send a message with
default JSON mapping as follows:
{
"event": {
"name": "John",
"id": 1423
}
}
When the endpoint responds with status code in the range of 200 the
message will be received by the http-call-response source associated
with the EmployeeResponseStream
stream, because it is correlated with
the sink by the same sink.id
employee-info
and as that expects
messages with http.status.code
in regex format 2\\d+
. If the
response message is in the format:
{
"town": "NY",
"age": 24
}
the source maps the location
and age
attributes by executing JSON
path on the message and maps the name
and id
attributes by
extracting them from the request event via as transport properties.
If the response status code is in the range of 400 then the message will be
received by the http-call-response source associated with the
EmployeeErrorStream
stream, because it is correlated with the sink by
the same sink.id
employee-info
and it expects messages with
http.status.code
in regex format 4\\d+
, and maps the error response
to the error
attribute of the event.