AI Transport
Stream LLM tokens over MQTT for real-time AI chat without managing WebSocket servers.
MQTT provides an efficient transport layer for AI applications that stream tokens from large language models. Instead of building and scaling a WebSocket server, you publish prompts and stream responses through CloudSignal's broker.
Architecture
The client publishes a user prompt to a request topic. A server process subscribes to that topic, calls the LLM API, and streams each token as a separate MQTT publish to a response topic that the client is subscribed to.
Topic Design
| Topic | Direction | Purpose |
|---|---|---|
ai/{session_id}/request | Client to server | User prompt |
ai/{session_id}/response | Server to client | Streamed tokens |
ai/{session_id}/status | Server to client | Processing state (thinking, generating, done) |
Each conversation uses a unique session_id so that multiple users can chat simultaneously without interference.
How It Works
- Client connects and subscribes to
ai/{session_id}/responseandai/{session_id}/status - User sends a message - client publishes the prompt to
ai/{session_id}/request - Server receives the prompt, calls the LLM API with streaming enabled
- Each token from the LLM stream is published as a separate message to
ai/{session_id}/response - Client renders tokens as they arrive, producing a typewriter effect
- Server publishes a final
[done]message to signal the stream is complete
Benefits Over WebSockets
| WebSocket | MQTT (CloudSignal) |
|---|---|
| Must build and scale a WS server | Broker handles connections |
| Custom reconnection logic | Built-in reconnect with session restore |
| Firewall issues on non-standard ports | Works over TLS (8883) or WSS (18885) |
| Point-to-point only | Multiple subscribers can observe the same stream |
| No offline queuing | Persistent sessions queue messages during disconnection |
Client Implementation
import { connect } from '@cloudsignal/mqtt-client'
const sessionId = crypto.randomUUID()
const client = await connect({
host: 'wss://connect.cloudsignal.app:18885/',
username: 'chat-user',
password: 'your-token'
})
// Subscribe to response stream
await client.subscribe(`ai/${sessionId}/response`)
await client.subscribe(`ai/${sessionId}/status`)
// Accumulate tokens into the response
let response = ''
client.on('message', (topic, payload) => {
const data = payload.toString()
if (topic.endsWith('/response')) {
if (data === '[done]') {
console.log('Complete response:', response)
} else {
response += data
renderToken(data) // Update UI incrementally
}
}
if (topic.endsWith('/status')) {
updateStatusIndicator(data) // "thinking", "generating", "done"
}
})
// Send a prompt
function sendMessage(prompt) {
response = ''
client.publish(`ai/${sessionId}/request`, JSON.stringify({
prompt,
model: 'claude-sonnet-4-20250514',
max_tokens: 1024
}))
}Server Implementation
import json
import anthropic
import paho.mqtt.client as mqtt
mqtt_client = mqtt.Client()
mqtt_client.username_pw_set("ai-server", "your-server-token")
mqtt_client.tls_set()
mqtt_client.connect("mqtt.cloudsignal.app", 8883)
mqtt_client.loop_start()
anthropic_client = anthropic.Anthropic()
def on_message(client, userdata, msg):
topic = msg.topic
session_id = topic.split("/")[1]
data = json.loads(msg.payload)
# Publish status
client.publish(f"ai/{session_id}/status", "thinking")
# Stream response from Claude
client.publish(f"ai/{session_id}/status", "generating")
with anthropic_client.messages.stream(
model=data.get("model", "claude-sonnet-4-20250514"),
max_tokens=data.get("max_tokens", 1024),
messages=[{"role": "user", "content": data["prompt"]}]
) as stream:
for text in stream.text_stream:
client.publish(f"ai/{session_id}/response", text)
# Signal completion
client.publish(f"ai/{session_id}/response", "[done]")
client.publish(f"ai/{session_id}/status", "done")
mqtt_client.on_message = on_message
mqtt_client.subscribe("ai/+/request")The server subscribes to ai/+/request using a single-level wildcard. This means one server process handles requests from all sessions. Scale horizontally by running multiple server instances - MQTT shared subscriptions distribute load automatically.
Next Steps
- AI Chat SDK reference - pre-built React components for AI chat interfaces
- QoS Levels - QoS 0 is recommended for token streaming to minimize latency
- Offline & Retain - persistent sessions can buffer tokens during brief disconnections