![]() |
VOOZH | about |
dotnet add package JohBloch.ConfluentKafka.Clients.Consumer --version 2.3.3
NuGet\Install-Package JohBloch.ConfluentKafka.Clients.Consumer -Version 2.3.3
<PackageReference Include="JohBloch.ConfluentKafka.Clients.Consumer" Version="2.3.3" />
<PackageVersion Include="JohBloch.ConfluentKafka.Clients.Consumer" Version="2.3.3" />Directory.Packages.props
<PackageReference Include="JohBloch.ConfluentKafka.Clients.Consumer" />Project file
paket add JohBloch.ConfluentKafka.Clients.Consumer --version 2.3.3
#r "nuget: JohBloch.ConfluentKafka.Clients.Consumer, 2.3.3"
#:package JohBloch.ConfluentKafka.Clients.Consumer@2.3.3
#addin nuget:?package=JohBloch.ConfluentKafka.Clients.Consumer&version=2.3.3Install as a Cake Addin
#tool nuget:?package=JohBloch.ConfluentKafka.Clients.Consumer&version=2.3.3Install as a Cake Tool
👁 Build Status
👁 NuGet
👁 NuGet
👁 NuGet
👁 NuGet
👁 License: MIT
👁 .NET
A modern, feature-rich .NET client library for Apache Kafka with Schema Registry support, Dead Letter Queue functionality, and multiple serialization formats.
✨ Multiple Schema Types
🔐 Security
📬 Dead Letter Queue (DLQ)
⚡ Performance
🛠️ Developer Experience
Choose either the convenience package (everything included) or pick only the components you need.
dotnet add package JohBloch.ConfluentKafka.Clients
dotnet add package JohBloch.ConfluentKafka.Clients.Core
dotnet add package JohBloch.ConfluentKafka.Clients.Consumer
dotnet add package JohBloch.ConfluentKafka.Clients.Producer
global.json. GitHub Actions uses that file to select the SDK.SNYK_TOKEN.This repo includes runnable console examples under :
(meta package: consumer + producer)
(producer-only)
(consumer-only)
(consumer-only, MSAL token provider + Redis schema cache)
Each example has a committed local.settings.sample.json template.
For local development, copy it to local.settings.json and put secrets there (this file is ignored by git).
PowerShell (example): Copy-Item .\examples\JohBloch.ConfluentKafka.Clients.Example.Clients.InternalSecurity\local.settings.sample.json .\examples\JohBloch.ConfluentKafka.Clients.Example.Clients.InternalSecurity\local.settings.json
Notes about configuration in the console example:
Values object.__ as a separator for nested options.Kafka, SchemaRegistry, Consumer, and Producer from separate root sections (see the JSON below).If you're wiring this library into your own app and binding KafkaClientOptions directly from configuration, see Minimal app configuration (recommended) below.
Multi-topic + multi-producer is configured via:
Consumer__TopicsProducer__Producers__*SchemaRegistry__Cache__Provider: RedisSchemaRegistry__Cache__Redis__ConnectionString: e.g. localhost:6379SchemaRegistry__Cache__Redis__KeyPrefix and SchemaRegistry__Cache__Redis__DefaultTtlSecondsStart Redis locally:
docker run --rm -p 6379:6379 redis:7-alpine
Example local.settings.json (for the repo console example app):
{
"IsEncrypted": false,
"Values": {
"Kafka__BootstrapServers": "localhost:9092",
"SchemaRegistry__Url": "http://localhost:8081",
"SchemaRegistry__Cache__Provider": "Redis",
"SchemaRegistry__Cache__Redis__ConnectionString": "localhost:6379",
"SchemaRegistry__Cache__Redis__KeyPrefix": "schema-registry-cache:",
"SchemaRegistry__Cache__Redis__DefaultTtlSeconds": "3600",
"Consumer__GroupId": "example-consumer-group",
"Consumer__Topics": "topic-a,topic-b",
"Consumer__AutoOffsetReset": "earliest",
"Producer__Config__acks": "all",
"Producer__Config__enable.idempotence": "true",
"Producer__Producers__orders__Topic": "topic-a",
"Producer__Producers__orders__AutoDlqOnDeliveryFailure": "true",
"Producer__Producers__orders__DeadLetterQueueTopicPattern": "dlq-{topic}",
"Producer__Producers__audit__Topic": "topic-b",
"Producer__Producers__audit__AutoDlqOnDeliveryFailure": "true",
"Producer__Producers__audit__DeadLetterQueueTopicPattern": "dlq-{topic}"
}
}
For most real applications, prefer binding KafkaClientOptions from a single Kafka root section:
Kafka:* binds to KafkaClientOptionsKafka:SchemaRegistry:* binds to SchemaRegistryOptions when you want Schema Registry OAuth credentials that differ from Kafka OAuth.Minimal example (JSON-form keys shown; environment variables use __):
{
"Kafka": {
"BootstrapServers": "localhost:9092",
"GroupId": "my-consumer-group",
"SchemaRegistryUrl": "http://localhost:8081",
"Consumer": {
"Topic": "orders",
"AutoOffsetReset": "Earliest"
},
"Producers": {
"default": {
"Topic": "orders"
}
}
}
}
This example shows how to keep all Kafka setup isolated in your consuming app (not in the NuGet package code), and wire everything up from Program.cs.
local.settings.json (examples)There are two common ways to configure Schema Registry:
Kafka using KafkaClientOptions.SchemaRegistryOptions under Kafka__SchemaRegistry__*.Option A is a great default when you want a single options object (KafkaClientOptions).
If Kafka and Schema Registry share the same OAuth settings, configure Kafka OAuth via Kafka:OAuth:* and configure Schema Registry OAuth via either Kafka:SchemaRegistry:* or Kafka:SchemaRegistryOauth*.
Kafka__SchemaRegistryUrl{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
"Kafka__BootstrapServers": "YOUR_BOOTSTRAP_SERVERS",
"Kafka__GroupId": "my-function-consumer",
"Kafka__OAuth__TokenEndpointUrl": "https://YOUR_IDP/oauth/token",
"Kafka__OAuth__ClientId": "YOUR_CLIENT_ID",
"Kafka__OAuth__ClientSecret": "YOUR_CLIENT_SECRET",
"Kafka__OAuth__Scope": "YOUR_SCOPE",
"Kafka__OAuth__LogicalCluster": "lkc-...",
"Kafka__OAuth__IdentityPoolId": "pool-...",
"Kafka__SchemaRegistryOauthTokenEndpoint": "https://YOUR_IDP/oauth/token",
"Kafka__SchemaRegistryOauthClientId": "YOUR_CLIENT_ID",
"Kafka__SchemaRegistryOauthClientSecret": "YOUR_CLIENT_SECRET",
"Kafka__SchemaRegistryOauthScope": "YOUR_SCOPE",
"Kafka__SchemaRegistryOauthLogicalCluster": "lsrc-...",
"Kafka__SchemaRegistryOauthIdentityPoolId": "pool-...",
"Kafka__Consumer__Topic": "orders",
"Kafka__Consumer__EnableAutoCommit": "false",
"Kafka__Consumer__AutoOffsetReset": "Earliest",
"Kafka__SchemaRegistryUrl": "https://YOUR_SCHEMA_REGISTRY"
}
}
Kafka__SchemaRegistry__*){
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
"Kafka__BootstrapServers": "YOUR_BOOTSTRAP_SERVERS",
"Kafka__GroupId": "my-function-consumer",
"Kafka__OAuth__TokenEndpointUrl": "https://YOUR_IDP/oauth/token",
"Kafka__OAuth__ClientId": "YOUR_CLIENT_ID",
"Kafka__OAuth__ClientSecret": "YOUR_CLIENT_SECRET",
"Kafka__OAuth__Scope": "YOUR_SCOPE",
"Kafka__OAuth__LogicalCluster": "lkc-...",
"Kafka__OAuth__IdentityPoolId": "pool-...",
"Kafka__Consumer__Topic": "orders",
"Kafka__Consumer__EnableAutoCommit": "false",
"Kafka__Consumer__AutoOffsetReset": "Earliest",
"Kafka__SchemaRegistry__Url": "https://YOUR_SCHEMA_REGISTRY",
"Kafka__SchemaRegistry__TokenEndpointUrl": "https://YOUR_IDP/oauth/token",
"Kafka__SchemaRegistry__ClientId": "YOUR_SR_CLIENT_ID",
"Kafka__SchemaRegistry__ClientSecret": "YOUR_SR_CLIENT_SECRET",
"Kafka__SchemaRegistry__Scope": "YOUR_SR_SCOPE",
"Kafka__SchemaRegistry__LogicalCluster": "YOUR_SR_LOGICAL_CLUSTER",
"Kafka__SchemaRegistry__IdentityPoolId": "YOUR_SR_IDENTITY_POOL_ID"
}
}
You can configure Schema Registry OAuth either via SchemaRegistryOptions (bound separately) or via KafkaClientOptions (SchemaRegistryOauth*).
Precedence (highest → lowest):
Kafka:SchemaRegistry:* (explicit schema-registry settings via SchemaRegistryOptions)Kafka:SchemaRegistryOauth* (defaults for SchemaRegistryOptions when you bind KafkaClientOptions)Schema Registry URL can be provided either as:
Kafka:SchemaRegistry:Url (when you bind SchemaRegistryOptions), orKafka:SchemaRegistryUrl (when you bind KafkaClientOptions)Exact configuration keys (JSON form shown; equivalent environment variable names use __):
{
"Kafka": {
"SchemaRegistry": {
"Url": "https://YOUR_SCHEMA_REGISTRY",
"TokenEndpointUrl": "https://YOUR_IDP/oauth/token",
"ClientId": "YOUR_SR_CLIENT_ID",
"ClientSecret": "YOUR_SR_CLIENT_SECRET",
"Scope": "YOUR_SR_SCOPE",
"LogicalCluster": "lsrc-...",
"IdentityPoolId": "YOUR_SR_IDENTITY_POOL_ID"
}
}
}
KafkaClientOptions fallback (Schema Registry OAuth via SchemaRegistryOauth*):{
"Kafka": {
"SchemaRegistryOauthTokenEndpoint": "https://YOUR_IDP/oauth/token",
"SchemaRegistryOauthClientId": "YOUR_SR_CLIENT_ID",
"SchemaRegistryOauthClientSecret": "YOUR_SR_CLIENT_SECRET",
"SchemaRegistryOauthScope": "YOUR_SR_SCOPE",
"SchemaRegistryOauthLogicalCluster": "lsrc-...",
"SchemaRegistryOauthIdentityPoolId": "YOUR_SR_IDENTITY_POOL_ID"
}
}
How the library uses these values:
Kafka:SchemaRegistry:* values; when any of those are not set the library will fall back to the corresponding Kafka:SchemaRegistryOauth* value (when you bind KafkaClientOptions).PostConfigure<SchemaRegistryOptions> (see Program.cs) to override either source.Custom token provider (e.g., MSAL):
ISecurityTokenProvider by registering your own implementation in DI before calling AddKafkaClients / AddKafkaProducerClient / AddKafkaConsumerClient.OAuthSecurityTokenProvider using TryAddSingleton, so your custom provider will automatically win.SchemaRegistryExtClient is wired up when a security provider is available; with a custom provider, refresh does not require SchemaRegistryOptions.TokenEndpointUrl to be set.Environment variables equivalent (examples):
Kafka__SchemaRegistry__ClientId → SchemaRegistryOptions.ClientIdKafka__SchemaRegistry__ClientSecret → SchemaRegistryOptions.ClientSecretKafka__SchemaRegistry__TokenEndpointUrl → SchemaRegistryOptions.TokenEndpointUrlKafka__OAuth__ClientId → KafkaClientOptions.OAuth.ClientId (Kafka brokers)Kafka__SchemaRegistryOauthClientId → KafkaClientOptions.SchemaRegistryOauthClientId (Schema Registry)Program.cs (Azure Functions isolated)using JohBloch.ConfluentKafka.Clients;
using JohBloch.ConfluentKafka.Clients.Models;
using Microsoft.Azure.Functions.Worker.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
var builder = FunctionsApplication.CreateBuilder(args);
builder.ConfigureFunctionsWebApplication();
// Bind all options from configuration and register the library services.
builder.Services.AddKafkaClients(options => builder.Configuration.GetSection("Kafka").Bind(options));
// Option A (simplest): configure Schema Registry URL via KafkaClientOptions (Kafka__SchemaRegistryUrl).
// No additional binding is needed.
// Option B (most explicit): if you use Kafka__SchemaRegistry__* keys, bind SchemaRegistryOptions too.
// This is useful when Schema Registry has different OAuth credentials than Kafka.
// builder.Services.PostConfigure<SchemaRegistryOptions>(
// sr => builder.Configuration.GetSection("Kafka:SchemaRegistry").Bind(sr));
var app = builder.Build();
app.Run();
KafkaTimer function (poll every 5 minutes)using System.Text.Json;
using JohBloch.ConfluentKafka.Clients.Interfaces;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
public sealed class KafkaTimer
{
private readonly IKafkaConsumerClient _consumer;
private readonly ILogger<KafkaTimer> _logger;
public KafkaTimer(IKafkaConsumerClient consumer, ILogger<KafkaTimer> logger)
{
_consumer = consumer;
_logger = logger;
}
[Function(nameof(KafkaTimer))]
public async Task Run([TimerTrigger("0 */5 * * * *")] TimerInfo timer, CancellationToken ct)
{
_logger.LogInformation("KafkaTimer fired at {UtcNow}", DateTimeOffset.UtcNow);
const int maxMessages = 100;
const int timeoutMs = 4000;
// Use JsonElement for a generic "just log it" approach.
// If you have a POCO, replace JsonElement with your type.
var batch = await _consumer.ConsumeBatchAsync<JsonElement>(maxMessages, timeoutMs, ct);
if (batch.Count == 0)
{
_logger.LogInformation("No messages received.");
return;
}
foreach (var record in batch)
{
var value = record.Message?.Value;
_logger.LogInformation(
"Received. Topic={Topic} Partition={Partition} Offset={Offset} Key={Key} Value={Value}",
record.Topic,
record.Partition.Value,
record.Offset.Value,
record.Message?.Key,
value.ValueKind == JsonValueKind.Undefined ? "<undefined>" : value.GetRawText());
// Simple demo: commit each message.
_consumer.Commit(record);
}
_logger.LogInformation("Processed and committed {Count} messages.", batch.Count);
}
}
using JohBloch.ConfluentKafka.Clients.Interfaces;
using JohBloch.ConfluentKafka.Clients.Models;
public sealed class OrderPublisher
{
private readonly IKafkaProducerClient _producer;
public OrderPublisher(IKafkaProducerClient producer)
{
_producer = producer;
}
public Task PublishAsync(Order order, CancellationToken ct)
{
// The producerKey must exist in Kafka:Producers config
return _producer.SendMessageWithSchemaAsync(
message: order,
key: order.OrderId,
producerKey: "default",
schemaType: SchemaType.Json,
ct: ct);
}
}
using JohBloch.ConfluentKafka.Clients.Interfaces;
public sealed class OrderWorker
{
private readonly IKafkaConsumerClient _consumer;
public OrderWorker(IKafkaConsumerClient consumer)
{
_consumer = consumer;
}
public async Task PollOnceAsync(CancellationToken ct)
{
// Optional if you already set Kafka:Consumer:Topic
_consumer.Subscribe(new[] { "orders" });
var record = await _consumer.ConsumeAsync<Order>(ct);
if (record is null) return;
await ProcessOrderAsync(record.Message.Value);
_consumer.Commit(record);
}
}
try
{
await ProcessMessageAsync(message);
}
catch (Exception ex)
{
// Automatically send to DLQ
await producer.SendToDeadLetterQueueAsync(
originalMessage: consumeResult,
exception: ex,
retryCount: 3);
}
public class KafkaProducerOptions
{
public string BootstrapServers { get; set; }
public string Topic { get; set; }
public string ApplicationId { get; set; }
public int BatchSizeKB { get; set; } = 32;
public int LingerMS { get; set; } = 100;
public int QueueBufferMaxMessages { get; set; } = 50000;
public string CompressionType { get; set; } = "none";
public int CompressionLevel { get; set; } = 0;
public string DeadLetterQueueTopicPattern { get; set; } = "dlq-{topic}";
public bool IncludeStackTraceInDlq { get; set; } = false;
public bool AutoDlqOnDeliveryFailure { get; set; } = false;
}
public class KafkaConsumerOptions
{
public string BootstrapServers { get; set; }
public string GroupId { get; set; }
public string Topic { get; set; }
public int SessionTimeoutMs { get; set; } = 45000;
public int HeartbeatIntervalMs { get; set; } = 3000;
public string AutoOffsetReset { get; set; } = "earliest";
public bool EnableAutoCommit { get; set; } = true;
public SchemaType DefaultSchemaType { get; set; } = SchemaType.Avro;
public bool AutoDetectSchemaType { get; set; } = true;
public Dictionary<string, SchemaType> TopicSchemaTypes { get; set; } = new();
}
global.json)This repo includes a minimal local stack for running the example app:
localhost:9092 (PLAINTEXT)http://localhost:8081Start the stack:
docker compose up -d
Run the example:
dotnet run --project examples/JohBloch.ConfluentKafka.Clients.Example.Clients.InternalSecurity/JohBloch.ConfluentKafka.Clients.Example.Clients.InternalSecurity.csproj
Stop the stack:
docker compose down -v
Dependencies are split per NuGet package to keep consumers lightweight.
JohBloch.ConfluentKafka.Clients (convenience package)
JohBloch.ConfluentKafka.Clients.Core, .Consumer, and .Producer (brings their dependencies transitively).JohBloch.ConfluentKafka.Clients.Core
2.13.02.13.01.1.09.0.0, Logging/Options 10.0.210.0.1JohBloch.ConfluentKafka.Clients.Consumer
10.12.03.2.56JohBloch.ConfluentKafka.Clients.Producer
10.12.03.2.56git clone https://github.com/JohBloch/JohBloch.ConfluentKafka.Clients.git
cd JohBloch.ConfluentKafka.Clients
dotnet build
dotnet test
All tests should pass in under a minute on a typical dev machine.
├── src/
│ ├── JohBloch.ConfluentKafka.Clients/ # Convenience package (Core + Consumer + Producer)
│ ├── JohBloch.ConfluentKafka.Clients.Core/ # Options, interfaces, models, shared helpers, security
│ ├── JohBloch.ConfluentKafka.Clients.Consumer/ # Consumer client + deserialization implementations
│ └── JohBloch.ConfluentKafka.Clients.Producer/ # Producer client + serialization implementations
├── tests/ # Unit tests
├── docs/ # Documentation
└── JohBloch.ConfluentKafka.Clients.sln
We welcome contributions! Please see our for details.
git checkout -b feature/amazing-feature)git commit -m 'Add amazing feature')git push origin feature/amazing-feature)This project is licensed under the MIT License - see the file for details.
Made with ❤️ by JohBloch
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 net8.0 is compatible. net8.0-android net8.0-android was computed. net8.0-browser net8.0-browser was computed. net8.0-ios net8.0-ios was computed. net8.0-maccatalyst net8.0-maccatalyst was computed. net8.0-macos net8.0-macos was computed. net8.0-tvos net8.0-tvos was computed. net8.0-windows net8.0-windows was computed. net9.0 net9.0 was computed. net9.0-android net9.0-android was computed. net9.0-browser net9.0-browser was computed. net9.0-ios net9.0-ios was computed. net9.0-maccatalyst net9.0-maccatalyst was computed. net9.0-macos net9.0-macos was computed. net9.0-tvos net9.0-tvos was computed. net9.0-windows net9.0-windows was computed. net10.0 net10.0 is compatible. net10.0-android net10.0-android was computed. net10.0-browser net10.0-browser was computed. net10.0-ios net10.0-ios was computed. net10.0-maccatalyst net10.0-maccatalyst was computed. net10.0-macos net10.0-macos was computed. net10.0-tvos net10.0-tvos was computed. net10.0-windows net10.0-windows was computed. |
Showing the top 1 NuGet packages that depend on JohBloch.ConfluentKafka.Clients.Consumer:
| Package | Downloads |
|---|---|
|
JohBloch.ConfluentKafka.Clients
A high-performance Kafka client library for .NET with support for Avro, JSON, and Protobuf schema serialization, built-in dead letter queue (DLQ) handling, and OAuth authentication. |
This package is not used by any popular GitHub repositories.