CloudSignal Docs
API Reference

Methods & Events

Client methods, events, and TypeScript types

Complete reference for CloudSignalClient methods and event callbacks.

Both import forms resolve to the same class:

import CloudSignal from '@cloudsignal/mqtt-client';
// or
import { CloudSignalClient } from '@cloudsignal/mqtt-client';

Connection Methods

connect()

Establish a connection using a username and password.

await client.connect(config: ConnectionConfig): Promise<void>

interface ConnectionConfig {
  host: string;
  username?: string;
  password?: string;
  clientId?: string;
  willTopic?: string;
  willMessage?: string;
  willQos?: 0 | 1 | 2;
  willRetain?: boolean;
}

Resolves when connected; rejects if authentication fails or the connection times out.

connectWithToken()

Establish a connection using token authentication. Requires tokenServiceUrl in the constructor options.

await client.connectWithToken(config: TokenAuthConfig): Promise<TokenInfo>

interface TokenAuthConfig {
  host: string;
  organizationId: string;
  secretKey?: string;        // native auth (with userEmail)
  userEmail?: string;        // native auth (with secretKey)
  externalToken?: string;    // external IdP exchange (Supabase, Firebase, Auth0, ...)
  provider?: 'supabase' | 'firebase' | 'auth0' | 'clerk' | 'oidc';
  clientId?: string;
  willTopic?: string;
  willMessage?: string;
  willQos?: 0 | 1 | 2;
}

Provide either externalToken (to exchange an IdP token) or both secretKey and userEmail (native CloudSignal auth).

disconnect()

Gracefully disconnect from the broker.

await client.disconnect(force?: boolean): Promise<void>

forceReconnect()

Tear down the current connection and reconnect using the last connection config.

client.forceReconnect(): void

destroy()

Disconnect and release all resources (token manager, request handler, callbacks, queues). The client cannot be reused after this.

client.destroy(): void

isConnected()

Check whether the client is currently connected.

client.isConnected(): boolean

getConnectionState()

Return the current connection state as a string: 'disconnected', 'connecting', 'connected', 'reconnecting', 'disconnecting', or 'error'.

client.getConnectionState(): string

Publish/Subscribe Methods

transmit()

Publish a message to a topic. If the client is offline and the offline queue is enabled, the message is queued and sent on reconnect.

client.transmit(
  topic: string,
  message: string | object,
  options?: PublishOptions
): Promise<void>

interface PublishOptions {
  qos?: 0 | 1 | 2;
  retain?: boolean;
}

Examples:

// Simple publish (object payloads are JSON-serialized)
client.transmit('sensors/temperature', { value: 22.5 });

// With options
client.transmit('alerts/critical', { message: 'Server down' }, {
  qos: 2,
  retain: true
});

// String payload
client.transmit('logs/app', 'Application started');

subscribe()

Subscribe to a topic. The options object carries the QoS only — incoming messages are delivered through onMessage(), not a per-call handler argument.

await client.subscribe(
  topic: string,
  options?: SubscribeOptions
): Promise<SubscriptionGrant>

interface SubscribeOptions {
  qos?: 0 | 1 | 2;
}

Examples:

// Basic subscription
await client.subscribe('chat/messages');

// With QoS
await client.subscribe('orders/new', { qos: 2 });

// Wildcard subscription
await client.subscribe('sensors/+/temperature', { qos: 1 });

// Route messages with onMessage()
client.onMessage((topic, message) => {
  if (topic.startsWith('sensors/')) {
    const sensorId = topic.split('/')[1];
    const data = JSON.parse(message);
    console.log(`Sensor ${sensorId}: ${data.value}°C`);
  }
});

unsubscribe()

Unsubscribe from a topic.

await client.unsubscribe(topic: string): Promise<void>

getSubscriptions()

Return the list of currently subscribed topic filters.

client.getSubscriptions(): string[]

isSubscribed()

Check whether the client is subscribed to a topic filter.

client.isSubscribed(topic: string): boolean

Message Handling

Incoming messages are delivered to global callbacks registered with onMessage(). There is no per-subscription handler argument; route by topic inside the callback.

onMessage()

Register a callback invoked for every incoming message. The argument order is (topic, message, packet), where message is the payload as a string and packet is the full MQTT packet (including MQTT 5 properties).

client.onMessage(
  callback: (topic: string, message: string, packet: any) => void
): void

Example:

client.onMessage((topic, message, packet) => {
  console.log(`${topic}: ${message}`);
  // packet.properties holds MQTT 5 user/correlation properties when present
});

You can register multiple callbacks; each is invoked for every message.

offMessage()

Remove a previously registered message callback.

client.offMessage(callback: Function): void

