VOOZH about

URL: https://dev.to/ahmet_gedik778845/python-celery-task-queues-for-video-metadata-processing-2oli

⇱ Python Celery Task Queues for Video Metadata Processing - DEV Community


Why Background Processing for a Viral Video Vault

ViralVidVault tracks viral videos across 7 European regions. The main cron pipeline needs to be fast -- fetch, score, store. But secondary tasks like thumbnail quality checks, deep metadata enrichment, and stale link removal are too slow for the critical path. Celery lets us offload this work to background workers.

Celery Configuration

# celery_app.py
from celery import Celery

app = Celery(
 "viralvidvault",
 broker="redis://localhost:6379/0",
 backend="redis://localhost:6379/1",
)

app.conf.update(
 task_serializer="json",
 result_serializer="json",
 accept_content=["json"],
 timezone="UTC",
 task_acks_late=True,
 worker_prefetch_multiplier=2,
 task_soft_time_limit=90,
 task_time_limit=120,
 task_routes={
 "tasks.score_virality_batch": {"queue": "scoring"},
 "tasks.validate_thumbnails": {"queue": "thumbnails"},
 "tasks.detect_dead_videos": {"queue": "cleanup"},
 },
)

Task 1: Batch Virality Re-Scoring

The cron pipeline does a quick initial score, but background workers do a deeper analysis with historical data:

# tasks.py
import sqlite3
from celery_app import app

REGIONAL_BASELINES = {
 "PL": {"velocity": 10000, "engagement": 6.0},
 "NL": {"velocity": 8000, "engagement": 5.5},
 "SE": {"velocity": 5000, "engagement": 5.0},
 "NO": {"velocity": 4000, "engagement": 5.0},
 "AT": {"velocity": 6000, "engagement": 5.0},
 "GB": {"velocity": 20000, "engagement": 4.5},
 "US": {"velocity": 50000, "engagement": 5.0},
}

@app.task(bind=True, max_retries=2)
def score_virality_batch(self, video_ids: list[str], db_path: str) -> dict:
 """Deep virality scoring with historical velocity analysis."""
 conn = sqlite3.connect(db_path)
 conn.row_factory = sqlite3.Row
 results = {}

 for vid in video_ids:
 rows = conn.execute(
 """SELECT view_count, likes, comments, region,
 strftime('%s', fetched_at) as ts
 FROM video_snapshots
 WHERE video_id = ?
 ORDER BY fetched_at DESC LIMIT 5""",
 (vid,),
 ).fetchall()

 if len(rows) < 2:
 results[vid] = {"score": 0, "reason": "insufficient data"}
 continue

 latest, previous = rows[0], rows[1]
 region = latest["region"]
 baseline = REGIONAL_BASELINES.get(region, REGIONAL_BASELINES["GB"])

 time_delta = int(latest["ts"]) - int(previous["ts"])
 if time_delta <= 0:
 results[vid] = {"score": 0, "reason": "no time delta"}
 continue

 view_velocity = (latest["view_count"] - previous["view_count"]) / time_delta * 3600
 engagement = (
 (latest["likes"] + latest["comments"] * 2) / max(latest["view_count"], 1)
 ) * 100

 velocity_norm = min(100, (view_velocity / baseline["velocity"]) * 100)
 engagement_norm = min(100, (engagement / baseline["engagement"]) * 100)

 score = velocity_norm * 0.55 + engagement_norm * 0.45

 # Update in database
 conn.execute(
 "UPDATE videos SET virality_score = ? WHERE video_id = ?",
 (round(score, 1), vid),
 )

 results[vid] = {
 "score": round(score, 1),
 "velocity": round(view_velocity),
 "engagement": round(engagement, 2),
 "region": region,
 "label": "VIRAL" if score >= 85 else "TRENDING" if score >= 60 else "NORMAL",
 }

 conn.commit()
 conn.close()
 return results

Task 2: Thumbnail Quality Validation

import requests
from PIL import Image
from io import BytesIO

@app.task(bind=True, max_retries=3, default_retry_delay=30)
def validate_thumbnails(self, videos: list[dict]) -> dict:
 """Check thumbnail accessibility and quality."""
 report = {"valid": 0, "broken": 0, "placeholder": 0, "details": []}

 for v in videos:
 try:
 resp = requests.get(v["thumbnail_url"], timeout=8, stream=True)
 if resp.status_code != 200:
 report["broken"] += 1
 report["details"].append({"id": v["video_id"], "status": "broken"})
 continue

 img = Image.open(BytesIO(resp.content))
 w, h = img.size

 if w < 200 or h < 150:
 report["placeholder"] += 1
 report["details"].append({"id": v["video_id"], "status": "placeholder", "size": f"{w}x{h}"})
 else:
 report["valid"] += 1

 except Exception as e:
 report["broken"] += 1
 report["details"].append({"id": v["video_id"], "status": "error", "msg": str(e)})

 return report

Task 3: Dead Video Detection

Videos get deleted or go private. Detect them early:

@app.task(rate_limit="20/m")
def detect_dead_videos(video_ids: list[str]) -> dict:
 """Check which videos are still publicly accessible."""
 alive, dead = [], []

 for vid in video_ids:
 try:
 resp = requests.head(
 f"https://www.youtube.com/oembed?url=https://youtube.com/watch?v={vid}",
 timeout=8,
 )
 (alive if resp.status_code == 200 else dead).append(vid)
 except requests.RequestException:
 dead.append(vid)

 return {"alive": len(alive), "dead": len(dead), "dead_ids": dead}

Dispatching from the Pipeline

from celery import group
from tasks import score_virality_batch, validate_thumbnails, detect_dead_videos

def background_processing(new_videos: list[dict], stale_ids: list[str], db_path: str):
 # Re-score in batches of 50
 for i in range(0, len(new_videos), 50):
 batch_ids = [v["video_id"] for v in new_videos[i:i+50]]
 score_virality_batch.apply_async(
 args=[batch_ids, db_path],
 queue="scoring",
 )

 # Validate thumbnails
 validate_thumbnails.apply_async(
 args=[new_videos],
 queue="thumbnails",
 )

 # Check stale videos for dead links
 if stale_ids:
 detect_dead_videos.apply_async(
 args=[stale_ids],
 queue="cleanup",
 )

Running Workers

# Scoring workers -- CPU bound
celery -A celery_app worker --queues=scoring --concurrency=4 --loglevel=info

# Thumbnail workers -- IO bound
celery -A celery_app worker --queues=thumbnails --concurrency=8

# Cleanup -- rate limited
celery -A celery_app worker --queues=cleanup --concurrency=2

Celery transformed how ViralVidVault processes metadata. The cron pipeline went from 15 minutes to 3 minutes, with background workers handling the heavy lifting asynchronously.


This article is part of the Building ViralVidVault series. Check out ViralVidVault to see these techniques in action.