VOOZH about

URL: https://dev.to/young_gao/building-a-production-ready-message-queue-consumer-in-go-1j28

⇱ Building a Production-Ready Message Queue Consumer in Go (2026 Guide) - DEV Community


In the previous article, we explored distributed tracing with OpenTelemetry. Today we tackle one of the most critical — and most frequently botched — pieces of backend infrastructure: the message queue consumer.

Every production system eventually outgrows synchronous request-response. Order processing, email delivery, image resizing, webhook fanout — these all belong in a queue. But the consumer side is where things get ugly. Messages arrive out of order. Workers crash mid-processing. Duplicates sneak through. Your "simple consumer" becomes a tangle of retries, panics, and lost messages.

This article builds a production-grade NATS JetStream consumer in Go, piece by piece, covering the patterns that keep it alive at 3 AM.

Why Message Queues Matter

Synchronous architectures hit a wall when:

  • Latency budgets are tight. Your API shouldn't block for 30 seconds while a PDF renders.
  • Failure domains bleed. A downstream service outage shouldn't take your entire API down.
  • Load is bursty. Black Friday traffic shouldn't require Black Friday compute 365 days a year.

Message queues decouple producers from consumers. The producer writes a message and moves on. The consumer processes it at its own pace, retries on failure, and scales independently. This is table stakes for any system that handles real traffic.

We'll use NATS JetStream as our concrete example. It's operationally simpler than RabbitMQ or Kafka, supports persistent streams with at-least-once delivery, and has an excellent Go client. The patterns apply universally.

The Architecture

Here's what we're building:

Producer → NATS JetStream Stream → Consumer Group
 ├── Worker Pool (N goroutines)
 ├── Retry with Exponential Backoff
 ├── Dead Letter Queue
 ├── Idempotency Check
 └── Metrics + Tracing

Let's define the core interfaces first:

package consumer

import (
 "context"
 "time"
)

// Message represents a queue message with metadata.
type Message struct {
 ID string
 Subject string
 Data []byte
 Headers map[string]string
 Timestamp time.Time
 Attempt int
}

// Handler processes a single message. Return an error to trigger retry.
type Handler func(ctx context.Context, msg Message) error

// IdempotencyStore tracks processed message IDs.
type IdempotencyStore interface {
 Exists(ctx context.Context, messageID string) (bool, error)
 Mark(ctx context.Context, messageID string, ttl time.Duration) error
}

Designing Idempotent Consumers

At-least-once delivery means your handler will receive duplicates. Network blips, consumer restarts, rebalancing — all cause redelivery. Your handler must be idempotent: processing the same message twice produces the same result.

Two strategies work in practice:

1. Deduplication at the consumer level — check a store before processing:

type RedisIdempotencyStore struct {
 client *redis.Client
}

func (s *RedisIdempotencyStore) Exists(ctx context.Context, id string) (bool, error) {
 val, err := s.client.Exists(ctx, "idem:"+id).Result()
 if err != nil {
 return false, fmt.Errorf("idempotency check failed: %w", err)
 }
 return val > 0, nil
}

func (s *RedisIdempotencyStore) Mark(ctx context.Context, id string, ttl time.Duration) error {
 return s.client.Set(ctx, "idem:"+id, "1", ttl).Err()
}

2. Idempotent operations — design your writes so repeats are safe. Use INSERT ... ON CONFLICT DO NOTHING, conditional updates with version checks, or deterministic IDs derived from message content.

The first approach is simpler to bolt on. The second is more robust. Production systems usually combine both: a deduplication layer as a fast path, with idempotent database operations as the safety net.

The TTL on the idempotency key matters. Set it too short, and late redeliveries slip through. Set it too long, and your store grows unbounded. Match it to your stream's max redelivery window — typically 24–72 hours.

The Consumer Core

Now let's build the consumer. The Config struct captures every tunable:

type Config struct {
 // NATS connection
 NATSUrl string
 StreamName string
 Subject string
 Durable string // consumer group name

 // Concurrency
 WorkerCount int

 // Retry
 MaxRetries int
 InitialBackoff time.Duration
 MaxBackoff time.Duration
 BackoffFactor float64

 // Idempotency
 IdempotencyTTL time.Duration

 // Observability
 ServiceName string
}

