Back to blog
TutorialPythonAI Agents

Building a Multi-Agent System with CloudSignal: A Practical Guide

Learn how to connect AI agents using MQTT pub/sub messaging. We'll build a three-agent pipeline that handles research, summarization, and notifications.

January 7, 2026
6 min read
By CloudSignal Team

In our previous post, we explored why multi-agent systems fail and how pub/sub messaging solves the communication problem. Now let's build one.

We'll create a three-agent pipeline:

  1. Research Agent β€” Takes a topic and gathers information
  2. Summary Agent β€” Receives research and creates a concise summary
  3. Notifier Agent β€” Sends the summary to Slack (or any webhook)

Each agent runs independently, communicates via MQTT topics, and handles offline scenarios gracefully.

Prerequisites

  • Python 3.10+
  • A CloudSignal account (sign up free)
  • OpenAI API key (for the AI agents)

Step 1: Set Up CloudSignal

After signing up, grab your MQTT credentials from the CloudSignal dashboard:

  • Host: mqtt.cloudsignal.app
  • Port: 8883 (TLS)
  • Username: Your CloudSignal username
  • Password: Your CloudSignal password

Step 2: Install Dependencies

pip install paho-mqtt openai python-dotenv

Create a .env file:

CLOUDSIGNAL_HOST=mqtt.cloudsignal.app
CLOUDSIGNAL_PORT=8883
CLOUDSIGNAL_USERNAME=your-username
CLOUDSIGNAL_PASSWORD=your-password
OPENAI_API_KEY=sk-...
SLACK_WEBHOOK_URL=https://hooks.slack.com/...

Step 3: Create the Base Agent Class

Let's create a reusable base class that handles MQTT connection and message handling:

# base_agent.py
import os
import json
import ssl
import paho.mqtt.client as mqtt
from dotenv import load_dotenv

load_dotenv()

class BaseAgent:
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.client = mqtt.Client(
            client_id=f"agent-{agent_id}",
            protocol=mqtt.MQTTv5
        )
        
        # Set up TLS
        self.client.tls_set(cert_reqs=ssl.CERT_REQUIRED)
        
        # Set credentials
        self.client.username_pw_set(
            os.getenv("CLOUDSIGNAL_USERNAME"),
            os.getenv("CLOUDSIGNAL_PASSWORD")
        )
        
        # Set callbacks
        self.client.on_connect = self._on_connect
        self.client.on_message = self._on_message
        self.client.on_disconnect = self._on_disconnect
        
        self.subscriptions = []
    
    def _on_connect(self, client, userdata, flags, rc, properties=None):
        print(f"[{self.agent_id}] Connected with result code {rc}")
        # Resubscribe on reconnect
        for topic in self.subscriptions:
            self.client.subscribe(topic, qos=1)
            print(f"[{self.agent_id}] Subscribed to {topic}")
    
    def _on_message(self, client, userdata, msg):
        try:
            payload = json.loads(msg.payload.decode())
            self.handle_message(msg.topic, payload)
        except Exception as e:
            print(f"[{self.agent_id}] Error handling message: {e}")
    
    def _on_disconnect(self, client, userdata, rc, properties=None):
        print(f"[{self.agent_id}] Disconnected. Will auto-reconnect.")
    
    def subscribe(self, topic: str):
        self.subscriptions.append(topic)
        if self.client.is_connected():
            self.client.subscribe(topic, qos=1)
    
    def publish(self, topic: str, payload: dict):
        self.client.publish(
            topic,
            json.dumps(payload),
            qos=1  # At least once delivery
        )
        print(f"[{self.agent_id}] Published to {topic}")
    
    def handle_message(self, topic: str, payload: dict):
        """Override this in subclasses"""
        pass
    
    def start(self):
        self.client.connect(
            os.getenv("CLOUDSIGNAL_HOST"),
            int(os.getenv("CLOUDSIGNAL_PORT")),
            keepalive=60
        )
        self.client.loop_forever()

Step 4: Build the Research Agent

The research agent listens for research requests and publishes findings:

# research_agent.py
from openai import OpenAI
from base_agent import BaseAgent

class ResearchAgent(BaseAgent):
    def __init__(self):
        super().__init__("research")
        self.openai = OpenAI()
        self.subscribe("tasks/research")
    
    def handle_message(self, topic: str, payload: dict):
        task_id = payload.get("task_id")
        query = payload.get("query")
        
        print(f"[research] Researching: {query}")
        
        # Use OpenAI to "research" (in production, you'd use web search)
        response = self.openai.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "You are a research assistant. Provide detailed, factual information."},
                {"role": "user", "content": f"Research the following topic and provide key findings: {query}"}
            ]
        )
        
        findings = response.choices[0].message.content
        
        # Publish results
        self.publish("results/research", {
            "task_id": task_id,
            "query": query,
            "findings": findings
        })

