VOOZH about

URL: https://dev.to/uaslimcreate/fastapi-background-tasks-vs-celery-for-ai-feature-processing-when-to-queue-and-when-to-345l

⇱ FastAPI Background Tasks vs. Celery for AI Feature Processing: When to Queue and When to Fire-and-Forget - DEV Community


FastAPI Background Tasks vs. Celery for AI Feature Processing: When to Queue and When to Fire-and-Forget

I've shipped AI feature processing in CitizenApp three different ways: BackgroundTasks, Celery + Redis, and naked async tasks. Each burned me in production. This post maps the exact failure modes that determine which pattern wins for your workload.

The tldr: BackgroundTasks is perfect for sub-5-second tasks in low-concurrency environments. Celery is overkill until you hit 50+ concurrent AI inferences or need horizontal scaling. Async tasks are the dangerous middle ground—they feel right until your process crashes and loses work.

Why This Matters for Multi-Tenant AI

When you're processing Claude API calls across 200 tenants, a single failure mode cascades differently depending on your queue strategy:

  • BackgroundTasks dies with the process → tenant's AI feature blocks or silently fails
  • Async tasks die if you don't handle graceful shutdown → you lose inference requests mid-processing
  • Celery with Redis persists work → AI features eventually complete, even after deploys

I learned this when pushing CitizenApp to production. We had 12 concurrent AI document analyses running when I deployed a code change. BackgroundTasks lost 3 of them. The tenants got a generic error and no way to retry.

The Patterns

Pattern 1: FastAPI BackgroundTasks (Fire-and-Forget Lite)

This is what I use for synchronous side effects: logging, webhook retries, non-critical notifications.

from fastapi import BackgroundTasks, FastAPI
from sqlalchemy.orm import Session

app = FastAPI()

def log_ai_feature_usage(tenant_id: str, feature_name: str, db: Session):
 """Non-blocking usage logging—fires after response returns."""
 try:
 usage = FeatureUsage(
 tenant_id=tenant_id,
 feature_name=feature_name,
 timestamp=datetime.utcnow(),
 )
 db.add(usage)
 db.commit()
 except Exception as e:
 # This fails silently in BackgroundTasks. Genuinely doesn't matter here.
 logger.warning(f"Failed to log usage: {e}")

@app.post("/analyze")
async def analyze_document(
 tenant_id: str,
 document: str,
 background_tasks: BackgroundTasks,
 db: Session = Depends(get_db),
):
 # Return immediately while logging happens in background
 background_tasks.add_task(log_ai_feature_usage, tenant_id, "document_analysis", db)

 return {"status": "queued", "document_id": "..."}

Why I use this: Synchronous, built-in, zero dependencies. No Redis. No Celery workers. Perfect for:

  • Analytics logging
  • Webhook retries (with exponential backoff in the task)
  • Cleanup jobs under 5 seconds
  • Non-critical email sends

Where it breaks:

  • Task dies if the worker process crashes → no retry mechanism
  • No visibility into failures (unless you instrument logging)
  • Blocks deployment until all tasks complete (FastAPI waits for BackgroundTasks on shutdown)
  • No tenant isolation by default—tasks run in shared worker pool

The last one bit me. A tenant's usage logging task had a database constraint violation, and FastAPI's default behavior logged it to stdout but didn't surface it to the tenant. They lost audit data and blamed our system.

Pattern 2: Async Tasks (The Trap)

Here's the seductive pattern that burned me hardest:

import asyncio
from datetime import datetime

async def process_ai_inference(tenant_id: str, input_text: str, db: Session):
 """Process Claude API call asynchronously."""
 try:
 # Call Claude
 response = await anthropic_client.messages.create(
 model="claude-3-5-sonnet-20241022",
 max_tokens=1024,
 messages=[{"role": "user", "content": input_text}],
 )

 # Store result
 inference = AIInference(
 tenant_id=tenant_id,
 input=input_text,
 output=response.content[0].text,
 status="completed",
 completed_at=datetime.utcnow(),
 )
 db.add(inference)
 db.commit()
 except Exception as e:
 logger.error(f"Inference failed for tenant {tenant_id}: {e}")
 # Try to mark as failed, but if DB is down, this silently fails
 inference = AIInference(
 tenant_id=tenant_id,
 status="failed",
 error=str(e),
 )
 db.add(inference)
 db.commit()

