CloudSignal Docs
Getting StartedUse Cases

IoT and edge computing

Connect sensors, gateways, and industrial systems to CloudSignal for IoT messaging and edge computing scenarios.

MQTT was designed for IoT. CloudSignal gives you the managed infrastructure to connect sensors, gateways, and industrial systems without running your own broker cluster.

Why CloudSignal for IoT?

IoT deployments face challenges that pub/sub messaging is built to handle:

ChallengeCloudSignal solution
Intermittent connectivityPersistent sessions queue messages
Limited device resourcesLightweight MQTT protocol
Thousands of devicesScalable managed infrastructure
Security at scalePer-device credentials + ACLs
Global distributionLow-latency global endpoints

Architecture patterns

Direct device connection

Simple sensors connect directly to CloudSignal.

Direct device connection
CloudSignal
Backend processing
Sensor 1
Sensor 2
Sensor N

Edge gateway pattern

Gateways aggregate and forward data from devices that speak non-MQTT protocols.

Sensors (Modbus, BLE, and so on)
Local protocols
Edge gateway (aggregate, filter, compress)
MQTT
CloudSignal
Cloud backends

Bidirectional communication

Send commands back to devices.

Cloud-to-device commands
Backend
CloudSignal
Device A
Device B
Device C

Each device publishes telemetry and subscribes to its own command topic.

Implementation examples

Sensor publishing telemetry

import paho.mqtt.client as mqtt
import ssl
import json
import time

# Device credentials (unique per device)
DEVICE_ID = 'sensor-001'
ORG_ID = 'org_k7xm4pqr2n5t'
PASSWORD = 'device-specific-password'

client = mqtt.Client(client_id=DEVICE_ID, clean_session=False)
client.username_pw_set(f'{DEVICE_ID}@{ORG_ID}', PASSWORD)
client.tls_set(cert_reqs=ssl.CERT_REQUIRED)

# Last will: mark device offline if connection drops
client.will_set(
    f'devices/{DEVICE_ID}/status',
    'offline',
    qos=1,
    retain=True,
)

def on_connect(client, userdata, flags, reason_code, properties):
    # Announce online
    client.publish(f'devices/{DEVICE_ID}/status', 'online', qos=1, retain=True)

    # Subscribe to commands
    client.subscribe(f'commands/{DEVICE_ID}/#', qos=1)

def on_message(client, userdata, msg):
    command = json.loads(msg.payload.decode())
    print(f'Received command: {command}')

    if command['type'] == 'reboot':
        # Handle reboot command
        pass
    elif command['type'] == 'update_config':
        # Handle config update
        pass

client.on_connect = on_connect
client.on_message = on_message
client.connect('mqtt.cloudsignal.app', 8883)
client.loop_start()

# Main telemetry loop
while True:
    telemetry = {
        'temperature': read_temperature(),
        'humidity': read_humidity(),
        'battery': read_battery(),
        'timestamp': time.time(),
    }

    client.publish(
        f'telemetry/{DEVICE_ID}',
        json.dumps(telemetry),
        qos=0,  # High frequency, loss acceptable
    )

    time.sleep(10)

Edge gateway aggregation

import paho.mqtt.client as mqtt
import json
from collections import defaultdict
import threading
import time

class EdgeGateway:
    def __init__(self, gateway_id, org_id, password):
        self.gateway_id = gateway_id
        self.buffer = defaultdict(list)
        self.lock = threading.Lock()

        self.client = mqtt.Client(client_id=gateway_id, clean_session=False)
        self.client.username_pw_set(f'{gateway_id}@{org_id}', password)
        self.client.tls_set()
        self.client.on_connect = self._on_connect
        self.client.on_message = self._on_message

    def _on_connect(self, client, userdata, flags, reason_code, properties):
        # Subscribe to commands for this gateway and its devices
        self.client.subscribe(f'commands/{self.gateway_id}/#')

    def _on_message(self, client, userdata, msg):
        # Forward commands to local devices
        self._forward_to_local_device(msg)

    def ingest_local_data(self, device_id, data):
        """Called by local protocol handlers (Modbus, BLE, and similar)."""
        with self.lock:
            self.buffer[device_id].append({
                **data,
                'timestamp': time.time(),
            })

    def _flush_buffer(self):
        """Send aggregated data to cloud on a schedule."""
        while True:
            time.sleep(60)  # Aggregate for 1 minute

            with self.lock:
                if not self.buffer:
                    continue

                aggregated = {}
                for device_id, readings in self.buffer.items():
                    aggregated[device_id] = {
                        'count': len(readings),
                        'avg_temp': sum(r['temperature'] for r in readings) / len(readings),
                        'max_temp': max(r['temperature'] for r in readings),
                        'min_temp': min(r['temperature'] for r in readings),
                    }

                self.buffer.clear()

            self.client.publish(
                f'gateways/{self.gateway_id}/aggregated',
                json.dumps(aggregated),
                qos=1,
            )

    def start(self):
        self.client.connect('mqtt.cloudsignal.app', 8883)
        threading.Thread(target=self._flush_buffer, daemon=True).start()
        self.client.loop_forever()

Backend command dispatch

import json
import time
import uuid

# Send commands to devices
def send_device_command(device_id, command_type, params):
    command = {
        'type': command_type,
        'params': params,
        'timestamp': time.time(),
        'id': str(uuid.uuid4()),
    }

    client.publish(
        f'commands/{device_id}',
        json.dumps(command),
        qos=1,  # Ensure delivery
    )

    return command['id']

# Example: update device configuration
send_device_command('sensor-001', 'update_config', {
    'reporting_interval': 30,
    'alert_threshold': 100,
})

# Example: request firmware update
send_device_command('sensor-001', 'firmware_update', {
    'version': '2.1.0',
    'url': 'https://firmware.example.com/v2.1.0.bin',
})

Topic design for IoT

Hierarchical topics

# Telemetry (device -> cloud)
telemetry/{device_id}
telemetry/{device_id}/temperature
telemetry/{device_id}/humidity

# Status (device -> cloud)
devices/{device_id}/status
devices/{device_id}/battery
devices/{device_id}/connectivity

# Commands (cloud -> device)
commands/{device_id}
commands/{device_id}/config
commands/{device_id}/firmware

# Alerts (device -> cloud)
alerts/{device_id}/{severity}

Wildcard subscriptions

# Backend: receive all telemetry
client.subscribe('telemetry/#')

# Backend: receive all critical alerts
client.subscribe('alerts/+/critical')

# Device: receive its commands
client.subscribe(f'commands/{device_id}/#')

Best practices

Unique credentials per device

# Good: each device has unique credentials
DEVICE_ID = 'sensor-001'
PASSWORD = get_device_password(DEVICE_ID)  # From secure storage

# Bad: shared credentials across devices
PASSWORD = 'shared-password'  # Security risk

Appropriate QoS levels

# High-frequency telemetry: QoS 0
client.publish('telemetry/sensor-001', data, qos=0)

# Important status updates: QoS 1
client.publish('devices/sensor-001/status', 'online', qos=1)

# Critical commands: QoS 1 or 2
client.publish('commands/sensor-001/reboot', cmd, qos=1)

Persistent sessions for commands

# Device won't miss commands while offline
client = mqtt.Client(
    client_id='sensor-001',
    clean_session=False,  # Persistent session
)

With clean_session=False, commands sent while a device is offline are delivered when it reconnects.

Use last will for presence

client.will_set(
    f'devices/{DEVICE_ID}/status',
    'offline',
    qos=1,
    retain=True,
)

On this page