Building an Autonomous AI Agent Colony with Python and FastAPI
Introduction
Imagine a digital ecosystem where specialized AI agents collaborate, communicate, and self-heal without human intervention. This isn't science fiction — it's a practical architecture you can build today using Python and FastAPI. In this post, I'll walk you through creating an autonomous AI agent colony where agents work together, monitor each other, and recover from failures automatically.
Core Architecture
Our agent colony consists of three main components:
- Agent Nodes - Specialized workers that perform specific tasks
- API Server - Central communication hub built with FastAPI
- Self-Healing Watcher - A monitoring system that ensures colony health
The magic happens when these components work together autonomously, creating a resilient system that can adapt to changing conditions.
Setting Up the API Server
Let's start with the heart of our colony — the FastAPI server that manages agent registration, communication, and task distribution.
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Dict, List, Optional
import uvicorn
import asyncio
from datetime import datetime
import uuid
app = FastAPI(title="AI Agent Colony API")
# Data models
class AgentRegistration(BaseModel):
name: str
agent_type: str
capabilities: List[str]
endpoint: str
class AgentStatus(BaseModel):
agent_id: str
status: str
last_heartbeat: datetime
current_task: Optional[str] = None
class TaskAssignment(BaseModel):
task_id: str
task_type: str
parameters: Dict
priority: int = 1
# In-memory storage (use Redis for production)
agents: Dict[str, AgentRegistration] = {}
tasks: Dict[str, TaskAssignment] = {}
agent_statuses: Dict[str, AgentStatus] = {}
@app.post("/register_agent")
async def register_agent(agent: AgentRegistration):
agent_id = str(uuid.uuid4())
agents[agent_id] = agent
agent_statuses[agent_id] = AgentStatus(
agent_id=agent_id,
status="idle",
last_heartbeat=datetime.now()
)
return {"agent_id": agent_id, "status": "registered"}
@app.post("/heartbeat/{agent_id}")
async def heartbeat(agent_id: str):
if agent_id not in agents:
raise HTTPException(status_code=404, detail="Agent not found")
agent_statuses[agent_id].last_heartbeat = datetime.now()
agent_statuses[agent_id].status = "active"
return {"status": "acknowledged"}
@app.get("/get_task/{agent_id}")
async def get_task(agent_id: str):
if agent_id not in agents:
raise HTTPException(status_code=404, detail="Agent not found")
# Find highest priority task that matches agent capabilities
agent = agents[agent_id]
for task_id, task in tasks.items():
if task.task_type in agent.capabilities and task.priority > 0:
tasks[task_id].priority -= 1 # Decrease priority to balance load
agent_statuses[agent_id].current_task = task_id
return task
return {"task_id": None, "message": "No tasks available"}
@app.post("/submit_result")
async def submit_result(task_id: str, result: Dict):
if task_id not in tasks:
raise HTTPException(status_code=404, detail="Task not found")
# Process and store result
return {"status": "completed", "task_id": task_id}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Building the Dashboard
A real-time dashboard helps visualize colony health. Here's a lightweight implementation using FastAPI's WebSocket support with a simple HTML frontend.
from fastapi import WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import json
# WebSocket connection manager
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast(self, message: str):
for connection in self.active_connections:
try:
await connection.send_text(message)
except:
self.disconnect(connection)
manager = ConnectionManager()
@app.get("/dashboard")
async def get_dashboard():
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>Agent Colony Dashboard</title>
<style>
body { font-family: Arial, sans-serif; background: #1a1a2e; color: #eee; }
.container { max-width: 1200px; margin: 0 auto; padding: 20px; }
.agent-card { background: #16213e; border-radius: 8px; padding: 15px; margin: 10px; }
.status-active { color: #00ff88; }
.status-idle { color: #ffd700; }
.status-error { color: #ff4444; }
#metrics { display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 20px; }
</style>
</head>
<body>
<div class="container">
<h1>🤖 Agent Colony Dashboard</h1>
<div id="metrics"></div>
<h2>Active Agents</h2>
<div id="agents"></div>
</div>
<script>
const ws = new WebSocket('ws://localhost:8000/ws');
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
document.getElementById('agents').innerHTML =
data.agents.map(agent => `
<div class="agent-card">
<h3>${agent.name}</h3>
<p>Status: <span class="status-${agent.status}">${agent.status}</span></p>
<p>Type: ${agent.type}</p>
<p>Last Task: ${agent.lastTask || 'None'}</p>
</div>
`).join('');
document.getElementById('metrics').innerHTML = `
<div class="agent-card">
<h3>Total Agents</h3>
<p>${data.metrics.totalAgents}</p>
</div>
<div class="agent-card">
<h3>Active Tasks</h3>
<p>${data.metrics.activeTasks}</p>
</div>
<div class="agent-card">
<h3>System Health</h3>
<p>${data.metrics.health}</p>
</div>
`;
};
</script>
</body>
</html>
"""
return HTMLResponse(content=html_content)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
# Send updated metrics every 2 seconds
dashboard_data = {
"agents": [
{
"name": agents[aid].name,
"status": agent_statuses[aid].status,
"type": agents[aid].agent_type,
"lastTask": agent_statuses[aid].current_task
}
for aid in agents
],
"metrics": {
"totalAgents": len(agents),
"activeTasks": len([t for t in tasks.values() if t.priority > 0]),
"health": "Good" if len(agents) > 0 else "Critical"
}
}
await manager.broadcast(json.dumps(dashboard_data))
await asyncio.sleep(2)
except WebSocketDisconnect:
manager.disconnect(websocket)
The Self-Healing Watcher
This is where the true autonomy lives. The watcher monitors agent health and automatically spawns replacements when needed.
python
import aiohttp
import asyncio
from datetime import datetime, timedelta
import subprocess
import sys
class SelfHealingWatcher:
def __init__(self, api_url: str, check_interval: int = 10):
self.api_url = api_url
self.check_interval = check_interval
self.failed_agents = set()
self.agent_templates = {
"data_processor": {
"script": "data_agent.py",
"capabilities": ["data_processing", "analysis"]
},
"api_connector": {
"script": "api_agent.py",
"capabilities": ["api_calls", "web_scraping"]
},
"reporter": {
"script": "report_agent.py",
"capabilities": ["reporting", "visualization"]
}
}
async def check_agent_health
For further actions, you may consider blocking this person and/or reporting abuse
