Node.js Server

Use CloudSignal MQTT in Node.js backends and microservices

Node.js Server

Integrate CloudSignal MQTT into your Node.js backend for server-to-server communication, IoT data processing, and microservices.

Installation

npm install @cloudsignal/mqtt-client

Quick Start

// server.js
const CloudSignal = require('@cloudsignal/mqtt-client').default;

const client = new CloudSignal({
  tokenServiceUrl: 'https://auth.cloudsignal.app',
  preset: 'server',
  debug: true,
});

async function main() {
  await client.connectWithToken({
    host: 'wss://connect.cloudsignal.app:18885/',
    organizationId: process.env.CLOUDSIGNAL_ORG_ID,
    secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
    userEmail: '[email protected]',
  });

  console.log('Connected to CloudSignal');

  // Subscribe to topics
  await client.subscribe('sensors/#');
  await client.subscribe('commands/server');

  // Handle incoming messages
  client.onMessage((topic, message) => {
    console.log(`[${topic}]`, message);
    processMessage(topic, JSON.parse(message));
  });

  // Publish periodic heartbeat
  setInterval(() => {
    client.transmit('status/server', {
      service: 'my-service',
      uptime: process.uptime(),
      timestamp: Date.now(),
    });
  }, 30000);
}

function processMessage(topic, data) {
  if (topic.startsWith('sensors/')) {
    // Store sensor data in database
    saveSensorReading(topic, data);
  } else if (topic === 'commands/server') {
    // Handle server commands
    executeCommand(data);
  }
}

main().catch(console.error);

Event Publisher Service

Create a dedicated service for publishing events:

// publisher.js
const CloudSignal = require('@cloudsignal/mqtt-client').default;

class EventPublisher {
  constructor() {
    this.client = new CloudSignal({
      tokenServiceUrl: 'https://auth.cloudsignal.app',
      preset: 'server',
    });
    this.connected = false;
  }

  async connect() {
    if (this.connected) return;

    await this.client.connectWithToken({
      host: 'wss://connect.cloudsignal.app:18885/',
      organizationId: process.env.CLOUDSIGNAL_ORG_ID,
      secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
      userEmail: '[email protected]',
    });

    this.client.onConnectionStatusChange = (connected) => {
      this.connected = connected;
      console.log('Publisher connected:', connected);
    };

    this.connected = true;
  }

  publish(topic, data, options = {}) {
    if (!this.connected) {
      console.warn('Not connected, message queued');
    }
    this.client.transmit(topic, JSON.stringify(data), {
      qos: options.qos ?? 1,
      retain: options.retain ?? false,
    });
  }

  async disconnect() {
    this.client.destroy();
    this.connected = false;
  }
}

// Singleton instance
const publisher = new EventPublisher();
module.exports = publisher;

Usage:

const publisher = require('./publisher');

// In your Express/Fastify route
app.post('/api/order', async (req, res) => {
  const order = await createOrder(req.body);
  
  // Publish event for real-time updates
  publisher.publish('orders/created', {
    orderId: order.id,
    userId: order.userId,
    total: order.total,
    timestamp: Date.now(),
  });

  res.json(order);
});

Express.js Integration

// app.js
const express = require('express');
const CloudSignal = require('@cloudsignal/mqtt-client').default;

const app = express();
let mqttClient = null;

// Initialize MQTT client
async function initMQTT() {
  mqttClient = new CloudSignal({
    tokenServiceUrl: 'https://auth.cloudsignal.app',
    preset: 'server',
  });

  await mqttClient.connectWithToken({
    host: 'wss://connect.cloudsignal.app:18885/',
    organizationId: process.env.CLOUDSIGNAL_ORG_ID,
    secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
    userEmail: '[email protected]',
  });

  console.log('MQTT connected');
}

// Middleware to ensure MQTT is connected
app.use((req, res, next) => {
  req.mqtt = mqttClient;
  next();
});

