![]() |
VOOZH | about |
dotnet add package Messbus.PubSub --version 0.1.1
NuGet\Install-Package Messbus.PubSub -Version 0.1.1
<PackageReference Include="Messbus.PubSub" Version="0.1.1" />
<PackageVersion Include="Messbus.PubSub" Version="0.1.1" />Directory.Packages.props
<PackageReference Include="Messbus.PubSub" />Project file
paket add Messbus.PubSub --version 0.1.1
#r "nuget: Messbus.PubSub, 0.1.1"
#:package Messbus.PubSub@0.1.1
#addin nuget:?package=Messbus.PubSub&version=0.1.1Install as a Cake Addin
#tool nuget:?package=Messbus.PubSub&version=0.1.1Install as a Cake Tool
A lightweight, extensible message bus framework for .NET that provides a simple abstraction for publishing and consuming messages across different messaging platforms.
Messbus is a flexible messaging framework designed to simplify the implementation of publish-subscribe patterns in .NET applications. It provides a clean abstraction layer that allows you to switch between different messaging platforms without changing your application code.
Currently supported platforms:
IHostedService for seamless integration with .NET hostingInstall the core package:
dotnet add package Messbus
Install the Google Cloud Pub/Sub implementation:
dotnet add package Messbus.PubSub
using Messbus;
using Messbus.PubSub;
// Define your message
public class OrderCreatedEvent
{
public string OrderId { get; set; }
public decimal Amount { get; set; }
public DateTime CreatedAt { get; set; }
}
// Configure services
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddPubSub(config =>
{
config.ProjectId = "my-gcp-project";
config.Publishing = new PublishingConfiguration
{
Prefix = "prod",
Topics = new List<string> { "orders" }
};
});
var app = builder.Build();
// Publish a message
app.MapPost("/orders", async (IMessageBus messageBus) =>
{
var orderEvent = new OrderCreatedEvent
{
OrderId = Guid.NewGuid().ToString(),
Amount = 99.99m,
CreatedAt = DateTime.UtcNow
};
var messageId = await Messbus.Publish("orders", orderEvent);
return Results.Ok(new { MessageId = messageId });
});
app.Run();
using Messbus;
using Messbus.PubSub;
// Define your message handler
public class OrderCreatedEventConsumer : IMessageConsumer<OrderCreatedEvent>
{
private readonly ILogger<OrderCreatedEventConsumer> _logger;
public OrderCreatedEventConsumer(ILogger<OrderCreatedEventConsumer> logger)
{
_logger = logger;
}
public async Task Handler(MessageContext<OrderCreatedEvent> context, CancellationToken cancellationToken)
{
var order = context.Message;
_logger.LogInformation("Processing order {OrderId} with amount {Amount}",
order.OrderId, order.Amount);
// Your business logic here
await ProcessOrderAsync(order, cancellationToken);
}
private async Task ProcessOrderAsync(OrderCreatedEvent order, CancellationToken cancellationToken)
{
// Implementation
await Task.CompletedTask;
}
}
// Configure services
var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddPubSub(config =>
{
config.ProjectId = "my-gcp-project";
config.Subscription = new SubscriptionConfiguration
{
Sufix = "order-processor",
AckDeadlineSeconds = 120,
MaxDeliveryAttempts = 5
};
})
.AddConsumer<OrderCreatedEvent, OrderCreatedEventConsumer>("orders");
var host = builder.Build();
await host.RunAsync();
The main interface for publishing messages:
public interface IMessageBus
{
Task<string> Publish<T>(string topic, T message, CancellationToken cancellationToken = default)
where T : class;
Task<IEnumerable<string>> PublishBatch<T>(string topic, IEnumerable<T> messages, CancellationToken cancellationToken = default)
where T : class;
}
Interface for implementing message handlers:
public interface IMessageConsumer<TEvent>
{
Task Handler(MessageContext<TEvent> context, CancellationToken cancellationToken = default);
}
Provides context information about the message being processed:
public record class MessageContext<T>
{
public string Id { get; } // Unique message identifier
public int Attempt { get; } // Current delivery attempt number
public byte[] Data { get; } // Raw message data
public T Message { get; } // Deserialized message
}
In managed mode, the framework automatically creates and manages topics, subscriptions, and dead letter queues:
builder.Services.AddPubSub(config =>
{
config.ProjectId = "my-gcp-project";
config.JsonCredentials = "{...}"; // Optional: Service account JSON
config.Publishing = new PublishingConfiguration
{
Prefix = "prod",
Topics = new List<string> { "orders", "payments", "notifications" },
MessageRetentionDurationDays = 7
};
config.Subscription = new SubscriptionConfiguration
{
Sufix = "order-service",
AckDeadlineSeconds = 120,
MaxDeliveryAttempts = 5,
MinBackoffSeconds = 5,
MaxBackoffSeconds = 600,
MessageRetentionDurationDays = 7,
MaxOutstandingElementCount = 1000,
MaxOutstandingByteCount = 100_000_000
};
})
.AddConsumer<OrderCreatedEvent, OrderCreatedEventConsumer>("orders")
.AddConsumer<PaymentProcessedEvent, PaymentProcessedEventConsumer>("payments");
Use existing topics and subscriptions without automatic resource management:
builder.Services.AddPubSub(config =>
{
config.ProjectId = "my-gcp-project";
config.ResourceInitialization = ResourceInitialization.None;
})
.AddUnmanagedConsumer<OrderCreatedEvent, OrderCreatedEventConsumer>(
topic: "orders",
subscription: "existing-subscription-name");
Automatically handle messages that fail after maximum retry attempts:
builder.Services.AddPubSub(configuration)
.AddConsumer<OrderCreatedEvent, OrderCreatedEventConsumer, OrderCreatedEventDeadLetterConsumer>("orders");
The dead letter consumer will receive messages that exceed the MaxDeliveryAttempts threshold:
public class OrderCreatedEventDeadLetterConsumer : IMessageConsumer<OrderCreatedEvent>
{
private readonly ILogger<OrderCreatedEventDeadLetterConsumer> _logger;
public OrderCreatedEventDeadLetterConsumer(ILogger<OrderCreatedEventDeadLetterConsumer> logger)
{
_logger = logger;
}
public async Task Handler(MessageContext<OrderCreatedEvent> context, CancellationToken cancellationToken)
{
_logger.LogError("Message {MessageId} failed after {Attempts} attempts",
context.Id, context.Attempt);
// Handle failed message (e.g., store in database, send alert, etc.)
await StoreFailedMessageAsync(context.Message, cancellationToken);
}
}
Efficiently publish multiple messages at once:
var orders = new List<OrderCreatedEvent>
{
new() { OrderId = "1", Amount = 10.00m },
new() { OrderId = "2", Amount = 20.00m },
new() { OrderId = "3", Amount = 30.00m }
};
var messageIds = await Messbus.PublishBatch("orders", orders);
Use named instances to connect to multiple GCP projects:
// Configure multiple instances
builder.Services
.AddPubSub(config =>
{
config.Alias = "primary";
config.ProjectId = "primary-project";
// ... configuration
})
.AddPubSub(config =>
{
config.Alias = "secondary";
config.ProjectId = "secondary-project";
// ... configuration
});
// Resolve by alias
var primaryBus = serviceProvider.GetRequiredKeyedService<IMessageBus>("primary");
var secondaryBus = serviceProvider.GetRequiredKeyedService<IMessageBus>("secondary");
{
"Messbus": {
"PubSub": {
"ProjectId": "my-gcp-project",
"JsonCredentials": "{...}",
"UseEmulator": false,
"VerbosityMode": false,
"ResourceInitialization": "All",
"Publishing": {
"Prefix": "prod",
"MessageRetentionDurationDays": 7,
"Topics": ["orders", "payments", "notifications"]
},
"Subscription": {
"Sufix": "order-service",
"MessageRetentionDurationDays": 7,
"AckDeadlineSeconds": 120,
"MaxDeliveryAttempts": 5,
"MinBackoffSeconds": 5,
"MaxBackoffSeconds": 600,
"MaxOutstandingElementCount": 1000,
"MaxOutstandingByteCount": 100000000
}
}
}
}
Then configure using:
builder.Services.AddPubSub(builder.Configuration);
| Property | Type | Description | Default |
|---|---|---|---|
ProjectId |
string |
GCP Project ID | Required |
JsonCredentials |
string |
Service account JSON credentials | Uses Application Default Credentials |
UseEmulator |
bool |
Use Pub/Sub emulator for local development | false |
VerbosityMode |
bool |
Enable detailed logging | false |
ResourceInitialization |
enum |
Control automatic resource creation | All |
Alias |
string |
Named instance identifier | null |
| Option | Description | Required GCP Permission |
|---|---|---|
All |
Create topics and subscriptions automatically | Pub/Sub Admin |
TopicsOnly |
Only create topics | Pub/Sub Publisher |
SubscriptionsOnly |
Only create subscriptions | Pub/Sub Subscriber |
None |
Don't create any resources (unmanaged mode) | Depends on usage (Publisher/Subscriber) |
| Property | Type | Description | Default |
|---|---|---|---|
Prefix |
string |
Prefix for topic names | null |
Topics |
List<string> |
List of allowed topics | Required |
MessageRetentionDurationDays |
int |
Message retention period | 7 |
| Property | Type | Description | Default | Minimum |
|---|---|---|---|---|
Sufix |
string |
Suffix for subscription names | Required | - |
MessageRetentionDurationDays |
int |
Message retention period | 7 |
7 |
AckDeadlineSeconds |
int |
Acknowledgment deadline | 120 |
30 |
MaxDeliveryAttempts |
int |
Maximum retry attempts | 5 |
5 |
MinBackoffSeconds |
int |
Minimum retry backoff | 5 |
5 |
MaxBackoffSeconds |
int |
Maximum retry backoff | 600 |
600 |
MaxOutstandingElementCount |
long? |
Max unacknowledged messages | null |
- |
MaxOutstandingByteCount |
long? |
Max unacknowledged bytes | null |
- |
Implement your own serializer:
public class ProtobufMessageSerializer : IMessageSerializer
{
public byte[] Serialize<T>(T obj)
{
using var stream = new MemoryStream();
ProtoBuf.Serializer.Serialize(stream, obj);
return stream.ToArray();
}
public T Deserialize<T>(byte[] data)
{
using var stream = new MemoryStream(data);
return ProtoBuf.Serializer.Deserialize<T>(stream);
}
}
// Register custom serializer
builder.Services
.AddPubSub(configuration)
.AddSerializer<ProtobufMessageSerializer>();
Consumers are resolved from a scoped service provider, allowing you to use scoped dependencies:
public class OrderCreatedEventConsumer : IMessageConsumer<OrderCreatedEvent>
{
private readonly IOrderRepository _repository;
private readonly IEmailService _emailService;
public OrderCreatedEventConsumer(
IOrderRepository repository,
IEmailService emailService)
{
_repository = repository;
_emailService = emailService;
}
public async Task Handler(MessageContext<OrderCreatedEvent> context, CancellationToken cancellationToken)
{
await _repository.SaveOrderAsync(context.Message, cancellationToken);
await _emailService.SendConfirmationAsync(context.Message, cancellationToken);
}
}
Messages are automatically retried based on the MaxDeliveryAttempts configuration. You can access the current attempt number:
public async Task Handler(MessageContext<OrderCreatedEvent> context, CancellationToken cancellationToken)
{
_logger.LogInformation("Processing message {MessageId}, attempt {Attempt}",
context.Id, context.Attempt);
if (context.Attempt > 3)
{
// Apply different logic for later attempts
await ProcessWithFallbackAsync(context.Message, cancellationToken);
}
else
{
await ProcessNormallyAsync(context.Message, cancellationToken);
}
}
Use the Google Cloud Pub/Sub Emulator for local development:
gcloud beta emulators pubsub start --project=local-project
builder.Services.AddPubSub(config =>
{
config.ProjectId = "local-project";
config.UseEmulator = true;
// ... rest of configuration
});
Messbus (Core)
├── IMessageBus - Publishing interface
├── IMessageConsumer<T> - Consumer interface
├── IMessageSerializer - Serialization interface
├── Messbus - Abstract base class for publishers
├── MessageConsumer<T> - Abstract base class for consumers
└── MessageContext<T> - Message context wrapper
Messbus.PubSub (Implementation)
├── PubSubMessageBus - Google Cloud Pub/Sub publisher
├── PubSubConsumer<T> - Google Cloud Pub/Sub consumer
├── Configuration/ - Configuration classes
├── Serialization/ - JSON serializer implementation
└── Internal/ - Internal helpers and utilities
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
dotnet restore
dotnet build
dotnet test
This project is licensed under the MIT License - see below for details:
MIT License
Copyright (c) 2025 Messbus Contributors
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net6.0 net6.0 is compatible. net6.0-android net6.0-android was computed. net6.0-ios net6.0-ios was computed. net6.0-maccatalyst net6.0-maccatalyst was computed. net6.0-macos net6.0-macos was computed. net6.0-tvos net6.0-tvos was computed. net6.0-windows net6.0-windows was computed. net7.0 net7.0 was computed. net7.0-android net7.0-android was computed. net7.0-ios net7.0-ios was computed. net7.0-maccatalyst net7.0-maccatalyst was computed. net7.0-macos net7.0-macos was computed. net7.0-tvos net7.0-tvos was computed. net7.0-windows net7.0-windows was computed. net8.0 net8.0 is compatible. net8.0-android net8.0-android was computed. net8.0-browser net8.0-browser was computed. net8.0-ios net8.0-ios was computed. net8.0-maccatalyst net8.0-maccatalyst was computed. net8.0-macos net8.0-macos was computed. net8.0-tvos net8.0-tvos was computed. net8.0-windows net8.0-windows was computed. net9.0 net9.0 was computed. net9.0-android net9.0-android was computed. net9.0-browser net9.0-browser was computed. net9.0-ios net9.0-ios was computed. net9.0-maccatalyst net9.0-maccatalyst was computed. net9.0-macos net9.0-macos was computed. net9.0-tvos net9.0-tvos was computed. net9.0-windows net9.0-windows was computed. net10.0 net10.0 was computed. net10.0-android net10.0-android was computed. net10.0-browser net10.0-browser was computed. net10.0-ios net10.0-ios was computed. net10.0-maccatalyst net10.0-maccatalyst was computed. net10.0-macos net10.0-macos was computed. net10.0-tvos net10.0-tvos was computed. net10.0-windows net10.0-windows was computed. |
This package is not used by any NuGet packages.
This package is not used by any popular GitHub repositories.