VOOZH about

URL: https://tech-insider.org/celery-python-tutorial-task-queue-redis-2026/

⇱ How to Build a Task Queue with Celery Python in 13 Steps [2026]


Skip to content
April 10, 2026
27 min read

Celery is the most widely used distributed task queue for Python, powering background jobs at Instagram, Mozilla, and Robinhood. With over 25,800 GitHub stars and support for Redis and RabbitMQ as message brokers, Celery handles everything from sending emails to processing machine learning pipelines. This tutorial walks you through building a complete task processing system with Celery and Redis in 13 steps, from installation to production monitoring. By the end, you will have a working project that processes tasks asynchronously, schedules periodic jobs with Celery Beat, and monitors workers with Flower.

Whether you are offloading slow API calls from a Django view, running nightly data aggregation, or distributing ML inference across workers, Celery Python gives you the framework to do it reliably. The latest stable release, Celery 5.6.3 (released in early 2026), supports Python 3.9 through 3.13 and PyPy 3.10+, includes Pydantic model support for task arguments, and pins Redis compatibility to version 5.2.1 or lower. This guide uses Redis as the broker because it doubles as both message transport and result backend, simplifying your infrastructure.

Prerequisites and Environment Setup

Before you start writing Celery tasks, you need a working Python environment and a running Redis server. Celery Python requires a message broker to shuttle tasks between your application and the workers that execute them. Redis is the easiest broker to set up and also serves as the result backend where task return values are stored. Here is what you need installed before proceeding.

You need Python 3.9 or higher. Celery 5.6.x dropped support for Python 3.8, which reached end-of-life in October 2024. If you are still running 3.8, you must use Celery 5.5.x or upgrade Python. You also need Redis 7.x running locally or accessible over your network. On macOS, install Redis with brew install redis. On Ubuntu or Debian, use sudo apt install redis-server. On Windows, use WSL2 or Docker. Verify your Redis server is running by executing redis-cli ping – it should return PONG.

You should also have pip version 23 or higher and a virtual environment tool like venv or virtualenv. This tutorial uses venv because it ships with Python. A basic understanding of Python decorators, functions, and the command line is assumed. No prior experience with message brokers or distributed systems is required.

RequirementMinimum VersionRecommended VersionNotes
Python3.93.12 or 3.13Celery 5.6.x requires 3.9+
Redis6.07.2Used as broker and result backend
Celery5.6.05.6.3Latest stable as of early 2026
pip23.024.xNeeded for dependency resolution
OSLinux, macOSUbuntu 22.04+Windows supported via WSL2
RAM512 MB2 GB+Per worker process

Step 1: Create the Project Structure and Virtual Environment

A clean project structure prevents import headaches later. Celery discovers tasks by scanning Python modules, so the layout of your files matters. Start by creating a project directory with a dedicated package for your application. This structure scales from a single-file prototype to a multi-module production system without reorganization.

👁 Step 1: Create the Project Structure and Virtual Environment
mkdir celery-demo && cd celery-demo
python3 -m venv venv
source venv/bin/activate
pip install celery[redis]==5.6.3
pip install flower==2.0.1

# Create project structure
mkdir -p myapp
touch myapp/__init__.py
touch myapp/celery_app.py
touch myapp/tasks.py
touch myapp/config.py

The celery[redis] extra installs the redis Python client alongside Celery. The flower package provides a web-based monitoring dashboard. Your directory should now look like this:

celery-demo/
├── venv/
├── myapp/
│ ├── __init__.py
│ ├── celery_app.py
│ ├── tasks.py
│ └── config.py
└── requirements.txt

Generate a requirements.txt with pip freeze > requirements.txt so other developers can replicate your environment. The virtual environment isolates your Celery installation from system Python packages, avoiding version conflicts with other projects on the same machine.

Step 2: Configure the Celery Application Instance

The Celery application instance is the central object that ties your configuration, tasks, and broker together. You create it once and import it everywhere else. Placing it in a dedicated celery_app.py module makes it easy to reference from both your task definitions and your framework integration (Django, Flask, or FastAPI).

Open myapp/celery_app.py and add the following configuration. This connects Celery to your local Redis instance on the default port 6379, using database 0 for the broker and database 1 for results. Separating the broker and result backend into different Redis databases prevents key collisions.

# myapp/celery_app.py
from celery import Celery

app = Celery(
 "myapp",
 broker="redis://localhost:6379/0",
 backend="redis://localhost:6379/1",
 include=["myapp.tasks"],
)

# Serialization
app.conf.task_serializer = "json"
app.conf.result_serializer = "json"
app.conf.accept_content = ["json"]

# Time zone
app.conf.timezone = "UTC"
app.conf.enable_utc = True

# Task execution settings
app.conf.task_acks_late = True
app.conf.worker_prefetch_multiplier = 1
app.conf.task_reject_on_worker_lost = True

# Result expiration (24 hours)
app.conf.result_expires = 86400

if __name__ == "__main__":
 app.start()

The task_acks_late = True setting tells Celery to acknowledge a task only after the worker finishes executing it, not when it receives the message. Combined with task_reject_on_worker_lost = True, this ensures tasks are redelivered if a worker crashes mid-execution. The worker_prefetch_multiplier = 1 limits each worker to fetching one task at a time, which prevents a single slow task from blocking a batch of queued messages. These three settings are critical for production reliability.