func DefaultConfig() Config {
 return Config{
 WorkerCount: 10,
 MaxRetries: 5,
 InitialBackoff: 1 * time.Second,
 MaxBackoff: 60 * time.Second,
 BackoffFactor: 2.0,
 IdempotencyTTL: 24 * time.Hour,
 ServiceName: "queue-consumer",
 }
}

The consumer struct ties everything together:

type Consumer struct {
 cfg Config
 js nats.JetStreamContext
 handler Handler
 idemStore IdempotencyStore
 metrics *Metrics
 tracer trace.Tracer
 logger *slog.Logger
 sub *nats.Subscription
 wg sync.WaitGroup
 msgCh chan *nats.Msg
 shutdownCh chan struct{}
}

func New(nc *nats.Conn, cfg Config, handler Handler, idemStore IdempotencyStore) (*Consumer, error) {
 js, err := nc.JetStream()
 if err != nil {
 return nil, fmt.Errorf("jetstream init: %w", err)
 }

 tp := otel.GetTracerProvider()
 meter := otel.GetMeterProvider().Meter(cfg.ServiceName)

 return &Consumer{
 cfg: cfg,
 js: js,
 handler: handler,
 idemStore: idemStore,
 metrics: NewMetrics(meter),
 tracer: tp.Tracer(cfg.ServiceName),
 logger: slog.Default(),
 msgCh: make(chan *nats.Msg, cfg.WorkerCount*2),
 shutdownCh: make(chan struct{}),
 }, nil
}

Retry with Exponential Backoff

The retry logic lives in the message processing path. NATS JetStream supports server-side NakWithDelay, which tells the server to redeliver after a specified duration. This is vastly superior to client-side retry loops because it survives consumer restarts:

func (c *Consumer) processMessage(natsMsg *nats.Msg) {
 ctx, span := c.tracer.Start(context.Background(), "process_message",
 trace.WithAttributes(
 attribute.String("subject", natsMsg.Subject),
 ),
 )
 defer span.End()

 meta, err := natsMsg.Metadata()
 if err != nil {
 c.logger.Error("failed to read metadata", "error", err)
 natsMsg.Nak()
 return
 }

 attempt := int(meta.NumDelivered)
 msgID := natsMsg.Header.Get("Nats-Msg-Id")
 if msgID == "" {
 msgID = fmt.Sprintf("%s-%d", meta.Stream, meta.Sequence.Stream)
 }

 span.SetAttributes(
 attribute.String("message_id", msgID),
 attribute.Int("attempt", attempt),
 )

 // Idempotency check
 if exists, err := c.idemStore.Exists(ctx, msgID); err != nil {
 c.logger.Error("idempotency check failed", "error", err, "msg_id", msgID)
 span.RecordError(err)
 // Retry — the store might be temporarily down
 c.nakWithBackoff(natsMsg, attempt)
 return
 } else if exists {
 c.logger.Debug("duplicate message, skipping", "msg_id", msgID)
 c.metrics.duplicatesSkipped.Add(ctx, 1)
 natsMsg.Ack()
 return
 }

 msg := Message{
 ID: msgID,
 Subject: natsMsg.Subject,
 Data: natsMsg.Data,
 Headers: flattenHeaders(natsMsg.Header),
 Timestamp: meta.Timestamp,
 Attempt: attempt,
 }

 // Process
 start := time.Now()
 err = c.handler(ctx, msg)
 duration := time.Since(start)

 c.metrics.processingDuration.Record(ctx, duration.Seconds(),
 metric.WithAttributes(attribute.Bool("success", err == nil)),
 )

 if err != nil {
 span.RecordError(err)
 span.SetStatus(codes.Error, err.Error())
 c.metrics.processingErrors.Add(ctx, 1)

 if attempt >= c.cfg.MaxRetries {
 c.logger.Error("max retries exceeded, sending to DLQ",
 "msg_id", msgID, "error", err, "attempts", attempt,
 )
 c.sendToDLQ(ctx, natsMsg, err)
 natsMsg.Ack() // Ack original to stop redelivery
 return
 }

 c.logger.Warn("processing failed, scheduling retry",
 "msg_id", msgID, "error", err, "attempt", attempt,
 )
 c.nakWithBackoff(natsMsg, attempt)
 return
 }

 // Success — mark as processed and ack
 if err := c.idemStore.Mark(ctx, msgID, c.cfg.IdempotencyTTL); err != nil {
 c.logger.Error("failed to mark idempotency", "error", err, "msg_id", msgID)
 // Don't fail the message — it was processed successfully.
 // Worst case: a duplicate gets processed again idempotently.
 }

 c.metrics.messagesProcessed.Add(ctx, 1)
 natsMsg.Ack()
}