if __name__ == "__main__":
    agent = ResearchAgent()
    agent.start()

Step 5: Build the Summary Agent

The summary agent receives research and creates concise summaries:

# summary_agent.py
from openai import OpenAI
from base_agent import BaseAgent

class SummaryAgent(BaseAgent):
    def __init__(self):
        super().__init__("summary")
        self.openai = OpenAI()
        self.subscribe("results/research")
    
    def handle_message(self, topic: str, payload: dict):
        task_id = payload.get("task_id")
        query = payload.get("query")
        findings = payload.get("findings")
        
        print(f"[summary] Summarizing research for: {query}")
        
        response = self.openai.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "Create concise, actionable summaries. Use bullet points."},
                {"role": "user", "content": f"Summarize these research findings in 3-5 bullet points:\n\n{findings}"}
            ]
        )
        
        summary = response.choices[0].message.content
        
        # Publish summary
        self.publish("results/summary", {
            "task_id": task_id,
            "query": query,
            "summary": summary
        })

if __name__ == "__main__":
    agent = SummaryAgent()
    agent.start()

Step 6: Build the Notifier Agent

The notifier sends completed summaries to Slack:

# notifier_agent.py
import os
import requests
from base_agent import BaseAgent

class NotifierAgent(BaseAgent):
    def __init__(self):
        super().__init__("notifier")
        self.webhook_url = os.getenv("SLACK_WEBHOOK_URL")
        self.subscribe("results/summary")
    
    def handle_message(self, topic: str, payload: dict):
        query = payload.get("query")
        summary = payload.get("summary")
        
        print(f"[notifier] Sending notification for: {query}")
        
        # Send to Slack
        requests.post(self.webhook_url, json={
            "text": f"*Research Complete: {query}*\n\n{summary}"
        })
        
        print(f"[notifier] Notification sent!")

if __name__ == "__main__":
    agent = NotifierAgent()
    agent.start()

Step 7: Create the Trigger Script

Finally, a simple script to kick off research tasks:

# trigger.py
import uuid
from base_agent import BaseAgent

class TriggerAgent(BaseAgent):
    def __init__(self):
        super().__init__("trigger")
    
    def request_research(self, query: str):
        task_id = str(uuid.uuid4())[:8]
        self.publish("tasks/research", {
            "task_id": task_id,
            "query": query
        })
        print(f"[trigger] Requested research: {query} (task: {task_id})")

if __name__ == "__main__":
    import sys
    agent = TriggerAgent()
    
    # Connect briefly to publish
    agent.client.connect(
        os.getenv("CLOUDSIGNAL_HOST"),
        int(os.getenv("CLOUDSIGNAL_PORT"))
    )
    agent.client.loop_start()
    
    query = " ".join(sys.argv[1:]) or "Benefits of MQTT for AI agents"
    agent.request_research(query)
    
    import time
    time.sleep(1)  # Wait for publish
    agent.client.disconnect()

Running the System

Open four terminal windows:

# Terminal 1 - Research Agent
python research_agent.py

# Terminal 2 - Summary Agent  
python summary_agent.py

# Terminal 3 - Notifier Agent
python notifier_agent.py

# Terminal 4 - Trigger a task
python trigger.py "Latest developments in AI agent frameworks"

You'll see:

  1. The trigger publishes to tasks/research
  2. Research agent picks it up, processes, publishes to results/research
  3. Summary agent picks that up, summarizes, publishes to results/summary
  4. Notifier agent sends the final summary to Slack

What Makes This Robust

Offline Handling

Stop the summary agent mid-task. The research results queue in CloudSignal. Restart the summary agentβ€”it picks up right where it left off.

Easy Scaling

Need two research agents for faster processing? Just run another instance. MQTT load-balances automatically with shared subscriptions.

Loose Coupling

Add a new agent that also listens to results/summary? No changes needed to existing agents. Just subscribe to the topic.

Observable

Every message flows through CloudSignal. You can monitor, debug, and replay messages from the dashboard.

Next Steps

This is a basic example. In production, you'd add:

  • Error handling topics β€” Publish failures to errors/{agent-id} for monitoring
  • Task status tracking β€” Use retained messages on status/{task-id}
  • Request-response patterns β€” For synchronous-style communication when needed
  • Authentication per agent β€” Each agent gets its own credentials with topic-level ACLs

Conclusion

Building multi-agent systems doesn't have to be complex. With pub/sub messaging:

  • Agents are decoupled and independently deployable
  • Messages are guaranteed to deliver
  • Offline scenarios are handled automatically
  • Scaling is straightforward

The infrastructure disappears. You focus on what your agents actually do.


Ready to build? Sign up for CloudSignal and get 50,000 messages/month free. No credit card required.

Ready to get started?

Try CloudSignal free and connect your first agents in minutes.

Start Building Free