CloudSignal Docs
Use Cases

AI Transport

Stream LLM tokens over MQTT for real-time AI chat without managing WebSocket servers.

MQTT provides an efficient transport layer for AI applications that stream tokens from large language models. Instead of building and scaling a WebSocket server, you publish prompts and stream responses through CloudSignal's broker.

Architecture

The client publishes a user prompt to a request topic. A server process subscribes to that topic, calls the LLM API, and streams each token as a separate MQTT publish to a response topic that the client is subscribed to.

AI Token Streaming
Browser
CloudSignal
Your Server
Publish prompt
Deliver to subscriber
Publish token 1
Deliver token 1
Publish token 2
Deliver token 2
Publish [done]
Deliver [done]

Topic Design

TopicDirectionPurpose
ai/{session_id}/requestClient to serverUser prompt
ai/{session_id}/responseServer to clientStreamed tokens
ai/{session_id}/statusServer to clientProcessing state (thinking, generating, done)

Each conversation uses a unique session_id so that multiple users can chat simultaneously without interference.

How It Works

  1. Client connects and subscribes to ai/{session_id}/response and ai/{session_id}/status
  2. User sends a message - client publishes the prompt to ai/{session_id}/request
  3. Server receives the prompt, calls the LLM API with streaming enabled
  4. Each token from the LLM stream is published as a separate message to ai/{session_id}/response
  5. Client renders tokens as they arrive, producing a typewriter effect
  6. Server publishes a final [done] message to signal the stream is complete

Benefits Over WebSockets

WebSocketMQTT (CloudSignal)
Must build and scale a WS serverBroker handles connections
Custom reconnection logicBuilt-in reconnect with session restore
Firewall issues on non-standard portsWorks over TLS (8883) or WSS (18885)
Point-to-point onlyMultiple subscribers can observe the same stream
No offline queuingPersistent sessions queue messages during disconnection

Client Implementation

import { connect } from '@cloudsignal/mqtt-client'

const sessionId = crypto.randomUUID()

const client = await connect({
  host: 'wss://connect.cloudsignal.app:18885/',
  username: 'chat-user',
  password: 'your-token'
})

// Subscribe to response stream
await client.subscribe(`ai/${sessionId}/response`)
await client.subscribe(`ai/${sessionId}/status`)

// Accumulate tokens into the response
let response = ''

client.on('message', (topic, payload) => {
  const data = payload.toString()

  if (topic.endsWith('/response')) {
    if (data === '[done]') {
      console.log('Complete response:', response)
    } else {
      response += data
      renderToken(data) // Update UI incrementally
    }
  }

  if (topic.endsWith('/status')) {
    updateStatusIndicator(data) // "thinking", "generating", "done"
  }
})

// Send a prompt
function sendMessage(prompt) {
  response = ''
  client.publish(`ai/${sessionId}/request`, JSON.stringify({
    prompt,
    model: 'claude-sonnet-4-20250514',
    max_tokens: 1024
  }))
}

Server Implementation

import json
import anthropic
import paho.mqtt.client as mqtt

mqtt_client = mqtt.Client()
mqtt_client.username_pw_set("ai-server", "your-server-token")
mqtt_client.tls_set()
mqtt_client.connect("mqtt.cloudsignal.app", 8883)
mqtt_client.loop_start()

anthropic_client = anthropic.Anthropic()

def on_message(client, userdata, msg):
    topic = msg.topic
    session_id = topic.split("/")[1]
    data = json.loads(msg.payload)

    # Publish status
    client.publish(f"ai/{session_id}/status", "thinking")

    # Stream response from Claude
    client.publish(f"ai/{session_id}/status", "generating")

    with anthropic_client.messages.stream(
        model=data.get("model", "claude-sonnet-4-20250514"),
        max_tokens=data.get("max_tokens", 1024),
        messages=[{"role": "user", "content": data["prompt"]}]
    ) as stream:
        for text in stream.text_stream:
            client.publish(f"ai/{session_id}/response", text)

    # Signal completion
    client.publish(f"ai/{session_id}/response", "[done]")
    client.publish(f"ai/{session_id}/status", "done")

mqtt_client.on_message = on_message
mqtt_client.subscribe("ai/+/request")

The server subscribes to ai/+/request using a single-level wildcard. This means one server process handles requests from all sessions. Scale horizontally by running multiple server instances - MQTT shared subscriptions distribute load automatically.

Next Steps

On this page