CloudSignal Docs
Getting StartedUse Cases

AI-era applications

Coordinate AI agents, stream LLM tokens, and run autonomous systems with CloudSignal's pub/sub messaging.

Modern AI systems usually involve multiple agents, streaming outputs, and coordination across services. CloudSignal gives those components a shared message bus so you can fan-out tokens, dispatch tool calls, and broadcast results without building your own infrastructure.

Why MQTT for AI applications?

Traditional AI architectures lean on HTTP for everything. As systems grow more agents, streaming outputs, and real-time coordination, HTTP starts to push back:

HTTPMQTT (CloudSignal)
Request/response onlyPub/sub + request/response
Polling for updatesReal-time push
Point-to-pointBroadcast to many
Connection per requestPersistent connections
High overheadMinimal overhead

MQTT fits when you need:

NeedWhat pub/sub gives you
Multiple consumers for the same dataA single publish reaches every subscriber
Real-time streaming of resultsTokens arrive as they're produced
Loose coupling between componentsAdd or remove agents without code changes
Reliable delivery with retriesQoS 1 or 2 handles transient failures

Use case: multi-agent coordination

AI agents working together need to share state and coordinate actions.

Orchestrator-driven agents
Orchestrator
CloudSignal
Agent A (research)
Agent B (analysis)
PUBLISH tasks/research
Deliver task
PUBLISH results/agent-a
Deliver result
PUBLISH tasks/analysis
Deliver task
PUBLISH results/agent-b
Deliver result

Implementation

import paho.mqtt.client as mqtt
import json

class AIOrchestrator:
    def __init__(self):
        self.client = mqtt.Client(client_id='orchestrator')
        self.client.username_pw_set('orchestrator@org_k7xm4pqr2n5t', 'password')
        self.client.on_message = self.handle_result

    def dispatch_task(self, agent, task):
        """Send a task to a specific agent."""
        self.client.publish(
            f'tasks/{agent}',
            json.dumps(task),
            qos=1,
        )

    def handle_result(self, client, userdata, msg):
        """Process results from any agent."""
        result = json.loads(msg.payload.decode())
        agent = msg.topic.split('/')[1]
        print(f'Result from {agent}: {result}')

        # Trigger next step in the pipeline
        if agent == 'research':
            self.dispatch_task('analysis', {
                'type': 'analyze',
                'data': result['findings'],
            })

    def start(self):
        self.client.connect('mqtt.cloudsignal.app', 8883)
        self.client.subscribe('results/#')
        self.client.loop_forever()

Use case: LLM streaming output

Stream LLM responses token-by-token to clients.

Token stream fan-out
LLM server
CloudSignal
Client 1
Client 2
Logger

Implementation

// Server: stream LLM output
import CloudSignal from '@cloudsignal/mqtt-client';

async function streamLLMResponse(prompt: string, userId: string) {
  const stream = await openai.chat.completions.create({
    model: 'gpt-4',
    messages: [{ role: 'user', content: prompt }],
    stream: true,
  });

  const topic = `llm/stream/${userId}`;

  for await (const chunk of stream) {
    const token = chunk.choices[0]?.delta?.content || '';
    if (token) {
      mqttClient.transmit(topic, {
        type: 'token',
        content: token,
        timestamp: Date.now(),
      });
    }
  }

  // Signal completion
  mqttClient.transmit(topic, { type: 'done', timestamp: Date.now() });
}
// Client: receive streaming response
const [response, setResponse] = useState('');

useEffect(() => {
  client.subscribe(`llm/stream/${userId}`);

  client.onMessage((_topic, msg: any) => {
    if (msg.type === 'token') {
      setResponse((prev) => prev + msg.content);
    } else if (msg.type === 'done') {
      console.log('Stream complete');
    }
  });
}, []);

Use case: AI tool execution

AI agents need to execute tools and report results:

Tool execution
AI agent
CloudSignal
Tool server
PUBLISH tools/exec/search
Deliver request
PUBLISH tools/result/agent-123
Deliver result

Implementation

# Tool server
import paho.mqtt.client as mqtt
import json

class ToolServer:
    def __init__(self, tool_name):
        self.tool_name = tool_name
        self.client = mqtt.Client(client_id=f'tool-{tool_name}')
        self.client.on_message = self.handle_request

    def handle_request(self, client, userdata, msg):
        request = json.loads(msg.payload.decode())
        agent_id = request['agent_id']

        # Execute the tool
        result = self.execute(request['params'])

        # Send the result back to the requesting agent
        self.client.publish(
            f'tools/result/{agent_id}',
            json.dumps({
                'tool': self.tool_name,
                'result': result,
                'request_id': request['request_id'],
            }),
            qos=1,
        )

    def execute(self, params):
        # Tool-specific logic
        pass

    def start(self):
        self.client.connect('mqtt.cloudsignal.app', 8883)
        self.client.subscribe(f'tools/exec/{self.tool_name}')
        self.client.loop_forever()

Use case: autonomous system monitoring

Monitor autonomous systems in real time:

# Autonomous system publishes telemetry
import json
import time

def publish_telemetry(system_id, metrics):
    client.publish(
        f'autonomous/{system_id}/telemetry',
        json.dumps({
            'cpu': metrics['cpu'],
            'memory': metrics['memory'],
            'active_tasks': metrics['tasks'],
            'decisions_per_min': metrics['decisions'],
            'timestamp': time.time(),
        }),
        qos=0,  # High-frequency, loss acceptable
    )

# Publish decisions for audit
def publish_decision(system_id, decision):
    client.publish(
        f'autonomous/{system_id}/decisions',
        json.dumps({
            'action': decision['action'],
            'reasoning': decision['reasoning'],
            'confidence': decision['confidence'],
            'timestamp': time.time(),
        }),
        qos=1,        # Important, ensure delivery
        retain=True,  # Keep latest decision
    )

Benefits for AI applications

BenefitWhat it enables
Efficient distributionOne publish reaches every dashboard, logger, and downstream agent
Decoupled architectureAdd or remove agents without changes to publishers
Reliable deliveryQoS and persistent sessions handle transient failures
Low latencySub-millisecond routing for token streams and coordination

Start with the Node.js quickstart for backend agent implementation or the Python quickstart for ML/AI workloads.

On this page