Getting StartedUse Cases
IoT and edge computing
Connect sensors, gateways, and industrial systems to CloudSignal for IoT messaging and edge computing scenarios.
MQTT was designed for IoT. CloudSignal gives you the managed infrastructure to connect sensors, gateways, and industrial systems without running your own broker cluster.
Why CloudSignal for IoT?
IoT deployments face challenges that pub/sub messaging is built to handle:
| Challenge | CloudSignal solution |
|---|---|
| Intermittent connectivity | Persistent sessions queue messages |
| Limited device resources | Lightweight MQTT protocol |
| Thousands of devices | Scalable managed infrastructure |
| Security at scale | Per-device credentials + ACLs |
| Global distribution | Low-latency global endpoints |
Architecture patterns
Direct device connection
Simple sensors connect directly to CloudSignal.
Direct device connection
CloudSignal
Backend processing
Sensor 1
Sensor 2
Sensor N
Edge gateway pattern
Gateways aggregate and forward data from devices that speak non-MQTT protocols.
Sensors (Modbus, BLE, and so on)
Local protocols
Edge gateway (aggregate, filter, compress)
MQTT
CloudSignal
Cloud backends
Bidirectional communication
Send commands back to devices.
Cloud-to-device commands
Backend
CloudSignal
Device A
Device B
Device C
Each device publishes telemetry and subscribes to its own command topic.
Implementation examples
Sensor publishing telemetry
import paho.mqtt.client as mqtt
import ssl
import json
import time
# Device credentials (unique per device)
DEVICE_ID = 'sensor-001'
ORG_ID = 'org_k7xm4pqr2n5t'
PASSWORD = 'device-specific-password'
client = mqtt.Client(client_id=DEVICE_ID, clean_session=False)
client.username_pw_set(f'{DEVICE_ID}@{ORG_ID}', PASSWORD)
client.tls_set(cert_reqs=ssl.CERT_REQUIRED)
# Last will: mark device offline if connection drops
client.will_set(
f'devices/{DEVICE_ID}/status',
'offline',
qos=1,
retain=True,
)
def on_connect(client, userdata, flags, reason_code, properties):
# Announce online
client.publish(f'devices/{DEVICE_ID}/status', 'online', qos=1, retain=True)
# Subscribe to commands
client.subscribe(f'commands/{DEVICE_ID}/#', qos=1)
def on_message(client, userdata, msg):
command = json.loads(msg.payload.decode())
print(f'Received command: {command}')
if command['type'] == 'reboot':
# Handle reboot command
pass
elif command['type'] == 'update_config':
# Handle config update
pass
client.on_connect = on_connect
client.on_message = on_message
client.connect('mqtt.cloudsignal.app', 8883)
client.loop_start()
# Main telemetry loop
while True:
telemetry = {
'temperature': read_temperature(),
'humidity': read_humidity(),
'battery': read_battery(),
'timestamp': time.time(),
}
client.publish(
f'telemetry/{DEVICE_ID}',
json.dumps(telemetry),
qos=0, # High frequency, loss acceptable
)
time.sleep(10)Edge gateway aggregation
import paho.mqtt.client as mqtt
import json
from collections import defaultdict
import threading
import time
class EdgeGateway:
def __init__(self, gateway_id, org_id, password):
self.gateway_id = gateway_id
self.buffer = defaultdict(list)
self.lock = threading.Lock()
self.client = mqtt.Client(client_id=gateway_id, clean_session=False)
self.client.username_pw_set(f'{gateway_id}@{org_id}', password)
self.client.tls_set()
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
def _on_connect(self, client, userdata, flags, reason_code, properties):
# Subscribe to commands for this gateway and its devices
self.client.subscribe(f'commands/{self.gateway_id}/#')
def _on_message(self, client, userdata, msg):
# Forward commands to local devices
self._forward_to_local_device(msg)
def ingest_local_data(self, device_id, data):
"""Called by local protocol handlers (Modbus, BLE, and similar)."""
with self.lock:
self.buffer[device_id].append({
**data,
'timestamp': time.time(),
})
def _flush_buffer(self):
"""Send aggregated data to cloud on a schedule."""
while True:
time.sleep(60) # Aggregate for 1 minute
with self.lock:
if not self.buffer:
continue
aggregated = {}
for device_id, readings in self.buffer.items():
aggregated[device_id] = {
'count': len(readings),
'avg_temp': sum(r['temperature'] for r in readings) / len(readings),
'max_temp': max(r['temperature'] for r in readings),
'min_temp': min(r['temperature'] for r in readings),
}
self.buffer.clear()
self.client.publish(
f'gateways/{self.gateway_id}/aggregated',
json.dumps(aggregated),
qos=1,
)
def start(self):
self.client.connect('mqtt.cloudsignal.app', 8883)
threading.Thread(target=self._flush_buffer, daemon=True).start()
self.client.loop_forever()Backend command dispatch
import json
import time
import uuid
# Send commands to devices
def send_device_command(device_id, command_type, params):
command = {
'type': command_type,
'params': params,
'timestamp': time.time(),
'id': str(uuid.uuid4()),
}
client.publish(
f'commands/{device_id}',
json.dumps(command),
qos=1, # Ensure delivery
)
return command['id']
# Example: update device configuration
send_device_command('sensor-001', 'update_config', {
'reporting_interval': 30,
'alert_threshold': 100,
})
# Example: request firmware update
send_device_command('sensor-001', 'firmware_update', {
'version': '2.1.0',
'url': 'https://firmware.example.com/v2.1.0.bin',
})Topic design for IoT
Hierarchical topics
# Telemetry (device -> cloud)
telemetry/{device_id}
telemetry/{device_id}/temperature
telemetry/{device_id}/humidity
# Status (device -> cloud)
devices/{device_id}/status
devices/{device_id}/battery
devices/{device_id}/connectivity
# Commands (cloud -> device)
commands/{device_id}
commands/{device_id}/config
commands/{device_id}/firmware
# Alerts (device -> cloud)
alerts/{device_id}/{severity}Wildcard subscriptions
# Backend: receive all telemetry
client.subscribe('telemetry/#')
# Backend: receive all critical alerts
client.subscribe('alerts/+/critical')
# Device: receive its commands
client.subscribe(f'commands/{device_id}/#')Best practices
Unique credentials per device
# Good: each device has unique credentials
DEVICE_ID = 'sensor-001'
PASSWORD = get_device_password(DEVICE_ID) # From secure storage
# Bad: shared credentials across devices
PASSWORD = 'shared-password' # Security riskAppropriate QoS levels
# High-frequency telemetry: QoS 0
client.publish('telemetry/sensor-001', data, qos=0)
# Important status updates: QoS 1
client.publish('devices/sensor-001/status', 'online', qos=1)
# Critical commands: QoS 1 or 2
client.publish('commands/sensor-001/reboot', cmd, qos=1)Persistent sessions for commands
# Device won't miss commands while offline
client = mqtt.Client(
client_id='sensor-001',
clean_session=False, # Persistent session
)With clean_session=False, commands sent while a device is offline are delivered when it reconnects.
Use last will for presence
client.will_set(
f'devices/{DEVICE_ID}/status',
'offline',
qos=1,
retain=True,
)Related
- Persistent sessions - Offline message handling
- Python quickstart - Device implementation