![]() |
VOOZH | about |
dotnet add package Galosys.Foundation.Amqp --version 26.5.20.1
NuGet\Install-Package Galosys.Foundation.Amqp -Version 26.5.20.1
<PackageReference Include="Galosys.Foundation.Amqp" Version="26.5.20.1" />
<PackageVersion Include="Galosys.Foundation.Amqp" Version="26.5.20.1" />Directory.Packages.props
<PackageReference Include="Galosys.Foundation.Amqp" />Project file
paket add Galosys.Foundation.Amqp --version 26.5.20.1
#r "nuget: Galosys.Foundation.Amqp, 26.5.20.1"
#:package Galosys.Foundation.Amqp@26.5.20.1
#addin nuget:?package=Galosys.Foundation.Amqp&version=26.5.20.1Install as a Cake Addin
#tool nuget:?package=Galosys.Foundation.Amqp&version=26.5.20.1Install as a Cake Tool
成熟度: 🟢 稳定 — AMQP 抽象层,已被 RabbitMQ.Client 模块正式继承
AMQP 消息抽象层模块,定义统一的发送/消费抽象基类、配置模型、[AmqpHandler] 注解、Handler 自动扫描。用户代码仅依赖此模块的 API,切换底层实现只需改 PackageReference。
Galosys.Foundation.Amqp ← 本模块(抽象层)
▲
│
Galosys.Foundation.RabbitMQ.Client ← RabbitMQ 实现(ConnectionPool + BatchPublishPipeline + Publisher Confirms)
AmqpTemplate + [AmqpHandler] + IMessageHandler<T>,切换实现零改动[AmqpHandler("routingKey")] 方法级注解,启动时自动扫描注册BroadcastAsync fanout 发布DelayAsync 支持 x-delayed-message 插件AmqpProperties 流式 API:自定义 MessageId、Header、延迟、TTLAmqpBootstrapper(IHostedLifecycleService,自动扫描 Handler + 优雅关闭)通常不直接安装此包,而是选择实现包:
<PackageReference Include="Galosys.Foundation.RabbitMQ.Client" Version="x.x.x" />
{
"RabbitMQ": {
"HostName": "localhost",
"UserName": "guest",
"Password": "guest",
"VirtualHost": "/"
}
}
AppName默认取应用名称,通常无需显式配置。
// 方式一:模块自动发现(推荐)
await Host.CreateDefaultBuilder()
.UseModularization()
.Locate()
.RunAsync();
// 方式二:手动注册(需同时注册实现包)
services.AddRabbitMQ(configuration);
using RabbitMQ.Client;
public class OrderCreatedMessage
{
public string OrderNo { get; set; }
}
public class OrderService
{
private readonly AmqpTemplate _amqp;
public OrderService(AmqpTemplate amqp) => _amqp = amqp;
// 单条发送
public async Task SendAsync() =>
await _amqp.SendAsync("order_created", new[] { new OrderCreatedMessage { OrderNo = "ORD001" } });
// 批量发送
public async Task SendBatchAsync() =>
await _amqp.SendAsync("order_created", new[]
{
new OrderCreatedMessage { OrderNo = "ORD001" },
new OrderCreatedMessage { OrderNo = "ORD002" }
});
// 广播消息(fanout)
public async Task BroadcastAsync() =>
await _amqp.BroadcastAsync("order_events", new[] { new OrderCreatedMessage { OrderNo = "ORD001" } });
// 延迟消息(需 x-delayed-message 插件)
public async Task DelayAsync() =>
await _amqp.DelayAsync("order_created", new[] { new OrderCreatedMessage { OrderNo = "ORD001" } }, delayMillionSeconds: 5000);
}
using RabbitMQ.Client;
[Handler]
public class OrderHandler : IMessageHandler
{
[AmqpHandler("order_created")]
public async Task<bool> HandleAsync(IMessage msg)
{
// 处理消息
return true; // true = ACK, false = Nack
}
// 广播消费
[AmqpHandler("order_events", Broadcast = true, Durable = true)]
public async Task<bool> OnBroadcastAsync(IMessage msg)
{
return true;
}
}
注册机制: 类标注
[Handler]→ DI 自动注册;方法标注[AmqpHandler]→ 启动时自动扫描订阅。类需实现IMessageHandler或IMessageHandler<T>。
{
"RabbitMQ": {
"HostName": "localhost",
"UserName": "guest",
"Password": "guest",
"VirtualHost": "/",
"AutoCreateEnabled": true,
"ConsumerDispatchConcurrency": 8,
"NetworkRecoveryInterval": 10,
"RequestedHeartbeat": 45,
"ContinuationTimeout": 20,
"Limit": {
"ProducerMaxConnections": 4,
"ConsumerMaxConnections": 4,
"MaxMessageSize": 65536,
"BatchSize": 100,
"BatchTimeoutMs": 50,
"PublishConfirmTimeoutMs": 3000,
"PipelineCapacity": 5000
},
"Handlers": {
"order_created": {
"Concurrency": 4,
"MaxRetryCount": 3,
"RetryBaseDelayMs": 1000,
"PrefetchMultiplier": 2,
"DurableQueue": false,
"QueueName": null
}
}
}
}
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
HostName |
string |
(必填) | RabbitMQ 主机地址,多主机空格/逗号分隔,自动 Ping 选最优 |
UserName |
string? |
null |
认证用户名 |
Password |
string? |
null |
认证密码 |
VirtualHost |
string? |
"/" |
虚拟主机 |
AppName |
string? |
应用名 | 客户端连接名称 |
AutoCreateEnabled |
bool |
true |
自动声明队列/交换器(阿里云限 TPS 场景可关闭) |
Limit |
AmqpLimitOptions |
见下表 | 连接池与限流配置 |
Handlers |
IDictionary<string, AmqpHandlerProperty> |
{} |
按 RoutingKey 粒度的消费者配置 |
ConsumerDispatchConcurrency |
ushort |
CPU核心数*4 |
消费者调度并发数 |
NetworkRecoveryInterval |
int |
10 |
自动重连间隔(秒) |
RequestedHeartbeat |
int |
45 |
心跳间隔(秒) |
ContinuationTimeout |
int |
20 |
RPC 续延超时(秒) |
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
ProducerMaxConnections |
int |
4 |
生产者连接池大小 |
ProducerChannelMaxPerConnection |
int |
10 |
每连接最大 Channel 数(生产者) |
ConsumerMaxConnections |
int |
4 |
消费者连接池大小 |
ConsumerChannelMaxPerConnection |
int |
50 |
每连接最大 Channel 数(消费者) |
MaxMessageSize |
int |
65536 |
最大消息体字节数(64KB) |
MaxTps |
int |
0 |
最大 TPS(0=不限) |
BatchSize |
int |
100 |
批量发送大小 |
BatchTimeoutMs |
int |
50 |
批量聚合等待时间(ms) |
PublishConfirmTimeoutMs |
int |
3000 |
Publisher Confirm 超时(ms) |
RetryCount |
int |
3 |
批量发送重试次数 |
RetryBaseDelayMs |
int |
1000 |
重试退避基础延迟(ms) |
PipelineCapacity |
int |
5000 |
发送管道容量(有界 Channel) |
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
Concurrency |
int |
1 |
并发消费线程数 |
MaxRetryCount |
int |
3 |
最大重试次数 |
RetryBaseDelayMs |
int |
1000 |
指数退避基础延迟(ms) |
PrefetchMultiplier |
int |
2 |
实际 Prefetch = Concurrency × PrefetchMultiplier |
DurableQueue |
bool |
false |
队列是否持久化(广播模式) |
QueueName |
string? |
null |
自定义队列名(null=自动命名) |
| 类名 | 说明 |
|---|---|
AmqpTemplate |
消息操作模板(Facade),提供 SendAsync、BroadcastAsync、DelayAsync、HandleAsync |
AmqpPublisher |
消息发布器抽象基类 |
AmqpConsumer |
消息消费者抽象基类 |
AmqpBootstrapper |
启动引导器,自动扫描 Handler + 优雅关闭 |
AmqpOptions |
配置选项类 |
AmqpLimitOptions |
连接池限流配置 |
AmqpHandlerProperty |
按 RoutingKey 的消费者配置 |
AmqpHandlerAttribute |
消费者方法注解 |
AmqpProperties |
消息属性,支持流式设置 Header/Delay/TTL |
IAmqpProperties |
消息属性接口 |
Galosys.Foundation.CoreMicrosoft.Extensions.HostingMicrosoft.Extensions.Options| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 net8.0 is compatible. net8.0-android net8.0-android was computed. net8.0-browser net8.0-browser was computed. net8.0-ios net8.0-ios was computed. net8.0-maccatalyst net8.0-maccatalyst was computed. net8.0-macos net8.0-macos was computed. net8.0-tvos net8.0-tvos was computed. net8.0-windows net8.0-windows was computed. net9.0 net9.0 was computed. net9.0-android net9.0-android was computed. net9.0-browser net9.0-browser was computed. net9.0-ios net9.0-ios was computed. net9.0-maccatalyst net9.0-maccatalyst was computed. net9.0-macos net9.0-macos was computed. net9.0-tvos net9.0-tvos was computed. net9.0-windows net9.0-windows was computed. net10.0 net10.0 was computed. net10.0-android net10.0-android was computed. net10.0-browser net10.0-browser was computed. net10.0-ios net10.0-ios was computed. net10.0-maccatalyst net10.0-maccatalyst was computed. net10.0-macos net10.0-macos was computed. net10.0-tvos net10.0-tvos was computed. net10.0-windows net10.0-windows was computed. |
Showing the top 3 NuGet packages that depend on Galosys.Foundation.Amqp:
| Package | Downloads |
|---|---|
|
Galosys.Foundation.EasyNetQ
Galosys.Foundation快速开发库 |
|
|
Galosys.Foundation.RabbitMQ.Client
Galosys.Foundation快速开发库 |
|
|
Galosys.Foundation.RabbitMQ.Benchmarks
Galosys.Foundation快速开发库 |
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 26.5.20.1 | 134 | 5/20/2026 |
| 26.5.19.1 | 134 | 5/19/2026 |
| 26.5.18.1 | 132 | 5/18/2026 |
| 26.5.15.1 | 130 | 5/15/2026 |
| 26.5.12.3 | 135 | 5/12/2026 |
| 26.5.12.2 | 130 | 5/12/2026 |
| 26.4.27.1-rc1 | 127 | 4/26/2026 |
| 26.4.25.1-rc1 | 123 | 4/25/2026 |
| 26.4.22.2-rc7 | 127 | 4/22/2026 |
| 26.4.22.2-rc6 | 128 | 4/22/2026 |
| 26.4.22.2-rc4 | 125 | 4/22/2026 |
| 26.4.22.2-rc3 | 126 | 4/22/2026 |
| 26.4.19.1-rc1 | 162 | 4/19/2026 |