@app.post("/infer")
async def infer(tenant_id: str, input_text: str, db: Session = Depends(get_db)):
 # Fire task without waiting
 asyncio.create_task(process_ai_inference(tenant_id, input_text, db))
 return {"status": "processing"}

Why this feels right: One-liner to spawn work, no external dependencies, uses Python's native async/await.

Why it's a trap:

  • Tasks are lost on process crash—no persistence
  • Graceful shutdown is tricky (need asyncio.gather() on app.on_event("shutdown"))
  • Concurrent limit is your server's event loop, not explicit
  • Tenant isolation requires manual task tracking
  • Zero visibility into which tenants have stuck tasks

This is what we used in CitizenApp v1. We'd deploy, and during the 30-second shutdown, any in-flight Claude API calls would get killed mid-response. We'd log them as failed, but the tenant's UI would timeout waiting for a response that was already discarded.

Pattern 3: Celery + Redis (Industrial Strength)

This is what I use now for anything that:

  • Takes >5 seconds
  • Must survive a deploy
  • Needs per-tenant concurrency limits
  • Touches expensive APIs (Claude, Stripe)
# tasks.py
from celery import Celery, Task
from kombu import Queue

app = Celery("citiezenapp")
app.conf.update(
 broker_url="redis://localhost:6379/0",
 result_backend="redis://localhost:6379/0",
 task_serializer="json",
 accept_content=["json"],
 result_expires=3600,
)

# Per-tenant queue routing
app.conf.task_queues = (
 Queue("default", routing_key="default"),
 Queue("ai_inference", routing_key="ai.inference"),
)

class CallbackTask(Task):
 """Track task state in DB for multi-tenant visibility."""
 def on_failure(self, exc, task_id, args, kwargs, einfo):
 tenant_id = kwargs.get("tenant_id")
 logger.error(f"Task {task_id} failed for tenant {tenant_id}: {exc}")
 # Update tenant's failure count, trigger alert

@app.task(base=CallbackTask, bind=True, max_retries=3)
def process_ai_inference(
 self,
 tenant_id: str,
 inference_id: str,
 input_text: str,
):
 """
 Process Claude inference with automatic retries.
 Lives in Redis until completion.
 """
 try:
 db = next(get_db())
 inference = db.query(AIInference).filter_by(id=inference_id).first()

 if not inference:
 return {"error": "Inference not found"}

 # Check tenant rate limits (per-tenant concurrency)
 tenant_config = db.query(TenantAIConfig).filter_by(
 tenant_id=tenant_id
 ).first()

 if tenant_config.concurrent_inferences >= tenant_config.max_concurrent:
 # Retry after 10 seconds
 raise self.retry(countdown=10)

 inference.status = "processing"
 db.commit()

 response = anthropic_client.messages.create(
 model="claude-3-5-sonnet-20241022",
 max_tokens=1024,
 messages=[{"role": "user", "content": input_text}],
 )

 inference.output = response.content[0].text
 inference.status = "completed"
 inference.completed_at = datetime.utcnow()
 db.commit()

 return {"inference_id": inference_id, "status": "completed"}

 except Exception as exc:
 # Exponential backoff: 60s, 120s, 240s
 raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)

# fastapi_app.py
@app.post("/infer")
async def infer(
 tenant_id: str,
 input_text: str,
 db: Session = Depends(get_db),
):
 inference = AIInference(
 tenant_id=tenant_id,
 input=input_text,
 status="queued",
 )
 db.add(inference)
 db.commit()

 # Queue task, returns immediately
 task = process_ai_inference.apply_async(
 kwargs={
 "tenant_id": tenant_id,
 "inference_id": inference.id,
 "input_text": input_text,
 },
 queue="ai_inference",
 priority=5, # Per-tenant prioritization
 )

 inference.celery_task_id = task.id
 db.commit()

 return {"inference_id": inference.id, "task_id": task.id}

Why I use Celery:

  • Tasks persist in Redis—survive deploys, crashes, power outages
  • Built-in retry logic with exponential backoff
  • Per-queue concurrency control (we have separate ai_inference queue with 10 workers)
  • Dead letter queue for failed tasks
  • Monitoring integration (Flower, DataDog)
  • Per-tenant rate limiting (shown above)

Cost: Running Redis + Celery workers adds ~$20