![]() |
VOOZH | about |
dotnet add package EasyCore.EventBus.Pulsar --version 8.0.1
NuGet\Install-Package EasyCore.EventBus.Pulsar -Version 8.0.1
<PackageReference Include="EasyCore.EventBus.Pulsar" Version="8.0.1" />
<PackageVersion Include="EasyCore.EventBus.Pulsar" Version="8.0.1" />Directory.Packages.props
<PackageReference Include="EasyCore.EventBus.Pulsar" />Project file
paket add EasyCore.EventBus.Pulsar --version 8.0.1
#r "nuget: EasyCore.EventBus.Pulsar, 8.0.1"
#:package EasyCore.EventBus.Pulsar@8.0.1
#addin nuget:?package=EasyCore.EventBus.Pulsar&version=8.0.1Install as a Cake Addin
#tool nuget:?package=EasyCore.EventBus.Pulsar&version=8.0.1Install as a Cake Tool
EasyCore.EventBus is a lightweight event bus library designed specifically for .NET Core, helping developers easily implement Event-Driven Architecture (EDA). This library supports multiple message queues as event transmission media and provides a unified event publish-subscribe interface, making asynchronous communication between different components, modules, or services simpler.
๐ฏ Core Concepts Event Bus The Event Bus is a core component in Event-Driven Architecture. Based on the Publish-Subscribe (Pub/Sub) model, it decouples different parts of the system:
| Component | Role | Responsibility |
|---|---|---|
| ๐ค Publisher | Event Producer | Pushes events to the EventBus |
| ๐ฅ Subscriber | Event Consumer | Subscribes and processes events |
| ๐จ Event | Message Carrier | Represents changes or actions in the system |
๐ Supported Message Queues EasyCore.EventBus provides support for multiple message queues:
| Package Name | Message Queue | Features |
|---|---|---|
| EasyCore.EventBus.Kafka | Apache Kafka | High throughput, distributed |
| EasyCore.EventBus.Pulsar | Apache Pulsar | Low latency, cloud-native |
| EasyCore.EventBus.RabbitMQ | RabbitMQ | High concurrency, AMQP protocol |
| EasyCore.EventBus.RedisStreams | Redis Streams | In-memory performance, simple to use |
[STAThread]
static void Main()
{
var host = CreateHostBuilder().Build();
ApplicationConfiguration.Initialize();
var mainForm = host.Services.GetRequiredService<Main>();
var backgroundService = host.Services.GetRequiredService<IHostedService>();
backgroundService.StartAsync(default).Wait();
Application.Run(mainForm);
}
public static IHostBuilder CreateHostBuilder() =>
Host.CreateDefaultBuilder()
.ConfigureServices((hostContext, services) =>
{
services.AddSingleton<Main>();
// ๐ฏ Register EventBus Service
services.AddAppEventBus(options =>
{
options.RabbitMQ(opt =>
{
opt.HostName = "192.168.157.142";
opt.UserName = "123";
opt.Password = "123";
opt.Port = 5672;
});
});
});
public class Program
{
public static void Main(string[] args)
{
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
// ๐ฏ Register EventBus Service
builder.Services.AddAppEventBus(options =>
{
options.RabbitMQ(opt =>
{
opt.HostName = "192.168.157.142";
opt.UserName = "123";
opt.Password = "123";
opt.Port = 5672;
});
});
var app = builder.Build();
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseAuthorization();
app.MapControllers();
app.Run();
}
}
public class LocalEventMessage : IEvent
{
public string Message { get; set; }
public DateTime Timestamp { get; set; } = DateTime.Now;
}
public class MyLocalEventHandler : ILocalEventHandler<LocalEventMessage>
{
private readonly ILogger<MyLocalEventHandler> _logger;
public MyLocalEventHandler(ILogger<MyLocalEventHandler> logger)
{
_logger = logger;
}
public async Task HandleAsync(LocalEventMessage eventMessage)
{
// โ
Handle event logic
_logger.LogInformation($"Received event: {eventMessage.Message} at {eventMessage.Timestamp}");
await Task.CompletedTask;
}
}
docker run -d --name rabbitmq \
-e RABBITMQ_DEFAULT_USER=123 \
-e RABBITMQ_DEFAULT_PASS=123 \
-p 15672:15672 -p 5672:5672 \
rabbitmq:3-management
public class DistributedEventMessage : IEvent
{
public string Message { get; set; }
public string Source { get; set; }
public Guid EventId { get; set; } = Guid.NewGuid();
}
public class MyDistributedEventHandler : IDistributedEventHandler<DistributedEventMessage>
{
private readonly ILogger<MyDistributedEventHandler> _logger;
public MyDistributedEventHandler(ILogger<MyDistributedEventHandler> logger)
{
_logger = logger;
}
public async Task HandleAsync(DistributedEventMessage eventMessage)
{
_logger.LogInformation($"Processing distributed event: {eventMessage.Message} from {eventMessage.Source}");
// ๐ง Process business logic
await ProcessBusinessLogic(eventMessage);
await Task.CompletedTask;
}
private async Task ProcessBusinessLogic(DistributedEventMessage message)
{
// Business logic code
await Task.Delay(100);
}
}
Retry Mechanism ๐ Sender Configuration
services.EasyCoreEventBus(options =>
{
options.RabbitMQ(opt =>
{
opt.HostName = "192.168.157.142";
opt.UserName = "123";
opt.Password = "123";
opt.Port = 5672;
});
// ๐ง Retry Configuration
options.RetryCount = 3; // Retry count
options.RetryInterval = 5; // Retry interval (seconds)
});
services.EasyCoreEventBus(options =>
{
options.RabbitMQ(opt =>
{
opt.HostName = "192.168.157.142";
opt.UserName = "123";
opt.Password = "123";
opt.Port = 5672;
});
// ๐จ Failure Callback Function
options.FailureCallback = (key, message) =>
{
MessageBox.Show($"Event handling failed: {message}",
"Error",
MessageBoxButtons.OK,
MessageBoxIcon.Error);
};
});
builder.Services.EasyCoreEventBus(options =>
{
options.Kafka("localhost:9092");
});
builder.Services.EasyCoreEventBus(options =>
{
options.Pulsar("pulsar://localhost:6650");
});
builder.Services.EasyCoreEventBus(options =>
{
options.RabbitMQ("localhost");
});
builder.Services.EasyCoreEventBus(options =>
{
options.RedisStreams(new List<string> { "localhost:6379" });
});
[Route("api/[controller]")]
[ApiController]
public class PublishController : ControllerBase
{
private readonly IDistributedEventBus _distributedEventBus;
public PublishController(IDistributedEventBus distributedEventBus)
{
_distributedEventBus = distributedEventBus;
}
[HttpPost]
public async Task<IActionResult> Publish([FromBody] string message)
{
var eventMessage = new WebEventMessage()
{
Message = message,
Timestamp = DateTime.UtcNow
};
await _distributedEventBus.PublishAsync(eventMessage);
return Ok(new { success = true, eventId = eventMessage.EventId });
}
}
public class MonitoringEventHandler : IDistributedEventHandler<WebEventMessage>
{
private readonly ILogger<MonitoringEventHandler> _logger;
private readonly IMetricsService _metrics;
public MonitoringEventHandler(ILogger<MonitoringEventHandler> logger, IMetricsService metrics)
{
_logger = logger;
_metrics = metrics;
}
public async Task HandleAsync(WebEventMessage eventMessage)
{
var stopwatch = Stopwatch.StartNew();
try
{
_logger.LogInformation($"Starting event processing: {eventMessage.Message}");
// ๐ Log metrics
_metrics.IncrementEventCount();
await ProcessEvent(eventMessage);
stopwatch.Stop();
_metrics.RecordProcessingTime(stopwatch.ElapsedMilliseconds);
_logger.LogInformation($"Event processed: {eventMessage.Message}");
}
catch (Exception ex)
{
_logger.LogError(ex, $"Event processing failed: {eventMessage.Message}");
_metrics.IncrementErrorCount();
throw;
}
}
}
| Feature | Benefit | Description |
|---|---|---|
| ๐ Multi-Queue Support | Flexibility | Supports Kafka, Pulsar, RabbitMQ, Redis Streams |
| โก High Performance | Low Latency | Optimized message serialization and transport |
| ๐ Reliability | Message Persistence | Supports retry on failure |
| ๐ฏ Easy-to-Use | Simple API | Unified publish-subscribe interface |
| ๐ง Scalable | Modular Architecture | Easy to extend with new message queue support |
EasyCore.EventBus provides a feature-rich and easy-to-use event bus solution for .NET Core applications. Whether it's decoupling modules within a monolithic application or enabling cross-service communication in a microservices architecture, it can be easily achieved with a unified API. Its robust retry mechanism and support for multiple message queues allow developers to focus on business logic rather than worrying about underlying communication details.
Start using EasyCore.EventBus to build more loosely coupled, scalable .NET Core applications! ๐
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 net8.0 is compatible. net8.0-android net8.0-android was computed. net8.0-browser net8.0-browser was computed. net8.0-ios net8.0-ios was computed. net8.0-maccatalyst net8.0-maccatalyst was computed. net8.0-macos net8.0-macos was computed. net8.0-tvos net8.0-tvos was computed. net8.0-windows net8.0-windows was computed. net9.0 net9.0 was computed. net9.0-android net9.0-android was computed. net9.0-browser net9.0-browser was computed. net9.0-ios net9.0-ios was computed. net9.0-maccatalyst net9.0-maccatalyst was computed. net9.0-macos net9.0-macos was computed. net9.0-tvos net9.0-tvos was computed. net9.0-windows net9.0-windows was computed. net10.0 net10.0 was computed. net10.0-android net10.0-android was computed. net10.0-browser net10.0-browser was computed. net10.0-ios net10.0-ios was computed. net10.0-maccatalyst net10.0-maccatalyst was computed. net10.0-macos net10.0-macos was computed. net10.0-tvos net10.0-tvos was computed. net10.0-windows net10.0-windows was computed. |
This package is not used by any NuGet packages.
This package is not used by any popular GitHub repositories.