Skip to main content

grpc-service (Source)

This extension implements a grpc server for receiving and responding to requests. During initialization time a grpc server is started on the user specified port exposing the required service as given in the url. This source also has a default mode and a user defined grpc service mode. By default this uses EventService. In the default mode this will use the EventService process method. If we want to use our custom gRPC services, we have to pack auto-generated gRPC service classes and protobuf classes into a jar file and add it into the project classpath (or to the jars folder in the streamprocessor-tooling folder if we use it with streamprocessor-tooling). This accepts grpc message class Event as defined in the EventService proto. This uses GrpcServiceResponse sink to send reponses back in the same Event message format.

Syntax

CREATE SOURCE <NAME> WITH (type="grpc-service", map.type="<STRING>", receiver.url="<STRING>", max.inbound.message.size="<INT>", max.inbound.metadata.size="<INT>", service.timeout="<INT>", server.shutdown.waiting.time="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", enable.ssl="<BOOL>", mutual.auth.enabled="<BOOL>", threadpool.size="<INT>", threadpool.buffer.size="<INT>")

Query Parameters

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
receiver.urlThe url which can be used by a client to access the grpc server in this extension. This url should consist the host hostPort, port, fully qualified service name, method name in the following format. grpc://0.0.0.0:9763/<serviceName>/<methodName> For example: grpc://0.0.0.0:9763/org.gdn.grpc.EventService/consumeSTRINGNoNo
max.inbound.message.sizeSets the maximum message size in bytes allowed to be received on the server.4194304INTYesNo
max.inbound.metadata.sizeSets the maximum size of metadata in bytes allowed to be received.8192INTYesNo
service.timeoutThe period of time in milliseconds to wait for stream processor to respond to a request received. After this time period of receiving a request it will be closed with an error message.10000INTYesNo
server.shutdown.waiting.timeThe time in seconds to wait for the server to shutdown, giving up if the timeout is reached.5LONGYesNo
truststore.filethe file path of truststore. If this is provided then server authentication is enabled-STRINGYesNo
truststore.passwordthe password of truststore. If this is provided then the integrity of the keystore is checked-STRINGYesNo
truststore.algorithmthe encryption algorithm to be used for server authentication-STRINGYesNo
tls.store.typeTLS store type-STRINGYesNo
keystore.filethe file path of keystore. If this is provided then client authentication is enabled-STRINGYesNo
keystore.passwordthe password of keystore-STRINGYesNo
keystore.algorithmthe encryption algorithm to be used for client authentication-STRINGYesNo
enable.sslto enable ssl. If set to true and keystore.file is not given then it will be set to default carbon jks by defaultFALSEBOOLYesNo
mutual.auth.enabledto enable mutual authentication. If set to true and truststore.file or keystore.file is not given then it will be set to default carbon jks by defaultFALSEBOOLYesNo
threadpool.sizeSets the maximum size of threadpool dedicated to serve requests at the gRPC server100INTYesNo
threadpool.buffer.sizeSets the maximum size of threadpool buffer server100INTYesNo

System Parameters

NameDescriptionDefault ValuePossible Parameters
keyStoreFileThis is the key store file with the path\${carbon.home}/resources/security/gdncarbon.jksvalid path for a key store file
keyStorePasswordThis is the password used with key store filegdncarbonvalid password for the key store file
keyStoreAlgorithmThe encryption algorithm to be used for client authenticationSunX509-
trustStoreFileThis is the trust store file with the path\${carbon.home}/resources/security/client-truststore.jks-
trustStorePasswordThis is the password used with trust store filegdncarbonvalid password for the trust store file
trustStoreAlgorithmthe encryption algorithm to be used for server authenticationSunX509-

Example 1

CREATE SOURCE FooStream WITH (type='grpc-service', receiver.url='grpc://localhost:8888/org.gdn.grpc.EventService/process', source.id='1', map.type='json', map.attributes="messageId='trp:messageId', message='message'") (messageId String, message String);

Here a grpc server will be started at port 8888. The process method of EventService will be exposed for clients. source.id is set as 1. So a grpc-service-response sink with source.id = 1 will send responses back for requests received to this source. Note that it is required to specify the transport property messageId since we need to correlate the request message with the response.

Example 2

CREATE SINK BarStream WITH (type='grpc-service-response', source.id='1', map.type='json') (messageId String, message String);

CREATE SOURCE FooStream WITH (type='grpc-service', receiver.url='grpc://134.23.43.35:8080/org.gdn.grpc.EventService/process', source.id='1', map.type='json', map.attributes="messageId='trp:messageId', message='message'") (messageId String, message String);

insert into BarStream
select *
from FooStream;

The grpc requests are received through the grpc-service sink. Each received event is sent back through grpc-service-source. This is just a passthrough through Stream App as we are selecting everything from FooStream and inserting into BarStream.

Example 3

CREATE SOURCE BarStream WITH (type='grpc-service', source.id='1', receiver.url='grpc://locanhost:8888/org.gdn.grpc.EventService/consume', map.type='json', map.attributes="name='trp:name', age='trp:age', message='message'") (message String, name String, age int);

Here we are getting headers sent with the request as transport properties and injecting them into the stream. With each request a header will be sent in MetaData in the following format: Name:John, Age:23

Example 4

CREATE SINK BarStream WITH (type='grpc-service-response', source.id='1', message.id='{{messageId}}', map.type='protobuf', map.payload="stringValue='a',intValue='b',longValue='c',booleanValue='d',floatValue = 'e', doubleValue ='f'") (a string,messageId string, b int,c long,d bool,e float,f double);

CREATE SOURCE FooStream WITH (type='grpc-service', receiver.url='grpc://134.23.43.35:8888/org.gdn.grpc.test.MyService/process', source.id='1', map.type='protobuf', map.attributes="messageId='trp:message.id', a = 'stringValue', b = 'intValue', c = 'longValue',d = 'booleanValue', e = 'floatValue', f ='doubleValue'") (a string,messageId string, b int,c long,d bool,e float,f double);

insert into BarStream
select *
from FooStream;

Here a grpc server will be started at port 8888. The process method of the MyService will be exposed to the clients. source.id is set as 1. So a grpc-service-response sink with source.id = 1 will send responses back for requests received to this source. Note that it is required to specify the transport property messageId since we need to correlate the request message with the response and also we should map stream attributes with correct protobuf message attributes even they define using the same name as protobuf message attributes.