Building a Multi-Agent System with CloudSignal: A Practical Guide
Learn how to connect AI agents using MQTT pub/sub messaging. We'll build a three-agent pipeline that handles research, summarization, and notifications.
In our previous post, we explored why multi-agent systems fail and how pub/sub messaging solves the communication problem. Now let's build one.
We'll create a three-agent pipeline:
- Research Agent — Takes a topic and gathers information
- Summary Agent — Receives research and creates a concise summary
- Notifier Agent — Sends the summary to Slack (or any webhook)
Each agent runs independently, communicates via MQTT topics, and handles offline scenarios gracefully.
Prerequisites
- Python 3.10+
- A CloudSignal account (sign up free)
- OpenAI API key (for the AI agents)
Step 1: Set Up CloudSignal
After signing up, grab your MQTT credentials from the CloudSignal dashboard:
- Host:
mqtt.cloudsignal.app - Port:
8883(TLS) - Username: Your CloudSignal username
- Password: Your CloudSignal password
Step 2: Install Dependencies
pip install paho-mqtt openai python-dotenv
Create a .env file:
CLOUDSIGNAL_HOST=mqtt.cloudsignal.app
CLOUDSIGNAL_PORT=8883
CLOUDSIGNAL_USERNAME=your-username
CLOUDSIGNAL_PASSWORD=your-password
OPENAI_API_KEY=sk-...
SLACK_WEBHOOK_URL=https://hooks.slack.com/...
Step 3: Create the Base Agent Class
Let's create a reusable base class that handles MQTT connection and message handling:
# base_agent.py
import os
import json
import ssl
import paho.mqtt.client as mqtt
from dotenv import load_dotenv
load_dotenv()
class BaseAgent:
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.client = mqtt.Client(
client_id=f"agent-{agent_id}",
protocol=mqtt.MQTTv5
)
# Set up TLS
self.client.tls_set(cert_reqs=ssl.CERT_REQUIRED)
# Set credentials
self.client.username_pw_set(
os.getenv("CLOUDSIGNAL_USERNAME"),
os.getenv("CLOUDSIGNAL_PASSWORD")
)
# Set callbacks
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect
self.subscriptions = []
def _on_connect(self, client, userdata, flags, rc, properties=None):
print(f"[{self.agent_id}] Connected with result code {rc}")
# Resubscribe on reconnect
for topic in self.subscriptions:
self.client.subscribe(topic, qos=1)
print(f"[{self.agent_id}] Subscribed to {topic}")
def _on_message(self, client, userdata, msg):
try:
payload = json.loads(msg.payload.decode())
self.handle_message(msg.topic, payload)
except Exception as e:
print(f"[{self.agent_id}] Error handling message: {e}")
def _on_disconnect(self, client, userdata, rc, properties=None):
print(f"[{self.agent_id}] Disconnected. Will auto-reconnect.")
def subscribe(self, topic: str):
self.subscriptions.append(topic)
if self.client.is_connected():
self.client.subscribe(topic, qos=1)
def publish(self, topic: str, payload: dict):
self.client.publish(
topic,
json.dumps(payload),
qos=1 # At least once delivery
)
print(f"[{self.agent_id}] Published to {topic}")
def handle_message(self, topic: str, payload: dict):
"""Override this in subclasses"""
pass
def start(self):
self.client.connect(
os.getenv("CLOUDSIGNAL_HOST"),
int(os.getenv("CLOUDSIGNAL_PORT")),
keepalive=60
)
self.client.loop_forever()
Step 4: Build the Research Agent
The research agent listens for research requests and publishes findings:
# research_agent.py
from openai import OpenAI
from base_agent import BaseAgent
class ResearchAgent(BaseAgent):
def __init__(self):
super().__init__("research")
self.openai = OpenAI()
self.subscribe("tasks/research")
def handle_message(self, topic: str, payload: dict):
task_id = payload.get("task_id")
query = payload.get("query")
print(f"[research] Researching: {query}")
# Use OpenAI to "research" (in production, you'd use web search)
response = self.openai.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a research assistant. Provide detailed, factual information."},
{"role": "user", "content": f"Research the following topic and provide key findings: {query}"}
]
)
findings = response.choices[0].message.content
# Publish results
self.publish("results/research", {
"task_id": task_id,
"query": query,
"findings": findings
})
if __name__ == "__main__":
agent = ResearchAgent()
agent.start()
Step 5: Build the Summary Agent
The summary agent receives research and creates concise summaries:
# summary_agent.py
from openai import OpenAI
from base_agent import BaseAgent
class SummaryAgent(BaseAgent):
def __init__(self):
super().__init__("summary")
self.openai = OpenAI()
self.subscribe("results/research")
def handle_message(self, topic: str, payload: dict):
task_id = payload.get("task_id")
query = payload.get("query")
findings = payload.get("findings")
print(f"[summary] Summarizing research for: {query}")
response = self.openai.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Create concise, actionable summaries. Use bullet points."},
{"role": "user", "content": f"Summarize these research findings in 3-5 bullet points:\n\n{findings}"}
]
)
summary = response.choices[0].message.content
# Publish summary
self.publish("results/summary", {
"task_id": task_id,
"query": query,
"summary": summary
})
if __name__ == "__main__":
agent = SummaryAgent()
agent.start()
Step 6: Build the Notifier Agent
The notifier sends completed summaries to Slack:
# notifier_agent.py
import os
import requests
from base_agent import BaseAgent
class NotifierAgent(BaseAgent):
def __init__(self):
super().__init__("notifier")
self.webhook_url = os.getenv("SLACK_WEBHOOK_URL")
self.subscribe("results/summary")
def handle_message(self, topic: str, payload: dict):
query = payload.get("query")
summary = payload.get("summary")
print(f"[notifier] Sending notification for: {query}")
# Send to Slack
requests.post(self.webhook_url, json={
"text": f"*Research Complete: {query}*\n\n{summary}"
})
print(f"[notifier] Notification sent!")
if __name__ == "__main__":
agent = NotifierAgent()
agent.start()
Step 7: Create the Trigger Script
Finally, a simple script to kick off research tasks:
# trigger.py
import uuid
from base_agent import BaseAgent
class TriggerAgent(BaseAgent):
def __init__(self):
super().__init__("trigger")
def request_research(self, query: str):
task_id = str(uuid.uuid4())[:8]
self.publish("tasks/research", {
"task_id": task_id,
"query": query
})
print(f"[trigger] Requested research: {query} (task: {task_id})")
if __name__ == "__main__":
import sys
agent = TriggerAgent()
# Connect briefly to publish
agent.client.connect(
os.getenv("CLOUDSIGNAL_HOST"),
int(os.getenv("CLOUDSIGNAL_PORT"))
)
agent.client.loop_start()
query = " ".join(sys.argv[1:]) or "Benefits of MQTT for AI agents"
agent.request_research(query)
import time
time.sleep(1) # Wait for publish
agent.client.disconnect()
Running the System
Open four terminal windows:
# Terminal 1 - Research Agent
python research_agent.py
# Terminal 2 - Summary Agent
python summary_agent.py
# Terminal 3 - Notifier Agent
python notifier_agent.py
# Terminal 4 - Trigger a task
python trigger.py "Latest developments in AI agent frameworks"
You'll see:
- The trigger publishes to
tasks/research - Research agent picks it up, processes, publishes to
results/research - Summary agent picks that up, summarizes, publishes to
results/summary - Notifier agent sends the final summary to Slack
What Makes This Robust
Offline Handling
Stop the summary agent mid-task. The research results queue in CloudSignal. Restart the summary agent—it picks up right where it left off.
Easy Scaling
Need two research agents for faster processing? Just run another instance. MQTT load-balances automatically with shared subscriptions.
Loose Coupling
Add a new agent that also listens to results/summary? No changes needed to existing agents. Just subscribe to the topic.
Observable
Every message flows through CloudSignal. You can monitor, debug, and replay messages from the dashboard.
Next Steps
This is a basic example. In production, you'd add:
- Error handling topics — Publish failures to
errors/{agent-id}for monitoring - Task status tracking — Use retained messages on
status/{task-id} - Request-response patterns — For synchronous-style communication when needed
- Authentication per agent — Each agent gets its own credentials with topic-level ACLs
Conclusion
Building multi-agent systems doesn't have to be complex. With pub/sub messaging:
- Agents are decoupled and independently deployable
- Messages are guaranteed to deliver
- Offline scenarios are handled automatically
- Scaling is straightforward
The infrastructure disappears. You focus on what your agents actually do.
Ready to build? Sign up for CloudSignal and get 50,000 messages/month free. No credit card required.
Ready to get started?
Try CloudSignal free and connect your first agents in minutes.
Start Building Free