The include parameter tells Celery which modules contain task definitions. Without it, the worker would start but have zero registered tasks. Every time you add a new tasks module, append it to this list.

Step 3: Define Your First Celery Tasks

Tasks are regular Python functions decorated with @app.task. When you call a task asynchronously, Celery serializes the function name and arguments, sends them to Redis, and a worker picks them up for execution. The function itself runs in a separate process, which means it cannot share memory with your web application – all data must be passed through the task arguments.

# myapp/tasks.py
import time
import requests
from myapp.celery_app import app


@app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_email(self, to_address, subject, body):
 """Simulate sending an email with retry logic."""
 try:
 # Simulate external API call
 time.sleep(2)
 print(f"Email sent to {to_address}: {subject}")
 return {"status": "sent", "to": to_address}
 except Exception as exc:
 raise self.retry(exc=exc)


@app.task(bind=True, max_retries=5, default_retry_delay=30)
def fetch_url(self, url):
 """Fetch a URL and return the status code and content length."""
 try:
 response = requests.get(url, timeout=10)
 return {
 "url": url,
 "status_code": response.status_code,
 "content_length": len(response.content),
 }
 except requests.RequestException as exc:
 raise self.retry(exc=exc)


@app.task
def add_numbers(x, y):
 """Simple task for testing."""
 return x + y


@app.task(bind=True, time_limit=300, soft_time_limit=240)
def process_data(self, dataset_id):
 """Long-running data processing task with time limits."""
 print(f"Processing dataset {dataset_id}...")
 time.sleep(5) # Simulate processing
 result = {"dataset_id": dataset_id, "rows_processed": 10000}
 print(f"Finished processing dataset {dataset_id}")
 return result

The bind=True parameter gives the task access to self, which is the task instance. You need this for retry logic because self.retry() re-queues the task with the same arguments. The max_retries and default_retry_delay parameters control how many times and how often a failed task is retried. The time_limit and soft_time_limit on process_data kill the task if it exceeds 300 seconds (hard limit) or raise a SoftTimeLimitExceeded exception at 240 seconds (soft limit), giving you a chance to clean up.

Notice that task arguments must be JSON-serializable. You cannot pass Django model instances, file handles, or database connections as arguments. Pass the primary key instead and query the database inside the task. This is one of the most common mistakes developers make when starting with Celery Python.

Step 4: Start the Worker and Execute Tasks

With your tasks defined, you can now start a Celery worker process that listens for incoming messages on Redis. The worker imports your task modules, connects to the broker, and waits for jobs. Open a terminal, activate your virtual environment, and start the worker.

# Start the Celery worker (run from the celery-demo directory)
celery -A myapp.celery_app worker --loglevel=info

# You should see output like:
# -------------- celery@hostname v5.6.3 (Emerald Rush)
# --- ***** -----
# -- ******* ---- Linux-6.x-x86_64
# - *** --- * ---
# - ** ---------- [config]
# - ** ---------- .> app: myapp:0x7f...
# - ** ---------- .> transport: redis://localhost:6379/0
# - ** ---------- .> results: redis://localhost:6379/1
# - *** --- * --- .> concurrency: 8 (prefork)
# -- ******* ---- .> task events: OFF
# --- ***** -----
# -------------- [queues]
# .> celery exchange=celery(direct) key=celery
# [tasks]
# . myapp.tasks.add_numbers
# . myapp.tasks.fetch_url
# . myapp.tasks.process_data
# . myapp.tasks.send_email

The worker banner confirms your connection to Redis, lists the registered tasks, and shows the concurrency level (number of child processes). By default, Celery uses the prefork pool with as many processes as your machine has CPU cores. Now open a second terminal, activate the virtual environment, and send tasks.

# In a Python shell or script
from myapp.tasks import add_numbers, send_email, fetch_url

# Send a task asynchronously
result = add_numbers.delay(4, 6)
print(f"Task ID: {result.id}")
print(f"Task state: {result.state}") # PENDING

# Wait for the result (blocks until done)
print(f"Result: {result.get(timeout=10)}") # 10

# Send email task
email_result = send_email.delay(
 "[email protected]",
 "Welcome!",
 "Thanks for signing up."
)
print(f"Email task ID: {email_result.id}")

# Using apply_async for more control
fetch_result = fetch_url.apply_async(
 args=["https://httpbin.org/get"],
 countdown=5, # Delay execution by 5 seconds
 queue="default",
)
print(f"Fetch result: {fetch_result.get(timeout=30)}")

The .delay() method is a shortcut for .apply_async(). Use .delay() for simple calls and .apply_async() when you need countdown delays, custom queues, ETAs, or expiration times. The result.get(timeout=10) call blocks until the worker returns a value or the timeout is reached. In production, avoid blocking on results inside web request handlers – store the task ID and let the client poll for status.

Step 5: Configure Task Routing and Multiple Queues

As your application grows, you will want to separate tasks by priority and resource requirements. A CPU-intensive data processing job should not share a queue with a quick email notification. Task routing lets you direct specific tasks to specific queues, where dedicated workers consume them. This is how production Celery deployments at companies like Instagram manage millions of tasks per day without bottlenecks.

