VOOZH about

URL: https://dev.to/temitopeajao/building-a-fault-tolerant-job-queue-nodejs-producers-elixirotp-consumers-34h2

⇱ Building a Fault-Tolerant Job Queue: Node.js Producers, Elixir/OTP Consumers - DEV Community


The pitch for OTP is always the same: "let it crash," nine nines of uptime, Erlang running phone switches. That's all true and also completely useless when you're staring at a blank mix new project wondering how to actually structure the thing.

This tutorial skips the theory tour. We build something real: a distributed job processing system where a Node.js API enqueues work into Redis, and an Elixir/OTP application consumes it — with a supervision tree that keeps the whole thing running when individual workers die, when Redis blips, and when a job payload is malformed.

By the end you'll have:

  • A Node.js producer API with Redis streams (not just lists — we want consumer groups)
  • An Elixir Application with a proper OTP supervision tree
  • A QueueConsumer GenServer that polls Redis and dispatches work
  • A WorkerSupervisor (DynamicSupervisor) that spawns and monitors per-job workers
  • A JobWorker GenServer that processes a job, retries on failure, and dead-letters after max attempts
  • A Telemetry integration so you can see what's actually happening

No Oban, no Exq. We're building the layer below so you understand what those libraries are doing.


Architecture Overview

┌─────────────────────────────────────────────────────────┐
│ Node.js Producer API │
│ POST /jobs → Redis XADD → Stream: "jobs:work" │
└─────────────────────────────────────────────────────────┘
 │ Redis Streams
 ▼
┌─────────────────────────────────────────────────────────┐
│ Elixir OTP Application │
│ │
│ Application (supervisor) │
│ ├── RedisPool (Redix connections) │
│ ├── QueueConsumer (GenServer — polls + dispatches) │
│ └── WorkerSupervisor (DynamicSupervisor) │
│ ├── JobWorker<job_id_1> (GenServer) │
│ ├── JobWorker<job_id_2> (GenServer) │
│ └── JobWorker<job_id_n> (GenServer) │
└─────────────────────────────────────────────────────────┘

Redis Streams give us persistent, consumer-group-aware queuing. A job isn't acknowledged until the worker finishes it — crash the worker mid-job and Redis redelivers it on restart.


Part 1: The Node.js Producer

Project Setup

mkdir job-producer && cd job-producer
npm init -y
npm install express ioredis ulid zod

Redis Stream Producer

We use XADD to append jobs to a Redis stream. Unlike LPUSH/RPUSH, streams give us:

  • Persistent, ordered log of all jobs (not consumed on read)
  • Consumer groups (multiple consumers, each gets different jobs)
  • Built-in pending entry list (PEL) — unacknowledged jobs are trackable
// src/redis.js
const Redis = require('ioredis');

const redis = new Redis({
 host: process.env.REDIS_HOST || 'localhost',
 port: parseInt(process.env.REDIS_PORT || '6379'),
 maxRetriesPerRequest: 3,
 retryStrategy: (times) => Math.min(times * 50, 2000),
 lazyConnect: false,
});

redis.on('error', (err) => console.error('[redis] error:', err.message));
redis.on('connect', () => console.log('[redis] connected'));

module.exports = redis;
// src/jobs.js
const { ulid } = require('ulid');
const redis = require('./redis');
const { z } = require('zod');

const STREAM_KEY = 'jobs:work';
const MAX_LEN = 10_000; // cap stream length, trim old entries

// Job schema — validate before enqueuing
const JobSchema = z.object({
 type: z.enum(['email', 'report', 'webhook', 'thumbnail']),
 payload: z.record(z.unknown()),
 priority: z.number().int().min(1).max(10).default(5),
});

