Pub-Sub with Streams Example
This page describes how to create geo-replicated streams and set up queues and pub-sub messaging with local latencies across the globe.
Prerequisites
- A Macrometa account with sufficient permissions to create streams.
 - Appropriate SDK installed. For more information, refer to Install SDKs.
 
Pub-Sub with Streams Code
- Copy and paste the code block below in your favorite IDE.
 - Update constants with your values, such as the API key.
 - Run the code.
 - (Optional) Log in to the Macrometa console to view the streams.
 
- Javascript
 - Python
 
const jsc8 = require("jsc8");
const readline = require("readline");
const globalUrl = "https://play.paas.macrometa.io";
// Create an authenticated instance with an API key (recommended) or JSON web token (JWT).
const client = new jsc8({
  url: globalUrl,
  apiKey:
    "XXXX",
  fabricName: "_system"
});
// const client = new jsc8({ url: gdnUrl, token: "XXXX", fabricName: "_system" });
// Or use email and password to authenticate a client instance
// const client = new jsc8(globalUrl);
// await client.login("your@email.com", "password");
// Variables
const stream = "streamQuickstart";
let prefix_text = "";
const is_local = false; //For a global stream pass True and False for local stream
// Get the right prefix for the stream
if (is_local) {
  prefix_text = "c8locals.";
} else {
  prefix_text = "c8globals.";
}
async function getDCList () {
  const dcListAll = await client.listUserFabrics();
  const dcListObject = await dcListAll.find(function (o) {
    return o.name === "_system";
  });
  const dcList = dcListObject.options.dcList.split(",");
  console.log("dcList: ", dcList);
}
async function createMyStream () {
  let streamName = { "stream-id": "" };
  if (await client.hasStream(stream, is_local)) {
    console.log("Stream already exists");
    streamName["stream-id"] = prefix_text + stream;
    console.log(`Old Producer = ${streamName["stream-id"]}`);
  } else {
    streamName = await client.createStream(stream, is_local);
    console.log(`New Producer = ${streamName.result["stream-id"]}`);
  }
}
async function sendData () {
  console.log("\n ------- Publish Messages  ------");
  const producer = await client.createStreamProducer(stream);
  producer.on("open", () => {
    for (let i = 0; i < 10; i++) {
      const msg1 = `Persistent hello from (${JSON.stringify(i)})`;
      const data = {
        payload: Buffer.from(msg1).toString("base64")
      };
      console.log(`Stream: ${msg1}`);
      producer.send(JSON.stringify(data));
    }
  });
  producer.onclose = function (e) {
    console.log("Closed WebSocket:Producer connection for " + streamName);
  };
}
async function receiveData () {
  console.log("\n ------- Receive Messages  ------");
  const consumer = await client.createStreamReader(
    stream,
    "test-subscription-1"
  );
  consumer.on("message", (msg) => {
    const { payload, messageId } = JSON.parse(msg);
    console.log(Buffer.from(payload, "base64").toString("ascii"));
    // Send message acknowledgement
    consumer.send(JSON.stringify({ messageId }));
  });
  consumer.onclose = function () {
    console.log("Closed WebSocket:Consumer connection for " + stream);
  };
}
async function selectAction () {
  const input = readline.createInterface({
    input: process.stdin,
    output: process.stdout
  });
  input.question(
    "Type 'w' or '1' to write data. Type 'r' or '0' to read data: ",
    (userInput) => {
      if (userInput === "w" || userInput === "1") {
        sendData();
      } else if (userInput === "r" || userInput === "0") {
        receiveData();
      } else {
        console.log("Invalid user input. Stopping program.");
        return false;
      }
      input.close();
    }
  );
}
(async function () {
  await getDCList();
  await createMyStream();
  await selectAction();
})();
""" This file is a demo to send data to/from a stream """
from operator import concat
import base64
import json
import warnings
from c8 import C8Client
import six
warnings.filterwarnings("ignore")
URL = "play.paas.macrometa.io"
GEO_FABRIC = "_system"
API_KEY = "my API key" # Change this to your API key
prefix_text = ""
is_local = False # For a global stream pass True and False for local stream
demo_stream = 'streamQuickstart'
client = C8Client(protocol='https', host=URL, port=443, apikey = API_KEY, geofabric = GEO_FABRIC)
# Get the right prefix for the stream
if is_local:
    prefix_text = "c8locals."
else:
    prefix_text = "c8globals."
def createStream():
    """ This function creates a stream """
    streamName = {"stream-id": ""}
    if client.has_stream(demo_stream, local = is_local):
        print("Stream already exists")
        streamName["stream-id"] = concat(prefix_text, demo_stream)
        print ("Old producer =",  streamName["stream-id"])
    else:
        #print(client.create_stream(demo_stream, local=is_local))
        streamName = client.create_stream(demo_stream, local=is_local)
        print ("New producer =",  streamName["stream-id"])
# Create the producer and publish messages.
def sendData():
    """ This function sends data through a stream """
    producer = client.create_stream_producer(demo_stream, local=is_local)
    while True:
        user_input = input("Enter your message to publish: ")
        if user_input == '0':
            break
        producer.send(user_input)
# Create the subscriber and receive data
def receiveData():
    """ This function receives data from a stream """
    subscriber = client.subscribe(stream=demo_stream, local=is_local,
        subscription_name="test-subscription-1")
    for i in range(10):
        print("In ",i)
        m1 = json.loads(subscriber.recv())  # Listen on stream for any receiving messages
        msg1 = base64.b64decode(m1["payload"])
        print(F"Received message '{msg1}' id='{m1['messageId']}'") # Print the received message
        subscriber.send(json.dumps({'messageId': m1['messageId']})) # Acknowledge the received message
createStream()
# Select choice
user_input = input("Type 'w' or '1' to write data. Type 'r' or '0' to read data: ")
if user_input == "w" or user_input == '1':
    sendData()
elif user_input == "r" or user_input == '0':
    receiveData()
else:
    print ("Invalid user input. Stopping program")