👁 Step 5: Configure Task Routing and Multiple Queues

Add the routing configuration to your celery_app.py file. This setup creates three queues: default for general tasks, email for notification tasks, and heavy for CPU-intensive processing. Each queue can have its own pool of workers with different concurrency settings.

# Add to myapp/celery_app.py

from kombu import Queue

app.conf.task_queues = (
 Queue("default", routing_key="default"),
 Queue("email", routing_key="email"),
 Queue("heavy", routing_key="heavy"),
)

app.conf.task_default_queue = "default"
app.conf.task_default_routing_key = "default"

app.conf.task_routes = {
 "myapp.tasks.send_email": {"queue": "email"},
 "myapp.tasks.process_data": {"queue": "heavy"},
 "myapp.tasks.fetch_url": {"queue": "default"},
 "myapp.tasks.add_numbers": {"queue": "default"},
}

Now start separate workers for each queue. This gives you fine-grained control over resource allocation. You might run four email workers with low memory but only two heavy workers with high memory on a beefier machine.

# Terminal 1: Default queue worker
celery -A myapp.celery_app worker -Q default --loglevel=info -c 4

# Terminal 2: Email queue worker
celery -A myapp.celery_app worker -Q email --loglevel=info -c 2

# Terminal 3: Heavy processing worker
celery -A myapp.celery_app worker -Q heavy --loglevel=info -c 1 --max-memory-per-child=500000

The --max-memory-per-child=500000 flag kills and replaces worker child processes that exceed 500 MB of memory. This prevents memory leaks in long-running workers from consuming all available RAM. The -c flag sets the concurrency (number of child processes) for each worker. For CPU-bound tasks, set concurrency equal to the number of available cores. For I/O-bound tasks like sending emails or fetching URLs, you can safely set it higher.

Step 6: Schedule Periodic Tasks with Celery Beat

Celery Beat is a scheduler that sends tasks at regular intervals – like cron for your Celery workers. You configure the schedule in your app configuration, and Celery Beat sends the tasks to the broker at the specified times. A separate Celery Beat process runs alongside your workers and is responsible only for scheduling, not executing.

Add the following schedule configuration to celery_app.py. This sets up three periodic tasks: a health check every 30 seconds, a data cleanup every hour, and a daily report at midnight UTC.

# Add to myapp/celery_app.py
from celery.schedules import crontab

app.conf.beat_schedule = {
 "health-check-every-30-seconds": {
 "task": "myapp.tasks.health_check",
 "schedule": 30.0,
 },
 "cleanup-every-hour": {
 "task": "myapp.tasks.cleanup_expired_data",
 "schedule": crontab(minute=0), # Top of every hour
 },
 "daily-report-midnight": {
 "task": "myapp.tasks.generate_daily_report",
 "schedule": crontab(hour=0, minute=0), # Midnight UTC
 "args": ("summary",),
 },
}

Add the corresponding tasks to myapp/tasks.py:

# Add to myapp/tasks.py

@app.task
def health_check():
 """Periodic health check."""
 return {"status": "healthy", "timestamp": time.time()}


@app.task
def cleanup_expired_data():
 """Remove expired records from the database."""
 print("Running data cleanup...")
 time.sleep(1)
 return {"cleaned_records": 42}


@app.task
def generate_daily_report(report_type):
 """Generate a daily summary report."""
 print(f"Generating {report_type} report...")
 time.sleep(3)
 return {"report_type": report_type, "generated": True}

Start Celery Beat in a separate terminal. It reads your beat_schedule configuration and sends tasks at the defined intervals. Never run more than one Beat process at a time – doing so will cause duplicate task submissions.

# Start Celery Beat scheduler
celery -A myapp.celery_app beat --loglevel=info

# Output:
# celery beat v5.6.3 (Emerald Rush) is starting.
# __ - ... __ - _
# LocalTime -> 2026-04-10 12:00:00
# Configuration ->
# . broker -> redis://localhost:6379/0
# . loader -> celery.loaders.app.AppLoader
# . scheduler -> celery.beat.PersistentScheduler
# . db -> celerybeat-schedule
# . logfile -> [stderr]@%INFO
# . maxinterval -> 5.00 minutes (300s)

For production, consider django-celery-beat if you use Django. It stores schedules in the database instead of a file, letting you modify schedules through the Django admin without restarting Beat. The crontab scheduler supports the same syntax as Unix cron: minute, hour, day of week, day of month, and month of year.

Step 7: Monitor Workers with Flower

Flower is a real-time web-based monitoring tool for Celery. It shows active workers, task progress, success and failure rates, and queue depths. In production, Flower is essential for diagnosing bottlenecks and identifying failing tasks before they pile up. You installed it in Step 1, so starting it takes one command.

# Start Flower monitoring dashboard
celery -A myapp.celery_app flower --port=5555

# Open http://localhost:5555 in your browser
# You will see:
# - Dashboard: active/processed/failed/succeeded task counts
# - Workers: list of connected workers with status
# - Tasks: searchable list of all tasks with state and result
# - Broker: Redis queue depths and message rates
# - Monitor: real-time graphs of task execution

