![]() |
VOOZH | about |
dotnet add package Rebus.AzureServiceBus.RebusPerQueueTopic --version 6.3.2
NuGet\Install-Package Rebus.AzureServiceBus.RebusPerQueueTopic -Version 6.3.2
<PackageReference Include="Rebus.AzureServiceBus.RebusPerQueueTopic" Version="6.3.2" />
<PackageVersion Include="Rebus.AzureServiceBus.RebusPerQueueTopic" Version="6.3.2" />Directory.Packages.props
<PackageReference Include="Rebus.AzureServiceBus.RebusPerQueueTopic" />Project file
paket add Rebus.AzureServiceBus.RebusPerQueueTopic --version 6.3.2
#r "nuget: Rebus.AzureServiceBus.RebusPerQueueTopic, 6.3.2"
#:package Rebus.AzureServiceBus.RebusPerQueueTopic@6.3.2
#addin nuget:?package=Rebus.AzureServiceBus.RebusPerQueueTopic&version=6.3.2Install as a Cake Addin
#tool nuget:?package=Rebus.AzureServiceBus.RebusPerQueueTopic&version=6.3.2Install as a Cake Tool
Forked from the original Rebus.AzureServiceBus repository. Rebus.AzureServiceBus.
Install Rebus.AzureServiceBus.RebusPerQueueTopic package, which is the repository. We are dependent on:
You should not upgrade to newer versions since we created this package in a hurry, compatibility issues were not fixed. Once done, we will update the documentation and the package.
In startup.cs enable the middleware:
public static void Configure(IApplicationBuilder app)
{
RebusPerQueueTopic.Configure(app);
}
Create a class containing all topic/queue subscription definitons named "AzureServiceBusConfigurator" ie.
public static class AzureServiceBusConfigurator
{
public static void Register(IServiceCollection services,
IConfiguration configuration,
IWebHostEnvironment webHostEnvironment)
{
var serviceBusConnectionString = configuration.GetValue<string>("ServiceBus:ConnectionString");
var r = new RebusPerQueueTopic(services, webHostEnvironment, serviceBusConnectionString, true,
new RebusAzureServiceBusSettings
{
Environment = webHostEnvironment,
RetryDelays = new[]
{
TimeSpan.FromMinutes(1),
TimeSpan.FromMinutes(5),
}
});
and call it in your startup.cs while building services. Retry Delays will be used in case a message handling is failed, it will be redelivered to the queue/topic subscription and will be retried. Once maximum retries achieved, it would be dead-lettered. Dont forget that maximum execution time for a message in Azure ServiceBus is limited with 5 minutes at most. You can delay a message to tomorrow maybe, but execution time (peek-locking) cannot exceed 5 minutes.
In AzureServiceBusConfigurator.cs file, here are some examples:
Definining a classic queue subscription:
r.Queue<QueueTestMessage2, QueueTestMessage2Handler>("QueueTestMessage2");
//or one way: term one-way meaning the app itself will not consume the message, will only send it
r.QueueOneWay<QueueTestOneWayMessage>("QueueOneWayMessage");
//or to a masstransit consumer
r.QueueOneWayToMasstransit<MyMasstransitEvent>("MyMasstransitEvent");
Example Queue message:
public class QueueTestMessage2
{
public string Source { get; set; }
public string Date { get; set; }
...
}
Example message handler:
public class QueueTestMessage2Handler : IHandleMessages<QueueTestMessage2>
{
public Task Handle(QueueTestMessage2 message)
{
return Task.CompletedTask;
}
}
// last bool parameter of true will result in multiple node execution
//Topictest/Sub1-{{Environment.MachineName}} will be used
r.Topic<TopicTestMessage, TopicTestMessageHandler>("TopicTest", "Sub1", true);
//One way topic message sending
r.TopicOneWay<TopicTestOneWayMessage>("TopicTest");
//To masstransit message
r.TopicOneWayToMasstransit<MasstransitTopicTestMessage>("TopicMasstransitTest");
In "AzureServiceBusConfigurator.cs" lastly, you can optionally enable health checking:
HealthChecksConfigurer.AddBusHealthChecks(services, webHostEnvironment, serviceBusConnectionString,
new BusHealthCheckOptions
{
ExpireConditionDelay = (message,
messageType) => TimeSpan.FromMinutes(1),
WarmupIgnoreDelayForHealthChecking = TimeSpan.FromSeconds(15)
});
ExpireConditionDelay: Default 1 hour. If any Rebus message (only consumed by this library, ignoring one-way client) creation date is older than 1 hour, the health check will fail. You can adjust it according to your application, longer/shorter durations might fit your needs.
WarmupIgnoreDelayForHealthChecking: In initial warmup phase of the application like first 1-2 minutes, you might ignore health check errors. Think about a scenario, your queue is full of hundreds message and it will take 30 mins to be consumed. In this scenario, you should ignore health checking due to the warmup phase. Default value is 1 hour.
The application will result as healthy by default for the first 1 hour of start. Can be adjusted according to your needs.
We switched from Masstransit to Rebus and here were the requirements for us:
We have strict internal Information Security policies even on Azure Cloud. While we were still on Masstransit, randomly message consumers stopped day by day and Masstransit will not resubscribe to the queue.
We tried to find the root cause since there were no missing network packages (from Network department), no Firewall issues, not an even debug-level log on production systems relevant with the issue.
However, we started starting our workdays seeing that a random queue/topic subscriber stopped working each day. After spending 2 weeks, our team decided to change the messaging infrastructure to another library: Rebus.
The original implementation of Rebus.AzureServiceBus package for Azure Service Bus Queues was like this:
Configure.With(...)
.Transport(t => t.UseAzureServiceBus(connectionString, "queue")
.AutomaticallyRenewPeekLock())
.(...)
and without Rebus.ServiceProvider.Named package, you cannot start multiple bus instances.
In original implementation, you specify a ASB connection string and a queue name which Rebus will use. However, you can send many T message types to only one queue in this implementation.
Coming to Topic usages, it is very far away what we expected:
As you can see in the documentation (https://github.com/rebus-org/Rebus/wiki/Azure-Service-Bus-transport), the implementation does the following internally:
Considering the limitation of Azure Service Bus itself; we cannot start a queue and a topic if they have the same name, it will result in a not allowed exception if you try to create a topic named "Mytopic" but if "MyTopic" named queue exists, it will fail
Using these conditions, we ensured that the original package will not fit our requirements, but we were on an alarm state and have to change from Masstransit to Rebus as soon as possible, so we re-implemented it.
1- Minimal usage for topic/queues even in one-way/two-way communication between Rebus/Masstransit libraries
2- Skipped implementation of Sagas, this library is working statelessly.
3- Each topic/queue subscription is living on its own Rebus instance
4- Health checking of each topic/queue subscription
5- Full control on Queue/Topic/Topic Subscription naming
6- For development environment you have an option of: you can prefix all queue/topic names starting with the machine name; this enables me to seperate my queues and application from another guy working on the same project since my queues are created like "oguzhan/queue1" whereas the other guy's "otherguy/queue1"
7- For topic subscribing, you have the option of whether you will need multiple node subscription or not. If enabled, for topic "Topic1" and subscription name of "sub", it will create "Topic1/sub-node1" whereas "node1" is the docker container name.
8- In masstransit you have the limitation of each message is transported with its full namespace. If you move its namespace or change class name, MT will not consume the message. This was an annoying issue for all of us in the company. We are free from this issue thanks to Rebus.
👁 alternate text is missing from this package README image
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net5.0 net5.0 is compatible. net5.0-windows net5.0-windows was computed. net6.0 net6.0 is compatible. net6.0-android net6.0-android was computed. net6.0-ios net6.0-ios was computed. net6.0-maccatalyst net6.0-maccatalyst was computed. net6.0-macos net6.0-macos was computed. net6.0-tvos net6.0-tvos was computed. net6.0-windows net6.0-windows was computed. net7.0 net7.0 was computed. net7.0-android net7.0-android was computed. net7.0-ios net7.0-ios was computed. net7.0-maccatalyst net7.0-maccatalyst was computed. net7.0-macos net7.0-macos was computed. net7.0-tvos net7.0-tvos was computed. net7.0-windows net7.0-windows was computed. net8.0 net8.0 was computed. net8.0-android net8.0-android was computed. net8.0-browser net8.0-browser was computed. net8.0-ios net8.0-ios was computed. net8.0-maccatalyst net8.0-maccatalyst was computed. net8.0-macos net8.0-macos was computed. net8.0-tvos net8.0-tvos was computed. net8.0-windows net8.0-windows was computed. net9.0 net9.0 was computed. net9.0-android net9.0-android was computed. net9.0-browser net9.0-browser was computed. net9.0-ios net9.0-ios was computed. net9.0-maccatalyst net9.0-maccatalyst was computed. net9.0-macos net9.0-macos was computed. net9.0-tvos net9.0-tvos was computed. net9.0-windows net9.0-windows was computed. net10.0 net10.0 was computed. net10.0-android net10.0-android was computed. net10.0-browser net10.0-browser was computed. net10.0-ios net10.0-ios was computed. net10.0-maccatalyst net10.0-maccatalyst was computed. net10.0-macos net10.0-macos was computed. net10.0-tvos net10.0-tvos was computed. net10.0-windows net10.0-windows was computed. |
This package is not used by any NuGet packages.
This package is not used by any popular GitHub repositories.