async function enqueueJob(rawInput) {
 const parsed = JobSchema.parse(rawInput); // throws ZodError if invalid

 const job = {
 id: ulid(), // sortable, unique job ID
 type: parsed.type,
 payload: JSON.stringify(parsed.payload),
 priority: String(parsed.priority),
 enqueued_at: new Date().toISOString(),
 attempts: '0',
 };

 // XADD stream MAXLEN ~ 10000 * id field value field value ...
 // '*' tells Redis to auto-generate the stream entry ID
 const entryId = await redis.xadd(
 STREAM_KEY,
 'MAXLEN', '~', String(MAX_LEN),
 '*', // auto-ID
 ...Object.entries(job).flat() // field-value pairs
 );

 console.log(`[jobs] enqueued ${job.type} job=${job.id} entry=${entryId}`);
 return { jobId: job.id, streamEntryId: entryId };
}

async function getJobStats() {
 const [length, groups] = await Promise.all([
 redis.xlen(STREAM_KEY),
 redis.xinfo('GROUPS', STREAM_KEY).catch(() => []),
 ]);

 return { stream: STREAM_KEY, length, consumerGroups: groups };
}

module.exports = { enqueueJob, getJobStats };

The API

// src/index.js
const express = require('express');
const { enqueueJob, getJobStats } = require('./jobs');

const app = express();
app.use(express.json());

// POST /jobs — enqueue a new job
app.post('/jobs', async (req, res) => {
 try {
 const result = await enqueueJob(req.body);
 res.status(202).json({ status: 'accepted', ...result });
 } catch (err) {
 if (err.name === 'ZodError') {
 return res.status(422).json({ error: 'Invalid job', issues: err.issues });
 }
 console.error('[api] enqueue error:', err);
 res.status(500).json({ error: 'Internal error' });
 }
});

// GET /jobs/stats — queue depth and consumer group info
app.get('/jobs/stats', async (req, res) => {
 const stats = await getJobStats();
 res.json(stats);
});

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => console.log(`[api] listening on :${PORT}`));

Test it:

node src/index.js &

curl -s -X POST http://localhost:3000/jobs \
 -H 'Content-Type: application/json' \
 -d '{"type":"email","payload":{"to":"user@example.com","template":"welcome"}}' \
 | jq .

# => { "status": "accepted", "jobId": "01HXYZ...", "streamEntryId": "1699...-0" }

Part 2: The Elixir/OTP Consumer

Project Setup

mix new job_consumer --sup # --sup scaffolds an Application module
cd job_consumer

The --sup flag is important — it generates a JobConsumer.Application module with a supervision tree stub. We'll fill that in.

# mix.exs
defp deps do
 [
 {:redix, "~> 1.4"},
 {:poolboy, "~> 1.5"},
 {:jason, "~> 1.4"},
 {:telemetry, "~> 1.2"},
 {:telemetry_metrics, "~> 0.6"},
 ]
end
mix deps.get

Configuration

# config/config.exs
import Config

config :job_consumer,
 redis_host: System.get_env("REDIS_HOST", "localhost"),
 redis_port: String.to_integer(System.get_env("REDIS_PORT", "6379")),
 stream_key: System.get_env("STREAM_KEY", "jobs:work"),
 consumer_group: System.get_env("CONSUMER_GROUP", "elixir-workers"),
 consumer_name: System.get_env("CONSUMER_NAME", "consumer-#{:inet.gethostname() |> elem(1)}"),
 max_concurrency: String.to_integer(System.get_env("MAX_CONCURRENCY", "10")),
 poll_interval_ms: String.to_integer(System.get_env("POLL_INTERVAL_MS", "100")),
 max_attempts: String.to_integer(System.get_env("MAX_ATTEMPTS", "3"))

The Supervision Tree

This is the heart of the OTP design. Get this right and everything else is pluggable.

