![]() |
VOOZH | about |
dotnet add package MASES.KNet --version 3.2.3
NuGet\Install-Package MASES.KNet -Version 3.2.3
<PackageReference Include="MASES.KNet" Version="3.2.3" />
<PackageVersion Include="MASES.KNet" Version="3.2.3" />Directory.Packages.props
<PackageReference Include="MASES.KNet" />Project file
paket add MASES.KNet --version 3.2.3
#r "nuget: MASES.KNet, 3.2.3"
#:package MASES.KNet@3.2.3
#addin nuget:?package=MASES.KNet&version=3.2.3Install as a Cake Addin
#tool nuget:?package=MASES.KNet&version=3.2.3Install as a Cake Tool
To use KNet classes the developer can write code in .NET using the same classes available in the official Apache Kafka™ package. If classes or methods are not available yet it is possible to use the approach synthetized in
KNet uses the official Apache Kafka™ Java client packages directly. All examples in this page use standard Producer, Consumer, and Admin Client APIs, which communicate with the broker exclusively through the Kafka wire protocol.
This means the code shown here works with any broker that implements the Kafka wire protocol — not only Apache Kafka™ itself. Examples of compatible brokers: Redpanda, Amazon MSK, Confluent Platform / Cloud, Aiven for Apache Kafka™, IBM Event Streams, WarpStream, AutoMQ, and others.
See for the full compatibility matrix covering all KNet feature areas.
KNet accepts many command-line switches to customize its behavior. The full list is available at page.
One of the most important command-line switch is JVMPath and it is available in JCOBridge switches: it can be used to set-up the location of the JVM™ library if JCOBridge is not able to identify a suitable JRE/JDK installation. If a developer is using KNet within its own product it is possible to override the JVMPath property with a snippet like the following one:
class MyKNetCore : KNetCore
{
public override string JVMPath
{
get
{
string pathToJVM = "Set here the path to JVM library or use your own search method";
return pathToJVM;
}
}
}
pathToJVM shall be escaped
string pathToJVM = "C:\\Program Files\\Eclipse Adoptium\\jdk-11.0.18.10-hotspot\\bin\\server\\jvm.dll";string pathToJVM = @"C:\Program Files\Eclipse Adoptium\jdk-11.0.18.10-hotspot\bin\server\jvm.dll";JCOBridge tries to identify a suitable JRE/JDK installation within the system using some standard mechanism of JRE/JDK: JAVA_HOME environment variable or Windows registry if available.
However it is possible, on Windows operating systems, that the library raises an InvalidOperationException: Missing Java Key in registry: Couldn't find Java installed on the machine.
This means that neither JAVA_HOME nor Windows registry contains information about a default installed JRE/JDK: some vendors may not set them up.
If the developer/user encounters this condition, they can follow the following steps:
set | findstr JAVA_HOME and verify the result;JAVA_HOME environment variable is not set at system level, but at a different level like user level which is not visible from the KNet process that raised the exception;JAVA_HOME at system level e.g. JAVA_HOME=C:\Program Files\Eclipse Adoptium\jdk-11.0.18.10-hotspot\;JCOBRIDGE_JVMPath at system level e.g. JCOBRIDGE_JVMPath=C:\Program Files\Eclipse Adoptium\jdk-11.0.18.10-hotspot\.JCOBRIDGE_JVMPath or JAVA_HOME environment variables or Windows registry (on Windows OSes) shall be availableJCOBRIDGE_JVMPath environment variable takes precedence over JAVA_HOME and Windows registry: you can set JCOBRIDGE_JVMPath to C:\Program Files\Eclipse Adoptium\jdk-11.0.18.10-hotspot\bin\server\jvm.dll and avoid to override JVMPath in your codeJVMPath takes precedence over JCOBRIDGE_JVMPath/JAVA_HOME environment variables or Windows registryKNet uses an embedded JVM™ through JNet/JCOBridge, however JVM™ initialization is incompatible with CET because the code used to identify the CPU tries to modify the return address and this is considered by CET a violation: see this comment.
From .NET 9 preview 6, CET is enabled by default on supported hardware when the final stage produces an executable artifact, i.e. the csproj file contains <OutputType>Exe</OutputType>.
If the application, upon startup, fails with the error 0xc0000409 (subcode 0x30) it was compiled with CET enabled and it fails during JVM™ initialization.
To solve the issue there are four possible solutions:
<PropertyGroup Condition="'$(TargetFramework)' == 'net9.0'">
<CETCompat>false</CETCompat>
</PropertyGroup>
dotnet app host, as reported in https://github.com/masesgroup/JCOBridgePublic/issues/7#issuecomment-2550031946, with a syntax like: dotnet MyApplication.dll
instead of the classic:
MyApplication.exe
reg add "HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion\Image File Execution Options\MyApplication.exe" /v MitigationOptions /t REG_BINARY /d "0000000000000000000000000000002000" /f
then run:
MyApplication.exe
Use the following to enable again CET:
reg delete "HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion\Image File Execution Options\MyApplication.exe" /v MitigationOptions /f
Below the reader can find two different versions of producer examples.
A basic producer can be like the following one:
using MASES.KNet;
using Org.Apache.Kafka™.Clients.Producer;
using Java.Util;
using System;
using System.Threading;
namespace MASES.KNetTemplate.KNetProducer
{
class Program
{
const string theServer = "localhost:9092";
const string theTopic = "myTopic";
static string serverToUse = theServer;
static string topicToUse = theTopic;
static readonly ManualResetEvent resetEvent = new ManualResetEvent(false);
static void Main(string[] args)
{
KNetCore.CreateGlobalInstance();
var appArgs = KNetCore.FilteredArgs;
if (appArgs.Length != 0)
{
serverToUse = args[0];
}
/**** Direct mode ******
Properties props = new Properties();
props.Put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse);
props.Put(ProducerConfig.ACKS_CONFIG, "all");
props.Put(ProducerConfig.RETRIES_CONFIG, 0);
props.Put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
******/
Properties props = ProducerConfigBuilder.Create()
.WithBootstrapServers(serverToUse)
.WithAcks(ProducerConfig.Acks.All)
.WithRetries(0)
.WithLingerMs(1)
.WithKeySerializerClass("org.apache.kafka.common.serialization.StringSerializer")
.WithValueSerializerClass("org.apache.kafka.common.serialization.StringSerializer")
.ToProperties();
Console.CancelKeyPress += Console_CancelKeyPress;
Console.WriteLine("Press Ctrl-C to exit");
using (KafkaProducer producer = new KafkaProducer(props))
{
int i = 0;
while (!resetEvent.WaitOne(0))
{
var record = new ProducerRecord<string, string>(topicToUse, i.ToString(), i.ToString());
var result = producer.Send(record);
Console.WriteLine($"Producing: {record} with result: {result.Get()}");
producer.Flush();
i++;
}
}
}
private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
{
if (e.Cancel) resetEvent.Set();
}
}
}
The example above can be found in the templates package. Its behavior is:
A producer with Callback can be like the following one. In this example the reader can highlight a slight difference from the corresponding Java™ code. See to go into detail in the callback management from JVM™.
using MASES.KNet;
using Org.Apache.Kafka™.Clients.Producer;
using Java.Util;
using System;
using System.Threading;
namespace MASES.KNetTemplate.KNetProducer
{
class Program
{
const string theServer = "localhost:9092";
const string theTopic = "myTopic";
static string serverToUse = theServer;
static string topicToUse = theTopic;
static readonly ManualResetEvent resetEvent = new ManualResetEvent(false);
static void Main(string[] args)
{
KNetCore.CreateGlobalInstance();
var appArgs = KNetCore.FilteredArgs;
if (appArgs.Length != 0)
{
serverToUse = args[0];
}
/**** Direct mode ******
Properties props = new Properties();
props.Put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse);
props.Put(ProducerConfig.ACKS_CONFIG, "all");
props.Put(ProducerConfig.RETRIES_CONFIG, 0);
props.Put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
******/
Properties props = ProducerConfigBuilder.Create()
.WithBootstrapServers(serverToUse)
.WithAcks(ProducerConfig.Acks.All)
.WithRetries(0)
.WithLingerMs(1)
.WithKeySerializerClass("org.apache.kafka.common.serialization.StringSerializer")
.WithValueSerializerClass("org.apache.kafka.common.serialization.StringSerializer")
.ToProperties();
Console.CancelKeyPress += Console_CancelKeyPress;
Console.WriteLine("Press Ctrl-C to exit");
using (KafkaProducer producer = new KafkaProducer(props))
{
int i = 0;
using (var callback = new Callback((o1, o2) =>
{
if (o2 != null) Console.WriteLine(o2.ToString());
else Console.WriteLine($"Produced on topic {o1.Topic} at offset {o1.Offset}");
}))
{
while (!resetEvent.WaitOne(0))
{
var record = new ProducerRecord<string, string>(topicToUse, i.ToString(), i.ToString());
var result = producer.Send(record, callback);
Console.WriteLine($"Producing: {record} with result: {result.Get()}");
producer.Flush();
i++;
}
}
}
}
private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
{
if (e.Cancel) resetEvent.Set();
}
}
}
The example above can be found in the templates package. Its behavior is:
A basic consumer can be like the following one:
using MASES.KNet;
using Org.Apache.Kafka™.Clients.Consumer;
using Java.Util;
using System;
namespace MASES.KNetTemplate.KNetConsumer
{
class Program
{
const string theServer = "localhost:9092";
const string theTopic = "myTopic";
static string serverToUse = theServer;
static string topicToUse = theTopic;
static readonly ManualResetEvent resetEvent = new ManualResetEvent(false);
static void Main(string[] args)
{
KNetCore.CreateGlobalInstance();
var appArgs = KNetCore.FilteredArgs;
if (appArgs.Length != 0)
{
serverToUse = args[0];
}
/**** Direct mode ******
Properties props = new Properties();
props.Put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse);
props.Put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.Put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.Put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.Put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.Put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
*******/
Properties props = ConsumerConfigBuilder.Create()
.WithBootstrapServers(serverToUse)
.WithGroupId("test")
.WithEnableAutoCommit(true)
.WithAutoCommitIntervalMs(1000)
.WithKeyDeserializerClass("org.apache.kafka.common.serialization.StringDeserializer")
.WithValueDeserializerClass("org.apache.kafka.common.serialization.StringDeserializer")
.ToProperties();
Console.CancelKeyPress += Console_CancelKeyPress;
Console.WriteLine("Press Ctrl-C to exit");
using (var consumer = new KafkaConsumer<string, string>(props))
{
var topics = Collections.Singleton(topicToUse);
consumer.Subscribe(topics);
while (!resetEvent.WaitOne(0))
{
var records = consumer.Poll((long)TimeSpan.FromMilliseconds(200).TotalMilliseconds);
foreach (var item in records)
{
Console.WriteLine($"Offset = {item.Offset}, Key = {item.Key}, Value = {item.Value}");
}
}
topics?.Dispose(); // needed to avoid Java.Lang.NullPointerException in some conditions where .NET GC retires topics too early
}
}
private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
{
if (e.Cancel) resetEvent.Set();
}
}
}
The example above can be found in the templates package. Its behavior is:
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 net8.0 is compatible. net8.0-android net8.0-android was computed. net8.0-browser net8.0-browser was computed. net8.0-ios net8.0-ios was computed. net8.0-maccatalyst net8.0-maccatalyst was computed. net8.0-macos net8.0-macos was computed. net8.0-tvos net8.0-tvos was computed. net8.0-windows net8.0-windows was computed. net9.0 net9.0 is compatible. net9.0-android net9.0-android was computed. net9.0-browser net9.0-browser was computed. net9.0-ios net9.0-ios was computed. net9.0-maccatalyst net9.0-maccatalyst was computed. net9.0-macos net9.0-macos was computed. net9.0-tvos net9.0-tvos was computed. net9.0-windows net9.0-windows was computed. net10.0 net10.0 is compatible. net10.0-android net10.0-android was computed. net10.0-browser net10.0-browser was computed. net10.0-ios net10.0-ios was computed. net10.0-maccatalyst net10.0-maccatalyst was computed. net10.0-macos net10.0-macos was computed. net10.0-tvos net10.0-tvos was computed. net10.0-windows net10.0-windows was computed. |
| .NET Framework | net462 net462 is compatible. net463 net463 was computed. net47 net47 was computed. net471 net471 was computed. net472 net472 was computed. net48 net48 was computed. net481 net481 was computed. |
Showing the top 5 NuGet packages that depend on MASES.KNet:
| Package | Downloads |
|---|---|
|
MASES.EntityFrameworkCore.KNet.Serialization
EntityFrameworkCore KNet - Serialization support for EntityFrameworkCore provider for Apache Kafka |
|
|
MASES.KNet.Serialization.Avro
Avro Serializer/Deserializer of .NET suite for Apache Kafka. KNet is a comprehensive .NET suite for Apache Kafka providing all features: Producer, Consumer, Admin, Streams, Connect, backends (KRaft). |
|
|
MASES.KNet.Serialization.Json
Json Serializer/Deserializer of .NET suite for Apache Kafka. KNet is a comprehensive .NET suite for Apache Kafka providing all features: Producer, Consumer, Admin, Streams, Connect, backends (KRaft). |
|
|
MASES.KNet.Serialization.MessagePack
MessagePack Serializer/Deserializer of .NET suite for Apache Kafka. KNet is a comprehensive .NET suite for Apache Kafka providing all features: Producer, Consumer, Admin, Streams, Connect, backends (KRaft). |
|
|
MASES.KNet.Serialization.Protobuf
Protobuf Serializer/Deserializer of .NET suite for Apache Kafka. KNet is a comprehensive .NET suite for Apache Kafka providing all features: Producer, Consumer, Admin, Streams, Connect, backends (KRaft). |
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 3.2.3 | 237 | 6/16/2026 |
| 3.2.3-rc991 | 5,654 | 6/10/2026 |
| 3.2.3-rc99 | 190 | 6/9/2026 |
| 3.2.3-rc98 | 181 | 6/8/2026 |
| 3.2.3-rc97 | 180 | 6/5/2026 |
| 3.2.3-rc96 | 206 | 6/4/2026 |
| 3.2.3-rc95 | 221 | 5/29/2026 |
| 3.2.3-rc94 | 318 | 5/26/2026 |
| 3.2.3-rc93 | 295 | 5/22/2026 |
| 3.2.3-rc92 | 222 | 5/20/2026 |
| 3.2.3-rc91 | 226 | 5/16/2026 |
| 3.2.3-rc9 | 228 | 5/12/2026 |
| 3.2.3-rc8 | 346 | 5/8/2026 |
| 3.2.3-rc7 | 183 | 5/8/2026 |
| 3.2.3-rc6 | 189 | 5/6/2026 |
| 3.2.3-rc5 | 186 | 5/4/2026 |
| 3.2.3-rc4 | 187 | 5/2/2026 |
| 3.2.3-rc3 | 234 | 4/30/2026 |
| 2.9.12 | 200 | 6/16/2026 |
| 2.9.12-rc1 | 264 | 6/10/2026 |