![]() |
VOOZH | about |
dotnet add package Excalibur.Dispatch.Transport.Kafka --version 3.0.0-alpha.208
NuGet\Install-Package Excalibur.Dispatch.Transport.Kafka -Version 3.0.0-alpha.208
<PackageReference Include="Excalibur.Dispatch.Transport.Kafka" Version="3.0.0-alpha.208" />
<PackageVersion Include="Excalibur.Dispatch.Transport.Kafka" Version="3.0.0-alpha.208" />Directory.Packages.props
<PackageReference Include="Excalibur.Dispatch.Transport.Kafka" />Project file
paket add Excalibur.Dispatch.Transport.Kafka --version 3.0.0-alpha.208
#r "nuget: Excalibur.Dispatch.Transport.Kafka, 3.0.0-alpha.208"
#:package Excalibur.Dispatch.Transport.Kafka@3.0.0-alpha.208
#addin nuget:?package=Excalibur.Dispatch.Transport.Kafka&version=3.0.0-alpha.208&prereleaseInstall as a Cake Addin
#tool nuget:?package=Excalibur.Dispatch.Transport.Kafka&version=3.0.0-alpha.208&prereleaseInstall as a Cake Tool
Apache Kafka transport implementation for the Excalibur framework, providing high-throughput, distributed event streaming with exactly-once semantics and CloudEvents support.
This package is included in the following metapackages:
| Metapackage | Tier | What It Adds |
|---|---|---|
Excalibur.Dispatch.Kafka |
Starter | + Resilience (Polly) + Observability |
Tip: If you are getting started, install
Excalibur.Dispatch.Kafkainstead of this package directly. It includes production-ready defaults.
This package provides Apache Kafka integration for Excalibur.Dispatch, enabling:
dotnet add package Excalibur.Dispatch.Transport.Kafka
services.Configure<KafkaOptions>(options =>
{
options.BootstrapServers = "localhost:9092";
options.Topic = "my-events";
options.ConsumerGroup = "my-consumer-group";
options.GroupProtocol = GroupProtocol.Consumer;
});
services.Configure<KafkaOptions>(options =>
{
options.BootstrapServers = "broker1:9092,broker2:9092,broker3:9092";
options.Topic = "my-events";
options.ConsumerGroup = "my-consumer-group";
});
Configure via environment variables for containerized deployments:
KAFKA__BOOTSTRAPSERVERS=broker1:9092,broker2:9092
KAFKA__TOPIC=my-events
KAFKA__CONSUMERGROUP=my-consumer-group
services.Configure<KafkaOptions>(configuration.GetSection("Kafka"));
services.Configure<KafkaOptions>(options =>
{
options.BootstrapServers = "kafka.example.com:9093";
options.AdditionalConfig["security.protocol"] = "SASL_SSL";
options.AdditionalConfig["sasl.mechanism"] = "PLAIN";
options.AdditionalConfig["sasl.username"] = "your-api-key";
options.AdditionalConfig["sasl.password"] = "your-api-secret";
});
services.Configure<KafkaOptions>(options =>
{
options.BootstrapServers = "kafka.example.com:9093";
options.AdditionalConfig["security.protocol"] = "SASL_SSL";
options.AdditionalConfig["sasl.mechanism"] = "SCRAM-SHA-512";
options.AdditionalConfig["sasl.username"] = "your-username";
options.AdditionalConfig["sasl.password"] = "your-password";
});
services.Configure<KafkaOptions>(options =>
{
options.BootstrapServers = "kafka.example.com:9093";
options.AdditionalConfig["security.protocol"] = "SSL";
options.AdditionalConfig["ssl.ca.location"] = "/path/to/ca.crt";
options.AdditionalConfig["ssl.certificate.location"] = "/path/to/client.crt";
options.AdditionalConfig["ssl.key.location"] = "/path/to/client.key";
options.AdditionalConfig["ssl.key.password"] = "key-password";
});
services.Configure<KafkaOptions>(options =>
{
options.BootstrapServers = "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098";
options.AdditionalConfig["security.protocol"] = "SASL_SSL";
options.AdditionalConfig["sasl.mechanism"] = "AWS_MSK_IAM";
options.AdditionalConfig["sasl.jaas.config"] =
"software.amazon.msk.auth.iam.IAMLoginModule required;";
});
services.Configure<KafkaOptions>(options =>
{
// Connection
options.BootstrapServers = "localhost:9092";
options.Topic = "my-events";
options.ConsumerGroup = "my-consumer-group";
// Offset management
options.EnableAutoCommit = false; // Manual commits (recommended)
options.AutoCommitIntervalMs = 5000; // If auto-commit is enabled
options.AutoOffsetReset = "latest"; // "earliest", "latest", or "none"
// Batching
options.MaxBatchSize = 100; // Messages per batch
options.MaxBatchWaitMs = 1000; // Max wait for batch (ms)
// Performance
options.QueuedMinMessages = 1000; // Prefetch per partition
options.MaxConcurrentCommits = 10; // Concurrent offset commits
// Session management
options.SessionTimeoutMs = 30000; // Consumer session timeout
options.MaxPollIntervalMs = 300000; // Max time between polls
// Partition handling
options.EnablePartitionEof = false; // EOF detection
// Security
options.EnableEncryption = false; // Message-level encryption
});
services.Configure<KafkaMessageBusOptions>(options =>
{
options.BootstrapServers = "localhost:9092";
options.ProducerClientId = "my-producer";
options.ConsumerGroupId = "my-consumer-group";
// CloudEvents
options.EnableCloudEvents = true;
// Compression
options.CompressionType = KafkaCompressionType.Snappy;
// Acknowledgment
options.AckLevel = KafkaAckLevel.All; // All, Leader, or None
// Partitioning
options.PartitioningStrategy = KafkaPartitioningStrategy.RoundRobin;
});
services.Configure<KafkaCloudEventOptions>(options =>
{
// Topic settings
options.DefaultTopic = "cloud-events";
options.DefaultPartitionCount = 3;
options.DefaultReplicationFactor = 3;
options.AutoCreateTopics = false;
// Partitioning strategy
options.PartitioningStrategy = KafkaPartitioningStrategy.CorrelationId;
// Exactly-once semantics
options.EnableIdempotentProducer = true;
options.EnableTransactions = false;
options.TransactionalId = null;
// Acknowledgment
options.AcknowledgmentLevel = KafkaAckLevel.All;
// Message size
options.MaxMessageSizeBytes = 1048576; // 1 MB
// Compression
options.EnableCompression = true;
options.CompressionType = KafkaCompressionType.Snappy;
options.CompressionThreshold = 1024; // Compress messages > 1 KB
// Consumer settings
options.ConsumerGroupId = "cloudevents-consumer";
options.OffsetReset = KafkaOffsetReset.Latest;
// Retry settings
options.RetrySettings = new KafkaRetrySettings
{
MaxRetries = 3,
RetryDelay = TimeSpan.FromMilliseconds(100),
MaxRetryDelay = TimeSpan.FromSeconds(30),
UseExponentialBackoff = true,
BackoffMultiplier = 2.0,
UseJitter = true
};
});
| Strategy | Use Case |
|---|---|
CorrelationId |
Order preservation per correlation |
TenantId |
Multi-tenant isolation |
UserId |
User-scoped ordering |
Source |
Source-based routing |
Type |
Event type-based routing |
EventId |
Unique event distribution |
RoundRobin |
Even distribution (default) |
Custom |
Custom partition key from extensions |
| Type | Characteristics |
|---|---|
None |
No compression (fastest, largest) |
Gzip |
Good compression, slower |
Snappy |
Balanced speed/compression (recommended) |
Lz4 |
Fastest compression |
Zstd |
Best compression ratio |
services.Configure<KafkaCloudEventOptions>(options =>
{
options.RetrySettings = new KafkaRetrySettings
{
MaxRetries = 3, // Retry attempts
RetryDelay = TimeSpan.FromMilliseconds(100), // Initial delay
MaxRetryDelay = TimeSpan.FromSeconds(30), // Maximum delay
UseExponentialBackoff = true, // Exponential backoff
BackoffMultiplier = 2.0, // Backoff factor
UseJitter = true // Add randomization
};
});
services.AddHealthChecks()
.AddCheck<KafkaHealthCheck>("kafka", tags: new[] { "ready", "messaging" });
services.Configure<KafkaHealthCheckOptions>(options =>
{
options.Timeout = TimeSpan.FromSeconds(10);
options.BootstrapServers = "localhost:9092";
});
public class KafkaHealthCheck : IHealthCheck
{
private readonly ITransportHealthChecker _healthChecker;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default)
{
try
{
var result = await _healthChecker.CheckQuickHealthAsync(cancellationToken);
return result.Status switch
{
TransportHealthStatus.Healthy => HealthCheckResult.Healthy("Kafka cluster reachable"),
TransportHealthStatus.Degraded => HealthCheckResult.Degraded(result.Description),
_ => HealthCheckResult.Unhealthy(result.Description)
};
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy("Kafka unreachable", ex);
}
}
}
// High-throughput configuration
services.Configure<KafkaCloudEventOptions>(options =>
{
options.DefaultPartitionCount = 12; // More partitions for parallelism
options.DefaultReplicationFactor = 3; // High availability
options.PartitioningStrategy = KafkaPartitioningStrategy.RoundRobin;
});
services.Configure<KafkaOptions>(options =>
{
options.MaxBatchSize = 500; // Larger batches
options.MaxBatchWaitMs = 50; // Shorter wait time
options.QueuedMinMessages = 5000; // More prefetch
});
services.Configure<KafkaMessageBusOptions>(options =>
{
options.AckLevel = KafkaAckLevel.Leader; // Faster acks (less durable)
options.CompressionType = KafkaCompressionType.Lz4; // Fast compression
});
services.Configure<KafkaCloudEventOptions>(options =>
{
options.EnableCompression = true;
options.CompressionType = KafkaCompressionType.Lz4;
options.EnableIdempotentProducer = false; // Disable for max throughput
});
services.Configure<KafkaOptions>(options =>
{
options.MaxBatchSize = 1; // Process immediately
options.MaxBatchWaitMs = 0; // No batching delay
options.SessionTimeoutMs = 10000; // Faster failure detection
options.MaxPollIntervalMs = 60000; // Shorter poll interval
});
services.Configure<KafkaCloudEventOptions>(options =>
{
options.EnableIdempotentProducer = true; // Idempotent writes
options.EnableTransactions = true; // Transactional processing
options.TransactionalId = "my-service-txn"; // Unique transaction ID
options.AcknowledgmentLevel = KafkaAckLevel.All;
});
services.Configure<KafkaOptions>(options =>
{
options.EnableAutoCommit = false; // Manual offset commits
});
Key Kafka metrics to monitor:
| Metric | Description | Alert Threshold |
|---|---|---|
| Consumer Lag | Messages behind latest offset | > 10,000 |
| Under-Replicated Partitions | Partitions with insufficient replicas | > 0 |
| Request Rate | Requests per second | Baseline deviation |
| Network Throughput | Bytes in/out per second | Approaching network limit |
| Disk Usage | Broker disk utilization | > 80% |
| ISR Shrink Rate | In-sync replica changes | > 0 (investigate) |
Confluent.Kafka.KafkaException: Local: Broker transport failure
Solutions:
kafka-broker-api-versions --bootstrap-server localhost:9092Confluent.Kafka.KafkaException: SASL authentication failed
Solutions:
Confluent.Kafka.KafkaException: Group coordinator not available
Solutions:
SessionTimeoutMs for slow-processing consumersMaxPollIntervalMs if processing takes too longConfluent.Kafka.KafkaException: Message size too large
Solutions:
message.max.bytes on brokermax.request.size on producerConfluent.Kafka.KafkaException: Offset out of range
Solutions:
AutoOffsetReset policyEnable detailed logging for troubleshooting:
{
"Logging": {
"LogLevel": {
"Excalibur.Dispatch.Transport.Kafka": "Debug",
"Confluent.Kafka": "Warning"
}
}
}
Use Kafka CLI tools:
# List topics
kafka-topics --bootstrap-server localhost:9092 --list
# Describe consumer group
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group my-group
# View topic messages
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic --from-beginning
Enable librdkafka debug:
options.AdditionalConfig["debug"] = "broker,topic,msg";
Check consumer lag:
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group my-group
Monitor with JMX or use Kafka UI tools like AKHQ, Conduktor, or Confluent Control Center
Use Docker for local development:
# docker-compose.yml
services:
kafka:
image: confluentinc/cp-kafka:latest
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
// Consumer/Core Options
services.Configure<KafkaOptions>(options =>
{
// Connection
options.BootstrapServers = "localhost:9092";
options.Topic = "my-events";
options.ConsumerGroup = "my-consumer-group";
// Offset management
options.EnableAutoCommit = false;
options.AutoCommitIntervalMs = 5000;
options.AutoOffsetReset = "latest";
// Batching
options.MaxBatchSize = 100;
options.MaxBatchWaitMs = 1000;
// Performance
options.QueuedMinMessages = 1000;
options.MaxConcurrentCommits = 10;
// Session management
options.SessionTimeoutMs = 30000;
options.MaxPollIntervalMs = 300000;
// Features
options.EnablePartitionEof = false;
options.EnableEncryption = false;
// Additional librdkafka config
options.AdditionalConfig["socket.keepalive.enable"] = "true";
});
// Message Bus Options
services.Configure<KafkaMessageBusOptions>(options =>
{
options.BootstrapServers = "localhost:9092";
options.ProducerClientId = "my-producer";
options.ConsumerGroupId = "my-consumer-group";
options.EnableCloudEvents = true;
options.CompressionType = KafkaCompressionType.Snappy;
options.AckLevel = KafkaAckLevel.All;
options.PartitioningStrategy = KafkaPartitioningStrategy.RoundRobin;
});
// CloudEvents Options
services.Configure<KafkaCloudEventOptions>(options =>
{
// Topics
options.DefaultTopic = "cloud-events";
options.DefaultPartitionCount = 3;
options.DefaultReplicationFactor = 1;
options.AutoCreateTopics = false;
// Partitioning
options.PartitioningStrategy = KafkaPartitioningStrategy.CorrelationId;
// Exactly-once
options.EnableIdempotentProducer = true;
options.EnableTransactions = false;
options.TransactionalId = null;
// Message settings
options.AcknowledgmentLevel = KafkaAckLevel.All;
options.MaxMessageSizeBytes = 1048576;
// Compression
options.EnableCompression = true;
options.CompressionType = KafkaCompressionType.Snappy;
options.CompressionThreshold = 1024;
// Consumer
options.ConsumerGroupId = "cloudevents-consumer";
options.OffsetReset = KafkaOffsetReset.Latest;
// Retry
options.RetrySettings = new KafkaRetrySettings
{
MaxRetries = 3,
RetryDelay = TimeSpan.FromMilliseconds(100),
MaxRetryDelay = TimeSpan.FromSeconds(30),
UseExponentialBackoff = true,
BackoffMultiplier = 2.0,
UseJitter = true
};
});
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 net10.0 is compatible. net10.0-android net10.0-android was computed. net10.0-browser net10.0-browser was computed. net10.0-ios net10.0-ios was computed. net10.0-maccatalyst net10.0-maccatalyst was computed. net10.0-macos net10.0-macos was computed. net10.0-tvos net10.0-tvos was computed. net10.0-windows net10.0-windows was computed. |
Showing the top 1 NuGet packages that depend on Excalibur.Dispatch.Transport.Kafka:
| Package | Downloads |
|---|---|
|
Excalibur.Dispatch.Kafka
Experience metapackage bundling Excalibur.Dispatch with Kafka transport. Provides a single AddDispatchKafka() call for the common Kafka streaming scenario. |
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 3.0.0-alpha.208 | 48 | 6/11/2026 |
| 3.0.0-alpha.207 | 49 | 6/11/2026 |
| 3.0.0-alpha.205 | 48 | 6/10/2026 |
| 3.0.0-alpha.204 | 50 | 6/8/2026 |
| 3.0.0-alpha.203 | 53 | 6/8/2026 |
| 3.0.0-alpha.202 | 52 | 6/8/2026 |
| 3.0.0-alpha.201 | 65 | 6/8/2026 |
| 3.0.0-alpha.199 | 51 | 6/8/2026 |
| 3.0.0-alpha.198 | 63 | 5/28/2026 |
| 3.0.0-alpha.197 | 67 | 5/28/2026 |
| 3.0.0-alpha.194 | 57 | 5/20/2026 |
| 3.0.0-alpha.193 | 58 | 5/13/2026 |
| 3.0.0-alpha.192 | 62 | 5/13/2026 |
| 3.0.0-alpha.191 | 53 | 5/13/2026 |
| 3.0.0-alpha.189 | 51 | 5/12/2026 |
| 3.0.0-alpha.187 | 62 | 5/8/2026 |
| 3.0.0-alpha.185 | 59 | 5/7/2026 |
| 3.0.0-alpha.183 | 63 | 5/7/2026 |
| 3.0.0-alpha.182 | 58 | 5/6/2026 |
| 3.0.0-alpha.181 | 49 | 5/6/2026 |