Back to blog
TutorialPythonAI Agents

Building a Multi-Agent System with CloudSignal: A Practical Guide

Learn how to connect AI agents using MQTT pub/sub messaging. We'll build a three-agent pipeline that handles research, summarization, and notifications.

January 7, 2026
6 min read
By CloudSignal Team

In our previous post, we explored why multi-agent systems fail and how pub/sub messaging solves the communication problem. Now let's build one.

We'll create a three-agent pipeline:

  1. Research Agent — Takes a topic and gathers information
  2. Summary Agent — Receives research and creates a concise summary
  3. Notifier Agent — Sends the summary to Slack (or any webhook)

Each agent runs independently, communicates via MQTT topics, and handles offline scenarios gracefully.

Prerequisites

  • Python 3.10+
  • A CloudSignal account (sign up free)
  • OpenAI API key (for the AI agents)

Step 1: Set Up CloudSignal

After signing up, grab your MQTT credentials from the CloudSignal dashboard:

  • Host: mqtt.cloudsignal.app
  • Port: 8883 (TLS)
  • Username: Your CloudSignal username
  • Password: Your CloudSignal password

Step 2: Install Dependencies

pip install paho-mqtt openai python-dotenv

Create a .env file:

CLOUDSIGNAL_HOST=mqtt.cloudsignal.app
CLOUDSIGNAL_PORT=8883
CLOUDSIGNAL_USERNAME=your-username
CLOUDSIGNAL_PASSWORD=your-password
OPENAI_API_KEY=sk-...
SLACK_WEBHOOK_URL=https://hooks.slack.com/...

Step 3: Create the Base Agent Class

Let's create a reusable base class that handles MQTT connection and message handling:

# base_agent.py
import os
import json
import ssl
import paho.mqtt.client as mqtt
from dotenv import load_dotenv

load_dotenv()

class BaseAgent:
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.client = mqtt.Client(
            client_id=f"agent-{agent_id}",
            protocol=mqtt.MQTTv5
        )
        
        # Set up TLS
        self.client.tls_set(cert_reqs=ssl.CERT_REQUIRED)
        
        # Set credentials
        self.client.username_pw_set(
            os.getenv("CLOUDSIGNAL_USERNAME"),
            os.getenv("CLOUDSIGNAL_PASSWORD")
        )
        
        # Set callbacks
        self.client.on_connect = self._on_connect
        self.client.on_message = self._on_message
        self.client.on_disconnect = self._on_disconnect
        
        self.subscriptions = []
    
    def _on_connect(self, client, userdata, flags, rc, properties=None):
        print(f"[{self.agent_id}] Connected with result code {rc}")
        # Resubscribe on reconnect
        for topic in self.subscriptions:
            self.client.subscribe(topic, qos=1)
            print(f"[{self.agent_id}] Subscribed to {topic}")
    
    def _on_message(self, client, userdata, msg):
        try:
            payload = json.loads(msg.payload.decode())
            self.handle_message(msg.topic, payload)
        except Exception as e:
            print(f"[{self.agent_id}] Error handling message: {e}")
    
    def _on_disconnect(self, client, userdata, rc, properties=None):
        print(f"[{self.agent_id}] Disconnected. Will auto-reconnect.")
    
    def subscribe(self, topic: str):
        self.subscriptions.append(topic)
        if self.client.is_connected():
            self.client.subscribe(topic, qos=1)
    
    def publish(self, topic: str, payload: dict):
        self.client.publish(
            topic,
            json.dumps(payload),
            qos=1  # At least once delivery
        )
        print(f"[{self.agent_id}] Published to {topic}")
    
    def handle_message(self, topic: str, payload: dict):
        """Override this in subclasses"""
        pass
    
    def start(self):
        self.client.connect(
            os.getenv("CLOUDSIGNAL_HOST"),
            int(os.getenv("CLOUDSIGNAL_PORT")),
            keepalive=60
        )
        self.client.loop_forever()

Step 4: Build the Research Agent

The research agent listens for research requests and publishes findings:

# research_agent.py
from openai import OpenAI
from base_agent import BaseAgent

class ResearchAgent(BaseAgent):
    def __init__(self):
        super().__init__("research")
        self.openai = OpenAI()
        self.subscribe("tasks/research")
    
    def handle_message(self, topic: str, payload: dict):
        task_id = payload.get("task_id")
        query = payload.get("query")
        
        print(f"[research] Researching: {query}")
        
        # Use OpenAI to "research" (in production, you'd use web search)
        response = self.openai.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "You are a research assistant. Provide detailed, factual information."},
                {"role": "user", "content": f"Research the following topic and provide key findings: {query}"}
            ]
        )
        
        findings = response.choices[0].message.content
        
        # Publish results
        self.publish("results/research", {
            "task_id": task_id,
            "query": query,
            "findings": findings
        })