Flower exposes a REST API at http://localhost:5555/api/ that you can integrate with alerting systems. For example, GET /api/workers returns JSON with worker status, and GET /api/tasks lists recent tasks with their states. You can use these endpoints to set up Prometheus metrics scraping or PagerDuty alerts when failure rates spike.

In production, protect Flower behind authentication. Use the --basic_auth=user:password flag or put it behind a reverse proxy with OAuth. Never expose an unprotected Flower instance to the internet – it reveals your task names, arguments, and infrastructure details.

Step 8: Handle Errors, Retries, and Dead Letter Queues

Error handling is where most Celery tutorials fall short and most production deployments fail. A task can fail for many reasons: network timeouts, database connection drops, third-party API rate limits, or out-of-memory errors. Your error handling strategy determines whether failed tasks are silently lost, retried indefinitely, or handled gracefully.

Celery provides three error handling mechanisms: automatic retries with self.retry(), exponential backoff with the retry_backoff option, and task error callbacks. Here is a production-grade task with all three:

# myapp/tasks.py - Production error handling

@app.task(
 bind=True,
 autoretry_for=(requests.RequestException, ConnectionError),
 retry_backoff=True,
 retry_backoff_max=600,
 retry_jitter=True,
 max_retries=5,
)
def reliable_api_call(self, endpoint, payload):
 """API call with exponential backoff and jitter."""
 response = requests.post(endpoint, json=payload, timeout=10)
 response.raise_for_status()
 return response.json()


@app.task(bind=True, max_retries=3)
def task_with_callback(self, data):
 """Task with explicit error handling and callbacks."""
 try:
 result = process_complex_data(data)
 return result
 except ValueError as exc:
 # Permanent failure — do not retry
 print(f"Permanent failure for data: {data}")
 return {"error": str(exc), "permanent": True}
 except ConnectionError as exc:
 # Transient failure — retry with backoff
 raise self.retry(exc=exc, countdown=2 ** self.request.retries)


def on_task_failure(self, exc, task_id, args, kwargs, einfo):
 """Called when a task fails after all retries."""
 print(f"Task {task_id} permanently failed: {exc}")
 # Send alert to Slack, PagerDuty, etc.

The autoretry_for parameter automatically retries the task when any of the listed exceptions are raised, without needing explicit self.retry() calls. The retry_backoff=True enables exponential backoff (1s, 2s, 4s, 8s, etc.), and retry_jitter=True adds random variance to prevent thundering herd problems when many tasks retry simultaneously. The retry_backoff_max=600 caps the maximum delay at 10 minutes.

For tasks that fail permanently after exhausting all retries, implement an on_failure handler or use Celery signals to route the failed task to a dead letter queue for manual inspection. This prevents data loss and gives you a record of what went wrong.

Step 9: Integrate Celery with Django

Django is the most common framework paired with Celery Python. The integration requires a specific file layout so that Django’s settings.py and Celery’s configuration stay in sync. The official pattern uses django.conf:settings as the configuration source, so you define Celery settings in settings.py with a CELERY_ prefix.

👁 Step 9: Integrate Celery with Django

Create a celery.py file in your Django project root (next to settings.py). This file initializes the Celery app and auto-discovers tasks from all installed apps. Then import the app in your project’s __init__.py so it loads when Django starts.

# myproject/celery.py
import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")

app = Celery("myproject")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()


# myproject/__init__.py
from .celery import app as celery_app
__all__ = ("celery_app",)


# myproject/settings.py — add these Celery settings
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/1"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = "UTC"
CELERY_TASK_ACKS_LATE = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1


# myapp/tasks.py (inside any Django app)
from celery import shared_task

@shared_task
def send_welcome_email(user_id):
 from django.contrib.auth.models import User
 user = User.objects.get(pk=user_id)
 user.email_user("Welcome!", "Thanks for signing up.")
 return {"sent_to": user.email}

Use @shared_task instead of @app.task in Django apps. The shared_task decorator does not require a direct reference to the Celery app instance, making your tasks reusable across projects. The autodiscover_tasks() call scans every installed app for a tasks.py module and registers the tasks it finds.

A critical Django-specific pattern: always pass database primary keys as task arguments, never model instances. Model instances cannot be serialized to JSON and would require pickle serialization, which is a security risk. The Celery 5.6 documentation recommends using delay_on_commit instead of delay in Django views to avoid sending tasks before the database transaction commits. If a task runs before the transaction commits, it may query stale data or fail with a “DoesNotExist” error.

Step 10: Integrate Celery with FastAPI

FastAPI’s async-first design pairs well with Celery for offloading blocking operations. While FastAPI handles HTTP requests asynchronously, Celery processes heavy background work in separate worker processes. The integration is straightforward: define your Celery app, create tasks, and call them from FastAPI endpoints.

# app/worker.py
from celery import Celery

celery_app = Celery(
 "worker",
 broker="redis://localhost:6379/0",
 backend="redis://localhost:6379/1",
)
celery_app.conf.task_serializer = "json"


@celery_app.task(name="process_upload")
def process_upload(file_path: str, user_id: int):
 import time
 time.sleep(10) # Simulate processing
 return {"file": file_path, "user_id": user_id, "status": "processed"}


