![]() |
VOOZH | about |
dotnet add package Rystem.Queue --version 10.0.8
NuGet\Install-Package Rystem.Queue -Version 10.0.8
<PackageReference Include="Rystem.Queue" Version="10.0.8" />
<PackageVersion Include="Rystem.Queue" Version="10.0.8" />Directory.Packages.props
<PackageReference Include="Rystem.Queue" />Project file
paket add Rystem.Queue --version 10.0.8
#r "nuget: Rystem.Queue, 10.0.8"
#:package Rystem.Queue@10.0.8
#addin nuget:?package=Rystem.Queue&version=10.0.8Install as a Cake Addin
#tool nuget:?package=Rystem.Queue&version=10.0.8Install as a Cake Tool
Rystem.Queue is a batching queue library built on top of Rystem.BackgroundJob.
Items are stored in an IQueue<T> implementation and periodically flushed to an IQueueManager<T> implementation. The package includes in-memory FIFO and LIFO backends, plus an integration hook for custom queue providers.
It is most useful for:
The best real examples for this package come from the source itself and the unit test in src/Extensions/Queue/Test/Rystem.Queue.UnitTest/QueueTest.cs.
dotnet add package Rystem.Queue
The current 10.x package targets net10.0 and builds on top of Rystem.BackgroundJob.
The package is centered around five pieces.
| Piece | Purpose |
|---|---|
IQueue<T> |
Storage abstraction for queued items |
IQueueManager<T> |
Batch processor invoked when a flush happens |
QueueProperty<T> |
Per-queue settings for thresholds and schedules |
QueueJobManager<T> |
Internal background job that decides when to flush |
AddQueueIntegration(...) |
DI entry point that wires queue, manager, settings, and background job |
The two built-in queue backends are:
MemoryQueue<T> for FIFO behaviorMemoryStackQueue<T> for LIFO behaviorAt a high level, the flow is:
IQueue<T>WarmUpAsync() so the background job startsIQueueManager<T>.ManageAsync(...)IQueueManager<T> is the consumer that receives a flushed batch.
using Rystem.Queue;
public sealed class SampleQueueManager : IQueueManager<Sample>
{
private readonly ILogger<SampleQueueManager> _logger;
public SampleQueueManager(ILogger<SampleQueueManager> logger)
{
_logger = logger;
}
public Task ManageAsync(IEnumerable<Sample> items)
{
_logger.LogInformation("Processing {Count} items", items.Count());
return Task.CompletedTask;
}
}
public interface IQueueManager<in T>
{
Task ManageAsync(IEnumerable<T> items);
}
The manager should be able to process the whole batch in one call.
AddQueueIntegration(...) registers IQueueManager<T> as Transient.
When a flush happens, QueueJobManager<T> resolves the manager from a fresh DI scope:
var service = _serviceProvider.CreateScope().ServiceProvider.GetService<IQueueManager<T>>();
That means scoped dependencies behave per flush, not for the lifetime of the application. The unit test manager in src/Extensions/Queue/Test/Rystem.Queue.UnitTest/QueueTest.cs demonstrates this by resolving singleton, scoped, and transient dependencies inside the manager.
Register the built-in FIFO queue with AddMemoryQueue<T, TQueueManager>():
services.AddMemoryQueue<Sample, SampleQueueManager>(options =>
{
options.MaximumBuffer = 1000;
options.MaximumRetentionCronFormat = "*/3 * * * * *";
options.BackgroundJobCronFormat = "*/1 * * * * *";
});
This uses MemoryQueue<T>, which is backed by ConcurrentQueue<T>.
If you want stack-like behavior instead, use AddMemoryStackQueue<T, TQueueManager>():
services.AddMemoryStackQueue<Sample, SampleQueueManager>(options =>
{
options.MaximumBuffer = 1000;
options.MaximumRetentionCronFormat = "*/3 * * * * *";
options.BackgroundJobCronFormat = "*/1 * * * * *";
});
This uses MemoryStackQueue<T>, which is backed by ConcurrentStack<T>.
The common registration path is:
services.AddQueueIntegration<T, TQueueManager, TQueue>(options =>
{
// configure QueueProperty<T>
});
Internally it registers:
QueueProperty<T> as singletonIQueue<T> as singletonIQueueManager<T> as transientQueueJobManager<T> through AddBackgroundJob(...)The background queue worker is configured with:
x.Cron = settings.BackgroundJobCronFormat;
x.RunImmediately = false;
So queue processing always depends on the Rystem.BackgroundJob scheduler and does not run immediately at startup unless the first scheduled tick occurs.
QueueProperty<T> contains the queue settings:
public sealed class QueueProperty<T>
{
public int MaximumBuffer { get; set; } = 5000;
public string MaximumRetentionCronFormat { get; set; } = "*/1 * * * *";
public string BackgroundJobCronFormat { get; set; } = "*/1 * * * *";
}
| Property | Default | Purpose |
|---|---|---|
MaximumBuffer |
5000 |
Flush when the queued item count goes above this value |
MaximumRetentionCronFormat |
"*/1 * * * *" |
Retention schedule used by QueueJobManager<T> when computing flush timing |
BackgroundJobCronFormat |
"*/1 * * * *" |
How often the background worker checks the queue |
QueueProperty<T> is generic only so it can be registered separately per queue type.
Because the queue worker is implemented as a background job, it starts only after warm-up runs:
var app = builder.Build();
await app.Services.WarmUpAsync();
app.Run();
The unit test does the same with:
serviceProvider.WarmUpAsync().ToResult();
Without warm-up, the scheduled queue flushes never begin.
The internal queue worker flushes when:
await _queue.CountAsync() > _property.MaximumBuffer
Note the comparison is > rather than >=.
That is why the test uses 1001 items when MaximumBuffer = 1000:
for (int i = 0; i < 1001; i++)
await queue.AddAsync(new Sample { Id = i.ToString() });
After a short wait, the queue is empty again:
Assert.Equal(0, await queue.CountAsync());
The queue worker runs on BackgroundJobCronFormat, and that scheduled execution is the outer polling loop for flushes.
Inside the worker, MaximumRetentionCronFormat is parsed with Cronos to compute the next retention occurrence when flush logic runs. In practice, the background job cadence is what determines how often the queue can be inspected, so keep BackgroundJobCronFormat at or below the level of responsiveness you want.
The test-backed example configures:
options.MaximumRetentionCronFormat = "*/3 * * * * *";
options.BackgroundJobCronFormat = "*/1 * * * * *";
With that setup, the queue is checked every second and flushed during the scheduled processing loop.
Inject IQueue<T> wherever items should be buffered.
using Rystem.Queue;
public sealed class OrderService
{
private readonly IQueue<Sample> _queue;
public OrderService(IQueue<Sample> queue)
{
_queue = queue;
}
public async Task EnqueueAsync()
{
for (int i = 0; i < 100; i++)
await _queue.AddAsync(new Sample { Id = i.ToString() });
}
}
public interface IQueue<T>
{
Task AddAsync(T entity);
Task<IEnumerable<T>> DequeueAsync(int? top = null);
Task<IEnumerable<T>> ReadAsync(int? top = null);
Task<int> CountAsync();
}
Typical usage:
await queue.AddAsync(new Sample { Id = "1" });
IEnumerable<Sample> preview = await queue.ReadAsync(top: 10);
IEnumerable<Sample> batch = await queue.DequeueAsync(top: 50);
int count = await queue.CountAsync();
The built-in in-memory implementations behave like this:
MemoryQueue<T> is FIFOMemoryStackQueue<T> is LIFOTo plug in your own queue storage, implement IQueue<T> and register it through AddQueueIntegration(...).
using Rystem.Queue;
public sealed class ServiceBusQueue<T> : IQueue<T>
{
public Task AddAsync(T entity) => Task.CompletedTask;
public Task<IEnumerable<T>> ReadAsync(int? top = null) => Task.FromResult(Enumerable.Empty<T>());
public Task<IEnumerable<T>> DequeueAsync(int? top = null) => Task.FromResult(Enumerable.Empty<T>());
public Task<int> CountAsync() => Task.FromResult(0);
}
services.AddQueueIntegration<Sample, SampleQueueManager, ServiceBusQueue<Sample>>(options =>
{
options.MaximumBuffer = 1000;
options.MaximumRetentionCronFormat = "*/3 * * * * *";
options.BackgroundJobCronFormat = "*/1 * * * * *";
});
When writing a custom backend, keep these semantics aligned with the queue worker:
CountAsync() should reflect the current queued count as accurately as possibleDequeueAsync() should remove the returned itemsReadAsync() should not remove itemsThe most useful references for this package are:
This README is intentionally architecture-first because Rystem.Queue is more than just an in-memory queue. It is a small batching pipeline built from a queue abstraction, a batch manager abstraction, and a scheduled worker from Rystem.BackgroundJob.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 net10.0 is compatible. net10.0-android net10.0-android was computed. net10.0-browser net10.0-browser was computed. net10.0-ios net10.0-ios was computed. net10.0-maccatalyst net10.0-maccatalyst was computed. net10.0-macos net10.0-macos was computed. net10.0-tvos net10.0-tvos was computed. net10.0-windows net10.0-windows was computed. |
This package is not used by any NuGet packages.
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 10.0.8 | 5,512 | 5/13/2026 |
| 10.0.7 | 270 | 3/26/2026 |
| 10.0.6 | 433,705 | 3/3/2026 |
| 10.0.5 | 271 | 2/22/2026 |
| 10.0.4 | 306 | 2/9/2026 |
| 10.0.3 | 148,082 | 1/28/2026 |
| 10.0.1 | 209,290 | 11/12/2025 |
| 9.1.3 | 453 | 9/2/2025 |
| 9.1.2 | 764,689 | 5/29/2025 |
| 9.1.1 | 97,989 | 5/2/2025 |
| 9.0.32 | 186,821 | 4/15/2025 |
| 9.0.31 | 5,982 | 4/2/2025 |
| 9.0.30 | 89,043 | 3/26/2025 |
| 9.0.29 | 9,225 | 3/18/2025 |
| 9.0.28 | 445 | 3/17/2025 |
| 9.0.27 | 446 | 3/16/2025 |
| 9.0.26 | 474 | 3/13/2025 |
| 9.0.25 | 52,317 | 3/9/2025 |
| 9.0.20 | 19,751 | 3/6/2025 |
| 9.0.19 | 499 | 3/6/2025 |