Node.js Server
Use CloudSignal MQTT in Node.js backends and microservices
Node.js Server
Integrate CloudSignal MQTT into your Node.js backend for server-to-server communication, IoT data processing, and microservices.
Installation
npm install @cloudsignal/mqtt-client
Quick Start
// server.js
const CloudSignal = require('@cloudsignal/mqtt-client').default;
const client = new CloudSignal({
tokenServiceUrl: 'https://auth.cloudsignal.app',
preset: 'server',
debug: true,
});
async function main() {
await client.connectWithToken({
host: 'wss://connect.cloudsignal.app:18885/',
organizationId: process.env.CLOUDSIGNAL_ORG_ID,
secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
userEmail: '[email protected]',
});
console.log('Connected to CloudSignal');
// Subscribe to topics
await client.subscribe('sensors/#');
await client.subscribe('commands/server');
// Handle incoming messages
client.onMessage((topic, message) => {
console.log(`[${topic}]`, message);
processMessage(topic, JSON.parse(message));
});
// Publish periodic heartbeat
setInterval(() => {
client.transmit('status/server', {
service: 'my-service',
uptime: process.uptime(),
timestamp: Date.now(),
});
}, 30000);
}
function processMessage(topic, data) {
if (topic.startsWith('sensors/')) {
// Store sensor data in database
saveSensorReading(topic, data);
} else if (topic === 'commands/server') {
// Handle server commands
executeCommand(data);
}
}
main().catch(console.error);
Event Publisher Service
Create a dedicated service for publishing events:
// publisher.js
const CloudSignal = require('@cloudsignal/mqtt-client').default;
class EventPublisher {
constructor() {
this.client = new CloudSignal({
tokenServiceUrl: 'https://auth.cloudsignal.app',
preset: 'server',
});
this.connected = false;
}
async connect() {
if (this.connected) return;
await this.client.connectWithToken({
host: 'wss://connect.cloudsignal.app:18885/',
organizationId: process.env.CLOUDSIGNAL_ORG_ID,
secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
userEmail: '[email protected]',
});
this.client.onConnectionStatusChange = (connected) => {
this.connected = connected;
console.log('Publisher connected:', connected);
};
this.connected = true;
}
publish(topic, data, options = {}) {
if (!this.connected) {
console.warn('Not connected, message queued');
}
this.client.transmit(topic, JSON.stringify(data), {
qos: options.qos ?? 1,
retain: options.retain ?? false,
});
}
async disconnect() {
this.client.destroy();
this.connected = false;
}
}
// Singleton instance
const publisher = new EventPublisher();
module.exports = publisher;
Usage:
const publisher = require('./publisher');
// In your Express/Fastify route
app.post('/api/order', async (req, res) => {
const order = await createOrder(req.body);
// Publish event for real-time updates
publisher.publish('orders/created', {
orderId: order.id,
userId: order.userId,
total: order.total,
timestamp: Date.now(),
});
res.json(order);
});
Express.js Integration
// app.js
const express = require('express');
const CloudSignal = require('@cloudsignal/mqtt-client').default;
const app = express();
let mqttClient = null;
// Initialize MQTT client
async function initMQTT() {
mqttClient = new CloudSignal({
tokenServiceUrl: 'https://auth.cloudsignal.app',
preset: 'server',
});
await mqttClient.connectWithToken({
host: 'wss://connect.cloudsignal.app:18885/',
organizationId: process.env.CLOUDSIGNAL_ORG_ID,
secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
userEmail: '[email protected]',
});
console.log('MQTT connected');
}
// Middleware to ensure MQTT is connected
app.use((req, res, next) => {
req.mqtt = mqttClient;
next();
});
// REST endpoint that publishes MQTT message
app.post('/api/notify/:userId', express.json(), (req, res) => {
const { userId } = req.params;
const { title, body, type } = req.body;
req.mqtt.transmit(`notifications/user/${userId}`, {
title,
body,
type: type || 'info',
timestamp: Date.now(),
});
res.json({ success: true });
});
// Start server
initMQTT().then(() => {
app.listen(3000, () => console.log('Server running on port 3000'));
});
Worker Process
Process messages from a queue:
// worker.js
const CloudSignal = require('@cloudsignal/mqtt-client').default;
const client = new CloudSignal({
tokenServiceUrl: 'https://auth.cloudsignal.app',
preset: 'server',
});
async function startWorker() {
await client.connectWithToken({
host: 'wss://connect.cloudsignal.app:18885/',
organizationId: process.env.CLOUDSIGNAL_ORG_ID,
secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
userEmail: '[email protected]',
});
// Subscribe to job queue
await client.subscribe('jobs/pending/#', 1);
client.onMessage(async (topic, message) => {
const job = JSON.parse(message);
console.log('Processing job:', job.id);
try {
const result = await processJob(job);
// Publish completion
client.transmit('jobs/completed', {
jobId: job.id,
result,
completedAt: Date.now(),
});
} catch (error) {
// Publish failure
client.transmit('jobs/failed', {
jobId: job.id,
error: error.message,
failedAt: Date.now(),
});
}
});
console.log('Worker started, waiting for jobs...');
}
startWorker().catch(console.error);
Environment Variables
# .env
CLOUDSIGNAL_ORG_ID=your-org-uuid
CLOUDSIGNAL_SECRET_KEY=cs_live_xxxxx
TypeScript Support
// server.ts
import CloudSignal from '@cloudsignal/mqtt-client';
interface SensorData {
sensorId: string;
value: number;
unit: string;
timestamp: number;
}
const client = new CloudSignal({
tokenServiceUrl: 'https://auth.cloudsignal.app',
preset: 'server',
});
async function main(): Promise<void> {
await client.connectWithToken({
host: 'wss://connect.cloudsignal.app:18885/',
organizationId: process.env.CLOUDSIGNAL_ORG_ID!,
secretKey: process.env.CLOUDSIGNAL_SECRET_KEY!,
userEmail: '[email protected]',
});
client.onMessage((topic: string, message: string) => {
const data: SensorData = JSON.parse(message);
console.log(`Sensor ${data.sensorId}: ${data.value} ${data.unit}`);
});
await client.subscribe('sensors/#');
}
main();
Graceful Shutdown
const CloudSignal = require('@cloudsignal/mqtt-client').default;
const client = new CloudSignal({
tokenServiceUrl: 'https://auth.cloudsignal.app',
preset: 'server',
});
// Handle shutdown signals
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
async function shutdown() {
console.log('Shutting down...');
// Publish offline status
client.transmit('status/server', {
status: 'offline',
timestamp: Date.now(),
}, { qos: 1 });
// Wait for message to be sent
await new Promise(resolve => setTimeout(resolve, 1000));
client.destroy();
process.exit(0);
}
async function main() {
await client.connectWithToken({
host: 'wss://connect.cloudsignal.app:18885/',
organizationId: process.env.CLOUDSIGNAL_ORG_ID,
secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
userEmail: '[email protected]',
});
// Publish online status
client.transmit('status/server', {
status: 'online',
timestamp: Date.now(),
}, { qos: 1, retain: true });
// Your server logic here...
}
main().catch(console.error);
Best Practices
- Use server preset β Optimized for backend: longer keepalive, clean sessions
- Handle reconnection β The SDK auto-reconnects, but track connection state
- Use QoS 1 for important messages β Ensures delivery
- Implement graceful shutdown β Publish offline status before exiting
- Use retain for status β Last known status available to new subscribers