# app/main.py
from fastapi import FastAPI
from app.worker import celery_app, process_upload

api = FastAPI()


@api.post("/upload")
async def create_upload(file_path: str, user_id: int):
 task = process_upload.delay(file_path, user_id)
 return {"task_id": task.id, "status": "queued"}


@api.get("/status/{task_id}")
async def get_status(task_id: str):
 result = celery_app.AsyncResult(task_id)
 return {
 "task_id": task_id,
 "status": result.state,
 "result": result.result if result.ready() else None,
 }

The /upload endpoint queues the task and returns the task ID immediately. The client then polls /status/{task_id} to check progress. This pattern keeps your FastAPI response times under 100ms while heavy processing happens in the background. For real-time updates, replace polling with WebSocket connections or server-sent events.

One important caveat: do not use result.get() inside an async def FastAPI endpoint. The .get() call blocks the event loop, freezing all concurrent requests. Always use the polling pattern or run .get() in a thread pool with asyncio.to_thread().

Step 11: Optimize Worker Performance and Concurrency

Celery supports four concurrency pools: prefork (multiprocessing), eventlet (greenlets), gevent (greenlets), and solo (single-threaded). Choosing the right pool depends on whether your tasks are CPU-bound or I/O-bound. This decision has a direct impact on throughput and resource utilization.

Pool TypeBest ForConcurrencyMemory UsageGIL Impact
preforkCPU-bound tasksPer-process (1 per core)High (separate processes)None (bypasses GIL)
eventletI/O-bound tasksHundreds of greenletsLow (shared memory)Subject to GIL
geventI/O-bound tasksHundreds of greenletsLow (shared memory)Subject to GIL
soloDebugging/testing1 (sequential)MinimalSubject to GIL

For most applications, start with the default prefork pool. Set the concurrency equal to the number of CPU cores for CPU-bound tasks. For I/O-bound tasks like sending HTTP requests or emails, switch to gevent or eventlet and increase concurrency to 100 or more. Here are optimized worker commands for each scenario:

# CPU-bound: prefork pool, 1 process per core
celery -A myapp.celery_app worker 
 --pool=prefork 
 --concurrency=4 
 --max-memory-per-child=200000 
 --max-tasks-per-child=1000 
 --loglevel=info

# I/O-bound: gevent pool, high concurrency
pip install gevent
celery -A myapp.celery_app worker 
 --pool=gevent 
 --concurrency=100 
 --loglevel=info

# Mixed workload: run two workers on different queues
# CPU worker on 'heavy' queue
celery -A myapp.celery_app worker 
 -Q heavy 
 --pool=prefork 
 --concurrency=2 
 --loglevel=info

# I/O worker on 'email,default' queues
celery -A myapp.celery_app worker 
 -Q email,default 
 --pool=gevent 
 --concurrency=50 
 --loglevel=info

The --max-tasks-per-child=1000 flag recycles worker processes after 1,000 tasks, preventing memory leaks from accumulating. Combined with --max-memory-per-child, this provides a double safety net against runaway memory consumption. Monitor your workers with Flower and adjust these values based on actual memory profiles.

For high-throughput systems, consider running workers on separate machines. Celery’s distributed architecture means workers only need network access to the Redis broker. You can scale horizontally by adding more worker machines without changing any code.

Step 12: Write Tests for Celery Tasks

Testing Celery tasks requires special consideration because tasks normally run asynchronously in separate processes. Celery provides an eager mode that executes tasks synchronously in the same process, making them behave like regular function calls during tests. This eliminates the need for a running broker or worker.

# tests/test_tasks.py
import pytest
from unittest.mock import patch
from myapp.celery_app import app
from myapp.tasks import add_numbers, send_email, fetch_url


@pytest.fixture(autouse=True)
def celery_eager_mode():
 """Run all tasks synchronously during tests."""
 app.conf.task_always_eager = True
 app.conf.task_eager_propagates = True
 yield
 app.conf.task_always_eager = False
 app.conf.task_eager_propagates = False


def test_add_numbers():
 result = add_numbers.delay(3, 7)
 assert result.get() == 10


def test_add_numbers_negative():
 result = add_numbers.delay(-5, 3)
 assert result.get() == -2


def test_send_email_returns_status():
 result = send_email.delay("[email protected]", "Test", "Body")
 data = result.get()
 assert data["status"] == "sent"
 assert data["to"] == "[email protected]"


@patch("myapp.tasks.requests.get")
def test_fetch_url_success(mock_get):
 mock_get.return_value.status_code = 200
 mock_get.return_value.content = b"Hello World"
 result = fetch_url.delay("https://example.com")
 data = result.get()
 assert data["status_code"] == 200
 assert data["content_length"] == 11


@patch("myapp.tasks.requests.get")
def test_fetch_url_retry_on_failure(mock_get):
 mock_get.side_effect = ConnectionError("Connection refused")
 with pytest.raises(ConnectionError):
 fetch_url.delay("https://example.com")

The task_always_eager = True setting is the key. When enabled, calling task.delay() executes the task immediately in the current process instead of sending it to Redis. The task_eager_propagates = True setting ensures that exceptions raised inside tasks propagate to the caller, so your tests can catch them with pytest.raises.

