VOOZH about

URL: https://dev.to/0012303/rabbitmq-has-a-free-api-heres-how-to-use-it-for-message-queue-automation-12nm

⇱ RabbitMQ Has a Free API: Here's How to Use It for Message Queue Automation - DEV Community


RabbitMQ's Management API gives you complete control over your message broker via HTTP. You can manage queues, exchanges, bindings, users, and monitor everything — all without touching the CLI.

Why Use the RabbitMQ Management API?

  • Monitor queue depths and consumer counts in real-time
  • Automate queue creation and exchange bindings
  • Build custom monitoring dashboards
  • Manage users, permissions, and virtual hosts programmatically

Getting Started

Enable the management plugin and access the API on port 15672:

# Enable management plugin
rabbitmq-plugins enable rabbitmq_management

# Check cluster overview
curl -s -u guest:guest "http://localhost:15672/api/overview" | jq '{rabbitmq_version: .rabbitmq_version, messages: .queue_totals.messages, consumers: .object_totals.consumers}'

# List all queues
curl -s -u guest:guest "http://localhost:15672/api/queues" | jq '.[] | {name: .name, messages: .messages, consumers: .consumers, state: .state}'

Python Client

import requests
import pika
import json

class RabbitMQManager:
 def __init__(self, host='localhost', port=15672, user='guest', password='guest'):
 self.base_url = f"http://{host}:{port}/api"
 self.auth = (user, password)

 def get_queues(self, vhost='%2F'):
 resp = requests.get(f"{self.base_url}/queues/{vhost}", auth=self.auth)
 return resp.json()

 def create_queue(self, name, vhost='%2F', durable=True):
 resp = requests.put(
 f"{self.base_url}/queues/{vhost}/{name}",
 json={"durable": durable, "auto_delete": False},
 auth=self.auth
 )
 return resp.status_code == 201 or resp.status_code == 204

 def create_exchange(self, name, exchange_type='direct', vhost='%2F'):
 resp = requests.put(
 f"{self.base_url}/exchanges/{vhost}/{name}",
 json={"type": exchange_type, "durable": True},
 auth=self.auth
 )
 return resp.status_code in (201, 204)

 def create_binding(self, exchange, queue, routing_key='', vhost='%2F'):
 resp = requests.post(
 f"{self.base_url}/bindings/{vhost}/e/{exchange}/q/{queue}",
 json={"routing_key": routing_key},
 auth=self.auth
 )
 return resp.status_code == 201

 def get_queue_messages(self, queue, count=5, vhost='%2F'):
 resp = requests.post(
 f"{self.base_url}/queues/{vhost}/{queue}/get",
 json={"count": count, "ackmode": "ack_requeue_true", "encoding": "auto"},
 auth=self.auth
 )
 return resp.json()

# Usage
mgr = RabbitMQManager()

# Create infrastructure
mgr.create_exchange('orders', 'topic')
mgr.create_queue('order-processing')
mgr.create_queue('order-notifications')
mgr.create_binding('orders', 'order-processing', 'order.created')
mgr.create_binding('orders', 'order-notifications', 'order.*')

print("Message infrastructure created!")

Queue Monitoring Dashboard

def queue_health_report(mgr):
 queues = mgr.get_queues()

 print(f"{'Queue':30s}{'Messages':>10s}{'Rate':>10s}{'Consumers':>10s}{'Status':>10s}")
 print('-' * 75)

 for q in sorted(queues, key=lambda x: x.get('messages', 0), reverse=True):
 name = q['name']
 msgs = q.get('messages', 0)
 rate = q.get('message_stats', {}).get('publish_details', {}).get('rate', 0)
 consumers = q.get('consumers', 0)

 status = 'OK'
 if msgs > 10000:
 status = 'CRITICAL'
 elif msgs > 1000:
 status = 'WARNING'
 elif consumers == 0 and msgs > 0:
 status = 'NO CONSUMER'

 print(f"{name:30s}{msgs:>10d}{rate:>10.1f}/s {consumers:>10d}{status:>10s}")

queue_health_report(mgr)

Dead Letter Queue Handler

def setup_dead_letter_queue(mgr, source_queue, dlx_exchange='dlx', dlq_name=None):
 dlq_name = dlq_name or f"{source_queue}.dlq"

 mgr.create_exchange(dlx_exchange, 'direct')
 mgr.create_queue(dlq_name)
 mgr.create_binding(dlx_exchange, dlq_name, source_queue)

 # Update source queue with DLX
 requests.put(
 f"{mgr.base_url}/queues/%2F/{source_queue}",
 json={
 "durable": True,
 "arguments": {
 "x-dead-letter-exchange": dlx_exchange,
 "x-dead-letter-routing-key": source_queue
 }
 },
 auth=mgr.auth
 )
 print(f"DLQ setup: {source_queue} -> {dlx_exchange} -> {dlq_name}")

setup_dead_letter_queue(mgr, 'order-processing')

Publishing Messages via API

# Publish a message via Management API
curl -s -u guest:guest -X POST "http://localhost:15672/api/exchanges/%2F/orders/publish" \
 -H "Content-Type: application/json" \
 -d '{
 "properties": {"content_type": "application/json"},
 "routing_key": "order.created",
 "payload": "{\"order_id\": 123, \"total\": 49.99}",
 "payload_encoding": "string"
 }'

Real-World Use Case

An e-commerce platform processed 50K orders daily through RabbitMQ. Using the Management API, they built an automated scaling system: when queue depth exceeded 5K messages, it spun up additional consumer instances. When depth dropped below 100, it scaled down. This saved 60% on compute costs while maintaining sub-second processing times.

What You Can Build

  • Queue monitoring dashboard with alerting on depth/rate
  • Auto-scaling system based on queue metrics
  • Dead letter queue processor for failed message recovery
  • Multi-tenant messaging with automated vhost creation
  • Event replay tool re-publishing messages from archives

Need custom messaging solutions? I build event-driven systems and automation tools.

Email me: spinov001@gmail.com
Check out my developer tools: https://apify.com/spinov001