AI-era applications
Coordinate AI agents, stream LLM tokens, and run autonomous systems with CloudSignal's pub/sub messaging.
Modern AI systems usually involve multiple agents, streaming outputs, and coordination across services. CloudSignal gives those components a shared message bus so you can fan-out tokens, dispatch tool calls, and broadcast results without building your own infrastructure.
Why MQTT for AI applications?
Traditional AI architectures lean on HTTP for everything. As systems grow more agents, streaming outputs, and real-time coordination, HTTP starts to push back:
| HTTP | MQTT (CloudSignal) |
|---|---|
| Request/response only | Pub/sub + request/response |
| Polling for updates | Real-time push |
| Point-to-point | Broadcast to many |
| Connection per request | Persistent connections |
| High overhead | Minimal overhead |
MQTT fits when you need:
| Need | What pub/sub gives you |
|---|---|
| Multiple consumers for the same data | A single publish reaches every subscriber |
| Real-time streaming of results | Tokens arrive as they're produced |
| Loose coupling between components | Add or remove agents without code changes |
| Reliable delivery with retries | QoS 1 or 2 handles transient failures |
Use case: multi-agent coordination
AI agents working together need to share state and coordinate actions.
Implementation
import paho.mqtt.client as mqtt
import json
class AIOrchestrator:
def __init__(self):
self.client = mqtt.Client(client_id='orchestrator')
self.client.username_pw_set('orchestrator@org_k7xm4pqr2n5t', 'password')
self.client.on_message = self.handle_result
def dispatch_task(self, agent, task):
"""Send a task to a specific agent."""
self.client.publish(
f'tasks/{agent}',
json.dumps(task),
qos=1,
)
def handle_result(self, client, userdata, msg):
"""Process results from any agent."""
result = json.loads(msg.payload.decode())
agent = msg.topic.split('/')[1]
print(f'Result from {agent}: {result}')
# Trigger next step in the pipeline
if agent == 'research':
self.dispatch_task('analysis', {
'type': 'analyze',
'data': result['findings'],
})
def start(self):
self.client.connect('mqtt.cloudsignal.app', 8883)
self.client.subscribe('results/#')
self.client.loop_forever()Use case: LLM streaming output
Stream LLM responses token-by-token to clients.
Implementation
// Server: stream LLM output
import CloudSignal from '@cloudsignal/mqtt-client';
async function streamLLMResponse(prompt: string, userId: string) {
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages: [{ role: 'user', content: prompt }],
stream: true,
});
const topic = `llm/stream/${userId}`;
for await (const chunk of stream) {
const token = chunk.choices[0]?.delta?.content || '';
if (token) {
mqttClient.transmit(topic, {
type: 'token',
content: token,
timestamp: Date.now(),
});
}
}
// Signal completion
mqttClient.transmit(topic, { type: 'done', timestamp: Date.now() });
}// Client: receive streaming response
const [response, setResponse] = useState('');
useEffect(() => {
client.subscribe(`llm/stream/${userId}`);
client.onMessage((_topic, msg: any) => {
if (msg.type === 'token') {
setResponse((prev) => prev + msg.content);
} else if (msg.type === 'done') {
console.log('Stream complete');
}
});
}, []);Use case: AI tool execution
AI agents need to execute tools and report results:
Implementation
# Tool server
import paho.mqtt.client as mqtt
import json
class ToolServer:
def __init__(self, tool_name):
self.tool_name = tool_name
self.client = mqtt.Client(client_id=f'tool-{tool_name}')
self.client.on_message = self.handle_request
def handle_request(self, client, userdata, msg):
request = json.loads(msg.payload.decode())
agent_id = request['agent_id']
# Execute the tool
result = self.execute(request['params'])
# Send the result back to the requesting agent
self.client.publish(
f'tools/result/{agent_id}',
json.dumps({
'tool': self.tool_name,
'result': result,
'request_id': request['request_id'],
}),
qos=1,
)
def execute(self, params):
# Tool-specific logic
pass
def start(self):
self.client.connect('mqtt.cloudsignal.app', 8883)
self.client.subscribe(f'tools/exec/{self.tool_name}')
self.client.loop_forever()Use case: autonomous system monitoring
Monitor autonomous systems in real time:
# Autonomous system publishes telemetry
import json
import time
def publish_telemetry(system_id, metrics):
client.publish(
f'autonomous/{system_id}/telemetry',
json.dumps({
'cpu': metrics['cpu'],
'memory': metrics['memory'],
'active_tasks': metrics['tasks'],
'decisions_per_min': metrics['decisions'],
'timestamp': time.time(),
}),
qos=0, # High-frequency, loss acceptable
)
# Publish decisions for audit
def publish_decision(system_id, decision):
client.publish(
f'autonomous/{system_id}/decisions',
json.dumps({
'action': decision['action'],
'reasoning': decision['reasoning'],
'confidence': decision['confidence'],
'timestamp': time.time(),
}),
qos=1, # Important, ensure delivery
retain=True, # Keep latest decision
)Benefits for AI applications
| Benefit | What it enables |
|---|---|
| Efficient distribution | One publish reaches every dashboard, logger, and downstream agent |
| Decoupled architecture | Add or remove agents without changes to publishers |
| Reliable delivery | QoS and persistent sessions handle transient failures |
| Low latency | Sub-millisecond routing for token streams and coordination |
Start with the Node.js quickstart for backend agent implementation or the Python quickstart for ML/AI workloads.
Related
- IoT & edge computing - Device connectivity patterns
- Real-time applications - User-facing real-time features