// REST endpoint that publishes MQTT message
app.post('/api/notify/:userId', express.json(), (req, res) => {
  const { userId } = req.params;
  const { title, body, type } = req.body;

  req.mqtt.transmit(`notifications/user/${userId}`, {
    title,
    body,
    type: type || 'info',
    timestamp: Date.now(),
  });

  res.json({ success: true });
});

// Start server
initMQTT().then(() => {
  app.listen(3000, () => console.log('Server running on port 3000'));
});

Worker Process

Process messages from a queue:

// worker.js
const CloudSignal = require('@cloudsignal/mqtt-client').default;

const client = new CloudSignal({
  tokenServiceUrl: 'https://auth.cloudsignal.app',
  preset: 'server',
});

async function startWorker() {
  await client.connectWithToken({
    host: 'wss://connect.cloudsignal.app:18885/',
    organizationId: process.env.CLOUDSIGNAL_ORG_ID,
    secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
    userEmail: '[email protected]',
  });

  // Subscribe to job queue
  await client.subscribe('jobs/pending/#', 1);

  client.onMessage(async (topic, message) => {
    const job = JSON.parse(message);
    console.log('Processing job:', job.id);

    try {
      const result = await processJob(job);
      
      // Publish completion
      client.transmit('jobs/completed', {
        jobId: job.id,
        result,
        completedAt: Date.now(),
      });
    } catch (error) {
      // Publish failure
      client.transmit('jobs/failed', {
        jobId: job.id,
        error: error.message,
        failedAt: Date.now(),
      });
    }
  });

  console.log('Worker started, waiting for jobs...');
}

startWorker().catch(console.error);

Environment Variables

# .env
CLOUDSIGNAL_ORG_ID=your-org-uuid
CLOUDSIGNAL_SECRET_KEY=cs_live_xxxxx

TypeScript Support

// server.ts
import CloudSignal from '@cloudsignal/mqtt-client';

interface SensorData {
  sensorId: string;
  value: number;
  unit: string;
  timestamp: number;
}

const client = new CloudSignal({
  tokenServiceUrl: 'https://auth.cloudsignal.app',
  preset: 'server',
});

async function main(): Promise<void> {
  await client.connectWithToken({
    host: 'wss://connect.cloudsignal.app:18885/',
    organizationId: process.env.CLOUDSIGNAL_ORG_ID!,
    secretKey: process.env.CLOUDSIGNAL_SECRET_KEY!,
    userEmail: '[email protected]',
  });

  client.onMessage((topic: string, message: string) => {
    const data: SensorData = JSON.parse(message);
    console.log(`Sensor ${data.sensorId}: ${data.value} ${data.unit}`);
  });

  await client.subscribe('sensors/#');
}

main();

Graceful Shutdown

const CloudSignal = require('@cloudsignal/mqtt-client').default;

const client = new CloudSignal({
  tokenServiceUrl: 'https://auth.cloudsignal.app',
  preset: 'server',
});

// Handle shutdown signals
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

async function shutdown() {
  console.log('Shutting down...');
  
  // Publish offline status
  client.transmit('status/server', {
    status: 'offline',
    timestamp: Date.now(),
  }, { qos: 1 });

  // Wait for message to be sent
  await new Promise(resolve => setTimeout(resolve, 1000));
  
  client.destroy();
  process.exit(0);
}

async function main() {
  await client.connectWithToken({
    host: 'wss://connect.cloudsignal.app:18885/',
    organizationId: process.env.CLOUDSIGNAL_ORG_ID,
    secretKey: process.env.CLOUDSIGNAL_SECRET_KEY,
    userEmail: '[email protected]',
  });

  // Publish online status
  client.transmit('status/server', {
    status: 'online',
    timestamp: Date.now(),
  }, { qos: 1, retain: true });

  // Your server logic here...
}

main().catch(console.error);

Best Practices

  1. Use server preset β€” Optimized for backend: longer keepalive, clean sessions
  2. Handle reconnection β€” The SDK auto-reconnects, but track connection state
  3. Use QoS 1 for important messages β€” Ensures delivery
  4. Implement graceful shutdown β€” Publish offline status before exiting
  5. Use retain for status β€” Last known status available to new subscribers