Skip to main content

Using REST APIs

Modern applications need to be highly responsive, always online, and able to access data instantly across the globe. At the same time, they need to be deployed on datacenters close to their users. Macrometa global data network (GDN) is a real-time materialized view engine that provides instant data to applications and APIs in a simple interface.

Prerequisites:

A Macrometa GDN tenant account and credentials.

API Browser

Your main tool for using REST APIs is the API reference in the GDN web browser interface. Use the built-in API reference to run various calls and view their input and output.

GDN API Browser

Stream Processing

Macrometa Stream Processing allows you to integrate streaming data and take appropriate actions.

import requests
import json
import base64
import six
import time
from websocket import create_connection

# Constants
URL = "api-play.paas.macrometa.io"
HTTP_URL = f"https://{URL}"
EMAIL = "nemo@nautilus.com"
PASSWORD = "xxxxxx"
FABRIC = "_system"
AUTH_TOKEN = "bearer "
TENANT_NAME = "nemo_nautilus.com"

STREAM_NAME = "tutorialAppInputStream"
STREAM_APP_NAME = "stream_app_tutorial"
STREAM_APP ="""
@App:name('stream_app_tutorial')
@App:qlVersion("2")
CREATE FUNCTION concatFn[javascript] return string {
var str1 = data[0];
var str2 = data[1];
var str3 = data[2];
var response = str1 + str2 + str3;
return response;
};
-- Stream
CREATE STREAM tutorialAppInputStream (deviceID string, roomNo int, temperature double);
-- Table
CREATE TABLE GLOBAL tutorialAppOutputTable (id string, temperature double);
@info(name='Query')
INSERT INTO tutorialAppOutputTable
SELECT concatFn(roomNo,'-',deviceID) as id, temperature
FROM tutorialAppInputStream;
"""

INPUT_DATA = [
{
"deviceID": "AD11",
"roomNo": 200,
"temperature": 18,
},
{ "deviceID": "AD11",
"roomNo": 201,
"temperature": 47 },
]

SELECT_QUERY = "FOR doc IN tutorialAppOutputTable return doc"

# Create a HTTPS Session
url = f"{HTTP_URL}/_open/auth"
payload = {
'email':EMAIL,
'password':PASSWORD
}
headers = {
'content-type': 'application/json'
}

response = requests.post(url, data = json.dumps(payload), headers = headers)
if response.status_code == 200:
resp_body = json.loads(response.text)
AUTH_TOKEN += resp_body["jwt"]
TENANT = resp_body["tenant"]
else:
raise Exception(f"Error while getting auth token. Code:{response.status_code}, Reason:{response.reason}")

session = requests.session()
session.headers.update({"content-type": 'application/json'})
session.headers.update({"authorization": AUTH_TOKEN})

# Create a Stream Application
url = f"{HTTP_URL}/_fabric/_system/_api/streamapps"
payload = {
"definition": STREAM_APP,
"regions": []
}

resp = session.post(url, data=json.dumps(payload))
result = json.loads(resp.text)
print("FED URL:", HTTP_URL)
print("\nStream App Created: ", result)

# Activate Stream Application
url = f"{HTTP_URL}/_fabric/_system/_api/streamapps/{STREAM_APP_NAME}/active?active=true"
resp = session.patch(url)
result = json.loads(resp.text)
print("\nStream App Activated: ", result)

# Wait for all inputs and outputs to initialize
time.sleep(5)
# Publish Messages to the input stream
stream_type = "c8local"
producerurl = f"wss://{URL}/_ws/ws/v2/producer/persistent/{TENANT_NAME}/{stream_type}.{FABRIC}/{stream_type}s.{STREAM_NAME}"
ws = create_connection(producerurl,header={"content-type": 'application/json', 'authorization': AUTH_TOKEN})
payload = {
"payload": base64.b64encode(
six.b(json.dumps(INPUT_DATA[0]))
).decode("utf-8")
}

ws.send(json.dumps(payload))

response = json.loads(ws.recv())
if response['result'] == 'ok':
print('Message published successfully')
else:
print('Failed to publish message:', response)

payload = {
"payload": base64.b64encode(
six.b(json.dumps(INPUT_DATA[1]))
).decode("utf-8")
}
ws.send(json.dumps(payload))

response = json.loads(ws.recv())
if response['result'] == 'ok':
print('Message published successfully')
else:
print('Failed to publish message:', response)
ws.close()

# Verify results from the collection
url = f"{HTTP_URL}/_fabric/_system/_api/cursor"
payload= {
"id": "tutorialStreamAppQuery",
"query": SELECT_QUERY,
"bindVars": {},
}
resp = session.post(url, data=json.dumps(payload))
result = json.loads(resp.text)
print("\nStream App Results: ", result)

# Delete Stream Application
url = f"{HTTP_URL}/_fabric/_system/_api/streamapps/{STREAM_APP_NAME}"
resp = session.delete(url)
result = json.loads(resp.text)
print("\nStream App Deleted: ", result)