CloudSignal Docs
Getting StartedQuickstarts

Node.js quickstart

Connect to CloudSignal from Node.js using @cloudsignal/mqtt-client. Build backend services, message processors, and integrations.

Connect a Node.js service to CloudSignal using @cloudsignal/mqtt-client. This quickstart covers basic connection, pub/sub, persistent sessions, and an Express integration pattern.

Install the SDK

npm install @cloudsignal/mqtt-client

Basic connection

import CloudSignal from '@cloudsignal/mqtt-client';

const client = new CloudSignal({
  preset: 'server',
  clientId: `nodejs-${Date.now()}`,
});

client.onMessage((topic, message) => {
  console.log(`${topic}:`, message);
});

await client.connectWithToken({
  organizationId: process.env.CLOUDSIGNAL_ORG_ID,
  secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
  userEmail: 'service@example.com',
});

await client.subscribe('events/#');
console.log('Connected to CloudSignal');

TypeScript version

import CloudSignal from '@cloudsignal/mqtt-client';

const client = new CloudSignal({
  preset: 'server',
  clientId: `nodejs-${Date.now()}`,
  reconnectPeriod: 5000,
});

client.onMessage((topic: string, message: unknown) => {
  console.log(`${topic}:`, message);
});

await client.connectWithToken({
  organizationId: process.env.CLOUDSIGNAL_ORG_ID!,
  secretKey: process.env.CLOUDSIGNAL_SECRET_KEY!,
  userEmail: 'service@example.com',
});

await client.subscribe('events/#');

The server preset sets cleanSession: false and other server-friendly defaults; override with cleanSession: true if you want a fresh session on every restart.

Publish and subscribe

import CloudSignal from '@cloudsignal/mqtt-client';

const client = new CloudSignal({ preset: 'server' });

client.onMessage((topic, payload) => {
  if (topic.startsWith('commands/service-1/')) {
    handleCommand(topic, payload);
  }
});

await client.connectWithToken({
  organizationId: process.env.CLOUDSIGNAL_ORG_ID,
  secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
  userEmail: 'service@example.com',
});

await client.subscribe('commands/service-1/#');

client.transmit('status/service-1', {
  status: 'online',
  timestamp: Date.now(),
}, { retain: true });

function handleCommand(topic, payload) {
  console.log(`Received command: ${topic}`, payload);
  const responseTopic = topic.replace('commands/', 'responses/');
  client.transmit(responseTopic, { received: true, processedAt: Date.now() });
}

Express.js integration

Add MQTT to an Express server:

import express from 'express';
import CloudSignal from '@cloudsignal/mqtt-client';

const app = express();
app.use(express.json());

const mqttClient = new CloudSignal({ preset: 'server' });

await mqttClient.connectWithToken({
  organizationId: process.env.CLOUDSIGNAL_ORG_ID!,
  secretKey: process.env.CLOUDSIGNAL_SECRET_KEY!,
  userEmail: 'express-app@example.com',
});
console.log('MQTT connected');

// REST endpoint to publish
app.post('/api/publish', (req, res) => {
  const { topic, message } = req.body;
  try {
    mqttClient.transmit(topic, message, { qos: 1 });
    res.json({ success: true });
  } catch (err) {
    res.status(500).json({ error: 'Publish failed' });
  }
});

// REST endpoint to forward MQTT messages via SSE
app.get('/api/subscribe/:topic', async (req, res) => {
  const { topic } = req.params;

  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  await mqttClient.subscribe(topic);

  // onMessage is additive: every call adds a handler to the list.
  // Remove this request's handler when the SSE stream closes.
  const handler = (msgTopic: string, payload: unknown) => {
    if (msgTopic.startsWith(topic.replace('#', ''))) {
      res.write(`data: ${JSON.stringify({ topic: msgTopic, message: payload })}\n\n`);
    }
  };

  mqttClient.onMessage(handler);

  req.on('close', () => {
    mqttClient.offMessage(handler);
    mqttClient.unsubscribe(topic);
  });
});

app.listen(3000, () => {
  console.log('Server running on port 3000');
});

Persistent sessions

Keep subscriptions across reconnects:

import CloudSignal from '@cloudsignal/mqtt-client';

const client = new CloudSignal({
  preset: 'server',
  clientId: 'service-001',  // Must be stable across restarts
  cleanSession: false,      // Persistent session (server preset default)
  reconnectPeriod: 5000,
});

client.onConnectionStatusChange = (isConnected) => {
  if (isConnected) {
    console.log('Connected. The SDK resubscribes automatically.');
  }
};

await client.connectWithToken({
  organizationId: process.env.CLOUDSIGNAL_ORG_ID,
  secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
  userEmail: 'service-001@example.com',
});

await client.subscribe('agents/agent-001/inbox', { qos: 1 });

With cleanSession: false and a stable clientId, the broker stores your session. The SDK tracks your subscriptions and resubscribes automatically after reconnect. Subscribe with QoS 1 or higher to receive queued messages.

Error handling

The SDK exposes connection-lifecycle and auth-error hooks as assignment properties. Use them for production logging and graceful shutdown.

import CloudSignal from '@cloudsignal/mqtt-client';

const client = new CloudSignal({
  preset: 'server',
  clientId: `service-${process.pid}`,
  reconnectPeriod: 5000,
  connectTimeout: 30000,
});

client.onConnectionStatusChange = (isConnected, state) => {
  console.log(`connection ${state}`);
};

client.onReconnecting = (attempt) => {
  console.log(`reconnecting (attempt ${attempt})`);
};

client.onOffline = () => console.log('client offline');
client.onOnline = () => console.log('client back online');

client.onAuthError = (err) => {
  console.error('auth error:', err.message);
  process.exit(1);
};

await client.connectWithToken({
  organizationId: process.env.CLOUDSIGNAL_ORG_ID,
  secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
  userEmail: 'service@example.com',
});

// Graceful shutdown
process.on('SIGTERM', () => {
  console.log('shutting down');
  client.destroy();
  process.exit(0);
});

Environment configuration

Centralize MQTT options in one module:

// config/mqtt.js
import 'dotenv/config';

export const mqttConfig = {
  organizationId: process.env.CLOUDSIGNAL_ORG_ID,
  secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
  userEmail: process.env.CLOUDSIGNAL_USER_EMAIL || 'service@example.com',
  clientOptions: {
    preset: 'server',
    clientId: `${process.env.SERVICE_NAME || 'app'}-${process.pid}`,
    cleanSession: process.env.MQTT_CLEAN_SESSION === 'true',
    reconnectPeriod: parseInt(process.env.MQTT_RECONNECT_PERIOD || '5000', 10),
  },
};

.env file:

CLOUDSIGNAL_ORG_ID=org_your_short_id
CLOUDSIGNAL_SECRET_KEY=sk_your_secret_key
CLOUDSIGNAL_USER_EMAIL=service@example.com
SERVICE_NAME=my-service
MQTT_CLEAN_SESSION=false
MQTT_RECONNECT_PERIOD=5000

On this page