# lib/job_consumer/application.ex
defmodule JobConsumer.Application do
 use Application
 require Logger

 @impl true
 def start(_type, _args) do
 config = Application.get_all_env(:job_consumer)

 children = [
 # 1. Redis connection pool — must start before anything that uses Redis
 {JobConsumer.RedisPool, config},

 # 2. Queue consumer — polls Redis, dispatches to WorkerSupervisor
 # Depends on RedisPool being up; supervision order matters
 {JobConsumer.QueueConsumer, config},

 # 3. Dynamic supervisor — spawns/monitors per-job worker processes
 {JobConsumer.WorkerSupervisor, config},

 # 4. Telemetry — attach handlers after workers are up
 JobConsumer.Telemetry,
 ]

 # :one_for_one — if one child crashes, only restart that child
 # This is correct here: a crashed QueueConsumer shouldn't kill the WorkerSupervisor
 # and vice versa. The RedisPool restart policy handles reconnection.
 opts = [
 strategy: :one_for_one,
 name: JobConsumer.Supervisor,
 max_restarts: 10,
 max_seconds: 60,
 ]

 Logger.info("[app] starting supervision tree")
 Supervisor.start_link(children, opts)
 end
end

A note on strategy choice: :one_for_one is right here because our children are loosely coupled — the QueueConsumer and WorkerSupervisor don't share state. If we had children where a crash in one makes the others' state invalid, we'd use :one_for_all (restart everyone) or :rest_for_one (restart the crashed child and all children started after it).


The Redis Connection Pool

We use Poolboy to maintain a pool of Redix connections. One connection handles one command at a time; pooling gives us concurrency.

# lib/job_consumer/redis_pool.ex
defmodule JobConsumer.RedisPool do
 @pool_name :redix_pool

 def child_spec(config) do
 pool_opts = [
 name: {:local, @pool_name},
 worker_module: JobConsumer.RedisWorker,
 size: 10, # idle connections
 max_overflow: 5, # burst connections
 ]

 redis_opts = [
 host: config[:redis_host],
 port: config[:redis_port],
 ]

 :poolboy.child_spec(@pool_name, pool_opts, redis_opts)
 end

 # Execute a Redis command, borrowing a connection from the pool
 def command(cmd) do
 :poolboy.transaction(@pool_name, fn worker ->
 Redix.command(worker, cmd)
 end)
 end

 # Pipeline multiple commands in one round trip
 def pipeline(cmds) do
 :poolboy.transaction(@pool_name, fn worker ->
 Redix.pipeline(worker, cmds)
 end)
 end
end
# lib/job_consumer/redis_worker.ex
defmodule JobConsumer.RedisWorker do
 use GenServer

 def start_link(redis_opts) do
 GenServer.start_link(__MODULE__, redis_opts)
 end

 @impl true
 def init(opts) do
 host = Keyword.get(opts, :host, "localhost")
 port = Keyword.get(opts, :port, 6379)

 case Redix.start_link(host: host, port: port) do
 {:ok, conn} -> {:ok, conn}
 {:error, reason} -> {:stop, reason}
 end
 end

 # Delegate all GenServer calls to the Redix connection
 @impl true
 def handle_call(request, from, conn) do
 GenServer.reply(from, Redix.command(conn, request))
 {:noreply, conn}
 end
end

Ensuring the Consumer Group Exists

Redis consumer groups must be created before XREADGROUP can be called. We do this lazily in the QueueConsumer init:

# lib/job_consumer/stream.ex
defmodule JobConsumer.Stream do
 require Logger

 @doc """
 Ensure the consumer group exists on the stream.
 XGROUP CREATE with MKSTREAM creates the stream if it doesn't exist yet.
 '$' means 'start from new messages only' (use '0' to reprocess all).
 """
 def ensure_consumer_group!(stream_key, group_name) do
 case JobConsumer.RedisPool.command(
 ["XGROUP", "CREATE", stream_key, group_name, "$", "MKSTREAM"]
 ) do
 {:ok, "OK"} ->
 Logger.info("[stream] created consumer group '#{group_name}' on '#{stream_key}'")

 {:error, %Redix.Error{message: "BUSYGROUP" <> _}} ->
 # Group already exists — this is fine, not an error
 :ok

 {:error, reason} ->
 raise "Failed to create consumer group: #{inspect(reason)}"
 end
 end

 @doc """
 Read up to `count` new messages from the stream via consumer group.
 '>' means 'give me messages not yet delivered to any consumer'.
 """
 def read_new(stream_key, group_name, consumer_name, count \\ 10) do
 JobConsumer.RedisPool.command([
 "XREADGROUP",
 "GROUP", group_name,
 consumer_name,
 "COUNT", Integer.to_string(count),
 "BLOCK", "0", # block until messages available (ms); 0 = indefinite
 "STREAMS", stream_key,
 ">" # deliver only undelivered messages
 ])
 end

 @doc """
 Re-claim messages that have been pending (delivered but not acknowledged)
 for longer than `min_idle_ms`. Used for crash recovery.
 """
 def reclaim_stale(stream_key, group_name, consumer_name, min_idle_ms \\ 30_000) do
 JobConsumer.RedisPool.command([
 "XAUTOCLAIM",
 stream_key,
 group_name,
 consumer_name,
 Integer.to_string(min_idle_ms),
 "0-0", # start from beginning of PEL
 "COUNT", "100"
 ])
 end

 @doc """
 Acknowledge a message — removes it from the Pending Entry List.
 Call this only after successful processing.
 """
 def ack(stream_key, group_name, entry_id) do
 JobConsumer.RedisPool.command(["XACK", stream_key, group_name, entry_id])
 end
end

The QueueConsumer GenServer

This is the poller. It wakes up, reads a batch of jobs from Redis, spawns a JobWorker for each via the WorkerSupervisor, and loops.

# lib/job_consumer/queue_consumer.ex
defmodule JobConsumer.QueueConsumer do
 use GenServer
 require Logger

 alias JobConsumer.{Stream, WorkerSupervisor}

 @reclaim_interval_ms 30_000 # check for stale pending entries every 30s

 # ── Public API ─────────────────────────────────────────────────────────────

 def start_link(config) do
 GenServer.start_link(__MODULE__, config, name: __MODULE__)
 end

 def status do
 GenServer.call(__MODULE__, :status)
 end

 # ── GenServer Callbacks ────────────────────────────────────────────────────

 @impl true
 def init(config) do
 stream_key = config[:stream_key]
 group_name = config[:consumer_group]
 consumer_name = config[:consumer_name]
 poll_interval = config[:poll_interval_ms]
 max_concurrent = config[:max_concurrency]

 # Ensure the consumer group exists before we start polling
 Stream.ensure_consumer_group!(stream_key, group_name)

 state = %{
 stream_key: stream_key,
 group_name: group_name,
 consumer_name: consumer_name,
 poll_interval: poll_interval,
 max_concurrent: max_concurrent,
 dispatched: 0,
 errors: 0,
 }

 # Schedule first poll immediately, then reclaim loop
 send(self(), :poll)
 Process.send_after(self(), :reclaim_stale, @reclaim_interval_ms)

 Logger.info("[consumer] started — group=#{group_name} consumer=#{consumer_name}")
 {:ok, state}
 end

 @impl true
 def handle_info(:poll, state) do
 # Backpressure: don't read more jobs than we can handle concurrently
 active_workers = WorkerSupervisor.active_count()

 new_state =
 if active_workers >= state.max_concurrent do
 Logger.debug("[consumer] at capacity (#{active_workers}/#{state.max_concurrent}), skipping poll")
 state
 else
 read_and_dispatch(state)
 end

 # Schedule next poll
 Process.send_after(self(), :poll, state.poll_interval)
 {:noreply, new_state}
 end

 @impl true
 def handle_info(:reclaim_stale, state) do
 case Stream.reclaim_stale(state.stream_key, state.group_name, state.consumer_name) do
 {:ok, [_next_id, entries, _]} when entries != [] ->
 Logger.warning("[consumer] reclaimed #{length(entries)} stale entries")
 dispatch_entries(entries, state)

 {:ok, _} ->
 :ok

 {:error, reason} ->
 Logger.error("[consumer] reclaim failed: #{inspect(reason)}")
 end

 Process.send_after(self(), :reclaim_stale, @reclaim_interval_ms)
 {:noreply, state}
 end

 @impl true
 def handle_call(:status, _from, state) do
 {:reply, Map.take(state, [:dispatched, :errors, :max_concurrent]), state}
 end

 # ── Private ────────────────────────────────────────────────────────────────

 defp read_and_dispatch(state) do
 case Stream.read_new(
 state.stream_key,
 state.group_name,
 state.consumer_name,
 state.max_concurrent
 ) do
 {:ok, [[_stream_key, entries]]} ->
 dispatch_entries(entries, state)

 {:ok, nil} ->
 # Timeout with no messages — normal
 state

 {:error, reason} ->
 Logger.error("[consumer] read error: #{inspect(reason)}")
 %{state | errors: state.errors + 1}
 end
 end

 defp dispatch_entries(entries, state) do
 Enum.reduce(entries, state, fn {entry_id, fields}, acc ->
 job = parse_job(entry_id, fields)

 case WorkerSupervisor.start_worker(job) do
 {:ok, _pid} ->
 Logger.debug("[consumer] dispatched job=#{job.id} entry=#{entry_id}")
 %{acc | dispatched: acc.dispatched + 1}

 {:error, reason} ->
 Logger.error("[consumer] dispatch failed job=#{job.id}: #{inspect(reason)}")
 %{acc | errors: acc.errors + 1}
 end
 end)
 end

 defp parse_job(entry_id, fields) do
 field_map = Enum.chunk_every(fields, 2)
 |> Enum.into(%{}, fn [k, v] -> {k, v} end)

 %{
 stream_entry_id: entry_id,
 id: field_map["id"],
 type: field_map["type"],
 payload: Jason.decode!(field_map["payload"]),
 priority: String.to_integer(field_map["priority"] || "5"),
 attempts: String.to_integer(field_map["attempts"] || "0"),
 enqueued_at: field_map["enqueued_at"],
 }
 end
