VOOZH about

URL: https://www.nuget.org/packages/KafkaGenericProcessor.Core/

⇱ NuGet Gallery | KafkaGenericProcessor.Core 0.0.2




👁 Image
KafkaGenericProcessor.Core 0.0.2

dotnet add package KafkaGenericProcessor.Core --version 0.0.2
 
 
NuGet\Install-Package KafkaGenericProcessor.Core -Version 0.0.2
 
 
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="KafkaGenericProcessor.Core" Version="0.0.2" />
 
 
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="KafkaGenericProcessor.Core" Version="0.0.2" />
 
Directory.Packages.props
<PackageReference Include="KafkaGenericProcessor.Core" />
 
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add KafkaGenericProcessor.Core --version 0.0.2
 
 
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: KafkaGenericProcessor.Core, 0.0.2"
 
 
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package KafkaGenericProcessor.Core@0.0.2
 
 
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=KafkaGenericProcessor.Core&version=0.0.2
 
Install as a Cake Addin
#tool nuget:?package=KafkaGenericProcessor.Core&version=0.0.2
 
Install as a Cake Tool
The NuGet Team does not provide support for this client. Please contact its maintainers for support.

KafkaGenericProcessor.Core

NOTE: This library is 100% vibe code material, the requirements were wrapping the KafkaFlow library to provide a simplified consume + consumeproduce + produce abilities with an easy simple fluent pattern. this library is NOT production ready, do NOT use it in production, contributions are welcomed, but please vibe code them 🤟

👁 NuGet

A robust, type-safe framework for building Kafka message processing applications in .NET. Simplify your Kafka consumer and producer implementation with a comprehensive set of features designed for enterprise applications.

Features

  • Type-Safe Processing: Handle Kafka messages with strongly-typed processors and serializers
  • Middleware Pipeline: Process messages through configurable middleware chains
  • Validation: Built-in support for message validation before processing
  • Error Handling: Comprehensive exception hierarchy with automatic handling
  • Retry Policies: Exponential backoff retry mechanism with configurable parameters
  • Health Checks: Built-in health monitoring for Kafka connectivity
  • Structured Logging: Consistent, correlation-based logging throughout the processing pipeline
  • Performance Metrics: Track processing times and throughput
  • Correlation IDs: Trace messages through the entire processing pipeline

Quick Start

Installation

dotnet add package KafkaGenericProcessor.Core

Basic Configuration

Add the following to your appsettings.json:

{
 "Kafka": {
 "Configurations": {
 "order-processor": {
 "Brokers": ["kafka-broker:9092"],
 "ConsumerTopic": "incoming-orders",
 "ProducerTopic": "processed-orders",
 "GroupId": "order-processing-group",
 "WorkersCount": 10,
 "BufferSize": 100,
 "CreateTopicsIfNotExists": true
 }
 }
 }
}

Register Services

// In Program.cs or Startup.cs
public void ConfigureServices(IServiceCollection services)
{
 // Add required services
 services.AddLogging();
 
 // Register your message processors and validators
 services.AddKeyedTransient<IMessageProcessor<OrderMessage, ProcessedOrderMessage>, OrderProcessor>("order-processor");
 services.AddKeyedTransient<IMessageValidator<OrderMessage>, OrderValidator>("order-processor");
 
 // Configure Kafka Generic Processor
 services
 .AddKafkaGenericProcessors(Configuration)
 .AddConsumerProducerProcessor<OrderMessage, ProcessedOrderMessage>("order-processor")
 .Build();
}

Create Message Types

public class OrderMessage
{
 public string OrderId { get; set; }
 public string CustomerName { get; set; }
 public decimal Amount { get; set; }
}

public class ProcessedOrderMessage
{
 public string OrderId { get; set; }
 public string Status { get; set; }
 public DateTimeOffset ProcessedAt { get; set; }
}

Implement Processor

public class OrderProcessor : IMessageProcessor<OrderMessage, ProcessedOrderMessage>
{
 private readonly ILogger<OrderProcessor> _logger;
 
 public OrderProcessor(ILogger<OrderProcessor> logger)
 {
 _logger = logger;
 }
 
 public async Task<ProcessedOrderMessage> ProcessAsync(
 OrderMessage input, 
 string correlationId, 
 CancellationToken cancellationToken = default)
 {
 _logger.LogInformation(
 "Processing order {OrderId} for customer {CustomerName}. CorrelationId: {CorrelationId}", 
 input.OrderId, input.CustomerName, correlationId);
 
 // Process the order (your business logic here)
 await Task.Delay(100, cancellationToken);
 
 return new ProcessedOrderMessage
 {
 OrderId = input.OrderId,
 Status = "Processed",
 ProcessedAt = DateTimeOffset.UtcNow
 };
 }
}

Implement Validator

public class OrderValidator : IMessageValidator<OrderMessage>
{
 public async Task<bool> ValidateAsync(
 OrderMessage message, 
 string correlationId, 
 CancellationToken cancellationToken = default)
 {
 var errors = await GetValidationErrorsAsync(message, correlationId, cancellationToken);
 return !errors.Any();
 }
 
 public Task<IReadOnlyList<ValidationError>> GetValidationErrorsAsync(
 OrderMessage message, 
 string correlationId, 
 CancellationToken cancellationToken = default)
 {
 var errors = new List<ValidationError>();
 
 if (string.IsNullOrEmpty(message.OrderId))
 {
 errors.Add(new ValidationError("OrderId", "Order ID is required"));
 }
 
 if (string.IsNullOrEmpty(message.CustomerName))
 {
 errors.Add(new ValidationError("CustomerName", "Customer name is required"));
 }
 
 if (message.Amount <= 0)
 {
 errors.Add(new ValidationError("Amount", "Amount must be greater than zero"));
 }
 
 return Task.FromResult<IReadOnlyList<ValidationError>>(errors);
 }
}

Documentation

For more detailed documentation, please refer to:

Contributing

Contributions are welcome! Please see our for details.

License

This project is licensed under the MIT License - see the file for details.

Product Versions Compatible and additional computed target framework versions.
.NET net9.0 net9.0 is compatible.  net9.0-android net9.0-android was computed.  net9.0-browser net9.0-browser was computed.  net9.0-ios net9.0-ios was computed.  net9.0-maccatalyst net9.0-maccatalyst was computed.  net9.0-macos net9.0-macos was computed.  net9.0-tvos net9.0-tvos was computed.  net9.0-windows net9.0-windows was computed.  net10.0 net10.0 was computed.  net10.0-android net10.0-android was computed.  net10.0-browser net10.0-browser was computed.  net10.0-ios net10.0-ios was computed.  net10.0-maccatalyst net10.0-maccatalyst was computed.  net10.0-macos net10.0-macos was computed.  net10.0-tvos net10.0-tvos was computed.  net10.0-windows net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.0.2 355 5/15/2025
0.0.1 316 5/14/2025