Run the tests with pytest tests/ -v. You do not need Redis running for eager mode tests. For integration tests that verify the full broker roundtrip, use pytest-celery which provides fixtures for starting real workers and brokers in Docker containers.

Step 13: Deploy to Production with Systemd and Docker

In production, Celery workers and Beat must run as managed services that restart on failure and start on boot. The two most common approaches are systemd service files (for bare-metal or VM deployments) and Docker Compose (for containerized environments). Here are both methods.

👁 Step 13: Deploy to Production with Systemd and Docker

For systemd, create a service file that starts the Celery worker with the correct user, working directory, and environment. This is the standard approach on Ubuntu, Debian, and RHEL servers.

# /etc/systemd/system/celery-worker.service
[Unit]
Description=Celery Worker Service
After=network.target redis.target

[Service]
Type=forking
User=celery
Group=celery
WorkingDirectory=/opt/myapp
ExecStart=/opt/myapp/venv/bin/celery -A myapp.celery_app multi start worker1 
 --pidfile=/var/run/celery/%n.pid 
 --logfile=/var/log/celery/%n%I.log 
 --loglevel=INFO 
 --concurrency=4 
 --max-memory-per-child=200000
ExecStop=/opt/myapp/venv/bin/celery multi stopwait worker1 
 --pidfile=/var/run/celery/%n.pid
ExecReload=/opt/myapp/venv/bin/celery multi restart worker1 
 --pidfile=/var/run/celery/%n.pid
Restart=always

[Install]
WantedBy=multi-user.target

For Docker Compose, define separate services for Redis, the Celery worker, and Celery Beat. This approach is portable and reproducible across development, staging, and production environments.

# docker-compose.yml
version: "3.9"

services:
 redis:
 image: redis:7.2-alpine
 ports:
 - "6379:6379"
 volumes:
 - redis_data:/data

 worker:
 build: .
 command: celery -A myapp.celery_app worker --loglevel=info --concurrency=4
 depends_on:
 - redis
 environment:
 - CELERY_BROKER_URL=redis://redis:6379/0
 - CELERY_RESULT_BACKEND=redis://redis:6379/1
 volumes:
 - .:/app
 restart: always

 beat:
 build: .
 command: celery -A myapp.celery_app beat --loglevel=info
 depends_on:
 - redis
 environment:
 - CELERY_BROKER_URL=redis://redis:6379/0
 volumes:
 - .:/app
 restart: always

 flower:
 build: .
 command: celery -A myapp.celery_app flower --port=5555
 depends_on:
 - redis
 ports:
 - "5555:5555"
 environment:
 - CELERY_BROKER_URL=redis://redis:6379/0

volumes:
 redis_data:

Start the entire stack with docker compose up -d. Scale workers horizontally with docker compose up -d --scale worker=3 to run three worker containers. Each container gets its own concurrency pool, so three workers with concurrency 4 gives you 12 parallel task execution slots.

5 Common Pitfalls When Using Celery Python

After deploying Celery in dozens of production systems, these are the mistakes that catch developers most often. Each one is easy to make and hard to diagnose without knowing what to look for.

Pitfall 1: Passing non-serializable objects as task arguments. Django model instances, file handles, and database cursors cannot be JSON-serialized. The task will fail with a serialization error or, worse, silently produce incorrect results if you switch to pickle. Always pass primitive types (strings, integers, lists, dicts) and reconstruct complex objects inside the task.

Pitfall 2: Calling result.get() inside a web request handler. This blocks the web server process until the task completes, eliminating any benefit of asynchronous execution. In Django, it blocks the WSGI worker. In FastAPI, it blocks the event loop. Return the task ID to the client and let them poll a status endpoint instead.

Pitfall 3: Running multiple Celery Beat instances. Unlike workers, Beat does not coordinate across instances. Running two Beat processes sends every scheduled task twice. Use a singleton pattern: run exactly one Beat process per deployment, backed by a PID file or container orchestration (Kubernetes Deployment with replicas=1).

Pitfall 4: Ignoring task_acks_late in production. With the default task_acks_late = False, Celery acknowledges a task as soon as the worker receives it. If the worker crashes before finishing, the task is lost forever. Set task_acks_late = True and task_reject_on_worker_lost = True so unfinished tasks are redelivered to another worker.

Pitfall 5: Not setting time limits on tasks. A task without a time limit can run forever, consuming a worker slot and potentially accumulating memory. Always set soft_time_limit and time_limit on tasks that call external services or process user data. The soft limit raises an exception you can catch; the hard limit kills the process.

Redis vs RabbitMQ: Choosing the Right Broker

Celery supports multiple message brokers, but Redis and RabbitMQ account for the vast majority of production deployments. Your choice affects reliability, performance, and operational complexity. Here is a detailed comparison based on production experience.

FeatureRedisRabbitMQ
Setup complexitySimple (single binary)Moderate (Erlang runtime)
Message durabilityOptional (RDB/AOF)Built-in (disk persistence)
Task prioritiesLimited (separate queues)Native (0-9 levels)
Result backendYes (built-in)Separate service needed
ThroughputHigher for simple queuesHigher for complex routing
Memory usageLower baselineHigher due to Erlang VM
Monitoringredis-cli, RedisInsightManagement UI (port 15672)
Cluster supportRedis Cluster / SentinelBuilt-in clustering
Use caseSmall-medium workloadsEnterprise, complex routing

