VOOZH about

URL: https://www.nuget.org/packages/Galosys.Foundation.Amqp/

⇱ NuGet Gallery | Galosys.Foundation.Amqp 26.5.20.1




Galosys.Foundation.Amqp 26.5.20.1

dotnet add package Galosys.Foundation.Amqp --version 26.5.20.1
 
 
NuGet\Install-Package Galosys.Foundation.Amqp -Version 26.5.20.1
 
 
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Galosys.Foundation.Amqp" Version="26.5.20.1" />
 
 
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Galosys.Foundation.Amqp" Version="26.5.20.1" />
 
Directory.Packages.props
<PackageReference Include="Galosys.Foundation.Amqp" />
 
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Galosys.Foundation.Amqp --version 26.5.20.1
 
 
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: Galosys.Foundation.Amqp, 26.5.20.1"
 
 
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Galosys.Foundation.Amqp@26.5.20.1
 
 
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Galosys.Foundation.Amqp&version=26.5.20.1
 
Install as a Cake Addin
#tool nuget:?package=Galosys.Foundation.Amqp&version=26.5.20.1
 
Install as a Cake Tool
The NuGet Team does not provide support for this client. Please contact its maintainers for support.

Galosys.Foundation.Amqp

成熟度: 🟢 稳定 — AMQP 抽象层,已被 RabbitMQ.Client 模块正式继承

AMQP 消息抽象层模块,定义统一的发送/消费抽象基类、配置模型、[AmqpHandler] 注解、Handler 自动扫描。用户代码仅依赖此模块的 API,切换底层实现只需改 PackageReference。

架构

Galosys.Foundation.Amqp ← 本模块(抽象层)
 ▲
 │
Galosys.Foundation.RabbitMQ.Client ← RabbitMQ 实现(ConnectionPool + BatchPublishPipeline + Publisher Confirms)

功能特性

  • 统一 APIAmqpTemplate + [AmqpHandler] + IMessageHandler<T>,切换实现零改动
  • 声明式消费[AmqpHandler("routingKey")] 方法级注解,启动时自动扫描注册
  • 广播消息BroadcastAsync fanout 发布
  • 延迟消息DelayAsync 支持 x-delayed-message 插件
  • 消息属性扩展AmqpProperties 流式 API:自定义 MessageId、Header、延迟、TTL
  • 生命周期管理AmqpBootstrapper(IHostedLifecycleService,自动扫描 Handler + 优雅关闭)
  • 每消息 DI Scope — 消费端自动创建独立 DI Scope,正确处理 Scoped 生命周期

安装

通常不直接安装此包,而是选择实现包:


<PackageReference Include="Galosys.Foundation.RabbitMQ.Client" Version="x.x.x" />

快速开始

1. 最小配置

{
 "RabbitMQ": {
 "HostName": "localhost",
 "UserName": "guest",
 "Password": "guest",
 "VirtualHost": "/"
 }
}

AppName 默认取应用名称,通常无需显式配置。

2. 注册服务

// 方式一:模块自动发现(推荐)
await Host.CreateDefaultBuilder()
 .UseModularization()
 .Locate()
 .RunAsync();

// 方式二:手动注册(需同时注册实现包)
services.AddRabbitMQ(configuration);

3. 发送消息

using RabbitMQ.Client;

public class OrderCreatedMessage
{
 public string OrderNo { get; set; }
}

public class OrderService
{
 private readonly AmqpTemplate _amqp;

 public OrderService(AmqpTemplate amqp) => _amqp = amqp;

 // 单条发送
 public async Task SendAsync() =>
 await _amqp.SendAsync("order_created", new[] { new OrderCreatedMessage { OrderNo = "ORD001" } });

 // 批量发送
 public async Task SendBatchAsync() =>
 await _amqp.SendAsync("order_created", new[]
 {
 new OrderCreatedMessage { OrderNo = "ORD001" },
 new OrderCreatedMessage { OrderNo = "ORD002" }
 });

 // 广播消息(fanout)
 public async Task BroadcastAsync() =>
 await _amqp.BroadcastAsync("order_events", new[] { new OrderCreatedMessage { OrderNo = "ORD001" } });

 // 延迟消息(需 x-delayed-message 插件)
 public async Task DelayAsync() =>
 await _amqp.DelayAsync("order_created", new[] { new OrderCreatedMessage { OrderNo = "ORD001" } }, delayMillionSeconds: 5000);
}

4. 消费消息

using RabbitMQ.Client;

[Handler]
public class OrderHandler : IMessageHandler
{
 [AmqpHandler("order_created")]
 public async Task<bool> HandleAsync(IMessage msg)
 {
 // 处理消息
 return true; // true = ACK, false = Nack
 }

 // 广播消费
 [AmqpHandler("order_events", Broadcast = true, Durable = true)]
 public async Task<bool> OnBroadcastAsync(IMessage msg)
 {
 return true;
 }
}

注册机制: 类标注 [Handler] → DI 自动注册;方法标注 [AmqpHandler] → 启动时自动扫描订阅。类需实现 IMessageHandlerIMessageHandler<T>

详细配置

完整配置示例

{
 "RabbitMQ": {
 "HostName": "localhost",
 "UserName": "guest",
 "Password": "guest",
 "VirtualHost": "/",
 "AutoCreateEnabled": true,
 "ConsumerDispatchConcurrency": 8,
 "NetworkRecoveryInterval": 10,
 "RequestedHeartbeat": 45,
 "ContinuationTimeout": 20,
 "Limit": {
 "ProducerMaxConnections": 4,
 "ConsumerMaxConnections": 4,
 "MaxMessageSize": 65536,
 "BatchSize": 100,
 "BatchTimeoutMs": 50,
 "PublishConfirmTimeoutMs": 3000,
 "PipelineCapacity": 5000
 },
 "Handlers": {
 "order_created": {
 "Concurrency": 4,
 "MaxRetryCount": 3,
 "RetryBaseDelayMs": 1000,
 "PrefetchMultiplier": 2,
 "DurableQueue": false,
 "QueueName": null
 }
 }
 }
}