end

The backpressure check (active_workers >= state.max_concurrent) is critical. Without it, a burst of 10,000 jobs would spawn 10,000 GenServer processes simultaneously. With it, we cap concurrency and let Redis hold the overflow.


The WorkerSupervisor

A DynamicSupervisor that spawns JobWorker processes on demand and supervises them independently.

# lib/job_consumer/worker_supervisor.ex
defmodule JobConsumer.WorkerSupervisor do
 use DynamicSupervisor
 require Logger

 def start_link(config) do
 DynamicSupervisor.start_link(__MODULE__, config, name: __MODULE__)
 end

 @impl true
 def init(_config) do
 # :one_for_one is the only strategy DynamicSupervisor supports
 # max_restarts/max_seconds: if a worker crashes more than 3 times in 5s,
 # the supervisor itself crashes and gets restarted by Application supervisor
 DynamicSupervisor.init(
 strategy: :one_for_one,
 max_restarts: 3,
 max_seconds: 5
 )
 end

 @doc "Spawn a supervised JobWorker for the given job map"
 def start_worker(job) do
 spec = {JobConsumer.JobWorker, job}
 DynamicSupervisor.start_child(__MODULE__, spec)
 end

 @doc "Count currently active (living) worker processes"
 def active_count do
 DynamicSupervisor.count_children(__MODULE__).active
 end

 @doc "List all active worker PIDs"
 def list_workers do
 DynamicSupervisor.which_children(__MODULE__)
 |> Enum.map(fn {_id, pid, _type, _modules} -> pid end)
 end
end

The JobWorker GenServer

This is where the actual work happens. Each job gets its own process — isolated heap, isolated failure domain, independent retry logic.

