Node.js quickstart
Connect to CloudSignal from Node.js using @cloudsignal/mqtt-client. Build backend services, message processors, and integrations.
Connect a Node.js service to CloudSignal using @cloudsignal/mqtt-client. This quickstart covers basic connection, pub/sub, persistent sessions, and an Express integration pattern.
Install the SDK
npm install @cloudsignal/mqtt-clientBasic connection
import CloudSignal from '@cloudsignal/mqtt-client';
const client = new CloudSignal({
preset: 'server',
clientId: `nodejs-${Date.now()}`,
});
client.onMessage((topic, message) => {
console.log(`${topic}:`, message);
});
await client.connectWithToken({
organizationId: process.env.CLOUDSIGNAL_ORG_ID,
secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
userEmail: 'service@example.com',
});
await client.subscribe('events/#');
console.log('Connected to CloudSignal');TypeScript version
import CloudSignal from '@cloudsignal/mqtt-client';
const client = new CloudSignal({
preset: 'server',
clientId: `nodejs-${Date.now()}`,
reconnectPeriod: 5000,
});
client.onMessage((topic: string, message: unknown) => {
console.log(`${topic}:`, message);
});
await client.connectWithToken({
organizationId: process.env.CLOUDSIGNAL_ORG_ID!,
secretKey: process.env.CLOUDSIGNAL_SECRET_KEY!,
userEmail: 'service@example.com',
});
await client.subscribe('events/#');The server preset sets cleanSession: false and other server-friendly defaults; override with cleanSession: true if you want a fresh session on every restart.
Publish and subscribe
import CloudSignal from '@cloudsignal/mqtt-client';
const client = new CloudSignal({ preset: 'server' });
client.onMessage((topic, payload) => {
if (topic.startsWith('commands/service-1/')) {
handleCommand(topic, payload);
}
});
await client.connectWithToken({
organizationId: process.env.CLOUDSIGNAL_ORG_ID,
secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
userEmail: 'service@example.com',
});
await client.subscribe('commands/service-1/#');
client.transmit('status/service-1', {
status: 'online',
timestamp: Date.now(),
}, { retain: true });
function handleCommand(topic, payload) {
console.log(`Received command: ${topic}`, payload);
const responseTopic = topic.replace('commands/', 'responses/');
client.transmit(responseTopic, { received: true, processedAt: Date.now() });
}Express.js integration
Add MQTT to an Express server:
import express from 'express';
import CloudSignal from '@cloudsignal/mqtt-client';
const app = express();
app.use(express.json());
const mqttClient = new CloudSignal({ preset: 'server' });
await mqttClient.connectWithToken({
organizationId: process.env.CLOUDSIGNAL_ORG_ID!,
secretKey: process.env.CLOUDSIGNAL_SECRET_KEY!,
userEmail: 'express-app@example.com',
});
console.log('MQTT connected');
// REST endpoint to publish
app.post('/api/publish', (req, res) => {
const { topic, message } = req.body;
try {
mqttClient.transmit(topic, message, { qos: 1 });
res.json({ success: true });
} catch (err) {
res.status(500).json({ error: 'Publish failed' });
}
});
// REST endpoint to forward MQTT messages via SSE
app.get('/api/subscribe/:topic', async (req, res) => {
const { topic } = req.params;
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
await mqttClient.subscribe(topic);
// onMessage is additive: every call adds a handler to the list.
// Remove this request's handler when the SSE stream closes.
const handler = (msgTopic: string, payload: unknown) => {
if (msgTopic.startsWith(topic.replace('#', ''))) {
res.write(`data: ${JSON.stringify({ topic: msgTopic, message: payload })}\n\n`);
}
};
mqttClient.onMessage(handler);
req.on('close', () => {
mqttClient.offMessage(handler);
mqttClient.unsubscribe(topic);
});
});
app.listen(3000, () => {
console.log('Server running on port 3000');
});Persistent sessions
Keep subscriptions across reconnects:
import CloudSignal from '@cloudsignal/mqtt-client';
const client = new CloudSignal({
preset: 'server',
clientId: 'service-001', // Must be stable across restarts
cleanSession: false, // Persistent session (server preset default)
reconnectPeriod: 5000,
});
client.onConnectionStatusChange = (isConnected) => {
if (isConnected) {
console.log('Connected. The SDK resubscribes automatically.');
}
};
await client.connectWithToken({
organizationId: process.env.CLOUDSIGNAL_ORG_ID,
secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
userEmail: 'service-001@example.com',
});
await client.subscribe('agents/agent-001/inbox', { qos: 1 });With cleanSession: false and a stable clientId, the broker stores your session. The SDK tracks your subscriptions and resubscribes automatically after reconnect. Subscribe with QoS 1 or higher to receive queued messages.
Error handling
The SDK exposes connection-lifecycle and auth-error hooks as assignment properties. Use them for production logging and graceful shutdown.
import CloudSignal from '@cloudsignal/mqtt-client';
const client = new CloudSignal({
preset: 'server',
clientId: `service-${process.pid}`,
reconnectPeriod: 5000,
connectTimeout: 30000,
});
client.onConnectionStatusChange = (isConnected, state) => {
console.log(`connection ${state}`);
};
client.onReconnecting = (attempt) => {
console.log(`reconnecting (attempt ${attempt})`);
};
client.onOffline = () => console.log('client offline');
client.onOnline = () => console.log('client back online');
client.onAuthError = (err) => {
console.error('auth error:', err.message);
process.exit(1);
};
await client.connectWithToken({
organizationId: process.env.CLOUDSIGNAL_ORG_ID,
secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
userEmail: 'service@example.com',
});
// Graceful shutdown
process.on('SIGTERM', () => {
console.log('shutting down');
client.destroy();
process.exit(0);
});Environment configuration
Centralize MQTT options in one module:
// config/mqtt.js
import 'dotenv/config';
export const mqttConfig = {
organizationId: process.env.CLOUDSIGNAL_ORG_ID,
secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
userEmail: process.env.CLOUDSIGNAL_USER_EMAIL || 'service@example.com',
clientOptions: {
preset: 'server',
clientId: `${process.env.SERVICE_NAME || 'app'}-${process.pid}`,
cleanSession: process.env.MQTT_CLEAN_SESSION === 'true',
reconnectPeriod: parseInt(process.env.MQTT_RECONNECT_PERIOD || '5000', 10),
},
};.env file:
CLOUDSIGNAL_ORG_ID=org_your_short_id
CLOUDSIGNAL_SECRET_KEY=sk_your_secret_key
CLOUDSIGNAL_USER_EMAIL=service@example.com
SERVICE_NAME=my-service
MQTT_CLEAN_SESSION=false
MQTT_RECONNECT_PERIOD=5000Related
- Python quickstart - Python backend guide
- Next.js quickstart - Browser quickstart with server-side tokens