Choose Redis if you want simplicity and already use Redis for caching. Redis doubles as both broker and result backend, reducing the number of services you manage. Choose RabbitMQ if you need native task priorities, complex routing patterns, or guaranteed message delivery across data center failures. RabbitMQ’s built-in management UI at port 15672 provides detailed queue metrics without installing additional tools.

A common production pattern is using RabbitMQ as the broker for reliability and Redis as the result backend for speed. This combines the strengths of both systems. Configure it in Celery by setting the broker URL to amqp://guest:guest@localhost:5672// and the result backend to redis://localhost:6379/1.

Troubleshooting Celery Python: 10 Common Issues

Debugging Celery can be frustrating because errors happen across multiple processes. Here are the ten most common issues with their symptoms and solutions.

Issue 1: “Received unregistered task” error. The worker does not recognize your task name. This happens when you start the worker with the wrong -A argument or forget to add the tasks module to the include list. Verify with celery -A myapp.celery_app inspect registered to see all registered tasks. If your task is missing, check the include parameter in the Celery app config.

Issue 2: Tasks stay in PENDING state forever. The task was sent to Redis but no worker is consuming the queue. Check that your worker is connected to the correct Redis database and listening on the correct queue. Run celery -A myapp.celery_app inspect active_queues to see which queues each worker is consuming. Also verify Redis connectivity with redis-cli ping.

Issue 3: Worker connects but immediately disconnects. This is usually a Redis authentication or version mismatch. Celery 5.6.x pins the Redis client library to version 5.2.1 or lower. If you have a newer redis Python package installed, downgrade with pip install redis==5.2.1. Check the worker logs for connection error details.

Issue 4: Memory leak in long-running workers. Worker child processes slowly accumulate memory from task execution. Set --max-memory-per-child=200000 (200 MB in kilobytes) and --max-tasks-per-child=1000 to automatically recycle workers. Monitor with Flower’s worker detail page to identify which tasks consume the most memory.

Issue 5: Tasks execute twice. Duplicate execution usually means you are running multiple Beat instances or the broker redelivered a task because the worker did not acknowledge it in time. Check for duplicate Beat processes with ps aux | grep celery. If using task_acks_late = True, ensure your visibility timeout in Redis (default 1 hour) exceeds your longest task duration.

Issue 6: “ConnectionResetError” or “Connection to broker lost.” Network interruptions between the worker and Redis. Enable automatic reconnection with app.conf.broker_connection_retry_on_startup = True. For production, use Redis Sentinel or a managed Redis service with high availability. Set broker_connection_retry = True and broker_connection_max_retries = 10.

Issue 7: Celery Beat schedule does not update after code changes. Beat caches the schedule in a local file (celerybeat-schedule). Delete this file and restart Beat to pick up configuration changes. Alternatively, use the --schedule=/tmp/celerybeat-schedule flag to keep the file in a temporary directory, or use django-celery-beat for database-backed schedules.

Issue 8: “SoftTimeLimitExceeded” exception. Your task exceeded its soft_time_limit. Catch this exception inside the task to perform cleanup before the hard limit kills the process. Increase the soft limit if the task legitimately needs more time, or optimize the task to run faster.

Issue 9: ImportError when starting workers. The worker process cannot import your task module. Ensure you start the worker from the correct directory (the project root) and that your virtual environment is activated. The -A argument must match the Python module path to your Celery app. For Django projects, verify that DJANGO_SETTINGS_MODULE is set.

Issue 10: Tasks silently disappearing. Tasks are sent to Redis but never execute and never appear in Flower. This usually means the task is routed to a queue that no worker is consuming. Check your task_routes configuration and ensure a worker is started with -Q queue_name for every queue you route to. Run redis-cli llen celery to check the default queue depth.

Advanced Tips for Production Celery Deployments

These optimizations separate hobby projects from production-grade Celery deployments. Each tip addresses a real scaling challenge encountered by teams running Celery at scale.

👁 Advanced Tips for Production Celery Deployments

Use task chains and groups for complex workflows. Celery’s canvas primitives let you compose tasks into workflows. A chain() runs tasks sequentially, passing each result to the next. A group() runs tasks in parallel. A chord() runs a group followed by a callback when all tasks complete. These are essential for data pipelines and batch processing.

from celery import chain, group, chord

# Sequential pipeline: fetch → process → notify
pipeline = chain(
 fetch_url.s("https://api.example.com/data"),
 process_data.s(),
 send_email.s("[email protected]", "Pipeline complete", "See results."),
)
pipeline.delay()

# Parallel execution: process 5 datasets simultaneously
parallel_jobs = group(
 process_data.s(dataset_id) for dataset_id in range(1, 6)
)
parallel_jobs.delay()

# Chord: parallel tasks + callback when all finish
callback = generate_daily_report.si("batch_complete")
batch = chord(
 [fetch_url.s(url) for url in url_list],
 callback
)
batch.delay()