if __name__ == "__main__":
    agent = ResearchAgent()
    agent.start()

Step 5: Build the Summary Agent

The summary agent receives research and creates concise summaries:

# summary_agent.py
from openai import OpenAI
from base_agent import BaseAgent

class SummaryAgent(BaseAgent):
    def __init__(self):
        super().__init__("summary")
        self.openai = OpenAI()
        self.subscribe("results/research")
    
    def handle_message(self, topic: str, payload: dict):
        task_id = payload.get("task_id")
        query = payload.get("query")
        findings = payload.get("findings")
        
        print(f"[summary] Summarizing research for: {query}")
        
        response = self.openai.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "Create concise, actionable summaries. Use bullet points."},
                {"role": "user", "content": f"Summarize these research findings in 3-5 bullet points:\n\n{findings}"}
            ]
        )
        
        summary = response.choices[0].message.content
        
        # Publish summary
        self.publish("results/summary", {
            "task_id": task_id,
            "query": query,
            "summary": summary
        })

if __name__ == "__main__":
    agent = SummaryAgent()
    agent.start()

Step 6: Build the Notifier Agent

The notifier sends completed summaries to Slack:

# notifier_agent.py
import os
import requests
from base_agent import BaseAgent

class NotifierAgent(BaseAgent):
    def __init__(self):
        super().__init__("notifier")
        self.webhook_url = os.getenv("SLACK_WEBHOOK_URL")
        self.subscribe("results/summary")
    
    def handle_message(self, topic: str, payload: dict):
        query = payload.get("query")
        summary = payload.get("summary")
        
        print(f"[notifier] Sending notification for: {query}")
        
        # Send to Slack
        requests.post(self.webhook_url, json={
            "text": f"*Research Complete: {query}*\n\n{summary}"
        })
        
        print(f"[notifier] Notification sent!")

if __name__ == "__main__":
    agent = NotifierAgent()
    agent.start()

Step 7: Create the Trigger Script

Finally, a simple script to kick off research tasks:

# trigger.py
import uuid
from base_agent import BaseAgent

class TriggerAgent(BaseAgent):
    def __init__(self):
        super().__init__("trigger")
    
    def request_research(self, query: str):
        task_id = str(uuid.uuid4())[:8]
        self.publish("tasks/research", {
            "task_id": task_id,
            "query": query
        })
        print(f"[trigger] Requested research: {query} (task: {task_id})")

if __name__ == "__main__":
    import sys
    agent = TriggerAgent()
    
    # Connect briefly to publish
    agent.client.connect(
        os.getenv("CLOUDSIGNAL_HOST"),
        int(os.getenv("CLOUDSIGNAL_PORT"))
    )
    agent.client.loop_start()
    
    query = " ".join(sys.argv[1:]) or "Benefits of MQTT for AI agents"
    agent.request_research(query)
    
    import time
    time.sleep(1)  # Wait for publish
    agent.client.disconnect()

Running the System

Open four terminal windows:

# Terminal 1 - Research Agent
python research_agent.py

# Terminal 2 - Summary Agent  
python summary_agent.py

# Terminal 3 - Notifier Agent
python notifier_agent.py

# Terminal 4 - Trigger a task
python trigger.py "Latest developments in AI agent frameworks"

You'll see:

  1. The trigger publishes to tasks/research
  2. Research agent picks it up, processes, publishes to results/research
  3. Summary agent picks that up, summarizes, publishes to results/summary
  4. Notifier agent sends the final summary to Slack

What Makes This Robust

Offline Handling

Stop the summary agent mid-task. The research results queue in CloudSignal. Restart the summary agent—it picks up right where it left off.

Easy Scaling

Need two research agents for faster processing? Just run another instance. MQTT load-balances automatically with shared subscriptions.

Loose Coupling

Add a new agent that also listens to results/summary? No changes needed to existing agents. Just subscribe to the topic.

Observable

Every message flows through CloudSignal. You can monitor, debug, and replay messages from the dashboard.

Next Steps

This is a basic example. In production, you'd add:

  • Error handling topics — Publish failures to errors/{agent-id} for monitoring
  • Task status tracking — Use retained messages on status/{task-id}
  • Request-response patterns — For synchronous-style communication when needed
  • Authentication per agent — Each agent gets its own credentials with topic-level ACLs

Conclusion

Building multi-agent systems doesn't have to be complex. With pub/sub messaging:

  • Agents are decoupled and independently deployable
  • Messages are guaranteed to deliver
  • Offline scenarios are handled automatically
  • Scaling is straightforward

The infrastructure disappears. You focus on what your agents actually do.


Ready to build? Sign up for CloudSignal and get 50,000 messages/month free. No credit card required.

Ready to get started?

Try CloudSignal free and connect your first agents in minutes.

Start Building Free