![]() |
VOOZH | about |
dotnet add package SerialPortRx.Reactive --version 5.0.5
NuGet\Install-Package SerialPortRx.Reactive -Version 5.0.5
<PackageReference Include="SerialPortRx.Reactive" Version="5.0.5" />
<PackageVersion Include="SerialPortRx.Reactive" Version="5.0.5" />Directory.Packages.props
<PackageReference Include="SerialPortRx.Reactive" />Project file
paket add SerialPortRx.Reactive --version 5.0.5
#r "nuget: SerialPortRx.Reactive, 5.0.5"
#:package SerialPortRx.Reactive@5.0.5
#addin nuget:?package=SerialPortRx.Reactive&version=5.0.5Install as a Cake Addin
#tool nuget:?package=SerialPortRx.Reactive&version=5.0.5Install as a Cake Tool
A Reactive Serial, TCP, and UDP I/O library that exposes incoming data as IObservable streams and accepts writes via simple methods. Ideal for event-driven, message-framed, and polling scenarios.
π SerialPortRx CI-Build
π Nuget
π NuGet Stats
dotnet add package SerialPortRx
Use the default SerialPortRx package for new code. Version 5.0.x is a breaking release that replaces direct System.Reactive usage with ReactiveUI.Primitives, including Primitives signals, async observables, sequencers, and disposable helpers.
Existing Rx consumers should install the compatibility package:
dotnet add package SerialPortRx.Reactive
SerialPortRx.Reactive shares the same source as SerialPortRx and uses ReactiveUI.Primitives .Reactive package variants so existing System.Reactive Unit, IScheduler, and Rx operator conventions remain available.
The package includes the SerialPortRx source generator as an analyzer. No separate generator package is required.
SerialPortRx package no longer depends on System.Reactive; it is based on ReactiveUI.Primitives.Unit, scheduler, subject, and disposable implementation details are now Primitives-based in the default package.SerialPortRx.Reactive when an application or library must keep System.Reactive-facing APIs and Rx scheduler/unit conventions.src/SerialPortRx.slnx.using System;
using CP.IO.Ports;
using ReactiveUI.Primitives;
var port = new SerialPortRx("COM3", 115200) { ReadTimeout = -1, WriteTimeout = -1 };
// Observe line/state/errors
using var openSubscription = port.IsOpenObservable.Subscribe(isOpen => Console.WriteLine($"Open: {isOpen}"));
using var errorSubscription = port.ErrorReceived.Subscribe(ex => Console.WriteLine($"Error: {ex.Message}"));
// Raw character stream
using var dataSubscription = port.DataReceived.Subscribe(ch => Console.Write(ch));
await port.Open();
port.WriteLine("AT");
// Close when done
port.Close();
// Emits the list of available port names whenever it changes
SerialPortRx.PortNames(pollInterval: 500)
.Subscribe(names => Console.WriteLine(string.Join(", ", names)));
To auto-connect when a specific COM port appears:
var target = "COM3";
var portDisposables = new List<IDisposable>();
using var portNamesSubscription = SerialPortRx.PortNames()
.Subscribe(names =>
{
if (portDisposables.Count == 0 && Array.Exists(names, n => string.Equals(n, target, StringComparison.OrdinalIgnoreCase)))
{
var port = new SerialPortRx(target, 115200);
portDisposables.Add(port);
portDisposables.Add(port.ErrorReceived.Subscribe(Console.WriteLine));
portDisposables.Add(port.IsOpenObservable.Subscribe(open => Console.WriteLine($"{target}: {(open ? "Open" : "Closed")}")));
port.Open();
}
else if (!Array.Exists(names, n => string.Equals(n, target, StringComparison.OrdinalIgnoreCase)))
{
foreach (var disposable in portDisposables)
{
disposable.Dispose();
}
portDisposables.Clear();
}
});
SerialPortRx uses ReactiveUI.Primitives async observables for consumers that need asynchronous observer callbacks and full IObservableAsync<T> operators.
using CP.IO.Ports;
using ReactiveUI.Primitives;
using ReactiveUI.Primitives.Async;
var port = new SerialPortRx("COM3", 115200);
await using var lines = await port.LinesAsync.SubscribeAsync(
async (line, cancellationToken) =>
{
await ProcessLineAsync(line, cancellationToken);
});
await port.Open();
Concrete types expose async properties:
SerialPortRx.DataReceivedAsync, DataReceivedBytesAsync, LinesAsync, BytesReceivedAsync, IsOpenObservableAsync, ErrorReceivedAsyncTcpClientRx.DataReceivedAsync, DataReceivedBatchesAsync, BytesReceivedAsyncUdpClientRx.DataReceivedAsync, DataReceivedBatchesAsync, BytesReceivedAsyncInterface consumers can use extension methods without requiring a new interface contract:
ISerialPortRx serial = port;
await using var state = await serial.IsOpenObservableAsync()
.WhereTrue()
.SubscribeAsync(_ => Console.WriteLine("Open"));
IPortRx common = port;
await using var bytes = await common.BytesReceivedAsync()
.SubscribeAsync(value => Console.WriteLine(value));
Async helper variants are also available:
var start = 0x21.AsObservableAsync();
var end = 0x0a.AsObservableAsync();
await using var framed = await port.DataReceivedAsync
.BufferUntil(start, end, timeOut: 100)
.SubscribeAsync(message => Console.WriteLine(message));
await using var names = await SerialPortRxMixins.PortNamesAsync()
.SubscribeAsync(ports => Console.WriteLine(string.Join(", ", ports)));
BufferUntil helps extract framed messages from the character stream between a start and end delimiter within a timeout.
// Example: messages start with '!' and end with '\n' and must complete within 100ms
var start = 0x21.AsObservable(); // '!'
var end = 0x0a.AsObservable(); // '\n'
port.DataReceived
.BufferUntil(start, end, timeOut: 100)
.Subscribe(msg => Console.WriteLine($"MSG: {msg}"));
A variant returns a default message on timeout:
port.DataReceived
.BufferUntil(start, end, defaultValue: Observable.Return("<timeout>"), timeOut: 100)
.Subscribe(msg => Console.WriteLine($"MSG: {msg}"));
// Write a heartbeat every 500ms but only while the port remains open
port.WhileIsOpen(TimeSpan.FromMilliseconds(500))
.Subscribe(_ => port.Write("PING\n"));
Use ReadAsync for binary protocols or fixed-length reads. Each byte successfully read is also pushed to BytesReceived.
var buffer = new byte[64];
int read = await port.ReadAsync(buffer, 0, buffer.Length);
Console.WriteLine($"Read {read} bytes");
port.BytesReceived.Subscribe(b => Console.WriteLine($"Byte: {b:X2}"));
Notes:
By default, EnableAutoDataReceive = true automatically feeds incoming data to DataReceived and DataReceivedBytes observables. Set this to false before calling Open() if you want to use synchronous read methods instead.
// Automatic mode (default) - data flows to observables
var port = new SerialPortRx("COM3", 115200);
port.DataReceived.Subscribe(ch => Console.Write(ch));
await port.Open();
// Manual mode - use synchronous reads
var port = new SerialPortRx("COM3", 115200) { EnableAutoDataReceive = false };
await port.Open();
string data = port.ReadExisting();
If you disable auto-receive but later want reactive streaming, call StartDataReception():
port.EnableAutoDataReceive = false;
await port.Open();
// Later, enable reactive streaming manually
var reception = port.StartDataReception(pollingIntervalMs: 10);
port.DataReceived.Subscribe(ch => Console.Write(ch));
// Stop when done
reception.Dispose();
When EnableAutoDataReceive = false, use these synchronous methods for manual data consumption:
var port = new SerialPortRx("COM3", 115200) { EnableAutoDataReceive = false, ReadTimeout = 1000 };
await port.Open();
// Read all available data as string
string existing = port.ReadExisting();
// Read a single byte (-1 if none available)
int b = port.ReadByte();
// Read a single character (-1 if none available)
int ch = port.ReadChar();
// Read into a byte buffer
var buffer = new byte[64];
int bytesRead = port.Read(buffer, 0, buffer.Length);
// Read into a char buffer
var charBuffer = new char[64];
int charsRead = port.Read(charBuffer, 0, charBuffer.Length);
// Read until newline (respects NewLine property)
string line = port.ReadLine();
// Read until a specific delimiter
string data = port.ReadTo(">");
Use ReadLineAsync to await a single complete line split by the configured NewLine. Supports single- and multi-character newline sequences and respects ReadTimeout (> 0).
port.NewLine = "\r\n"; // optional: default is "\n"
var line = await port.ReadLineAsync();
Console.WriteLine($"Line: {line}");
You can also pass a CancellationToken:
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var line = await port.ReadLineAsync(cts.Token);
Read data up to a specific delimiter asynchronously:
// Read until '>' delimiter
var data = await port.ReadToAsync(">");
Console.WriteLine($"Received: {data}");
// With cancellation
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var data = await port.ReadToAsync(">", cts.Token);
Subscribe to Lines to get a continuous stream of complete lines:
port.NewLine = "\n";
port.Lines.Subscribe(line => Console.WriteLine($"LINE: {line}"));
The package includes a source generator that can turn serial protocol messages into strongly typed properties with classic and async observable streams. Mark a partial class with one or more SerialPortReactiveStream attributes, then connect it to an ISerialPortRx.
using CP.IO.Ports;
using CP.IO.Ports.SourceGeneration;
using ReactiveUI.Primitives;
using ReactiveUI.Primitives.Async;
[SerialPortReactiveStream("Temperature", typeof(double), @"^TEMP:(?<value>-?\d+(\.\d+)?)$")]
[SerialPortReactiveStream("DeviceReady", typeof(bool), @"^READY:(?<value>0|1)$", IgnoreCase = true)]
public partial class DeviceState
{
}
var port = new SerialPortRx("COM3", 115200);
var state = new DeviceState();
using var generatedBindings = state.ConnectReactiveSerialPort(port);
state.TemperatureObservable.Subscribe(value => Console.WriteLine($"Temperature: {value}"));
await using var ready = await state.DeviceReadyObservableAsync
.SubscribeAsync(value => Console.WriteLine($"Ready: {value}"));
await port.Open();
Generated members:
Temperature and DeviceReady properties with private settersTemperatureObservable / DeviceReadyObservableTemperatureObservableAsync / DeviceReadyObservableAsyncConnectReactiveSerialPort(ISerialPortRx serialPort) to wire the generated bindingsBy default, generated bindings listen to ISerialPortRx.Lines. Set Source to SerialPortReactiveSource.DataReceived, DataReceivedBytes, BytesReceived, or IsOpen when a property should be driven by a different stream.
The generated members expose IObservable<T> and IObservableAsync<T>. When the consuming project also references R3 or R3Async, the ReactiveUI.Primitives bridge generator emits conversion extensions in ReactiveUI.Primitives.R3Bridge, so R3 support can stay at the application boundary without changing the generated serial binding code.
using CP.IO.Ports;
using CP.IO.Ports.SourceGeneration;
using ReactiveUI.Primitives;
using ReactiveUI.Primitives.R3Bridge;
[SerialPortReactiveStream("Temperature", typeof(double), @"^TEMP:(?<value>-?\d+(\.\d+)?)$")]
public partial class DeviceState
{
}
var state = new DeviceState();
// Available when R3 is referenced by the consuming project.
var temperatureAsR3 = state.TemperatureObservable.AsR3Observable();
port.Write(string text) - Write a stringport.WriteLine(string text) - Write a string followed by NewLineport.Write(byte[] buffer) - Write entire byte arrayport.Write(byte[] buffer, int offset, int count) - Write portion of byte arrayport.Write(char[] buffer) - Write entire char arrayport.Write(char[] buffer, int offset, int count) - Write portion of char arrayOn modern .NET targets, additional Span-based overloads are available:
// Write from ReadOnlySpan<byte>
ReadOnlySpan<byte> data = stackalloc byte[] { 0x01, 0x02, 0x03 };
port.Write(data);
// Write from ReadOnlyMemory<byte>
ReadOnlyMemory<byte> memory = new byte[] { 0x01, 0x02, 0x03 };
port.Write(memory);
// Write from ReadOnlySpan<char>
ReadOnlySpan<char> chars = "Hello".AsSpan();
port.Write(chars);
port.ErrorReceived for exceptions and serial errors.port.IsOpenObservable to react to open/close transitions.port.Close() or dispose subscriptions (DisposeWith) to release the port.// Discard pending input data
port.DiscardInBuffer();
// Discard pending output data
port.DiscardOutBuffer();
// Check buffer sizes
Console.WriteLine($"Bytes to read: {port.BytesToRead}");
Console.WriteLine($"Bytes to write: {port.BytesToWrite}");
On Windows targets, subscribe to pin state changes:
#if HasWindows
port.PinChanged.Subscribe(args =>
Console.WriteLine($"Pin changed: {args.EventType}"));
#endif
The TcpClientRx and UdpClientRx classes implement the same IPortRx interface for a similar reactive experience with sockets.
TCP example:
var tcp = new TcpClientRx("example.com", 80);
await tcp.Open();
var req = System.Text.Encoding.ASCII.GetBytes("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n");
tcp.Write(req, 0, req.Length);
var buf = new byte[1024];
var n = await tcp.ReadAsync(buf, 0, buf.Length);
Console.WriteLine(System.Text.Encoding.ASCII.GetString(buf, 0, n));
UDP example:
var udp = new UdpClientRx(12345);
await udp.Open();
var buf = new byte[16];
var n = await udp.ReadAsync(buf, 0, buf.Length);
Console.WriteLine($"UDP read {n} bytes");
Subscribe to batched byte arrays for throughput-sensitive pipelines:
// TCP batched chunks per read loop
new TcpClientRx("example.com", 80).DataReceivedBatches
.Subscribe(chunk => Console.WriteLine($"TCP chunk size: {chunk.Length}"));
// UDP per-datagram batches
new UdpClientRx(12345).DataReceivedBatches
.Subscribe(datagram => Console.WriteLine($"UDP datagram size: {datagram.Length}"));
The test suite uses TUnit on Microsoft.Testing.Platform. Run it with:
dotnet test --project src/SerialPortRx.Test/SerialPortRx.Test.csproj -c Debug -f net8.0
Serial integration tests expect a virtual COM port pair named COM1 and COM2. The source-generator tests do not require serial hardware.
using System;
using System.Collections.Generic;
using System.Linq;
using CP.IO.Ports;
using ReactiveUI.Primitives;
internal static class Program
{
private static async System.Threading.Tasks.Task Main()
{
const string comPortName = "COM1";
const string dataToWrite = "DataToWrite";
var rootDisposables = new List<IDisposable>();
var startChar = 0x21.AsObservable(); // '!'
var endChar = 0x0a.AsObservable(); // '\n'
var portDisposables = new List<IDisposable>();
using var portNamesSubscription = SerialPortRx.PortNames().Subscribe(names =>
{
if (portDisposables.Count == 0 && names.Contains(comPortName))
{
var port = new SerialPortRx(comPortName, 9600);
portDisposables.Add(port);
portDisposables.Add(port.ErrorReceived.Subscribe(Console.WriteLine));
portDisposables.Add(port.IsOpenObservable.Subscribe(open => Console.WriteLine($"{comPortName} {(open ? "Open" : "Closed")}")));
portDisposables.Add(port.DataReceived
.BufferUntil(startChar, endChar, 100)
.Subscribe(data => Console.WriteLine($"Data: {data}")));
portDisposables.Add(port.WhileIsOpen(TimeSpan.FromMilliseconds(500))
.Subscribe(_ => port.Write(dataToWrite)));
port.Open().GetAwaiter().GetResult();
}
else if (!names.Contains(comPortName))
{
foreach (var disposable in portDisposables)
{
disposable.Dispose();
}
portDisposables.Clear();
Console.WriteLine($"Port {comPortName} Disposed");
}
});
rootDisposables.Add(portNamesSubscription);
Console.ReadLine();
foreach (var disposable in portDisposables)
{
disposable.Dispose();
}
foreach (var disposable in rootDisposables)
{
disposable.Dispose();
}
}
}
This project is licensed under the MIT License - see the file for details.
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
If you find this library useful and would like to support its development, consider sponsoring the project on GitHub Sponsors.
SerialPortRx - Empowering Industrial Automation with Reactive Technology β‘π
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 net8.0 is compatible. net8.0-android net8.0-android was computed. net8.0-browser net8.0-browser was computed. net8.0-ios net8.0-ios was computed. net8.0-maccatalyst net8.0-maccatalyst was computed. net8.0-macos net8.0-macos was computed. net8.0-tvos net8.0-tvos was computed. net8.0-windows net8.0-windows was computed. net8.0-windows10.0.19041 net8.0-windows10.0.19041 is compatible. net9.0 net9.0 is compatible. net9.0-android net9.0-android was computed. net9.0-browser net9.0-browser was computed. net9.0-ios net9.0-ios was computed. net9.0-maccatalyst net9.0-maccatalyst was computed. net9.0-macos net9.0-macos was computed. net9.0-tvos net9.0-tvos was computed. net9.0-windows net9.0-windows was computed. net9.0-windows10.0.19041 net9.0-windows10.0.19041 is compatible. net10.0 net10.0 is compatible. net10.0-android net10.0-android was computed. net10.0-browser net10.0-browser was computed. net10.0-ios net10.0-ios was computed. net10.0-maccatalyst net10.0-maccatalyst was computed. net10.0-macos net10.0-macos was computed. net10.0-tvos net10.0-tvos was computed. net10.0-windows net10.0-windows was computed. net10.0-windows10.0.19041 net10.0-windows10.0.19041 is compatible. |
| .NET Framework | net462 net462 is compatible. net463 net463 was computed. net47 net47 was computed. net471 net471 was computed. net472 net472 is compatible. net48 net48 was computed. net481 net481 is compatible. |
This package is not used by any NuGet packages.
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 5.0.5 | 43 | 6/26/2026 |
Compatibility with .NET 8/9/10, TUnit/MTP tests, async observable bridges, and serial reactive stream source generation.