Skip to main content

grpc (Source)

This extension starts a grpc server during initialization time. The server listens to requests from grpc stubs. This source has a default mode of operation and custom user defined grpc service mode. By default this uses EventService. In the default mode this source will use EventService consume method. If we want to use our custom gRPC services, we have to pack auto-generated gRPC service classes and protobuf classes into a jar file and add it into the project classpath (or to the jars folder in the streamprocessor-tooling folder if we use it with streamprocessor-tooling). This method will receive requests and injects them into stream through a mapper.

Syntax

CREATE SOURCE <NAME> WITH (type="grpc", map.type="<STRING>", receiver.url="<STRING>", max.inbound.message.size="<INT>", max.inbound.metadata.size="<INT>", server.shutdown.waiting.time="<LONG>", truststore.file="<STRING>", truststore.password="<STRING>", truststore.algorithm="<STRING>", tls.store.type="<STRING>", keystore.file="<STRING>", keystore.password="<STRING>", keystore.algorithm="<STRING>", enable.ssl="<BOOL>", mutual.auth.enabled="<BOOL>", threadpool.size="<INT>", threadpool.buffer.size="<INT>")

Query Parameters

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
receiver.urlThe url which can be used by a client to access the grpc server in this extension. This url should consist the host hostPort, port, fully qualified service name, method name in the following format. grpc://0.0.0.0:9763/<serviceName>/<methodName> For example: grpc://0.0.0.0:9763/org.gdn.grpc.EventService/consumeSTRINGNoNo
max.inbound.message.sizeSets the maximum message size in bytes allowed to be received on the server.4194304INTYesNo
max.inbound.metadata.sizeSets the maximum size of metadata in bytes allowed to be received.8192INTYesNo
server.shutdown.waiting.timeThe time in seconds to wait for the server to shutdown, giving up if the timeout is reached.5LONGYesNo
truststore.filethe file path of truststore. If this is provided then server authentication is enabled-STRINGYesNo
truststore.passwordthe password of truststore. If this is provided then the integrity of the keystore is checked-STRINGYesNo
truststore.algorithmthe encryption algorithm to be used for server authentication-STRINGYesNo
tls.store.typeTLS store type-STRINGYesNo
keystore.filethe file path of keystore. If this is provided then client authentication is enabled-STRINGYesNo
keystore.passwordthe password of keystore-STRINGYesNo
keystore.algorithmthe encryption algorithm to be used for client authentication-STRINGYesNo
enable.sslto enable ssl. If set to true and keystore.file is not given then it will be set to default carbon jks by defaultFALSEBOOLYesNo
mutual.auth.enabledto enable mutual authentication. If set to true and keystore.file or truststore.file is not given then it will be set to default carbon jks by defaultFALSEBOOLYesNo
threadpool.sizeSets the maximum size of threadpool dedicated to serve requests at the gRPC server100INTYesNo
threadpool.buffer.sizeSets the maximum size of threadpool buffer server100INTYesNo

System Parameters

NameDescriptionDefault ValuePossible Parameters
keyStoreFilePath of the key store file\${carbon.home}/resources/security/gdncarbon.jksvalid path for a key store file
keyStorePasswordThis is the password used with key store filegdncarbonvalid password for the key store file
keyStoreAlgorithmThe encryption algorithm to be used for client authenticationSunX509-
trustStoreFileThis is the trust store file with the path\${carbon.home}/resources/security/client-truststore.jks-
trustStorePasswordThis is the password used with trust store filegdncarbonvalid password for the trust store file
trustStoreAlgorithmthe encryption algorithm to be used for server authenticationSunX509-

Example 1

CREATE SOURCE BarStream WITH (type='grpc', receiver.url='grpc://localhost:8888/org.gdn.grpc.EventService/consume', map.type='json') (message String);

Here the port is given as 8888. So a grpc server will be started on port 8888 and the server will expose EventService. This is the default service packed with the source. In EventService the consume method is

Example 2

CREATE SOURCE BarStream WITH (type='grpc', receiver.url='grpc://localhost:8888/org.gdn.grpc.EventService/consume', map.type='json', map.attributes="name='trp:name', age='trp:age', message='message'") (message String, name String, age int);

Here we are getting headers sent with the request as transport properties and injecting them into the stream. With each request a header will be sent in MetaData in the following format: Name:John, Age:23

Example 3

CREATE SOURCE BarStream WITH (type='grpc', receiver.url='grpc://localhost:8888/org.gdn.grpc.MyService/send', map.type='protobuf') (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

Here the port is given as 8888. So a grpc server will be started on port 8888 and sever will keep listening to the send RPC method in the MyService service.

Example 4

CREATE SOURCE BarStream WITH (type='grpc', receiver.url='grpc://localhost:8888/org.gdn.grpc.MyService/send', map.type='protobuf', attributes="a = 'stringValue', b = 'intValue', c = 'longValue',d = 'booleanValue', e ='floatValue', f ='doubleValue'") (a string ,c long,b int, d bool,e float,f double);

Here the port is given as 8888. So a grpc server will be started on port 8888 and sever will keep listening to the send method in the MyService service. Since we provide mapping in the stream we can use any names for stream attributes, but we have to map those names with correct protobuf message attributes' names. If we want to send metadata, we should map the attributes.

Example 5

CREATE SOURCE BarStream WITH (type='grpc', receiver.url='grpc://localhost:8888/org.gdn.grpc.StreamService/clientStream', map.type='protobuf') (stringValue string, intValue int,longValue long,booleanValue bool,floatValue float,doubleValue double);

Here we receive a stream of requests to the grpc source. Whenever we want to use streaming with grpc source, we have to define the RPC method as client streaming method, when we define a stream method stream processor will identify it as a stream RPC method and ready to accept stream of request from the client.