![]() |
VOOZH | about |
dotnet add package Momentum.Extensions.Messaging.Kafka --version 0.0.12
NuGet\Install-Package Momentum.Extensions.Messaging.Kafka -Version 0.0.12
<PackageReference Include="Momentum.Extensions.Messaging.Kafka" Version="0.0.12" />
<PackageVersion Include="Momentum.Extensions.Messaging.Kafka" Version="0.0.12" />Directory.Packages.props
<PackageReference Include="Momentum.Extensions.Messaging.Kafka" />Project file
paket add Momentum.Extensions.Messaging.Kafka --version 0.0.12
#r "nuget: Momentum.Extensions.Messaging.Kafka, 0.0.12"
#:package Momentum.Extensions.Messaging.Kafka@0.0.12
#addin nuget:?package=Momentum.Extensions.Messaging.Kafka&version=0.0.12Install as a Cake Addin
#tool nuget:?package=Momentum.Extensions.Messaging.Kafka&version=0.0.12Install as a Cake Tool
Kafka messaging integration package for the Momentum platform, providing event-driven architecture capabilities with CloudEvents support and automatic topic management.
This package extends the Momentum platform with Apache Kafka messaging capabilities, enabling reliable event-driven communication between microservices. It builds on top of Momentum.ServiceDefaults to provide seamless integration with the platform's observability, health checks, and configuration systems.
Add the package to your project using the .NET CLI:
dotnet add package Momentum.Extensions.Messaging.Kafka
Or using the Package Manager Console:
Install-Package Momentum.Extensions.Messaging.Kafka
This package includes the following key dependencies:
| Package | Purpose |
|---|---|
| Aspire.Confluent.Kafka | .NET Aspire Kafka integration with service discovery |
| CloudNative.CloudEvents.Kafka | CloudEvents specification implementation for Kafka |
| WolverineFx.Kafka | Message bus framework with Kafka transport |
| WolverineFx | Message bus framework with pattern matching |
Add Kafka messaging to your Momentum service:
// Program.cs
var builder = WebApplication.CreateBuilder(args);
// Add service defaults first
builder.AddServiceDefaults();
// Add Kafka messaging
builder.AddKafkaMessagingExtensions();
var app = builder.Build();
app.MapDefaultEndpoints();
app.Run();
The package now leverages .NET Aspire's Kafka configuration. Add the Kafka connection to your configuration:
// appsettings.json - Complete Aspire integration
{
"ConnectionStrings": {
"Messaging": "localhost:9092"
},
"Aspire": {
"Confluent": {
"Kafka": {
"Messaging": {
"BootstrapServers": "localhost:9092",
"Producer": {
"Config": {
"Acks": "All",
"EnableIdempotence": true,
"CompressionType": "Snappy",
"BatchSize": 16384
}
},
"Consumer": {
"Config": {
"AutoOffsetReset": "Latest",
"EnableAutoCommit": true,
"AutoCommitIntervalMs": 1000
}
},
"Security": {
"Protocol": "Plaintext"
}
}
}
}
},
"Wolverine": {
"AutoProvision": true
}
}
Create events that will be published across services:
// Events should be in a namespace ending with "IntegrationEvents"
namespace MyService.Contracts.IntegrationEvents;
[EventTopic("customer", Domain = "sales")]
public record CustomerCreated(
Guid CustomerId,
string Name,
string Email,
DateTime CreatedAt) : IDistributedEvent
{
public string GetPartitionKey() => CustomerId.ToString();
}
Publish events using Wolverine's message bus:
public class CustomerService(IMessageBus messageBus)
{
public async Task CreateCustomerAsync(CreateCustomerRequest request)
{
// Business logic here...
var integrationEvent = new CustomerCreated(
customerId,
request.Name,
request.Email,
DateTime.UtcNow);
// This will be automatically routed to the appropriate Kafka topic
await messageBus.PublishAsync(integrationEvent);
}
}
Create handlers for integration events:
// This handler will automatically subscribe to the CustomerCreated topic
public class CustomerCreatedHandler
{
public async Task Handle(CustomerCreated customerCreated, CancellationToken cancellationToken)
{
// Process the integration event
Console.WriteLine($"Customer {customerCreated.Name} was created with ID {customerCreated.CustomerId}");
}
}
This package fully integrates with .NET Aspire's Kafka configuration system:
// Support for multiple Kafka clusters
builder.AddKafkaMessagingExtensions("primary");
builder.AddKafkaMessagingExtensions("secondary",
configureProducerSettings: settings => {
// Producer-specific configuration
},
configureConsumerSettings: settings => {
// Consumer-specific configuration
});
// Or use advanced Aspire-Wolverine bridge integration
builder.AddKafkaMessagingWithAspire("primary", kafka =>
{
kafka.AutoProvision();
// Additional Wolverine-specific configuration
});
For maximum integration leveraging all Aspire capabilities:
// Program.cs - Full Aspire-Wolverine bridge
var builder = WebApplication.CreateBuilder(args);
builder.AddServiceDefaults();
// Use the advanced Aspire-Wolverine bridge
builder.AddKafkaMessagingWithAspire("Messaging", kafka =>
{
kafka.AutoProvision();
// Wolverine-specific configuration with Aspire integration
kafka.ConfigureProducers(config => {
config.Acks = Acks.All;
config.EnableIdempotence = true;
});
kafka.ConfigureConsumers(config => {
config.GroupId = "my-service-group";
config.AutoOffsetReset = AutoOffsetReset.Latest;
});
});
var app = builder.Build();
app.Run();
Bridge Benefits:
The package supports multiple configuration sources in order of precedence:
Aspire:Confluent:Kafka:Producer/Consumer:ConfigConnectionStrings:messagingWolverine:AutoProvisionTopics are automatically named using the following pattern:
{environment}.{domain}.{scope}.{topic}.{version}
For example:
dev.sales.public.customers.v1prod.sales.public.customers.v1prod.sales.internal.customer-updates.v1Control topic configuration with attributes:
[EventTopic(
"order-payment",
Domain = "ecommerce",
Internal = false, // Creates public topic
ShouldPluralizeTopicName = true, // "payments" instead of "payment"
Version = "v2")]
public record PaymentProcessed(Guid OrderId, decimal Amount);
public record OrderCreated(Guid OrderId, Guid CustomerId) : IDistributedEvent
{
// Messages with the same customer ID will go to the same partition
public string GetPartitionKey() => CustomerId.ToString();
}
public record ProductUpdated(
[PartitionKey] Guid ProductId,
string Name,
decimal Price);
The package automatically adapts topic names based on the environment:
// Environment mapping
"Development" → "dev"
"Production" → "prod"
"Test" → "test"
Kafka health checks are automatically registered and available at the /health endpoint:
{
"status": "Healthy",
"checks": {
"kafka": {
"status": "Healthy",
"description": "Kafka connectivity check",
"tags": ["messaging", "kafka"]
}
}
}
All messages are automatically wrapped in CloudEvents format, providing:
Example CloudEvent structure:
{
"specversion": "1.0",
"type": "CustomerCreated",
"source": "urn:momentum:sales-api",
"id": "550e8400-e29b-41d4-a716-446655440000",
"time": "2024-01-15T10:30:00Z",
"datacontenttype": "application/json",
"data": {
"customerId": "123e4567-e89b-12d3-a456-426614174000",
"name": "John Doe",
"email": "john.doe@example.com"
}
}
The package provides error handling through WolverineFx:
Built-in observability includes:
Connection Failures
InvalidOperationException: Kafka connection string 'messaging' not found in configuration
messaging connection string is configured (note lowercase)Topic Creation Issues
Topic does not exist and auto-creation is disabled
"Wolverine:AutoProvision": trueSerialization Errors
CloudEvent serialization failed
Enable debug logging for detailed troubleshooting:
{
"Logging": {
"LogLevel": {
"Momentum.Extensions.Messaging.Kafka": "Debug",
"Wolverine.Kafka": "Debug",
"Aspire.Confluent.Kafka": "Debug"
}
}
}
This project is licensed under the MIT License - see the LICENSE file for details.
For more information about the Momentum platform and contribution guidelines, please visit the main repository.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 net10.0 is compatible. net10.0-android net10.0-android was computed. net10.0-browser net10.0-browser was computed. net10.0-ios net10.0-ios was computed. net10.0-maccatalyst net10.0-maccatalyst was computed. net10.0-macos net10.0-macos was computed. net10.0-tvos net10.0-tvos was computed. net10.0-windows net10.0-windows was computed. |
This package is not used by any NuGet packages.
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.0.12 | 73 | 6/11/2026 |
| 0.0.12-preview.9 | 46 | 6/11/2026 |
| 0.0.12-preview.8 | 46 | 6/11/2026 |
| 0.0.12-preview.7 | 43 | 6/11/2026 |
| 0.0.12-preview.6 | 43 | 6/11/2026 |
| 0.0.12-preview.5 | 62 | 5/27/2026 |
| 0.0.12-preview.4 | 54 | 5/21/2026 |
| 0.0.12-preview.3 | 51 | 5/20/2026 |
| 0.0.12-preview.2 | 54 | 5/14/2026 |
| 0.0.12-preview.1 | 53 | 5/12/2026 |
| 0.0.11 | 174 | 5/10/2026 |
| 0.0.11-preview.2 | 55 | 5/10/2026 |
| 0.0.11-preview.1 | 58 | 5/1/2026 |
| 0.0.10 | 124 | 5/1/2026 |
| 0.0.10-preview.8 | 56 | 5/1/2026 |
| 0.0.10-preview.7 | 53 | 5/1/2026 |
| 0.0.10-preview.6 | 60 | 4/28/2026 |
| 0.0.10-preview.5 | 66 | 4/24/2026 |
| 0.0.10-preview.4 | 62 | 4/24/2026 |
| 0.0.10-preview.3 | 66 | 4/22/2026 |