Methods & Events
Client methods, events, and TypeScript types
Complete reference for CloudSignalClient methods and event callbacks.
Both import forms resolve to the same class:
import CloudSignal from '@cloudsignal/mqtt-client';
// or
import { CloudSignalClient } from '@cloudsignal/mqtt-client';Connection Methods
connect()
Establish a connection using a username and password.
await client.connect(config: ConnectionConfig): Promise<void>
interface ConnectionConfig {
host: string;
username?: string;
password?: string;
clientId?: string;
willTopic?: string;
willMessage?: string;
willQos?: 0 | 1 | 2;
willRetain?: boolean;
}Resolves when connected; rejects if authentication fails or the connection times out.
connectWithToken()
Establish a connection using token authentication. Requires tokenServiceUrl in the constructor options.
await client.connectWithToken(config: TokenAuthConfig): Promise<TokenInfo>
interface TokenAuthConfig {
host: string;
organizationId: string;
secretKey?: string; // native auth (with userEmail)
userEmail?: string; // native auth (with secretKey)
externalToken?: string; // external IdP exchange (Supabase, Firebase, Auth0, ...)
provider?: 'supabase' | 'firebase' | 'auth0' | 'clerk' | 'oidc';
clientId?: string;
willTopic?: string;
willMessage?: string;
willQos?: 0 | 1 | 2;
}Provide either externalToken (to exchange an IdP token) or both secretKey and userEmail (native CloudSignal auth).
disconnect()
Gracefully disconnect from the broker.
await client.disconnect(force?: boolean): Promise<void>forceReconnect()
Tear down the current connection and reconnect using the last connection config.
client.forceReconnect(): voiddestroy()
Disconnect and release all resources (token manager, request handler, callbacks, queues). The client cannot be reused after this.
client.destroy(): voidisConnected()
Check whether the client is currently connected.
client.isConnected(): booleangetConnectionState()
Return the current connection state as a string: 'disconnected', 'connecting', 'connected', 'reconnecting', 'disconnecting', or 'error'.
client.getConnectionState(): stringPublish/Subscribe Methods
transmit()
Publish a message to a topic. If the client is offline and the offline queue is enabled, the message is queued and sent on reconnect.
client.transmit(
topic: string,
message: string | object,
options?: PublishOptions
): Promise<void>
interface PublishOptions {
qos?: 0 | 1 | 2;
retain?: boolean;
}Examples:
// Simple publish (object payloads are JSON-serialized)
client.transmit('sensors/temperature', { value: 22.5 });
// With options
client.transmit('alerts/critical', { message: 'Server down' }, {
qos: 2,
retain: true
});
// String payload
client.transmit('logs/app', 'Application started');subscribe()
Subscribe to a topic. The options object carries the QoS only — incoming messages are delivered through onMessage(), not a per-call handler argument.
await client.subscribe(
topic: string,
options?: SubscribeOptions
): Promise<SubscriptionGrant>
interface SubscribeOptions {
qos?: 0 | 1 | 2;
}Examples:
// Basic subscription
await client.subscribe('chat/messages');
// With QoS
await client.subscribe('orders/new', { qos: 2 });
// Wildcard subscription
await client.subscribe('sensors/+/temperature', { qos: 1 });
// Route messages with onMessage()
client.onMessage((topic, message) => {
if (topic.startsWith('sensors/')) {
const sensorId = topic.split('/')[1];
const data = JSON.parse(message);
console.log(`Sensor ${sensorId}: ${data.value}°C`);
}
});unsubscribe()
Unsubscribe from a topic.
await client.unsubscribe(topic: string): Promise<void>getSubscriptions()
Return the list of currently subscribed topic filters.
client.getSubscriptions(): string[]isSubscribed()
Check whether the client is subscribed to a topic filter.
client.isSubscribed(topic: string): booleanMessage Handling
Incoming messages are delivered to global callbacks registered with onMessage(). There is no per-subscription handler argument; route by topic inside the callback.
onMessage()
Register a callback invoked for every incoming message. The argument order is (topic, message, packet), where message is the payload as a string and packet is the full MQTT packet (including MQTT 5 properties).
client.onMessage(
callback: (topic: string, message: string, packet: any) => void
): voidExample:
client.onMessage((topic, message, packet) => {
console.log(`${topic}: ${message}`);
// packet.properties holds MQTT 5 user/correlation properties when present
});You can register multiple callbacks; each is invoked for every message.
offMessage()
Remove a previously registered message callback.
client.offMessage(callback: Function): voidRequest/Response Pattern
For AI agent and RPC-style communication, the client can send a request and await a correlated response using MQTT 5 correlationData and responseTopic properties. Enable it in the constructor:
const client = new CloudSignal({ enableRequestResponse: true });The client exposes only the requester side (request()). There is no built-in responder helper — the service that answers requests reads the inbound responseTopic / correlationData properties from the packet (delivered via onMessage()) and publishes its reply back with transmit().
request()
Send a request and wait for the correlated response. Requires enableRequestResponse: true.
const response = await client.request(
topic: string,
payload: object | string,
options?: RequestOptions
): Promise<RequestResponse>
interface RequestOptions {
timeout?: number; // ms, default: 30000
qos?: 0 | 1 | 2; // default: 1
userProperties?: Record<string, string>;
messageType?: string;
}
interface RequestResponse {
correlationId: string;
topic: string;
payload: any; // parsed JSON, or the raw string
properties: any; // MQTT 5 properties from the response packet
userProperties: Record<string, string>;
receivedAt: number;
latencyMs: number;
}The promise rejects with a timeout error if no response arrives within timeout.
Example:
// Send request to an AI agent and await its reply
const response = await client.request('ai/assistant', {
prompt: 'Summarize this document',
documentId: 'doc-123'
}, { timeout: 60000 });
console.log(response.payload.summary);
console.log(`Round trip: ${response.latencyMs}ms`);Responding to requests
There is no onRequest() method. To answer requests, subscribe to the request topic, read the MQTT 5 responseTopic and correlationData from the packet, and reply with transmit():
await client.subscribe('ai/assistant');
client.onMessage(async (topic, message, packet) => {
if (topic !== 'ai/assistant') return;
const props = packet.properties || {};
const responseTopic = props.responseTopic;
const correlationData = props.correlationData; // Buffer
if (!responseTopic) return;
const result = await processAIRequest(JSON.parse(message));
client.transmit(responseTopic, result, {
qos: 1,
properties: { correlationData },
});
});getRequestStats()
Return request/response statistics, or null if the pattern is not enabled.
client.getRequestStats(): RequestStats | null
interface RequestStats {
requestsSent: number;
responsesReceived: number;
timeouts: number;
errors: number;
pendingRequests: number;
}Event Callbacks
There is no generic .on() / .off() / .once(). Lifecycle events are wired through setter methods (preferred) or by assigning the corresponding callback property directly.
Connection events:
// Fires whenever the connection state changes
client.setConnectionStatusCallback((isConnected, state) => {
console.log('Connected:', isConnected, 'state:', state);
});
// Fires once the client comes online (good place to re-subscribe)
client.setOnlineCallback(() => {
console.log('Connected to broker');
});
// Fires when the client goes offline
client.setOfflineCallback(() => {
console.log('Disconnected');
});
// Fires on each reconnect attempt
client.setReconnectingCallback((attempt) => {
console.log(`Reconnecting... attempt ${attempt}`);
});
// Fires when the broker rejects the credentials
client.setAuthErrorCallback((error) => {
console.error('Auth failed:', error.message);
});Each setter has an equivalent assignable property:
client.onConnectionStatusChange = (isConnected, state) => { /* ... */ };
client.onOnline = () => { /* ... */ };
client.onOffline = () => { /* ... */ };
client.onReconnecting = (attempt) => { /* ... */ };
client.onAuthError = (error) => { /* ... */ };Token events (available when tokenServiceUrl is configured):
client.setTokenRefreshedCallback((newTokenInfo) => {
console.log('Token refreshed');
});
client.setTokenExpiringCallback((remainingSeconds) => {
console.log(`Token expiring in ${remainingSeconds}s`);
});
client.setTokenErrorCallback((error) => {
console.error('Token error:', error);
});Because the broker delivers no "messages queued/sent" counts, the offline transition is signaled by setOfflineCallback / setOnlineCallback (no message count). Re-subscribe idempotently inside setOnlineCallback (or onConnectionStatusChange) rather than relying on a session-present flag.
Utility Methods
getClientId()
Get the MQTT client ID, or null before the first connection.
client.getClientId(): string | nullgetSubscriptions()
Get the list of currently subscribed topic filters.
client.getSubscriptions(): string[]getConfig()
Get the resolved client configuration.
client.getConfig(): ClientOptionsgetEnvironment()
Get detected environment information (platform, default transport, etc.).
client.getEnvironment(): EnvironmentInfogetTokenInfo()
Get the current token information, or null if token auth is not in use.
client.getTokenInfo(): TokenInfo | null