![]() |
VOOZH | about |
dotnet add package ActorSrcGen.Abstractions --version 2.4.3
NuGet\Install-Package ActorSrcGen.Abstractions -Version 2.4.3
<PackageReference Include="ActorSrcGen.Abstractions" Version="2.4.3" />
<PackageVersion Include="ActorSrcGen.Abstractions" Version="2.4.3" />Directory.Packages.props
<PackageReference Include="ActorSrcGen.Abstractions" />Project file
paket add ActorSrcGen.Abstractions --version 2.4.3
#r "nuget: ActorSrcGen.Abstractions, 2.4.3"
#:package ActorSrcGen.Abstractions@2.4.3
#addin nuget:?package=ActorSrcGen.Abstractions&version=2.4.3Install as a Cake Addin
#tool nuget:?package=ActorSrcGen.Abstractions&version=2.4.3Install as a Cake Tool
ActorSrcGen is a C# Source Generator that converts simple C# classes into TPL Dataflow-compatible pipelines. It simplifies working with TPL Dataflow by generating boilerplate code to handle errors without interrupting the pipeline, ideal for long-lived processes with ingesters that continually pump messages into the pipeline.
Install the package:
dotnet add package ActorSrcGen
Declare the pipeline class:
[Actor]
public partial class MyPipeline
{
}
The class must be partial to allow the source generator to add boilerplate code.
If you are using Visual Studio, you can see the generated part of the code under the ActorSrcGen analyzer:
Create ingester functions:
[Ingest(1)]
[NextStep(nameof(DoSomethingWithRequest))]
public async Task<string> ReceivePollRequest(CancellationToken cancellationToken)
{
return await GetTheNextRequest();
}
Ingesters define a Priority and are visited in priority order. If no messages are available, the pipeline sleeps for a second before retrying.
Implement pipeline steps:
[FirstStep("decode incoming poll request")]
[NextStep(nameof(ActOnTheRequest))]
public PollRequest DecodeRequest(string json)
{
Console.WriteLine(nameof(DecodeRequest));
var pollRequest = JsonSerializer.Deserialize<PollRequest>(json);
return pollRequest;
}
The first step controls the pipeline's interface. Implement additional steps as needed, ensuring input and output types match.
Now implement other steps are needed in the pipeline. The outputs and input types of successive steps need to match.
[Step]
[NextStep(nameof(DeliverResults))]
public PollResults ActOnTheRequest(PollRequest req)
{
Console.WriteLine(nameof(ActOnTheRequest));
var result = SomeApiClient.GetTheResults(req.Id);
return result;
}
Define the last step:
[LastStep]
public bool DeliverResults(PollResults res)
{
return myQueue.TryPush(res);
}
Generated code example:
using System.Threading.Tasks.Dataflow;
using Gridsum.DataflowEx;
public partial class MyActor : Dataflow<string, bool>, IActor< string >
{
public MyActor(DataflowOptions dataflowOptions = null) : base(DataflowOptions.Default)
{
_DeliverResults = new TransformBlock<PollResults,bool>( (PollResults x) => {
try
{
return DeliverResults(x);
}
catch(Exception e)
{
LogMessage(LogLevel.Error, $"Error in DeliverResults: {e.Message}\nStack Trace: {e.StackTrace}");
return default;
}
},
new ExecutionDataflowBlockOptions() {
BoundedCapacity = 1,
MaxDegreeOfParallelism = 1
});
RegisterChild(_DeliverResults);
_ActOnTheRequest = new TransformBlock<PollRequest,PollResults>( (PollRequest x) => {
try
{
return ActOnTheRequest(x);
}
catch(Exception e)
{
LogMessage(LogLevel.Error, $"Error in ActOnTheRequest: {e.Message}\nStack Trace: {e.StackTrace}");
return default;
}
},
new ExecutionDataflowBlockOptions() {
BoundedCapacity = 1,
MaxDegreeOfParallelism = 1
});
RegisterChild(_ActOnTheRequest);
_DecodeRequest = new TransformBlock<string,PollRequest>( (string x) => {
try
{
return DecodeRequest(x);
}
catch(Exception e)
{
LogMessage(LogLevel.Error, $"Error in DecodeRequest: {e.Message}\nStack Trace: {e.StackTrace}");
return default;
}
},
new ExecutionDataflowBlockOptions() {
BoundedCapacity = 1,
MaxDegreeOfParallelism = 1
});
RegisterChild(_DecodeRequest);
_ActOnTheRequest.LinkTo(_DeliverResults, new DataflowLinkOptions { PropagateCompletion = true });
_DecodeRequest.LinkTo(_ActOnTheRequest, new DataflowLinkOptions { PropagateCompletion = true });
}
TransformBlock<PollResults,bool> _DeliverResults;
TransformBlock<PollRequest,PollResults> _ActOnTheRequest;
TransformBlock<string,PollRequest> _DecodeRequest;
public override ITargetBlock<string > InputBlock { get => _DecodeRequest ; }
public override ISourceBlock< bool > OutputBlock { get => _DeliverResults; }
public bool Call(string input) => InputBlock.Post(input);
public async Task<bool> Cast(string input) => await InputBlock.SendAsync(input);
public async Task<bool> AcceptAsync(CancellationToken cancellationToken)
{
try
{
var result = await _DeliverResults.ReceiveAsync(cancellationToken);
return result;
}
catch (OperationCanceledException operationCanceledException)
{
return await Task.FromCanceled<bool>(cancellationToken);
}
}
public async Task Ingest(CancellationToken ct)
{
// start the message pump
while (!ct.IsCancellationRequested)
{
var foundSomething = false;
try
{
// cycle through ingesters IN PRIORITY ORDER.
{
var msg = await ReceivePollRequest(ct);
if (msg != null)
{
Call(msg);
foundSomething = true;
// then jump back to the start of the pump
continue;
}
}
if (!foundSomething)
await Task.Delay(1000, ct);
}
catch (TaskCanceledException)
{
// if nothing was found on any of the receivers, then sleep for a while.
continue;
}
catch (Exception e)
{
LogMessage(LogLevel.Error, $"Exception in Ingest loop: {e.Message}\nStack Trace: {e.StackTrace}");
}
}
}
}
Using the pipeline:
var actor = new MyActor(); // this is your pipeline
try
{
// call into the pipeline synchronously
if (actor.Call("""
{ "something": "here" }
"""))
Console.WriteLine("Called Synchronously");
// stop the pipeline after 10 secs
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
// kick off an endless process to keep ingesting input into the pipeline
var t = Task.Run(async () => await actor.Ingest(cts.Token), cts.Token);
// consume results from the last step via the AcceptAsync method
while (!cts.Token.IsCancellationRequested)
{
var result = await actor.AcceptAsync(cts.Token);
Console.WriteLine($"Result: {result}");
}
await t; // cancel the message pump task
await actor.SignalAndWaitForCompletionAsync(); // wait for all pipeline tasks to complete
}
catch (OperationCanceledException _)
{
Console.WriteLine("All Done!");
}
dotnet testdotnet test /p:CollectCoverage=true /p:CoverletOutputFormat=coberturaBuilt on DataflowEx and Bnaya.SourceGenerator.Template.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net5.0 net5.0 was computed. net5.0-windows net5.0-windows was computed. net6.0 net6.0 was computed. net6.0-android net6.0-android was computed. net6.0-ios net6.0-ios was computed. net6.0-maccatalyst net6.0-maccatalyst was computed. net6.0-macos net6.0-macos was computed. net6.0-tvos net6.0-tvos was computed. net6.0-windows net6.0-windows was computed. net7.0 net7.0 was computed. net7.0-android net7.0-android was computed. net7.0-ios net7.0-ios was computed. net7.0-maccatalyst net7.0-maccatalyst was computed. net7.0-macos net7.0-macos was computed. net7.0-tvos net7.0-tvos was computed. net7.0-windows net7.0-windows was computed. net8.0 net8.0 was computed. net8.0-android net8.0-android was computed. net8.0-browser net8.0-browser was computed. net8.0-ios net8.0-ios was computed. net8.0-maccatalyst net8.0-maccatalyst was computed. net8.0-macos net8.0-macos was computed. net8.0-tvos net8.0-tvos was computed. net8.0-windows net8.0-windows was computed. net9.0 net9.0 was computed. net9.0-android net9.0-android was computed. net9.0-browser net9.0-browser was computed. net9.0-ios net9.0-ios was computed. net9.0-maccatalyst net9.0-maccatalyst was computed. net9.0-macos net9.0-macos was computed. net9.0-tvos net9.0-tvos was computed. net9.0-windows net9.0-windows was computed. net10.0 net10.0 was computed. net10.0-android net10.0-android was computed. net10.0-browser net10.0-browser was computed. net10.0-ios net10.0-ios was computed. net10.0-maccatalyst net10.0-maccatalyst was computed. net10.0-macos net10.0-macos was computed. net10.0-tvos net10.0-tvos was computed. net10.0-windows net10.0-windows was computed. |
| .NET Core | netcoreapp2.0 netcoreapp2.0 was computed. netcoreapp2.1 netcoreapp2.1 was computed. netcoreapp2.2 netcoreapp2.2 was computed. netcoreapp3.0 netcoreapp3.0 was computed. netcoreapp3.1 netcoreapp3.1 was computed. |
| .NET Standard | netstandard2.0 netstandard2.0 is compatible. netstandard2.1 netstandard2.1 was computed. |
| .NET Framework | net461 net461 was computed. net462 net462 was computed. net463 net463 was computed. net47 net47 was computed. net471 net471 was computed. net472 net472 was computed. net48 net48 was computed. net481 net481 was computed. |
| MonoAndroid | monoandroid monoandroid was computed. |
| MonoMac | monomac monomac was computed. |
| MonoTouch | monotouch monotouch was computed. |
| Tizen | tizen40 tizen40 was computed. tizen60 tizen60 was computed. |
| Xamarin.iOS | xamarinios xamarinios was computed. |
| Xamarin.Mac | xamarinmac xamarinmac was computed. |
| Xamarin.TVOS | xamarintvos xamarintvos was computed. |
| Xamarin.WatchOS | xamarinwatchos xamarinwatchos was computed. |
Showing the top 1 NuGet packages that depend on ActorSrcGen.Abstractions:
| Package | Downloads |
|---|---|
|
ActorSrcGen
A C# Source Generator to adapt a simple class to allow it to use TPL Dataflow for robust high performance computation |
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 2.4.3 | 352 | 12/6/2025 |
| 2.3.2 | 409 | 8/29/2024 |
| 2.3.1 | 335 | 8/28/2024 |
| 2.1.1 | 335 | 8/27/2024 |
| 2.0.1 | 334 | 8/23/2024 |
| 1.1.2 | 344 | 5/9/2024 |
| 1.1.1 | 335 | 5/8/2024 |
| 1.0.4 | 371 | 4/29/2024 |
| 1.0.3 | 339 | 4/29/2024 |
| 1.0.2 | 342 | 4/29/2024 |
| 1.0.1 | 344 | 4/28/2024 |
| 0.3.6 | 388 | 4/25/2024 |
| 0.3.5 | 369 | 4/24/2024 |
| 0.3.3 | 389 | 4/23/2024 |
| 0.3.0 | 800 | 11/4/2023 |
| 0.2.10 | 568 | 10/31/2023 |