func (c *Consumer) nakWithBackoff(msg *nats.Msg, attempt int) {
 delay := c.cfg.InitialBackoff
 for i := 1; i < attempt; i++ {
 delay = time.Duration(float64(delay) * c.cfg.BackoffFactor)
 if delay > c.cfg.MaxBackoff {
 delay = c.cfg.MaxBackoff
 break
 }
 }
 msg.NakWithDelay(delay)
}

Key decisions here:

  • Server-side retry via NakWithDelay — the consumer doesn't hold the message during backoff. If this consumer dies, another picks it up.
  • Ack after DLQ publish — we ack the original message to stop the redelivery loop. The DLQ is the new source of truth.
  • Idempotency mark happens after success — if the mark fails, the message might be reprocessed. That's fine because operations are idempotent.

Dead Letter Queue

Messages that fail all retries go to a separate stream. Your ops team reviews them, fixes the bug, and replays:

func (c *Consumer) sendToDLQ(ctx context.Context, original *nats.Msg, processErr error) {
 _, span := c.tracer.Start(ctx, "send_to_dlq")
 defer span.End()

 headers := nats.Header{}
 // Preserve original headers
 for k, v := range original.Header {
 headers[k] = v
 }
 headers.Set("X-DLQ-Error", processErr.Error())
 headers.Set("X-DLQ-Timestamp", time.Now().UTC().Format(time.RFC3339))
 headers.Set("X-Original-Subject", original.Subject)

 dlqMsg := &nats.Msg{
 Subject: fmt.Sprintf("dlq.%s", c.cfg.Subject),
 Data: original.Data,
 Header: headers,
 }

 if _, err := c.js.PublishMsg(dlqMsg); err != nil {
 c.logger.Error("failed to publish to DLQ", "error", err)
 span.RecordError(err)
 c.metrics.dlqFailures.Add(ctx, 1)
 // This is bad. The message will be lost after ack.
 // In practice, also log the full message body for manual recovery.
 c.logger.Error("LOST MESSAGE — manual recovery required",
 "subject", original.Subject,
 "data", string(original.Data),
 )
 }
 c.metrics.dlqMessages.Add(ctx, 1)
}

Create the DLQ stream during setup:

func EnsureStreams(js nats.JetStreamContext, streamName, subject string) error {
 // Main stream
 _, err := js.AddStream(&nats.StreamConfig{
 Name: streamName,
 Subjects: []string{subject},
 Retention: nats.WorkQueuePolicy,
 MaxAge: 72 * time.Hour,
 })
 if err != nil {
 return fmt.Errorf("create main stream: %w", err)
 }

 // DLQ stream
 _, err = js.AddStream(&nats.StreamConfig{
 Name: streamName + "_dlq",
 Subjects: []string{"dlq." + subject},
 Retention: nats.LimitsPolicy,
 MaxAge: 30 * 24 * time.Hour, // Keep DLQ messages for 30 days
 })
 if err != nil {
 return fmt.Errorf("create DLQ stream: %w", err)
 }
 return nil
}

Concurrency Control with Worker Pools

A single goroutine consuming messages wastes most of its time waiting on I/O. A worker pool lets you process N messages concurrently while keeping backpressure under control:

