![]() |
VOOZH | about |
dotnet add package SimplyWorks.Bus.RabbitMqExtensions --version 8.1.12
NuGet\Install-Package SimplyWorks.Bus.RabbitMqExtensions -Version 8.1.12
<PackageReference Include="SimplyWorks.Bus.RabbitMqExtensions" Version="8.1.12" />
<PackageVersion Include="SimplyWorks.Bus.RabbitMqExtensions" Version="8.1.12" />Directory.Packages.props
<PackageReference Include="SimplyWorks.Bus.RabbitMqExtensions" />Project file
paket add SimplyWorks.Bus.RabbitMqExtensions --version 8.1.12
#r "nuget: SimplyWorks.Bus.RabbitMqExtensions, 8.1.12"
#:package SimplyWorks.Bus.RabbitMqExtensions@8.1.12
#addin nuget:?package=SimplyWorks.Bus.RabbitMqExtensions&version=8.1.12Install as a Cake Addin
#tool nuget:?package=SimplyWorks.Bus.RabbitMqExtensions&version=8.1.12Install as a Cake Tool
π Build and Publish NuGet Package
π NuGet
π NuGet
π NuGet
π License: MIT
A lightweight .NET 8 message bus library built on top of RabbitMQ, designed for event-driven microservice architectures in ASP.NET Core.
The library ships as three complementary NuGet packages:
| Package | Purpose |
|---|---|
SimplyWorks.Bus |
Core runtime β publishing, consuming, retries, dead-letter, tracing |
SimplyWorks.Bus.RabbitMqExtensions |
Public contracts, consumer interfaces, dashboard data models |
SimplyWorks.Bus.RabbitMqViewer |
Built-in HTMX operations dashboard (dark + light mode, zero JS framework) |
IPublishIDelayedPublish; uses the RabbitMQ Delayed Message Exchange plugin when available, TTL buckets otherwiseIBroadcast / IListen<T>IConsume<T> with strongly-typed message deserializationIConsumeExtendedIConsumerReaderIErrorQueueReaderActivitySource tracing and Meter metricsSimplyWorks.Bus.RabbitMqViewerIBusDashboardDataService in SimplyWorks.Bus.RabbitMqExtensions β build your own UI without the viewer package# Core β always required
dotnet add package SimplyWorks.Bus
# Public contracts + monitoring interfaces (no RabbitMQ.Client dependency)
dotnet add package SimplyWorks.Bus.RabbitMqExtensions
# Optional: built-in operations dashboard
dotnet add package SimplyWorks.Bus.RabbitMqViewer
Add RabbitMQ connection string to your appsettings.json:
{
"ConnectionStrings": {
"RabbitMQ": "amqp://guest:guest@localhost:5672/"
}
}
In your Startup.cs or Program.cs:
services.AddBus(config =>
{
config.ApplicationName = "MyApp";
// Optional JWT configuration
config.Token.Key = Configuration["Token:Key"];
config.Token.Issuer = Configuration["Token:Issuer"];
config.Token.Audience = Configuration["Token:Audience"];
});
services.AddBusPublish(); // registers IPublish, IBroadcast
services.AddBusConsume(); // registers IHostedService consumer + scans calling assembly
public class OrderController : ControllerBase
{
private readonly IPublish _publish;
private readonly IBroadcast _broadcast;
public OrderController(IPublish publish, IBroadcast broadcast)
{
_publish = publish;
_broadcast = broadcast;
}
[HttpPost]
public async Task<IActionResult> Create(CreateOrderRequest req)
{
// Routed to consumers subscribed to OrderCreated
await _publish.Publish(new OrderCreated { OrderId = Guid.NewGuid() });
// Fan-out to all connected application instances
await _broadcast.Broadcast(new OrderNotification { Message = "New order" });
return Ok();
}
}
String-based publish (useful for dynamic routing):
await _publish.Publish("OrderCreated", jsonPayload);
await _publish.Publish("OrderCreated", payloadBytes);
public class OrderCreatedConsumer : IConsume<OrderCreated>
{
public async Task Process(OrderCreated message)
{
// Auto-acked on success, rejected/retried on exception
}
}
public class GenericConsumer : IConsume
{
public Task<IEnumerable<string>> GetMessageTypeNames()
=> Task.FromResult<IEnumerable<string>>(new[] { "OrderCreated", "OrderCancelled" });
public async Task Process(string typeName, string message) { ... }
}
public class OrderCreatedConsumer : IConsume<OrderCreated>
{
public async Task Process(OrderCreated message) { ... }
// Called on every failure (including retries) β optional
public async Task OnFail(Exception ex) { ... }
}
public class SecureConsumer : IConsume<SecureMessage>
{
private readonly RequestContext _ctx;
public SecureConsumer(RequestContext ctx) => _ctx = ctx;
public async Task Process(SecureMessage msg)
{
var user = _ctx.User;
var correlationId = _ctx.CorrelationId;
var remaining = _ctx.GetValue("RemainingRetries");
}
}
services.AddBusConsume(); // scan calling assembly
services.AddBusConsume(typeof(OrderCreatedConsumer).Assembly); // specific assembly
IDelayedPublish delivers a message to one specific consumer queue after a delay. Unlike IPublish β which routes by message type and fans out to every consumer bound to that type β delayed publishing routes by the target consumer's class name, so only that one queue receives the message.
IDelayedPublish is registered automatically when you call AddBusPublish().
public class OrderController : ControllerBase
{
private readonly IDelayedPublish _delayed;
public OrderController(IDelayedPublish delayed) => _delayed = delayed;
[HttpPost("schedule")]
public async Task<IActionResult> Schedule()
{
// Deliver to OrderReminderConsumer.Process(OrderReminder) in 30 minutes
await _delayed.PublishDelayed(
new OrderReminder { OrderId = "123" },
consumerName: nameof(OrderReminderConsumer),
delay: TimeSpan.FromMinutes(30));
return Accepted();
}
}
The consumerName is the exact class name of the target consumer. Combined with the message type name, it forms the naked queue name {ConsumerClass}.{MessageType} that uniquely identifies the consumer queue.
// Raw overload β useful when consumer and message types are not referenced in the calling assembly
await _delayed.PublishDelayed(
messageTypeName: "OrderReminder",
body: jsonString,
nakedQueueName: "orderreminderconsumer.orderreminder",
delay: TimeSpan.FromMinutes(30));
Passing TimeSpan.Zero (or negative) skips the delay entirely and delivers immediately to the targeted consumer.
IPublish with a delay?IPublish.Publish routes by message type name on the process exchange. Every consumer queue bound to that message type receives a copy β that is intentional fan-out. IDelayedPublish routes by the consumer's naked queue name on a separate direct exchange, so the message reaches exactly one queue regardless of how many other consumers handle the same message type.
The library detects at startup whether the broker has the RabbitMQ Delayed Message Exchange plugin enabled, and picks the appropriate strategy automatically. You write the same calling code either way.
When the plugin is available (BusOptions.DelayedPluginAvailable == true), the library declares an x-delayed-message exchange (v3.{env}.delay.x) and publishes with an x-delay header set to the requested milliseconds. The plugin holds the message broker-side until the delay elapses, then routes it directly to the target consumer queue.
Publisher βββΊ v3.{env}.delay.x (x-delayed-message, direct)
β holds for exactly N ms
βΌ
v3.{env}.{app}.{ConsumerClass}.{MessageType} β target consumer queue
This is the preferred path β delays are precise to the millisecond.
When the plugin is not installed, the library uses RabbitMQ's native TTL + dead-letter mechanism. Requested delays are rounded up to the nearest pre-configured bucket (default ladder: 1 s, 5 s, 15 s, 30 s, 60 s, 5 min, 15 min, 30 min, 1 h). A pair of exchanges and a queue are created on demand for each bucket duration used.
Publisher
β publish(routingKey = "orderreminderconsumer.orderreminder")
βΌ
v3.{env}.delay.in.1800s β fanout entry exchange
β (delivers to TTL queue regardless of routing key,
β but preserves the original routing key on the message)
βΌ
v3.{env}.delay.1800s β TTL queue (x-message-ttl = 1800000 ms)
β
β TTL expires β dead-letter with original routing key intact
βΌ
v3.{env}.delay β direct router exchange
β routes by "orderreminderconsumer.orderreminder"
βΌ
v3.{env}.{app}.orderreminderconsumer.orderreminder β target consumer queue
Why the fanout entry exchange?
Publishing directly into the TTL queue (via the default exchange) would replace the routing key with the queue name, losing the consumer target. The fanout entry exchange delivers to the TTL queue without touching the routing key, so it survives intact all the way to the dead-letter route on the direct router.
Bucket queues are declared lazily on first use. Each bucket is created at most once per process lifetime. The bucket ladder is configurable:
services.AddBus(config =>
{
// Custom bucket ladder β delays round up to nearest value (seconds)
config.DelayBucketsSeconds = new[] { 5, 30, 60, 300, 3600 };
});
A delay larger than all configured buckets creates a one-off bucket for that exact duration.
| Exchange | Type | Purpose |
|---|---|---|
v3.{env}.delay |
Direct | Final router β consumer queues bind here by naked queue name |
v3.{env}.delay.x |
x-delayed-message | Plugin path β holds messages until delay elapses |
v3.{env}.delay.in.{N}s |
Fanout | TTL fallback β entry point for each bucket duration |
Each consumer queue gets an extra binding to v3.{env}.delay (and v3.{env}.delay.x when the plugin is present) during ConsumersService startup. No other topology changes are needed.
Once a delayed message is delivered to the consumer queue, the normal retry and dead-letter machinery takes over. If the consumer throws, the message is rejected to its .retry queue, retried up to DefaultRetryCount times, and moved to .bad on exhaustion β exactly the same as a non-delayed message.
Broadcasts are fan-out messages delivered to every running instance simultaneously.
// Send
await _broadcast.Broadcast(new PricingUpdated { Version = 42 });
// Trigger live consumer refresh across all instances
await _broadcast.RefreshConsumers();
// Receive
public class PricingUpdatedListener : IListen<PricingUpdated>
{
public async Task Process(PricingUpdated message) { ... }
public async Task OnFail(Exception ex) { ... } // optional
}
services.AddBusListen(); // scan calling assembly
services.AddBusListen(typeof(PricingUpdatedListener).Assembly); // specific assembly
Fine-tune individual queues via BusOptions.AddQueueOption. The key is "{ConsumerClass}.{MessageType}" (case-insensitive).
services.AddBus(config =>
{
config.AddQueueOption(
"OrderCreatedConsumer.OrderCreated",
prefetch: 10,
retryCount: 3,
retryAfterSeconds: 30);
// Enable priority queue (0β10 range)
config.AddQueueOption(
"PaymentConsumer.PaymentProcessed",
priority: 10);
});
IConsumeExtended)For consumers that need per-instance prefetch or priority without touching global config:
// Typed consumer with runtime options
public class PriorityOrderConsumer : IConsumeExtended<OrderCreated>
{
public async Task Process(OrderCreated message) { ... }
public Task<ConsumerOptions> GetConsumerOptions() =>
Task.FromResult(new ConsumerOptions { Prefetch = 8, Priority = 5 });
}
// Multi-message consumer with per-type options
public class MultiConsumer : IConsumeExtended
{
public Task<IEnumerable<string>> GetMessageTypeNames() =>
Task.FromResult<IEnumerable<string>>(new[] { "TypeA", "TypeB" });
public Task<IDictionary<string, ConsumerOptions>> GetMessageTypeNamesWithOptions() =>
Task.FromResult<IDictionary<string, ConsumerOptions>>(
new Dictionary<string, ConsumerOptions>
{
["TypeA"] = new ConsumerOptions { Prefetch = 4 },
["TypeB"] = new ConsumerOptions { Prefetch = 16, Priority = 3 }
});
public async Task Process(string typeName, string message) { ... }
}
IConsumeExtended consumers are hot-reloadable β call IBroadcast.RefreshConsumers() to apply updated options across all running instances without restart.
IConsumerReaderRegistered automatically by AddBus(). Returns live queue statistics from the RabbitMQ Management API with configurable caching.
// All consumers
var all = await _reader.GetAllConsumersCount();
// Typed consumer + message type
var order = await _reader.GetConsumerCount<OrderConsumer, OrderCreated>();
// Multi-message consumer by message name
var generic = await _reader.GetConsumerCount<GenericConsumer>("OrderCreated");
// All queues for a consumer class
var byClass = await _reader.GetConsumerCount<OrderConsumer>();
ConsumerCount fields:
| Field | Description |
|---|---|
Name |
Consumer class name |
MessageName |
Message type name |
TotalNodes |
Active consumer instances |
ProcessingCount |
Messages in-flight (unacknowledged) |
QueueCount |
Messages ready in main queue |
RetryCount |
Messages in retry queue |
FailedCount |
Messages in dead-letter queue |
Priority |
Consumer priority level |
Prefetch |
QoS prefetch count |
IncomingRate |
Publish rate (msg/s) |
ProcessingRate |
Deliver rate to consumers (msg/s) |
AckRate |
Acknowledge rate (msg/s) |
services.AddBus(config =>
{
config.MonitoringCacheSeconds = 10; // cache duration 3β60 s, default 5
config.ManagementUrl = "http://localhost:15672"; // defaults from AMQP URI
config.ManagementUsername = "guest";
config.ManagementPassword = "guest";
config.VirtualHost = "/";
});
IErrorQueueReaderPeek at messages in retry or dead-letter queues without removing them.
// Typed consumer
var failed = await _errorReader.Peek<OrderConsumer, OrderCreated>(ErrorQueueType.Bad, count: 20);
// Multi-message consumer
var retrying = await _errorReader.Peek<GenericConsumer>("OrderCreated", ErrorQueueType.Retry);
// Raw queue name
var raw = await _errorReader.PeekByQueueName("v3.production.orderconsumer.ordercreated.bad");
ErrorMessage properties:
| Property | Description |
|---|---|
RawBody |
Original JSON payload |
Exchange |
Exchange where message was published |
RoutingKey |
Routing key used at publish |
Properties |
All AMQP properties |
Headers |
Extracted AMQP headers |
CorrelationId |
Correlation ID if set |
ExceptionHistory |
All recorded exceptions oldest-first |
LastException |
Most recent exception string |
SW.Bus emits strongly-typed lifecycle events that flow through a lock-free buffered pipeline. The goal is observability and diagnostics, not logging.
| Event | Trigger |
|---|---|
PublishStarted |
Message about to be published |
PublishCompleted |
Publish succeeded (includes duration ms, payload bytes) |
PublishFailed |
Publish threw an exception |
MessageProcessingStarted |
Consumer received and started processing |
MessageProcessingCompleted |
Processed successfully (includes duration ms) |
MessageProcessingFailed |
Consumer threw an exception (type, message, stack trace) |
MessageRetryScheduled |
Message rejected back to retry queue |
MessageMovedToDeadLetter |
Message exhausted retries |
ConsumerConnected |
Consumer channel attached to a queue |
ConsumerDisconnected |
Consumer channel shut down |
QueueBackpressureDetected |
Queue depth exceeded QueueBackpressureThreshold |
Every event carries: TimestampUtc, SchemaVersion, EventName, MachineName, Environment, ApplicationName, Exchange, QueueName, ConsumerName, MessageType, MessageId, CorrelationId, CausationId, TraceId, SpanId, DeliveryTag.
Consumer / Publisher hot path
β (non-blocking TryWrite)
βΌ
BoundedChannel<IOperationalEvent> β configurable capacity, drop-oldest on full
β
βΌ (BackgroundService, configurable flush interval)
OperationalEventDispatcher
βββ InMemoryOperationalEventStore (ring buffer β always present)
βββ [your IOperationalEventBatchSink registrations]
using SW.Bus.RabbitMqExtensions;
public class ElasticsearchSink : IOperationalEventBatchSink
{
public async Task PublishBatch(IReadOnlyList<IOperationalEvent> events,
CancellationToken cancellationToken = default)
{
await _esClient.BulkAsync(events, cancellationToken);
}
}
// Sinks stack additively β InMemoryStore is always active alongside yours
services.AddSingleton<IOperationalEventBatchSink, ElasticsearchSink>();
services.AddOpenTelemetry()
.WithTracing(b => b
.AddSource("SimplyWorks.Bus") // ActivitySource name
.AddOtlpExporter());
"SimplyWorks.Bus")services.AddOpenTelemetry()
.WithMetrics(b => b
.AddMeter("SimplyWorks.Bus")
.AddPrometheusExporter());
| Instrument | Type | Description |
|---|---|---|
sw_bus_publish_started_total |
Counter | Publish attempts |
sw_bus_publish_completed_total |
Counter | Successful publishes |
sw_bus_publish_failed_total |
Counter | Failed publishes |
sw_bus_processing_started_total |
Counter | Messages picked up |
sw_bus_processing_completed_total |
Counter | Messages processed successfully |
sw_bus_processing_failed_total |
Counter | Messages that threw exceptions |
sw_bus_retry_scheduled_total |
Counter | Messages sent to retry queue |
sw_bus_dead_letter_total |
Counter | Messages moved to dead-letter |
sw_bus_operational_event_dropped_total |
Counter | Events dropped (buffer full) |
sw_bus_processing_latency_ms |
Histogram | Consumer processing time |
sw_bus_publish_latency_ms |
Histogram | Publish time |
services.AddBus(config =>
{
config.OperationalEventsEnabled = true; // default
config.OperationalEventsBufferCapacity = 8192; // channel buffer before drop
config.OperationalEventsBatchSize = 256; // events per flush batch
config.OperationalEventsFlushIntervalMs = 1000; // ms between flushes
config.OperationalEventsDropOldest = true; // drop strategy when full
config.OperationalEventsSchemaVersion = "1.0"; // stamped on every event
config.OperationalEventsStoreCapacity = 10000; // in-memory ring buffer size
});
IBusDashboardDataServiceAll dashboard data is exposed through IBusDashboardDataService (defined in SimplyWorks.Bus.RabbitMqExtensions, implemented and registered by SimplyWorks.Bus). Inject it directly to build your own custom dashboard, REST API, or health probe β no dependency on SimplyWorks.Bus.RabbitMqViewer needed. See Building a Custom Dashboard for a complete guide.
public class OpsController : ControllerBase
{
private readonly IBusDashboardDataService _dash;
public OpsController(IBusDashboardDataService dash) => _dash = dash;
[HttpGet("ops/summary")]
public async Task<IActionResult> Summary()
=> Ok(await _dash.GetSummaryAsync());
[HttpGet("ops/consumers")]
public async Task<IActionResult> Consumers()
=> Ok(await _dash.GetConsumerHealthAsync());
[HttpGet("ops/queues")]
public async Task<IActionResult> Queues()
=> Ok(await _dash.GetQueueDetailsAsync());
[HttpGet("ops/retries")]
public async Task<IActionResult> Retries()
=> Ok(await _dash.GetRetryAnalysisAsync());
[HttpGet("ops/dead-letters")]
public async Task<IActionResult> DeadLetters()
=> Ok(await _dash.GetDeadLetterSummaryAsync());
[HttpGet("ops/alerts")]
public async Task<IActionResult> Alerts()
=> Ok(await _dash.GetAlertsAsync());
[HttpGet("ops/events")]
public IActionResult Events(
[FromQuery] string? consumer, [FromQuery] string? messageType,
[FromQuery] string? correlationId, [FromQuery] string? traceId,
[FromQuery] string? eventName, [FromQuery] int limit = 200)
=> Ok(_dash.GetRecentEvents(new OperationalEventFilter(
ConsumerName: consumer, MessageType: messageType,
CorrelationId: correlationId, TraceId: traceId,
EventName: eventName, Limit: limit)));
}
View models:
| Record | Key fields |
|---|---|
DashboardSummary |
TotalConsumers, UnhealthyConsumers, DisconnectedConsumers, TotalQueueDepth, TotalRetryBacklog, TotalDeadLetterBacklog, TotalIncomingRate, TotalAckRate, ActiveAlerts, LastUpdatedUtc |
ConsumerHealthView |
All ConsumerCount fields + QueueName, IsBackpressured, HealthStatus (AlertSeverity) |
QueueDetailView |
Main/retry/dead-letter queue names and depths, consumer count, rates |
RetryAnalysisView |
Per-consumer retry backlog ordered by size with severity |
DeadLetterSummaryView |
Per-consumer DL count, last exception type/message, last failure timestamp |
DashboardAlert |
Severity (Info/Warning/Critical), Title, Detail, QueueName, ConsumerName, TimestampUtc |
OperationalEventFilter fields (all optional, string fields are case-insensitive substring matches):
ApplicationName, ConsumerName, MessageType, CorrelationId, TraceId, QueueName, EventName (exact), From, To, Limit (default 200)
IAlertEvaluatorDefault thresholds are configured on BusOptions:
services.AddBus(config =>
{
config.AlertRetryWarningThreshold = 10; // Warning when retry backlog β₯ N
config.AlertRetryCriticalThreshold = 100; // Critical when retry backlog β₯ N
config.AlertDeadLetterCriticalThreshold = 100; // Critical when DL count β₯ N
config.QueueBackpressureThreshold = 5000; // backpressure warning threshold
});
To apply domain-specific or SLA-based thresholds, replace the default evaluator:
public class MyAlertEvaluator : IAlertEvaluator
{
public IReadOnlyList<DashboardAlert> Evaluate(ConsumerHealthView[] consumers)
{
var alerts = new List<DashboardAlert>();
foreach (var c in consumers)
{
if (c.Name == "PaymentConsumer" && c.FailedCount > 0)
alerts.Add(new DashboardAlert(
AlertSeverity.Critical, "Payment Dead Letter",
$"{c.FailedCount} failed payments require immediate attention.",
c.QueueName, c.Name, DateTime.UtcNow));
}
return alerts;
}
}
// Register before AddBus() β TryAddSingleton means yours wins
services.AddSingleton<IAlertEvaluator, MyAlertEvaluator>();
services.AddBus(...);
If you do not want to use SimplyWorks.Bus.RabbitMqViewer β e.g. you want React, Blazor, an existing admin framework, or a JSON API for a mobile app β everything you need is in SimplyWorks.Bus.RabbitMqExtensions. You never need to install the viewer package.
dotnet add package SimplyWorks.Bus # core runtime (always required)
dotnet add package SimplyWorks.Bus.RabbitMqExtensions # contracts + data service
# SimplyWorks.Bus.RabbitMqViewer is NOT needed
All interfaces below are registered automatically by services.AddBus(...). Just inject what you need.
| Interface | Description |
|---|---|
IBusDashboardDataService |
Aggregate read layer β summary, consumer health, queues, retries, dead letters, alerts, events |
IConsumerReader |
Raw per-consumer queue statistics from the RabbitMQ Management API (with configurable caching) |
IErrorQueueReader |
Peek at messages in retry and dead-letter queues without removing them |
IOperationalEventStore |
Query the in-memory event ring buffer with OperationalEventFilter |
IAlertEvaluator |
Evaluate a ConsumerHealthView[] snapshot and return DashboardAlert objects |
IOperationalEventBatchSink |
Implement to stream event batches to external systems (Elasticsearch, ClickHouse, etc.) |
// Program.cs
builder.Services.AddBus(config => { config.ApplicationName = "MyApp"; /* ... */ });
builder.Services.AddBusPublish();
builder.Services.AddBusConsume();
var app = builder.Build();
app.MapGet("/ops/summary", async (IBusDashboardDataService d) => await d.GetSummaryAsync());
app.MapGet("/ops/consumers", async (IBusDashboardDataService d) => await d.GetConsumerHealthAsync());
app.MapGet("/ops/queues", async (IBusDashboardDataService d) => await d.GetQueueDetailsAsync());
app.MapGet("/ops/retries", async (IBusDashboardDataService d) => await d.GetRetryAnalysisAsync());
app.MapGet("/ops/dead-letters", async (IBusDashboardDataService d) => await d.GetDeadLetterSummaryAsync());
app.MapGet("/ops/alerts", async (IBusDashboardDataService d) => await d.GetAlertsAsync());
app.MapGet("/ops/events", (IBusDashboardDataService d,
string? consumer, string? messageType,
string? eventName, int limit = 100) =>
d.GetRecentEvents(new OperationalEventFilter(
ConsumerName: consumer, MessageType: messageType,
EventName: eventName, Limit: limit)));
app.Run();
public class MyOpsService
{
private readonly IOperationalEventStore _store;
public MyOpsService(IOperationalEventStore store) => _store = store;
public IReadOnlyList<IOperationalEvent> GetRecentFailures(int limit = 50) =>
_store.GetRecent(new OperationalEventFilter(
EventName: "MessageProcessingFailed",
Limit: limit));
public long TotalEventsSinceStartup => _store.TotalReceived;
}
All events are strongly typed C# records inheriting from OperationalEventBase. Use pattern matching to extract type-specific fields:
foreach (var evt in _store.GetRecent())
{
switch (evt)
{
case MessageProcessingFailed f:
Console.WriteLine($"[FAIL] {f.ConsumerName} β {f.ExceptionType}: {f.ExceptionMessage}");
break;
case MessageProcessingCompleted c:
Console.WriteLine($"[OK] {c.ConsumerName} in {c.ProcessingDurationMs:F1} ms");
break;
case MessageRetryScheduled r:
Console.WriteLine($"[RETRY] {r.ConsumerName} attempt {r.RetryCount}, {r.RemainingRetryCount} remaining");
break;
case MessageMovedToDeadLetter dl:
Console.WriteLine($"[DLQ] {dl.ConsumerName} β {dl.DeadLetterRoutingKey}");
break;
case QueueBackpressureDetected bp:
Console.WriteLine($"[PRESSURE] {bp.QueueName} depth={bp.QueueDepth} threshold={bp.Threshold}");
break;
case ConsumerConnected cc:
Console.WriteLine($"[CONNECT] {cc.ConsumerName} tag={cc.ConsumerTag}");
break;
case ConsumerDisconnected cd:
Console.WriteLine($"[DISCONN] {cd.ConsumerName} reason={cd.Reason}");
break;
case PublishFailed pf:
Console.WriteLine($"[PUB FAIL] {pf.MessageType} β {pf.ExceptionType}");
break;
}
}
Register IOperationalEventBatchSink to stream events alongside the built-in ring buffer:
public class ElasticsearchSink : IOperationalEventBatchSink
{
public async Task PublishBatch(IReadOnlyList<IOperationalEvent> events,
CancellationToken cancellationToken = default)
=> await _esClient.BulkAsync(events, cancellationToken);
}
services.AddSingleton<IOperationalEventBatchSink, ElasticsearchSink>();
services.AddBus(...); // in-memory store and your sink both active
To replace query access with your own persistent store:
public class ClickHouseEventStore : IOperationalEventStore, IOperationalEventBatchSink
{
public long TotalReceived => /* query row count */;
public IReadOnlyList<IOperationalEvent> GetRecent(OperationalEventFilter? filter = null)
=> /* translate filter β SQL β query */;
public async Task PublishBatch(IReadOnlyList<IOperationalEvent> events,
CancellationToken ct = default)
=> await _clickHouse.BulkInsertAsync(events, ct);
}
// Register BEFORE AddBus() β TryAddSingleton means yours wins
services.AddSingleton<ClickHouseEventStore>();
services.AddSingleton<IOperationalEventStore>(sp => sp.GetRequiredService<ClickHouseEventStore>());
services.AddSingleton<IOperationalEventBatchSink>(sp => sp.GetRequiredService<ClickHouseEventStore>());
services.AddBus(...);
ConsumerHealthView.HealthStatusbuilder.Services.AddHealthChecks()
.AddAsyncCheck("bus-consumers", async (IBusDashboardDataService dash, ct) =>
{
var consumers = await dash.GetConsumerHealthAsync(ct);
var disconnected = consumers.Where(c => c.TotalNodes == 0).ToList();
var critical = consumers.Where(c => c.HealthStatus == AlertSeverity.Critical).ToList();
if (disconnected.Any())
return HealthCheckResult.Unhealthy(
$"{disconnected.Count} consumer(s) disconnected: " +
string.Join(", ", disconnected.Select(c => c.Name)));
if (critical.Any())
return HealthCheckResult.Degraded($"{critical.Count} consumer(s) in critical state.");
return HealthCheckResult.Healthy($"{consumers.Length} consumer(s) all healthy.");
});
SimplyWorks.Bus.RabbitMqViewer adds a server-rendered operations dashboard built with Pico.css + HTMX. No JavaScript framework required. All tables auto-refresh via HTMX polling with CSS animations.
dotnet add package SimplyWorks.Bus.RabbitMqViewer
| Route | Content |
|---|---|
/bus-viewer |
Overview β summary cards, consumer health table, active alerts, live event feed |
/bus-viewer/consumers |
Consumer Health β full grid with status badges, rates, prefetch, priority |
/bus-viewer/queues |
Queue Details β main + retry + dead-letter depths and rates per queue |
/bus-viewer/retries |
Retry Analysis β consumers with retry backlogs ordered by severity |
/bus-viewer/dead-letters |
Dead-Letter Inspection β counts, last exception, last failure timestamp |
/bus-viewer/events |
Live Events β filterable operational event stream with inline exception details |
/bus-viewer/login |
Login page β form-based credential entry (no browser credential caching) |
/bus-viewer/logout |
Logout β clears session cookie and redirects to login |
localStorage; theme restored before first paint to prevent flash<tbody> row slides up and fades in with a 60 ms stagger on data arrival// Program.cs / Startup.ConfigureServices β call after AddBus()
builder.Services.AddBusViewer(o => o.UseBasicAuth());
// Middleware pipeline
app.UseStaticFiles(); // serves /_content/SimplyWorks.Bus.RabbitMqViewer/bus-viewer.css
app.UseRouting();
app.UseAuthentication(); // required for RequirePolicy() mode
app.UseAuthorization();
app.UseBusViewer();
app.MapRazorPages(); // discovers BusViewer area automatically
Note:
UseBusViewer()can be placed at any position in the pipeline. WhenUseBasicAuth()is active, anIStartupFilterautomatically inserts the viewer's authentication gate at the very beginning of the pipeline β beforeUseAuthorizationβ so 403 errors from the host application's authorization policies can never block the viewer.
UseBasicAuth())Credentials are validated against IConfiguration at request time (supports secret rotation without restart). On success a short-lived session cookie is issued:
.BusViewerAuth, path-scoped to /bus-viewer, HttpOnly, SameSite=Strict/bus-viewer/logout clears the cookie immediatelybuilder.Services.AddBusViewer(o => o.UseBasicAuth(
usernameConfigKey: "BusViewer:Username", // default key
passwordConfigKey: "BusViewer:Password")); // default key
{
"BusViewer": { "Username": "ops", "Password": "super-secret" }
}
Environment variable equivalents: BusViewer__Username, BusViewer__Password
Delegates entirely to the host's authorization pipeline. Any scheme works (JWT, cookie, OIDC, Windows Auth):
builder.Services.AddAuthorization(o =>
o.AddPolicy("OpsOnly", p => p.RequireRole("ops")));
builder.Services.AddBusViewer(o => o.RequirePolicy("OpsOnly"));
Throws InvalidOperationException at startup when IHostEnvironment.IsProduction() is true unless explicitly suppressed:
builder.Services.AddBusViewer(o =>
{
o.AllowAnonymous();
o.AllowAnonymousInProduction = true; // only if behind proxy/network policy
});
| Option | Default | Description |
|---|---|---|
Title |
"SW.Bus Operations" |
Header title displayed in the sidebar |
AuthMode |
None |
Set via RequirePolicy(), UseBasicAuth(), or AllowAnonymous() |
PolicyName |
null |
Policy name used when AuthMode = Policy |
UsernameConfigKey |
"BusViewer:Username" |
Config key for the login username |
PasswordConfigKey |
"BusViewer:Password" |
Config key for the login password |
AllowAnonymousInProduction |
false |
Suppress production guard for anonymous mode |
BusOptions Referenceservices.AddBus(config =>
{
// ββ Identity ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
config.ApplicationName = "OrderService";
// ββ Connection ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
config.HeartBeatTimeOut = 60; // heartbeat seconds (0 = off)
config.ManagementUrl = "http://localhost:15672"; // defaults from AMQP URI
config.ManagementUsername = "guest";
config.ManagementPassword = "guest";
config.VirtualHost = "/";
// ββ Queue defaults ββββββββββββββββββββββββββββββββββββββββββββββββββββ
config.DefaultQueuePrefetch = 4; // QoS prefetch per consumer
config.DefaultRetryCount = 5; // retries before dead-letter
config.DefaultRetryAfter = 60; // seconds between retries
config.DefaultMaxPriority = 0; // 0 = priority queues disabled
// ββ Per-queue overrides βββββββββββββββββββββββββββββββββββββββββββββββ
config.AddQueueOption("OrderConsumer.OrderCreated",
prefetch: 10, retryCount: 3, retryAfterSeconds: 30, priority: 5);
// ββ Broadcast listeners βββββββββββββββββββββββββββββββββββββββββββββββ
config.ListenRetryCount = 5;
config.ListenRetryAfter = 60;
// ββ Monitoring ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
config.MonitoringCacheSeconds = 5; // cache management API responses (3β60)
// ββ JWT context propagation βββββββββββββββββββββββββββββββββββββββββββ
config.Token.Key = "your-secret-key";
config.Token.Issuer = "your-issuer";
config.Token.Audience = "your-audience";
// ββ Operational events pipeline βββββββββββββββββββββββββββββββββββββββ
config.OperationalEventsEnabled = true;
config.OperationalEventsBufferCapacity = 8192; // channel buffer before drop
config.OperationalEventsBatchSize = 256; // events per flush batch
config.OperationalEventsFlushIntervalMs = 1000; // ms between flushes
config.OperationalEventsDropOldest = true; // drop strategy when buffer full
config.OperationalEventsSchemaVersion = "1.0"; // stamped on every event
config.OperationalEventsStoreCapacity = 10000; // in-memory ring buffer size
// ββ Alert thresholds βββββββββββββββββββββββββββββββββββββββββββββββββ
config.QueueBackpressureThreshold = 5000; // queue depth β backpressure event
config.AlertRetryWarningThreshold = 10; // retry count β Warning alert
config.AlertRetryCriticalThreshold = 100; // retry count β Critical alert
config.AlertDeadLetterCriticalThreshold = 100; // DL count β Critical alert
});
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β RabbitMQ Broker β
β Process Exchange (direct) βββΊ Consumer Queue β
β β on failure β
β βΌ β
β Dead-Letter Exchange βββΊ Retry Queue βββΊ (TTL) βββΊ Consumer Q β
β ββββΊ Bad Queue (exhausted retries) β
β β
β Node Exchange (direct) βββΊ Per-Instance Queue (broadcasts) β
β β
β Delay Exchange (direct) βββββββββββββββββββββββΊ Consumer Queue β
β βββ [plugin path] Delay Plugin Exchange (x-delayed-message) β
β βββ [TTL fallback] Delay Entry Exchange (fanout) β
β ββββΊ Delay Bucket Queue (TTL) β
β β on expiry β
β ββββΊ Delay Exchange ββββββΊ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β² β
β publish β consume
βΌ βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SimplyWorks.Bus Runtime β
β BasicPublisher βββ ActivitySource βββ OperationalEventPublisherβ
β ConsumerRunner βββ ActivitySource βββ OperationalEventPublisherβ
β ConsumersService ββββββββββββββββββββ OperationalEventPublisherβ
β β
β OperationalEventDispatcher (BackgroundService) β
β βββ InMemoryOperationalEventStore (ring buffer) β
β βββ [custom IOperationalEventBatchSink sinks] β
β β
β BusDashboardDataService ββ IConsumerReader (+ cache) β
β ββ IOperationalEventStore β
β ββ IAlertEvaluator β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ (optional)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SimplyWorks.Bus.RabbitMqViewer β
β /bus-viewer Overview dashboard β
β /bus-viewer/consumers Consumer health grid β
β /bus-viewer/queues Queue depths & rates β
β /bus-viewer/retries Retry analysis β
β /bus-viewer/dead-letters Dead-letter inspection β
β /bus-viewer/events Live operational event stream β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Queue naming convention:
{env}.{app}.{ConsumerClass}.{MessageType}
{env}.{app}.{ConsumerClass}.{MessageType}.retry
{env}.{app}.{ConsumerClass}.{MessageType}.bad
Example with ApplicationName = "OrderService" in Development:
v3.development.orderservice.ordercreatedconsumer.ordercreated
v3.development.orderservice.ordercreatedconsumer.ordercreated.retry
v3.development.orderservice.ordercreatedconsumer.ordercreated.bad
// Replace IPublish with a no-op β no RabbitMQ connection required
services.AddBusPublishMock();
// IBusDashboardDataService, IConsumerReader, IErrorQueueReader are standard
// interfaces β mock them with any mocking library
The SW.Bus.IntegrationTests project spins up real RabbitMQ containers via Testcontainers and tests the full delayed-publishing path against both broker variants. Docker must be running.
dotnet test SW.Bus.IntegrationTests/SW.Bus.IntegrationTests.csproj
PluginAvailableTests β starts a container from heidiks/rabbitmq-delayed-message-exchange:3.13.0-management (the plugin is pre-enabled). Asserts that:
BusOptions.DelayedPluginAvailable is true (plugin was detected at startup).x-delayed-message exchange.PluginUnavailableTests β starts a container from stock rabbitmq:3.13-management (no plugin). Asserts that:
BusOptions.DelayedPluginAvailable is false.Both test classes share the same scenario logic in DelayedDeliveryScenario.RunAsync, which boots a full generic host (AddBus + AddBusConsume + AddBusPublish), waits for consumer topology to bind, publishes a delayed message, checks it has not arrived early, then polls until it arrives or a 25-second deadline passes.
| File | Role |
|---|---|
BusHarness |
Boots a real IHost against a container's connection string; waits for ConsumersService to declare topology |
DelayedConsumer |
IConsume<DelayDto> β target consumer that records receive timestamps in MessageSink |
MessageSink |
Singleton ConcurrentDictionary of message id β received-at timestamp |
DelayedDeliveryScenario |
Shared assertions used by both test classes |
PluginAvailableTests |
Container with the delayed-message plugin |
PluginUnavailableTests |
Stock container, exercises TTL buckets |
| Package | Version | Used for |
|---|---|---|
RabbitMQ.Client |
6.8.1 | AMQP connection and channel management |
EasyNetQ.Management.Client |
3.0.1 | RabbitMQ Management API calls |
Scrutor |
4.2.2 | Assembly scanning for consumers |
SimplyWorks.HttpExtensions |
8.1.1 | JWT and request context propagation |
SimplyWorks.PrimitiveTypes |
8.1.3 | RequestContext, IConsume<T>, etc. |
MIT β see
SW.Bus.SampleWeb project in this repository| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 net8.0 is compatible. net8.0-android net8.0-android was computed. net8.0-browser net8.0-browser was computed. net8.0-ios net8.0-ios was computed. net8.0-maccatalyst net8.0-maccatalyst was computed. net8.0-macos net8.0-macos was computed. net8.0-tvos net8.0-tvos was computed. net8.0-windows net8.0-windows was computed. net9.0 net9.0 was computed. net9.0-android net9.0-android was computed. net9.0-browser net9.0-browser was computed. net9.0-ios net9.0-ios was computed. net9.0-maccatalyst net9.0-maccatalyst was computed. net9.0-macos net9.0-macos was computed. net9.0-tvos net9.0-tvos was computed. net9.0-windows net9.0-windows was computed. net10.0 net10.0 was computed. net10.0-android net10.0-android was computed. net10.0-browser net10.0-browser was computed. net10.0-ios net10.0-ios was computed. net10.0-maccatalyst net10.0-maccatalyst was computed. net10.0-macos net10.0-macos was computed. net10.0-tvos net10.0-tvos was computed. net10.0-windows net10.0-windows was computed. |
Showing the top 1 NuGet packages that depend on SimplyWorks.Bus.RabbitMqExtensions:
| Package | Downloads |
|---|---|
|
SimplyWorks.Bus
A lightweight .NET 8 message bus library built on RabbitMQ for ASP.NET Core applications. Provides simple publish-subscribe patterns, typed consumers, broadcasting, automatic retries, and JWT integration. Visit GitHub for full documentation and examples. |
This package is not used by any popular GitHub repositories.
See https://github.com/simplify9/SW-Bus/releases for release notes and changelog.