Request/Response Pattern

For AI agent and RPC-style communication, the client can send a request and await a correlated response using MQTT 5 correlationData and responseTopic properties. Enable it in the constructor:

const client = new CloudSignal({ enableRequestResponse: true });

The client exposes only the requester side (request()). There is no built-in responder helper — the service that answers requests reads the inbound responseTopic / correlationData properties from the packet (delivered via onMessage()) and publishes its reply back with transmit().

request()

Send a request and wait for the correlated response. Requires enableRequestResponse: true.

const response = await client.request(
  topic: string,
  payload: object | string,
  options?: RequestOptions
): Promise<RequestResponse>

interface RequestOptions {
  timeout?: number;        // ms, default: 30000
  qos?: 0 | 1 | 2;         // default: 1
  userProperties?: Record<string, string>;
  messageType?: string;
}

interface RequestResponse {
  correlationId: string;
  topic: string;
  payload: any;            // parsed JSON, or the raw string
  properties: any;         // MQTT 5 properties from the response packet
  userProperties: Record<string, string>;
  receivedAt: number;
  latencyMs: number;
}

The promise rejects with a timeout error if no response arrives within timeout.

Example:

// Send request to an AI agent and await its reply
const response = await client.request('ai/assistant', {
  prompt: 'Summarize this document',
  documentId: 'doc-123'
}, { timeout: 60000 });

console.log(response.payload.summary);
console.log(`Round trip: ${response.latencyMs}ms`);

Responding to requests

There is no onRequest() method. To answer requests, subscribe to the request topic, read the MQTT 5 responseTopic and correlationData from the packet, and reply with transmit():

await client.subscribe('ai/assistant');

client.onMessage(async (topic, message, packet) => {
  if (topic !== 'ai/assistant') return;

  const props = packet.properties || {};
  const responseTopic = props.responseTopic;
  const correlationData = props.correlationData; // Buffer
  if (!responseTopic) return;

  const result = await processAIRequest(JSON.parse(message));

  client.transmit(responseTopic, result, {
    qos: 1,
    properties: { correlationData },
  });
});

getRequestStats()

Return request/response statistics, or null if the pattern is not enabled.

client.getRequestStats(): RequestStats | null

interface RequestStats {
  requestsSent: number;
  responsesReceived: number;
  timeouts: number;
  errors: number;
  pendingRequests: number;
}

Event Callbacks

There is no generic .on() / .off() / .once(). Lifecycle events are wired through setter methods (preferred) or by assigning the corresponding callback property directly.

Connection events:

// Fires whenever the connection state changes
client.setConnectionStatusCallback((isConnected, state) => {
  console.log('Connected:', isConnected, 'state:', state);
});

// Fires once the client comes online (good place to re-subscribe)
client.setOnlineCallback(() => {
  console.log('Connected to broker');
});

// Fires when the client goes offline
client.setOfflineCallback(() => {
  console.log('Disconnected');
});

// Fires on each reconnect attempt
client.setReconnectingCallback((attempt) => {
  console.log(`Reconnecting... attempt ${attempt}`);
});

// Fires when the broker rejects the credentials
client.setAuthErrorCallback((error) => {
  console.error('Auth failed:', error.message);
});

Each setter has an equivalent assignable property:

client.onConnectionStatusChange = (isConnected, state) => { /* ... */ };
client.onOnline = () => { /* ... */ };
client.onOffline = () => { /* ... */ };
client.onReconnecting = (attempt) => { /* ... */ };
client.onAuthError = (error) => { /* ... */ };

Token events (available when tokenServiceUrl is configured):

client.setTokenRefreshedCallback((newTokenInfo) => {
  console.log('Token refreshed');
});

client.setTokenExpiringCallback((remainingSeconds) => {
  console.log(`Token expiring in ${remainingSeconds}s`);
});

client.setTokenErrorCallback((error) => {
  console.error('Token error:', error);
});

Because the broker delivers no "messages queued/sent" counts, the offline transition is signaled by setOfflineCallback / setOnlineCallback (no message count). Re-subscribe idempotently inside setOnlineCallback (or onConnectionStatusChange) rather than relying on a session-present flag.

Utility Methods

getClientId()

Get the MQTT client ID, or null before the first connection.

client.getClientId(): string | null

getSubscriptions()

Get the list of currently subscribed topic filters.

client.getSubscriptions(): string[]

getConfig()

Get the resolved client configuration.

client.getConfig(): ClientOptions

getEnvironment()

Get detected environment information (platform, default transport, etc.).

client.getEnvironment(): EnvironmentInfo

getTokenInfo()

Get the current token information, or null if token auth is not in use.

client.getTokenInfo(): TokenInfo | null

On this page