func (c *Consumer) Start(ctx context.Context) error {
 sub, err := c.js.PullSubscribe(
 c.cfg.Subject,
 c.cfg.Durable,
 nats.ManualAck(),
 nats.AckWait(30*time.Second),
 nats.MaxDeliver(c.cfg.MaxRetries+1),
 )
 if err != nil {
 return fmt.Errorf("subscribe: %w", err)
 }
 c.sub = sub

 // Start workers
 for i := 0; i < c.cfg.WorkerCount; i++ {
 c.wg.Add(1)
 go c.worker(i)
 }

 // Fetch loop — pulls messages from NATS and fans out to workers
 c.wg.Add(1)
 go c.fetchLoop(ctx)

 c.logger.Info("consumer started",
 "workers", c.cfg.WorkerCount,
 "subject", c.cfg.Subject,
 "durable", c.cfg.Durable,
 )
 return nil
}

func (c *Consumer) fetchLoop(ctx context.Context) {
 defer c.wg.Done()
 defer close(c.msgCh)

 for {
 select {
 case <-ctx.Done():
 return
 case <-c.shutdownCh:
 return
 default:
 }

 msgs, err := c.sub.Fetch(c.cfg.WorkerCount, nats.MaxWait(5*time.Second))
 if err != nil {
 if errors.Is(err, nats.ErrTimeout) {
 continue // No messages available, poll again
 }
 c.logger.Error("fetch error", "error", err)
 time.Sleep(time.Second) // Back off on unexpected errors
 continue
 }

 for _, msg := range msgs {
 select {
 case c.msgCh <- msg:
 case <-ctx.Done():
 return
 case <-c.shutdownCh:
 return
 }
 }
 }
}

func (c *Consumer) worker(id int) {
 defer c.wg.Done()

 c.logger.Debug("worker started", "worker_id", id)
 for msg := range c.msgCh {
 c.processMessage(msg)
 }
 c.logger.Debug("worker stopped", "worker_id", id)
}

The Fetch batch size matches the worker count. This keeps the channel buffer from growing unbounded while ensuring every worker stays fed. The MaxWait on Fetch prevents tight-looping when the stream is empty.

Graceful Shutdown Integration

Following the patterns from article #14, we integrate with OS signals to drain in-flight work before exiting:

func (c *Consumer) Shutdown(ctx context.Context) error {
 c.logger.Info("shutting down consumer...")

 // Signal the fetch loop to stop
 close(c.shutdownCh)

 // Drain the subscription — no new messages will be delivered
 if c.sub != nil {
 if err := c.sub.Drain(); err != nil {
 c.logger.Error("subscription drain failed", "error", err)
 }
 }

 // Wait for in-flight messages to complete (or context deadline)
 done := make(chan struct{})
 go func() {
 c.wg.Wait()
 close(done)
 }()

 select {
 case <-done:
 c.logger.Info("consumer shutdown complete")
 return nil
 case <-ctx.Done():
 c.logger.Warn("shutdown timed out, some messages may be redelivered")
 return ctx.Err()
 }
}

Wire it into main:

func main() {
 ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
 defer stop()

 nc, _ := nats.Connect(nats.DefaultURL)
 defer nc.Close()

 cfg := consumer.DefaultConfig()
 cfg.NATSUrl = nats.DefaultURL
 cfg.StreamName = "ORDERS"
 cfg.Subject = "orders.process"
 cfg.Durable = "order-processor"

 store := &RedisIdempotencyStore{client: redis.NewClient(&redis.Options{Addr: "localhost:6379"})}

 c, err := consumer.New(nc, cfg, handleOrder, store)
 if err != nil {
 log.Fatal(err)
 }

 if err := c.Start(ctx); err != nil {
 log.Fatal(err)
 }

 <-ctx.Done()

 shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
 defer cancel()
 c.Shutdown(shutdownCtx)
}

func handleOrder(ctx context.Context, msg consumer.Message) error {
 var order Order
 if err := json.Unmarshal(msg.Data, &order); err != nil {
 return fmt.Errorf("unmarshal order: %w", err) // Will retry, but won't help — consider a non-retryable error type
 }
 // Process the order...
 return nil
}

A subtle point: the handleOrder comment hints at an important production refinement. Some errors are non-retryable — bad JSON, invalid business data, schema mismatches. Retrying won't help. A production consumer should distinguish these:

type PermanentError struct {
 Err error
}