根配置(AmqpOptions)

配置项 类型 默认值 说明
HostName string (必填) RabbitMQ 主机地址,多主机空格/逗号分隔,自动 Ping 选最优
UserName string? null 认证用户名
Password string? null 认证密码
VirtualHost string? "/" 虚拟主机
AppName string? 应用名 客户端连接名称
AutoCreateEnabled bool true 自动声明队列/交换器(阿里云限 TPS 场景可关闭)
Limit AmqpLimitOptions 见下表 连接池与限流配置
Handlers IDictionary<string, AmqpHandlerProperty> {} 按 RoutingKey 粒度的消费者配置
ConsumerDispatchConcurrency ushort CPU核心数*4 消费者调度并发数
NetworkRecoveryInterval int 10 自动重连间隔(秒)
RequestedHeartbeat int 45 心跳间隔(秒)
ContinuationTimeout int 20 RPC 续延超时(秒)

限流配置(AmqpLimitOptions)

配置项 类型 默认值 说明
ProducerMaxConnections int 4 生产者连接池大小
ProducerChannelMaxPerConnection int 10 每连接最大 Channel 数(生产者)
ConsumerMaxConnections int 4 消费者连接池大小
ConsumerChannelMaxPerConnection int 50 每连接最大 Channel 数(消费者)
MaxMessageSize int 65536 最大消息体字节数(64KB)
MaxTps int 0 最大 TPS(0=不限)
BatchSize int 100 批量发送大小
BatchTimeoutMs int 50 批量聚合等待时间(ms)
PublishConfirmTimeoutMs int 3000 Publisher Confirm 超时(ms)
RetryCount int 3 批量发送重试次数
RetryBaseDelayMs int 1000 重试退避基础延迟(ms)
PipelineCapacity int 5000 发送管道容量(有界 Channel)

Handler 配置(AmqpHandlerProperty)

配置项 类型 默认值 说明
Concurrency int 1 并发消费线程数
MaxRetryCount int 3 最大重试次数
RetryBaseDelayMs int 1000 指数退避基础延迟(ms)
PrefetchMultiplier int 2 实际 Prefetch = Concurrency × PrefetchMultiplier
DurableQueue bool false 队列是否持久化(广播模式)
QueueName string? null 自定义队列名(null=自动命名)

核心类

类名 说明
AmqpTemplate 消息操作模板(Facade),提供 SendAsyncBroadcastAsyncDelayAsyncHandleAsync
AmqpPublisher 消息发布器抽象基类
AmqpConsumer 消息消费者抽象基类
AmqpBootstrapper 启动引导器,自动扫描 Handler + 优雅关闭
AmqpOptions 配置选项类
AmqpLimitOptions 连接池限流配置
AmqpHandlerProperty 按 RoutingKey 的消费者配置
AmqpHandlerAttribute 消费者方法注解
AmqpProperties 消息属性,支持流式设置 Header/Delay/TTL
IAmqpProperties 消息属性接口

依赖

  • Galosys.Foundation.Core
  • Microsoft.Extensions.Hosting
  • Microsoft.Extensions.Options
Product Versions Compatible and additional computed target framework versions.
.NET net8.0 net8.0 is compatible.  net8.0-android net8.0-android was computed.  net8.0-browser net8.0-browser was computed.  net8.0-ios net8.0-ios was computed.  net8.0-maccatalyst net8.0-maccatalyst was computed.  net8.0-macos net8.0-macos was computed.  net8.0-tvos net8.0-tvos was computed.  net8.0-windows net8.0-windows was computed.  net9.0 net9.0 was computed.  net9.0-android net9.0-android was computed.  net9.0-browser net9.0-browser was computed.  net9.0-ios net9.0-ios was computed.  net9.0-maccatalyst net9.0-maccatalyst was computed.  net9.0-macos net9.0-macos was computed.  net9.0-tvos net9.0-tvos was computed.  net9.0-windows net9.0-windows was computed.  net10.0 net10.0 was computed.  net10.0-android net10.0-android was computed.  net10.0-browser net10.0-browser was computed.  net10.0-ios net10.0-ios was computed.  net10.0-maccatalyst net10.0-maccatalyst was computed.  net10.0-macos net10.0-macos was computed.  net10.0-tvos net10.0-tvos was computed.  net10.0-windows net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (3)

Showing the top 3 NuGet packages that depend on Galosys.Foundation.Amqp:

Package Downloads
Galosys.Foundation.EasyNetQ

Galosys.Foundation快速开发库

Galosys.Foundation.RabbitMQ.Client

Galosys.Foundation快速开发库

Galosys.Foundation.RabbitMQ.Benchmarks

Galosys.Foundation快速开发库

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
26.5.20.1 134 5/20/2026
26.5.19.1 134 5/19/2026
26.5.18.1 132 5/18/2026
26.5.15.1 130 5/15/2026
26.5.12.3 135 5/12/2026
26.5.12.2 130 5/12/2026
26.4.27.1-rc1 127 4/26/2026
26.4.25.1-rc1 123 4/25/2026
26.4.22.2-rc7 127 4/22/2026
26.4.22.2-rc6 128 4/22/2026
26.4.22.2-rc4 125 4/22/2026
26.4.22.2-rc3 126 4/22/2026
26.4.19.1-rc1 162 4/19/2026