Enable task compression for large payloads. If your task arguments or results are large (over 1 KB), enable gzip compression to reduce broker bandwidth. Add app.conf.task_compression = "gzip" to your configuration. This is especially valuable when tasks transfer data between services over slow network links.

Implement rate limiting for external API calls. Use the rate_limit parameter to throttle task execution. Setting @app.task(rate_limit="10/m") limits a task to 10 executions per minute across all workers. This prevents your Celery workers from overwhelming third-party APIs and triggering rate limit bans.

Use Prometheus and Grafana for monitoring. While Flower provides a quick overview, production systems need time-series metrics. The celery-exporter package exposes Celery metrics in Prometheus format. Track queue depth, task latency, failure rate, and worker uptime. Set up Grafana dashboards and alerts for anomalies.

Implement idempotent tasks. Because tasks can be retried or executed more than once due to broker redelivery, design every task to be idempotent – running it twice with the same arguments should produce the same result. Use database unique constraints, Redis locks, or task deduplication to prevent double-processing. This is the single most important principle for reliable distributed systems.

Related Coverage

For more tutorials and comparisons on developer tools and frameworks, explore these related articles on tech-insider.org:

Celery Python Cheat Sheet: Essential Commands

Keep this reference handy when working with Celery in development and production. These commands cover the most common operations you will need day-to-day.

CommandDescription
celery -A myapp worker --loglevel=infoStart a worker with info-level logging
celery -A myapp beat --loglevel=infoStart the periodic task scheduler
celery -A myapp flower --port=5555Start the Flower monitoring dashboard
celery -A myapp inspect activeShow currently executing tasks
celery -A myapp inspect reservedShow tasks waiting in worker memory
celery -A myapp inspect registeredList all registered task names
celery -A myapp inspect statsShow worker statistics and uptime
celery -A myapp control shutdownGracefully stop all workers
celery -A myapp purgeDelete all pending tasks from queues
celery -A myapp call myapp.tasks.add --args='[2,3]'Execute a task from the command line

Frequently Asked Questions

What is Celery in Python?
Celery is an open-source distributed task queue that lets you run Python functions asynchronously in background worker processes. It uses a message broker like Redis or RabbitMQ to transport tasks from your application to workers. Celery is the most popular Python library for background job processing, with over 25,800 GitHub stars and production use at companies like Instagram and Mozilla.

What Python versions does Celery 5.6 support?
Celery 5.6.x supports Python 3.9 through 3.13 and PyPy 3.10+. Support for Python 3.8 was dropped because Python 3.8 reached end-of-life in October 2024. If you need Python 3.8 support, use Celery 5.5.x.

Should I use Redis or RabbitMQ as my Celery broker?
Use Redis if you want simplicity and already have Redis in your stack. It works as both broker and result backend, reducing infrastructure complexity. Use RabbitMQ if you need native task priorities, guaranteed message delivery, or complex routing patterns. For most small to medium applications, Redis is the better starting point.

How do I run Celery with Django?
Create a celery.py file in your Django project root that initializes the Celery app with config_from_object("django.conf:settings", namespace="CELERY"). Call autodiscover_tasks() to auto-register tasks from all installed apps. Import the app in __init__.py so it loads with Django. Use @shared_task instead of @app.task in your Django apps.

How do I run Celery with FastAPI?
Define your Celery app in a separate module and import task functions into your FastAPI routes. Call task.delay() from endpoints to queue work asynchronously. Never call result.get() inside async endpoints – it blocks the event loop. Instead, return the task ID and provide a separate status-check endpoint.

Why are my Celery tasks stuck in PENDING state?
Tasks stay PENDING when no worker is consuming the queue they were sent to. Verify that your worker is running, connected to the correct Redis instance, and listening on the correct queue. Run celery inspect active_queues to check. Also ensure the task is registered by running celery inspect registered.

How do I test Celery tasks without a broker?
Set app.conf.task_always_eager = True in your test configuration. This makes Celery execute tasks synchronously in the current process, behaving like regular function calls. Combined with task_eager_propagates = True, exceptions propagate normally for assertion testing.

What are Celery alternatives for Python?
Dramatiq is a simpler, more opinionated alternative with built-in middleware and better error messages. Huey is a lightweight option for small projects. RQ (Redis Queue) is a minimal Redis-based task queue. arq uses asyncio for async-native task processing. For most production systems, Celery remains the standard due to its maturity, extensive documentation, and broad ecosystem support.

👁 Sofia Lindström

Sofia Lindström

Editor-in-Chief

Sofia Lindström is the Editor-in-Chief at Tech Insider, where she leads editorial strategy and oversees coverage across AI, cybersecurity, and enterprise technology. With over a decade in Swedish tech journalism, she previously served as technology editor at Dagens Industri and covered the Nordic startup ecosystem for Breakit. Sofia holds an MSc in Media Technology from KTH Royal Institute of Technology and is a frequent speaker at Web Summit and Slush. She is passionate about making complex technology accessible to business leaders.

View all articles
👁 Tech Insider
Tech
Insider

Tech Insider delivers in-depth coverage of the technologies shaping the future: AI, cybersecurity, cloud computing, hardware, and the trends that matter.

Company

Explore

Categories

© 2026 Tech Insider Media AB. All rights reserved.