Building a Multi-Agent System with CloudSignal: A Practical Guide
Learn how to connect AI agents using MQTT pub/sub messaging. We'll build a three-agent pipeline that handles research, summarization, and notifications.
In our previous post, we explored why multi-agent systems fail and how pub/sub messaging solves the communication problem. Now let’s build one.
We’ll create a three-agent pipeline:
- Research Agent: Takes a topic and gathers information
- Summary Agent: Receives research and creates a concise summary
- Notifier Agent: Sends the summary to Slack (or any webhook)
Each agent runs independently, communicates via MQTT topics, and handles offline scenarios gracefully.
Prerequisites
- Python 3.10+
- A CloudSignal account (sign up free)
- OpenAI API key (for the AI agents)
Step 1: Set Up CloudSignal
After signing up, grab your MQTT credentials from the CloudSignal dashboard:
- Host:
mqtt.cloudsignal.app - Port:
8883(TLS) - Username: Your CloudSignal username
- Password: Your CloudSignal password
Step 2: Install Dependencies
pip install paho-mqtt openai python-dotenv
Create a .env file:
CLOUDSIGNAL_HOST=mqtt.cloudsignal.app
CLOUDSIGNAL_PORT=8883
CLOUDSIGNAL_USERNAME=your-username
CLOUDSIGNAL_PASSWORD=your-password
OPENAI_API_KEY=sk-...
SLACK_WEBHOOK_URL=https://hooks.slack.com/...
Step 3: Create the Base Agent Class
Let’s create a reusable base class that handles MQTT connection and message handling:
# base_agent.py
import os
import json
import ssl
import paho.mqtt.client as mqtt
from dotenv import load_dotenv
load_dotenv()
class BaseAgent:
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.client = mqtt.Client(
client_id=f"agent-{agent_id}",
protocol=mqtt.MQTTv5
)
# Set up TLS
self.client.tls_set(cert_reqs=ssl.CERT_REQUIRED)
# Set credentials
self.client.username_pw_set(
os.getenv("CLOUDSIGNAL_USERNAME"),
os.getenv("CLOUDSIGNAL_PASSWORD")
)
# Set callbacks
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect
self.subscriptions = []
def _on_connect(self, client, userdata, flags, rc, properties=None):
print(f"[{self.agent_id}] Connected with result code {rc}")
# Resubscribe on reconnect
for topic in self.subscriptions:
self.client.subscribe(topic, qos=1)
print(f"[{self.agent_id}] Subscribed to {topic}")
def _on_message(self, client, userdata, msg):
try:
payload = json.loads(msg.payload.decode())
self.handle_message(msg.topic, payload)
except Exception as e:
print(f"[{self.agent_id}] Error handling message: {e}")
def _on_disconnect(self, client, userdata, rc, properties=None):
print(f"[{self.agent_id}] Disconnected. Will auto-reconnect.")
def subscribe(self, topic: str):
self.subscriptions.append(topic)
if self.client.is_connected():
self.client.subscribe(topic, qos=1)
def publish(self, topic: str, payload: dict):
self.client.publish(
topic,
json.dumps(payload),
qos=1 # At least once delivery
)
print(f"[{self.agent_id}] Published to {topic}")
def handle_message(self, topic: str, payload: dict):
"""Override this in subclasses"""
pass
def start(self):
self.client.connect(
os.getenv("CLOUDSIGNAL_HOST"),
int(os.getenv("CLOUDSIGNAL_PORT")),
keepalive=60
)
self.client.loop_forever()
Step 4: Build the Research Agent
The research agent listens for research requests and publishes findings:
# research_agent.py
from openai import OpenAI
from base_agent import BaseAgent
class ResearchAgent(BaseAgent):
def __init__(self):
super().__init__("research")
self.openai = OpenAI()
self.subscribe("tasks/research")
def handle_message(self, topic: str, payload: dict):
task_id = payload.get("task_id")
query = payload.get("query")
print(f"[research] Researching: {query}")
# Use OpenAI to "research" (in production, you'd use web search)
response = self.openai.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a research assistant. Provide detailed, factual information."},
{"role": "user", "content": f"Research the following topic and provide key findings: {query}"}
]
)
findings = response.choices[0].message.content
# Publish results
self.publish("results/research", {
"task_id": task_id,
"query": query,
"findings": findings
})
if __name__ == "__main__":
agent = ResearchAgent()
agent.start()
Step 5: Build the Summary Agent
The summary agent receives research and creates concise summaries:
# summary_agent.py
from openai import OpenAI
from base_agent import BaseAgent
class SummaryAgent(BaseAgent):
def __init__(self):
super().__init__("summary")
self.openai = OpenAI()
self.subscribe("results/research")
def handle_message(self, topic: str, payload: dict):
task_id = payload.get("task_id")
query = payload.get("query")
findings = payload.get("findings")
print(f"[summary] Summarizing research for: {query}")
response = self.openai.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Create concise, actionable summaries. Use bullet points."},
{"role": "user", "content": f"Summarize these research findings in 3-5 bullet points:\n\n{findings}"}
]
)
summary = response.choices[0].message.content
# Publish summary
self.publish("results/summary", {
"task_id": task_id,
"query": query,
"summary": summary
})
if __name__ == "__main__":
agent = SummaryAgent()
agent.start()
Step 6: Build the Notifier Agent
The notifier sends completed summaries to Slack:
# notifier_agent.py
import os
import requests
from base_agent import BaseAgent
class NotifierAgent(BaseAgent):
def __init__(self):
super().__init__("notifier")
self.webhook_url = os.getenv("SLACK_WEBHOOK_URL")
self.subscribe("results/summary")
def handle_message(self, topic: str, payload: dict):
query = payload.get("query")
summary = payload.get("summary")
print(f"[notifier] Sending notification for: {query}")
# Send to Slack
requests.post(self.webhook_url, json={
"text": f"*Research Complete: {query}*\n\n{summary}"
})
print(f"[notifier] Notification sent!")
if __name__ == "__main__":
agent = NotifierAgent()
agent.start()
Step 7: Create the Trigger Script
Finally, a simple script to kick off research tasks:
# trigger.py
import uuid
from base_agent import BaseAgent
class TriggerAgent(BaseAgent):
def __init__(self):
super().__init__("trigger")
def request_research(self, query: str):
task_id = str(uuid.uuid4())[:8]
self.publish("tasks/research", {
"task_id": task_id,
"query": query
})
print(f"[trigger] Requested research: {query} (task: {task_id})")
if __name__ == "__main__":
import sys
agent = TriggerAgent()
# Connect briefly to publish
agent.client.connect(
os.getenv("CLOUDSIGNAL_HOST"),
int(os.getenv("CLOUDSIGNAL_PORT"))
)
agent.client.loop_start()
query = " ".join(sys.argv[1:]) or "Benefits of MQTT for AI agents"
agent.request_research(query)
import time
time.sleep(1) # Wait for publish
agent.client.disconnect()
Running the System
Open four terminal windows:
# Terminal 1 - Research Agent
python research_agent.py
# Terminal 2 - Summary Agent
python summary_agent.py
# Terminal 3 - Notifier Agent
python notifier_agent.py
# Terminal 4 - Trigger a task
python trigger.py "Latest developments in AI agent frameworks"
You’ll see:
- The trigger publishes to
tasks/research - Research agent picks it up, processes, publishes to
results/research - Summary agent picks that up, summarizes, publishes to
results/summary - Notifier agent sends the final summary to Slack
What Makes This Robust
Offline Handling
Stop the summary agent mid-task. The research results queue in CloudSignal. Restart the summary agent; it picks up right where it left off.
Easy Scaling
Need two research agents for faster processing? Just run another instance. MQTT load-balances automatically with shared subscriptions.
Loose Coupling
Add a new agent that also listens to results/summary? No changes needed to existing agents. Just subscribe to the topic.
Observable
Every message flows through CloudSignal. You can monitor, debug, and replay messages from the dashboard.
Next Steps
This is a basic example. In production, you’d add:
- Error handling topics: Publish failures to
errors/{agent-id}for monitoring - Task status tracking: Use retained messages on
status/{task-id} - Request-response patterns: For synchronous-style communication when needed
- Authentication per agent: Each agent gets its own credentials with topic-level ACLs
Conclusion
Building multi-agent systems doesn’t have to be complex. With pub/sub messaging:
- Agents are decoupled and independently deployable
- Messages are guaranteed to deliver
- Offline scenarios are handled automatically
- Scaling is straightforward
The infrastructure disappears. You focus on what your agents actually do.
Ready to build? Sign up for CloudSignal and get 50,000 messages/month free. No credit card required.