# lib/job_consumer/job_worker.ex
defmodule JobConsumer.JobWorker do
 use GenServer, restart: :temporary # don't auto-restart crashed workers
 require Logger

 alias JobConsumer.{Stream, DeadLetter}

 @base_retry_delay_ms 1_000
 @max_attempts Application.compile_env(:job_consumer, :max_attempts, 3)

 # ── Public API ─────────────────────────────────────────────────────────────

 def start_link(job) do
 GenServer.start_link(__MODULE__, job)
 end

 # ── GenServer Callbacks ────────────────────────────────────────────────────

 @impl true
 def init(job) do
 # Process the job immediately after init — don't block the supervisor
 send(self(), :process)
 {:ok, job}
 end

 @impl true
 def handle_info(:process, job) do
 start_time = System.monotonic_time()

 :telemetry.execute(
 [:job_consumer, :job, :start],
 %{system_time: System.system_time()},
 %{job_type: job.type, job_id: job.id}
 )

 result =
 try do
 {:ok, execute_job(job)}
 rescue
 e -> {:error, Exception.format(:error, e, __STACKTRACE__)}
 catch
 :exit, reason -> {:error, "exit: #{inspect(reason)}"}
 end

 duration = System.monotonic_time() - start_time

 case result do
 {:ok, _output} ->
 handle_success(job, duration)

 {:error, reason} ->
 handle_failure(job, reason, duration)
 end

 # Worker is done — stop normally. The supervisor does not restart :temporary workers.
 {:stop, :normal, job}
 end

 # ── Job Dispatch ───────────────────────────────────────────────────────────

 defp execute_job(%{type: "email"} = job) do
 # Simulate: in production, call your mailer here
 %{to: to, template: template} = atomize(job.payload)
 Logger.info("[worker] sending email to=#{to} template=#{template}")
 Process.sleep(100) # simulate I/O
 %{sent_to: to, template: template}
 end

 defp execute_job(%{type: "report"} = job) do
 %{report_id: id} = atomize(job.payload)
 Logger.info("[worker] generating report id=#{id}")
 Process.sleep(500)
 %{report_id: id, rows: :rand.uniform(10_000)}
 end

 defp execute_job(%{type: "webhook"} = job) do
 %{url: url} = atomize(job.payload)
 Logger.info("[worker] dispatching webhook to=#{url}")
 # In production: HTTP call here; raise on non-2xx for retry
 Process.sleep(200)
 %{url: url, status: 200}
 end

 defp execute_job(%{type: type}) do
 raise "Unknown job type: #{type}"
 end

 # ── Success / Failure Handling ─────────────────────────────────────────────

 defp handle_success(job, duration_native) do
 duration_ms = System.convert_time_unit(duration_native, :native, :millisecond)

 # Acknowledge the message — removes it from Redis PEL
 case Stream.ack(config(:stream_key), config(:consumer_group), job.stream_entry_id) do
 {:ok, 1} ->
 Logger.info("[worker] ✓ job=#{job.id} type=#{job.type} duration=#{duration_ms}ms")

 {:ok, 0} ->
 Logger.warning("[worker] ack returned 0 for job=#{job.id} — already acked?")

 {:error, reason} ->
 Logger.error("[worker] ack failed job=#{job.id}: #{inspect(reason)}")
 end

 :telemetry.execute(
 [:job_consumer, :job, :success],
 %{duration: duration_native},
 %{job_type: job.type, job_id: job.id}
 )
 end

 defp handle_failure(job, reason, duration_native) do
 attempts = job.attempts + 1
 duration_ms = System.convert_time_unit(duration_native, :native, :millisecond)

 Logger.error("[worker] ✗ job=#{job.id} type=#{job.type} attempt=#{attempts} reason=#{inspect(reason)}")

 :telemetry.execute(
 [:job_consumer, :job, :failure],
 %{duration: duration_native},
 %{job_type: job.type, job_id: job.id, attempt: attempts, reason: reason}
 )

 if attempts >= @max_attempts do
 # Max attempts reached — move to dead letter stream, then ack to clear PEL
 Logger.error("[worker] dead-lettering job=#{job.id} after #{attempts} attempts")
 DeadLetter.push(job, reason)
 Stream.ack(config(:stream_key), config(:consumer_group), job.stream_entry_id)
 else
 # Exponential backoff retry: re-enqueue with incremented attempts
 # We ack the current entry and re-add to the stream with updated attempts count
 delay_ms = @base_retry_delay_ms * :math.pow(2, attempts) |> round()
 Logger.info("[worker] retrying job=#{job.id} in #{delay_ms}ms (attempt #{attempts}/#{@max_attempts})")

 Stream.ack(config(:stream_key), config(:consumer_group), job.stream_entry_id)

 # Re-enqueue after delay — spawn a detached process so we don't block
 job_to_retry = %{job | attempts: attempts}
 Task.start(fn ->
 Process.sleep(delay_ms)
 re_enqueue(job_to_retry)
 end)
 end
 end

 defp re_enqueue(job) do
 fields = [
 "id", job.id,
 "type", job.type,
 "payload", Jason.encode!(job.payload),
 "priority", Integer.to_string(job.priority),
 "enqueued_at", job.enqueued_at,
 "attempts", Integer.to_string(job.attempts),
 ]

 JobConsumer.RedisPool.command(
 ["XADD", config(:stream_key), "MAXLEN", "~", "10000", "*" | fields]
 )
 end

 defp atomize(map) do
 Map.new(map, fn {k, v} -> {String.to_existing_atom(k), v} end)
 end

 defp config(key), do: Application.fetch_env!(:job_consumer, key)
