VOOZH about

URL: https://dev.to/biao_lin_14b493a4944b1361/building-an-autonomous-ai-agent-colony-with-python-and-fastapi-5500

⇱ Building an Autonomous AI Agent Colony with Python and FastAPI - DEV Community


Building an Autonomous AI Agent Colony with Python and FastAPI

Introduction

Imagine a digital ecosystem where specialized AI agents collaborate, communicate, and self-heal without human intervention. This isn't science fiction — it's a practical architecture you can build today using Python and FastAPI. In this post, I'll walk you through creating an autonomous AI agent colony where agents work together, monitor each other, and recover from failures automatically.

Core Architecture

Our agent colony consists of three main components:

  1. Agent Nodes - Specialized workers that perform specific tasks
  2. API Server - Central communication hub built with FastAPI
  3. Self-Healing Watcher - A monitoring system that ensures colony health

The magic happens when these components work together autonomously, creating a resilient system that can adapt to changing conditions.

Setting Up the API Server

Let's start with the heart of our colony — the FastAPI server that manages agent registration, communication, and task distribution.

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Dict, List, Optional
import uvicorn
import asyncio
from datetime import datetime
import uuid

app = FastAPI(title="AI Agent Colony API")

# Data models
class AgentRegistration(BaseModel):
 name: str
 agent_type: str
 capabilities: List[str]
 endpoint: str

class AgentStatus(BaseModel):
 agent_id: str
 status: str
 last_heartbeat: datetime
 current_task: Optional[str] = None

class TaskAssignment(BaseModel):
 task_id: str
 task_type: str
 parameters: Dict
 priority: int = 1

# In-memory storage (use Redis for production)
agents: Dict[str, AgentRegistration] = {}
tasks: Dict[str, TaskAssignment] = {}
agent_statuses: Dict[str, AgentStatus] = {}

@app.post("/register_agent")
async def register_agent(agent: AgentRegistration):
 agent_id = str(uuid.uuid4())
 agents[agent_id] = agent
 agent_statuses[agent_id] = AgentStatus(
 agent_id=agent_id,
 status="idle",
 last_heartbeat=datetime.now()
 )
 return {"agent_id": agent_id, "status": "registered"}

@app.post("/heartbeat/{agent_id}")
async def heartbeat(agent_id: str):
 if agent_id not in agents:
 raise HTTPException(status_code=404, detail="Agent not found")
 agent_statuses[agent_id].last_heartbeat = datetime.now()
 agent_statuses[agent_id].status = "active"
 return {"status": "acknowledged"}

@app.get("/get_task/{agent_id}")
async def get_task(agent_id: str):
 if agent_id not in agents:
 raise HTTPException(status_code=404, detail="Agent not found")

 # Find highest priority task that matches agent capabilities
 agent = agents[agent_id]
 for task_id, task in tasks.items():
 if task.task_type in agent.capabilities and task.priority > 0:
 tasks[task_id].priority -= 1 # Decrease priority to balance load
 agent_statuses[agent_id].current_task = task_id
 return task

 return {"task_id": None, "message": "No tasks available"}

@app.post("/submit_result")
async def submit_result(task_id: str, result: Dict):
 if task_id not in tasks:
 raise HTTPException(status_code=404, detail="Task not found")
 # Process and store result
 return {"status": "completed", "task_id": task_id}

if __name__ == "__main__":
 uvicorn.run(app, host="0.0.0.0", port=8000)

Building the Dashboard

A real-time dashboard helps visualize colony health. Here's a lightweight implementation using FastAPI's WebSocket support with a simple HTML frontend.

from fastapi import WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import json

# WebSocket connection manager
class ConnectionManager:
 def __init__(self):
 self.active_connections: List[WebSocket] = []

 async def connect(self, websocket: WebSocket):
 await websocket.accept()
 self.active_connections.append(websocket)

 def disconnect(self, websocket: WebSocket):
 self.active_connections.remove(websocket)

 async def broadcast(self, message: str):
 for connection in self.active_connections:
 try:
 await connection.send_text(message)
 except:
 self.disconnect(connection)

manager = ConnectionManager()

@app.get("/dashboard")
async def get_dashboard():
 html_content = """
 <!DOCTYPE html>
 <html>
 <head>
 <title>Agent Colony Dashboard</title>
 <style>
 body { font-family: Arial, sans-serif; background: #1a1a2e; color: #eee; }
 .container { max-width: 1200px; margin: 0 auto; padding: 20px; }
 .agent-card { background: #16213e; border-radius: 8px; padding: 15px; margin: 10px; }
 .status-active { color: #00ff88; }
 .status-idle { color: #ffd700; }
 .status-error { color: #ff4444; }
 #metrics { display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 20px; }
 </style>
 </head>
 <body>
 <div class="container">
 <h1>🤖 Agent Colony Dashboard</h1>
 <div id="metrics"></div>
 <h2>Active Agents</h2>
 <div id="agents"></div>
 </div>
 <script>
 const ws = new WebSocket('ws://localhost:8000/ws');
 ws.onmessage = function(event) {
 const data = JSON.parse(event.data);
 document.getElementById('agents').innerHTML = 
 data.agents.map(agent => `
 <div class="agent-card">
 <h3>${agent.name}</h3>
 <p>Status: <span class="status-${agent.status}">${agent.status}</span></p>
 <p>Type: ${agent.type}</p>
 <p>Last Task: ${agent.lastTask || 'None'}</p>
 </div>
 `).join('');

 document.getElementById('metrics').innerHTML = `
 <div class="agent-card">
 <h3>Total Agents</h3>
 <p>${data.metrics.totalAgents}</p>
 </div>
 <div class="agent-card">
 <h3>Active Tasks</h3>
 <p>${data.metrics.activeTasks}</p>
 </div>
 <div class="agent-card">
 <h3>System Health</h3>
 <p>${data.metrics.health}</p>
 </div>
 `;
 };
 </script>
 </body>
 </html>
 """
 return HTMLResponse(content=html_content)

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
 await manager.connect(websocket)
 try:
 while True:
 # Send updated metrics every 2 seconds
 dashboard_data = {
 "agents": [
 {
 "name": agents[aid].name,
 "status": agent_statuses[aid].status,
 "type": agents[aid].agent_type,
 "lastTask": agent_statuses[aid].current_task
 }
 for aid in agents
 ],
 "metrics": {
 "totalAgents": len(agents),
 "activeTasks": len([t for t in tasks.values() if t.priority > 0]),
 "health": "Good" if len(agents) > 0 else "Critical"
 }
 }
 await manager.broadcast(json.dumps(dashboard_data))
 await asyncio.sleep(2)
 except WebSocketDisconnect:
 manager.disconnect(websocket)

The Self-Healing Watcher

This is where the true autonomy lives. The watcher monitors agent health and automatically spawns replacements when needed.


python
import aiohttp
import asyncio
from datetime import datetime, timedelta
import subprocess
import sys

class SelfHealingWatcher:
 def __init__(self, api_url: str, check_interval: int = 10):
 self.api_url = api_url
 self.check_interval = check_interval
 self.failed_agents = set()
 self.agent_templates = {
 "data_processor": {
 "script": "data_agent.py",
 "capabilities": ["data_processing", "analysis"]
 },
 "api_connector": {
 "script": "api_agent.py",
 "capabilities": ["api_calls", "web_scraping"]
 },
 "reporter": {
 "script": "report_agent.py",
 "capabilities": ["reporting", "visualization"]
 }
 }

 async def check_agent_health