func (e *PermanentError) Error() string { return e.Err.Error() }
func (e *PermanentError) Unwrap() error { return e.Err }

// In processMessage, before retry logic:
var permErr *PermanentError
if errors.As(err, &permErr) {
 c.logger.Error("permanent error, sending to DLQ immediately",
 "msg_id", msgID, "error", err,
 )
 c.sendToDLQ(ctx, natsMsg, err)
 natsMsg.Ack()
 return
}

Observability: Metrics and Tracing

The metrics struct captures the four golden signals for a queue consumer:

type Metrics struct {
 messagesProcessed metric.Int64Counter
 processingErrors metric.Int64Counter
 processingDuration metric.Float64Histogram
 duplicatesSkipped metric.Int64Counter
 dlqMessages metric.Int64Counter
 dlqFailures metric.Int64Counter
 inflightMessages metric.Int64UpDownCounter
}

func NewMetrics(meter metric.Meter) *Metrics {
 m := &Metrics{}
 m.messagesProcessed, _ = meter.Int64Counter("consumer.messages.processed",
 metric.WithDescription("Total messages successfully processed"))
 m.processingErrors, _ = meter.Int64Counter("consumer.messages.errors",
 metric.WithDescription("Total processing errors"))
 m.processingDuration, _ = meter.Float64Histogram("consumer.messages.duration_seconds",
 metric.WithDescription("Message processing duration"),
 metric.WithExplicitBucketBoundaries(0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30))
 m.duplicatesSkipped, _ = meter.Int64Counter("consumer.messages.duplicates",
 metric.WithDescription("Duplicate messages skipped"))
 m.dlqMessages, _ = meter.Int64Counter("consumer.dlq.sent",
 metric.WithDescription("Messages sent to DLQ"))
 m.dlqFailures, _ = meter.Int64Counter("consumer.dlq.failures",
 metric.WithDescription("Failed DLQ publishes — potential message loss"))
 m.inflightMessages, _ = meter.Int64UpDownCounter("consumer.messages.inflight",
 metric.WithDescription("Currently processing messages"))
 return m
}

The dlq.failures counter deserves an alert. If this fires, you're losing messages. Set a PagerDuty threshold at > 0.

For tracing, the spans we set in processMessage create a trace per message. To connect producer and consumer traces, propagate the trace context through NATS headers:

// Producer side
func publishWithTrace(ctx context.Context, js nats.JetStreamContext, subject string, data []byte) error {
 msg := &nats.Msg{
 Subject: subject,
 Data: data,
 Header: nats.Header{},
 }
 otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(msg.Header))
 _, err := js.PublishMsg(msg)
 return err
}

// Consumer side — in processMessage, replace context.Background():
ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(natsMsg.Header))
ctx, span := c.tracer.Start(ctx, "process_message", ...)

Now your traces show the full journey: API request → publish → consume → process → downstream calls. This is indispensable for debugging latency in async pipelines.

Testing Strategies

Queue consumers are notoriously hard to test. Here's a layered approach.

Unit test the handler in isolation — no queue, no infrastructure:

func TestHandleOrder_Success(t *testing.T) {
 msg := consumer.Message{
 ID: "test-1",
 Data: []byte(`{"id":"order-123","amount":99.99}`),
 }

 err := handleOrder(context.Background(), msg)
 assert.NoError(t, err)
 // Assert side effects: database writes, API calls, etc.
}

func TestHandleOrder_InvalidJSON(t *testing.T) {
 msg := consumer.Message{
 ID: "test-2",
 Data: []byte(`not json`),
 }

 err := handleOrder(context.Background(), msg)
 assert.Error(t, err)

 var permErr *consumer.PermanentError
 assert.True(t, errors.As(err, &permErr), "bad JSON should be a permanent error")
}

Integration test with an embedded NATS server — tests the full consumer lifecycle:

func TestConsumer_ProcessAndAck(t *testing.T) {
 // Start embedded NATS
 srv, _ := server.NewServer(&server.Options{
 Port: -1,
 JetStream: true,
 StoreDir: t.TempDir(),
 })
 go srv.Start()
 defer srv.Shutdown()
 srv.ReadyForConnections(5 * time.Second)

 nc, _ := nats.Connect(srv.ClientURL())
 defer nc.Close()

 js, _ := nc.JetStream()
 consumer.EnsureStreams(js, "TEST", "test.subject")

 processed := make(chan string, 1)
 handler := func(ctx context.Context, msg consumer.Message) error {
 processed <- msg.ID
 return nil
 }

 store := &InMemoryIdempotencyStore{seen: map[string]bool{}}
 cfg := consumer.DefaultConfig()
 cfg.StreamName = "TEST"
 cfg.Subject = "test.subject"
 cfg.Durable = "test-consumer"
 cfg.WorkerCount = 1

 c, _ := consumer.New(nc, cfg, handler, store)

 ctx, cancel := context.WithCancel(context.Background())
 defer cancel()
 c.Start(ctx)

 // Publish a message
 js.Publish("test.subject", []byte(`{"key":"value"}`),
 nats.MsgId("msg-001"))

 select {
 case id := <-processed:
 assert.Equal(t, "msg-001", id)
 case <-time.After(5 * time.Second):
 t.Fatal("message not processed within timeout")
 }
}

func TestConsumer_DuplicateSkipped(t *testing.T) {
 // Same setup as above, but pre-mark the message ID
 store := &InMemoryIdempotencyStore{seen: map[string]bool{"msg-001": true}}
 // ... publish msg-001, assert it gets acked but handler is NOT called
}

func TestConsumer_RetryThenDLQ(t *testing.T) {
 // Handler returns error every time.
 // Assert message appears in DLQ stream after MaxRetries.
}

Chaos test — kill the consumer mid-processing, restart, and verify no messages are lost:

func TestConsumer_CrashRecovery(t *testing.T) {
 // 1. Publish 100 messages
 // 2. Start consumer, let it process ~50
 // 3. Cancel context (simulates crash)
 // 4. Start a new consumer with the same durable name
 // 5. Wait for all 100 to be processed (some may be processed twice)
 // 6. Assert: handler was called >= 100 times, all message IDs covered
}

The in-memory idempotency store for tests:

type InMemoryIdempotencyStore struct {
 mu sync.Mutex
 seen map[string]bool
}

func (s *InMemoryIdempotencyStore) Exists(_ context.Context, id string) (bool, error) {
 s.mu.Lock()
 defer s.mu.Unlock()
 return s.seen[id], nil
}

func (s *InMemoryIdempotencyStore) Mark(_ context.Context, id string, _ time.Duration) error {
 s.mu.Lock()
 defer s.mu.Unlock()
 s.seen[id] = true
 return nil
}

Production Checklist

Before shipping your consumer:

  • [ ] Idempotency — handler is safe to run twice on the same input
  • [ ] Permanent vs transient errors — bad data goes straight to DLQ, not through the retry loop
  • [ ] Backoff boundsMaxBackoff prevents retry storms; jitter is even better
  • [ ] DLQ monitoring — alert on dlq.sent > 0 and page on dlq.failures > 0
  • [ ] Graceful shutdown — drain subscription, wait for in-flight, then exit
  • [ ] Health check — expose /healthz that checks NATS and idempotency store connectivity
  • [ ] Consumer lag metric — monitor consumer.messages.pending via NATS admin API
  • [ ] Resource limits — bound worker count, channel buffer, and message size
  • [ ] Trace propagation — connect producer and consumer spans for end-to-end visibility

Wrapping Up

A message queue consumer is deceptively simple to prototype and deceptively hard to get right in production. The patterns in this article — idempotent processing, server-side retry with backoff, dead letter queues, bounded worker pools, graceful shutdown, and full observability — form a foundation that handles the failures real systems encounter.

The complete consumer is around 300 lines of focused Go code. No frameworks, no magic. Just clear concurrency patterns and deliberate error handling. That's the kind of code you want running at 3 AM.

Next in the series, we'll look at building a production-ready rate limiter using Redis and sliding window counters. Stay tuned.


If this article helped you, consider buying me a coffee on Ko-fi! Follow me for more production backend patterns.


You Might Also Like

Follow me for more production-ready backend content!


If this helped you, buy me a coffee on Ko-fi!