end

The restart: :temporary option on use GenServer is essential. It tells the WorkerSupervisor not to automatically restart a worker that exits — whether normally or abnormally. We want full control over retry logic inside the worker itself. Auto-restart would bypass our backoff and dead-letter logic.


Dead Letter Queue

Jobs that exhaust retries go here for inspection, not into the void:

# lib/job_consumer/dead_letter.ex
defmodule JobConsumer.DeadLetter do
 require Logger

 @stream_key "jobs:dead"

 def push(job, reason) do
 fields = [
 "original_id", job.id,
 "type", job.type,
 "payload", Jason.encode!(job.payload),
 "attempts", Integer.to_string(job.attempts),
 "failed_at", DateTime.utc_now() |> DateTime.to_iso8601(),
 "reason", inspect(reason),
 ]

 case JobConsumer.RedisPool.command(["XADD", @stream_key, "*" | fields]) do
 {:ok, entry_id} ->
 Logger.info("[dead_letter] stored job=#{job.id} at entry=#{entry_id}")
 {:ok, entry_id}

 {:error, reason} ->
 Logger.error("[dead_letter] failed to store job=#{job.id}: #{inspect(reason)}")
 {:error, reason}
 end
 end

 def list(count \\ 100) do
 case JobConsumer.RedisPool.command(["XRANGE", @stream_key, "-", "+", "COUNT", Integer.to_string(count)]) do
 {:ok, entries} -> {:ok, Enum.map(entries, &parse_entry/1)}
 error -> error
 end
 end

 defp parse_entry({entry_id, fields}) do
 field_map = Enum.chunk_every(fields, 2)
 |> Enum.into(%{}, fn [k, v] -> {k, v} end)
 Map.put(field_map, "entry_id", entry_id)
 end
end

Telemetry

Wire up metrics so you actually know what's happening:

# lib/job_consumer/telemetry.ex
defmodule JobConsumer.Telemetry do
 use GenServer
 require Logger

 def start_link(_opts) do
 GenServer.start_link(__MODULE__, [], name: __MODULE__)
 end

 @impl true
 def init(_) do
 events = [
 [:job_consumer, :job, :start],
 [:job_consumer, :job, :success],
 [:job_consumer, :job, :failure],
 ]

 :telemetry.attach_many(
 "job-consumer-logger",
 events,
 &__MODULE__.handle_event/4,
 nil
 )

 {:ok, %{processed: 0, failed: 0}}
 end

 def handle_event([:job_consumer, :job, :start], _measurements, meta, _config) do
 Logger.debug("[telemetry] job started type=#{meta.job_type} id=#{meta.job_id}")
 end

 def handle_event([:job_consumer, :job, :success], measurements, meta, _config) do
 duration_ms = System.convert_time_unit(measurements.duration, :native, :millisecond)
 Logger.info("[telemetry] job success type=#{meta.job_type} id=#{meta.job_id} duration=#{duration_ms}ms")
 # In production: emit to StatsD, Prometheus, Datadog, etc.
 end

 def handle_event([:job_consumer, :job, :failure], _measurements, meta, _config) do
 Logger.warning("[telemetry] job failure type=#{meta.job_type} id=#{meta.job_id} attempt=#{meta.attempt}")
 end
end

Watching the Supervision Tree in Action

Start the Elixir application:

mix run --no-halt

In another terminal, enqueue a batch of jobs:

for i in $(seq 1 20); do
 curl -s -X POST http://localhost:3000/jobs \
 -H 'Content-Type: application/json' \
 -d "{\"type\":\"email\",\"payload\":{\"to\":\"user${i}@example.com\",\"template\":\"welcome\"}}" \
 > /dev/null
done
echo "Enqueued 20 jobs"

Observe the Elixir logs — you'll see workers spawning, processing, and acknowledging:

[consumer] dispatched job=01HX... entry=1699...-0
[worker] sending email to=user1@example.com template=welcome
[worker] ✓ job=01HX... type=email duration=103ms
[consumer] dispatched job=01HY... entry=1699...-1
...

Now simulate a crash. In iex:

# Kill the QueueConsumer process directly
Process.whereis(JobConsumer.QueueConsumer) |> Process.exit(:kill)

# The Application supervisor restarts it automatically within milliseconds
# Watch the logs:
# [consumer] started — group=elixir-workers consumer=consumer-hostname

The supervision tree just restarted the consumer. Any jobs that were mid-flight but not yet acknowledged are still in the Redis PEL — the reclaim_stale loop will pick them up on the next cycle.


The Failure Matrix

Failure What happens Recovery
JobWorker crashes mid-job Job stays in Redis PEL (not acked) XAUTOCLAIM reclaims after 30s
JobWorker raises exception try/rescue catches it, retry logic runs Exponential backoff, then dead letter
QueueConsumer crashes App supervisor restarts it Polls resume; PEL intact in Redis
WorkerSupervisor crashes App supervisor restarts it All workers lost; PEL covers in-flight jobs
Redis connection drops Redix auto-reconnects; pool returns errors Consumer logs errors, retries next poll
Job exceeds max_attempts Moved to jobs:dead stream, PEL cleared Manual inspection + replay
Burst of jobs Backpressure check caps concurrency Overflow sits in Redis stream safely

Every cell in that table has code behind it in what we built. None of it relies on hope.


Where to Take It Next

  • Priorities: Add a separate stream per priority level (jobs:high, jobs:normal, jobs:low). Poll high-priority first; fall through to lower streams only when high is empty.
  • Observability: Replace the Telemetry logger with a Prometheus exporter. Track queue depth (Redis XLEN), processing rate, p99 duration per job type.
  • Horizontal scaling: Run multiple Elixir nodes. Each gets a unique consumer_name. Redis consumer groups handle deduplication automatically — no coordinator needed.
  • Rate limiting: Add a RateLimiter GenServer that tracks jobs-per-second per job type and blocks the QueueConsumer dispatch when limits are hit.
  • Job cancellation: XDEL a stream entry by ID before it's claimed. Workers should check a cancellation flag at the start of execute_job.

The OTP supervision tree you have now is the skeleton that all of this hangs on. Add a new capability → add a supervised child. Something breaks → the tree heals it. That's the promise, and it's not magic — it's just processes all the way down.