解释:
RabbitMQ 是一个流行的开源消息队列系统,广泛用于实现异步通信、解耦组件、负载均衡等场景。在本篇博客中,我们将详细介绍如何在 .NET 6 中使用 RabbitMQ,包括生产者和消费者的实现,以及如何通过依赖注入来管理它们。
一、创建 .NET 6 应用
首先,确保你已经安装了 .NET 6 SDK。可以使用命令行工具创建一个新的 .NET 控制台应用:
dotnet new console -n RabbitMQDemo cd RabbitMQDemo
接着,你需要安装 RabbitMQ 的客户端库,通过 NuGet 包管理器来安装 RabbitMQ.Client:
dotnet add package RabbitMQ.Client; //这里我们选择 6.4.0 版本 dotnet add package Masuit.Tools.Core; //这个为一个
二、配置 RabbitMQ
接下来,我们需要定义连接 RabbitMQ 所需的配置选项。通常我们会将这些配置选项存储在一个类中。以下是配置类(RabbitMQServiceOptions)的实现:
////// RabbitMQ服务配置 /// public class RabbitMQServiceOptions { ////// 服务地址 /// public string Host { get; set; } ////// 端口 /// public int Port { get; set; } ////// 用户名 /// public string UserName { get; set; } ////// 密码 /// public string Password { get; set; } }
三、实现 RabbitMQ 连接工厂
为确保我们能够高效且安全地建立 RabbitMQ 连接,我们将创建一个名为 RabbitMQContext 的连接工厂类:
////// RabbitMQ连接工厂 /// public class RabbitMQContext { private static ConnectionFactory? factory; private static readonly object lockObj = new(); ////// 获取单个RabbitMQ连接 /// /// public static IConnection GetConnection(string hostName, int port, string userName, string password) { if (factory == null) { lock (lockObj) { factory ??= new ConnectionFactory { HostName = hostName, Port = port, UserName = userName, Password = password }; } } return factory.CreateConnection(); } }
四、实现 RabbitMQ 生产者
接下来我们来实现一个 RabbitMQ 生产者,用于发送消息到队列。创建一个名为 RabbitMQProducer 的类:
////// RabbitMQ 客户端,用于发送消息到 RabbitMQ 队列或交换机。 /// public class RabbitMQProducer { private readonly ILogger _logger; private readonly RabbitMQServiceOptions _options; ////// 初始化 RabbitMQ 客户端。 /// /// 日志记录器。 /// RabbitMQ 连接配置。 public RabbitMQProducer(ILogger logger, RabbitMQServiceOptions options) { _logger = logger; _options = options; } /**** * RabbitMQ 交换机类型说明: * 1. Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。 * 例如,如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发。 * 2. Fanout Exchange – 不处理路由键。只需将队列绑定到交换机上,发送到交换机的消息会被转发到所有绑定的队列。 * 类似于广播,所有绑定的队列都会收到消息。 * 3. Topic Exchange – 将路由键和某模式进行匹配。队列需要绑定到一个模式上。 * 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。 * 例如,“audit.#”能够匹配到“audit.irs.corporate”,但“audit.*”只会匹配到“audit.irs”。 ****/ ////// 发布消息(工作队列模式,适用于多消费者负载均衡)。 /// /// 队列名称。 /// 消息内容。 public void WorkQueueSendMessage(string queueName, string message) { using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); using var channel = connection.CreateModel(); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送消息到指定队列 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: string.Empty, routingKey: queueName, basicProperties: properties, body: body); } ////// 推送消息(简单模式,适用于单生产者和单消费者)。 /// /// 队列名称。 /// 消息内容。 public void SimpleSendMessage(string queueName, string message) { using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); using var channel = connection.CreateModel(); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送消息到指定队列 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: string.Empty, routingKey: queueName, mandatory: false, basicProperties: properties, body: body); } ////// 发布消息(Fanout Exchange 模式,适用于广播消息到所有绑定队列)。 /// /// 队列名称。 /// 消息内容。 public void FanoutSendMessage(string queueName, string message) { using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); using var channel = connection.CreateModel(); // 创建 Fanout 类型的交换机 var exchangeName = $"{queueName}_fanout_exchange"; channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false); // 将队列绑定到交换机,routingKey 无需指定 channel.QueueBind(queueName, exchangeName, routingKey: string.Empty); // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送消息到交换机,所有绑定队列都会收到消息 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchangeName, routingKey: string.Empty, properties, body: body); } ////// 发布消息(Fanout Exchange 模式,适用于广播消息到所有绑定队列)。 /// /// 交换机名称。 /// 队列名称。 /// 消息内容。 public void FanoutSendMessage(string exchangeName, string queueName, string message) { using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); using var channel = connection.CreateModel(); // 创建 Fanout 类型的交换机 channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false); // 将队列绑定到交换机,routingKey 无需指定 channel.QueueBind(queueName, exchangeName, routingKey: string.Empty); // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送消息到交换机,所有绑定队列都会收到消息 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchangeName, routingKey: string.Empty, properties, body: body); } ////// 发布消息(Direct Exchange 模式,适用于路由键完全匹配的消息分发)。 /// /// 队列名称。 /// 消息内容。 public void DirectSendMessage(string queueName, string message) { using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); using var channel = connection.CreateModel(); // 创建 Direct 类型的交换机 var exchangeName = $"{queueName}_direct_exchange"; channel.ExchangeDeclare(exchangeName, ExchangeType.Direct); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false); // 将队列绑定到交换机,并指定路由键 var routingKey = $"{queueName}"; channel.QueueBind(queueName, exchangeName, routingKey); // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送消息到交换机,只有路由键完全匹配的队列才会收到消息 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchangeName, routingKey, properties, body: body); } ////// 发布消息(Direct Exchange 模式,适用于路由键完全匹配的消息分发)。 /// /// 交换机名称。 /// 队列名称。 /// 路由键。 /// 消息内容。 public void DirectSendMessage(string exchangeName, string queueName, string routingKey, string message) { using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); using var channel = connection.CreateModel(); // 创建 Direct 类型的交换机 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false); // 将队列绑定到交换机,并指定路由键 channel.QueueBind(queueName, exchangeName, routingKey); // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送消息到交换机,只有路由键完全匹配的队列才会收到消息 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchangeName, routingKey, properties, body: body); } ////// 发布消息(Topic Exchange 模式,适用于路由键模式匹配的消息分发)。 /// /// 队列名称。 /// 路由键。 /// 消息内容。 /// 绑定规则(可选)。 public void TopicSendMessage(string queueName, string routingKey, string message, IEnumerable? bindingKeys = null) { using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); using var channel = connection.CreateModel(); // 创建 Topic 类型的交换机 var exchangeName = $"{queueName}_topic_exchange"; channel.ExchangeDeclare(exchangeName, ExchangeType.Topic); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false); // 将队列绑定到交换机,并指定绑定规则 if (!bindingKeys.IsNullOrEmpty()) { foreach (string bindingKey in bindingKeys) { channel.QueueBind(queueName, exchangeName, bindingKey); } } // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送消息到交换机,只有路由键与绑定规则匹配的队列才会收到消息 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchangeName, routingKey, properties, body: body); } ////// 发布消息(Topic Exchange 模式,适用于路由键模式匹配的消息分发)。 /// /// 交换机名称。 /// 队列名称。 /// 路由键。 /// 消息内容。 /// 绑定规则(可选)。 public void TopicSendMessage(string exchangeName, string queueName, string routingKey, string message, IEnumerable? bindingKeys = null) { using var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); using var channel = connection.CreateModel(); // 创建 Topic 类型的交换机 channel.ExchangeDeclare(exchangeName, ExchangeType.Topic); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false); // 将队列绑定到交换机,并指定绑定规则 if (!bindingKeys.IsNullOrEmpty()) { foreach (string bindingKey in bindingKeys) { channel.QueueBind(queueName, exchangeName, bindingKey); } } // 设置消息持久化,确保消息在 RabbitMQ 重启后不会丢失 var properties = channel.CreateBasicProperties(); properties.Persistent = true; // 发送消息到交换机,只有路由键与绑定规则匹配的队列才会收到消息 var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchangeName, routingKey, properties, body: body); } }
五、实现 RabbitMQ 消费者
接下来我们来实现一个 RabbitMQ 消费者,用于发送消息到队列。创建一个名为 RabbitMQConsumer 的类:
////// RabbitMQ 消费者,用于从 RabbitMQ 队列或交换机中消费消息。 /// public class RabbitMQConsumer { private readonly ILogger _logger; private readonly RabbitMQServiceOptions _options; ////// 初始化 RabbitMQ 消费者。 /// /// 日志记录器。 /// RabbitMQ 连接配置。 public RabbitMQConsumer(ILogger logger, RabbitMQServiceOptions options) { _logger = logger; _options = options; } ////// 简单消费者(简单模式,适用于单生产者和单消费者)。 /// /// 队列名称。 /// 消息处理逻辑。 public void SimpleConsumer(string queueName, Action handler) { var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); var channel = connection.CreateModel(); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // 创建消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { // 处理消息 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); handler(message); }; // 开始消费,自动确认消息 channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); } ////// 消费者(工作队列模式,适用于多消费者负载均衡)。 /// /// 队列名称。 /// 消息处理逻辑。 public void WorkConsumer(string queueName, Func handler) { var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); var channel = connection.CreateModel(); // 声明队列(如果不存在则创建),并设置为持久化 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false); // 设置限流,避免消费者一次性接收过多消息 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); // 创建消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { // 处理消息 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); var result = handler(message); // 如果消息处理成功,手动确认消息 if (result) { channel.BasicAck(ea.DeliveryTag, false); } }; // 开始消费,手动确认消息 channel.BasicConsume(queueName, autoAck: false, consumer: consumer); } ////// 消费者(发布/订阅模式,适用于广播消息到所有绑定队列)。 /// /// 交换机名称。 /// 队列名称。 /// 消息处理逻辑。 public void PubSubConsumer(string exchangeName, string queueName, Action handler) { var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); var channel = connection.CreateModel(); // 声明 Fanout 类型的交换机 channel.ExchangeDeclare(exchange: exchangeName, type: "fanout"); // 声明队列(如果不存在则创建),并设置为持久化 var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // 将队列绑定到交换机 channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: ""); // 创建消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { // 处理消息 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); handler(message); // 手动确认消息 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; // 开始消费,自动确认消息 channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); } ////// 消费者(路由模式,适用于路由键完全匹配的消息分发)。 /// /// 交换机名称。 /// 队列名称。 /// 路由键。 /// 消息处理逻辑。 public void RoutingConsumer(string exchangeName, string queueName, string routingKey, Action handler) { var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); var channel = connection.CreateModel(); // 声明 Direct 类型的交换机 channel.ExchangeDeclare(exchange: exchangeName, type: "direct"); // 声明队列(如果不存在则创建),并设置为持久化 var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // 将队列绑定到交换机,并指定路由键 channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey); // 创建消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { // 处理消息 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); handler(message); // 手动确认消息 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; // 开始消费,自动确认消息 channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); } ////// 消费者(主题模式,适用于路由键模式匹配的消息分发)。 /// /// 交换机名称。 /// 队列名称。 /// 路由键。 /// 消息处理逻辑。 public void TopicConsumer(string exchangeName, string queueName, string routingKey, Action handler) { var connection = RabbitMQContext.GetConnection(_options.Host, _options.Port, _options.UserName, _options.Password); var channel = connection.CreateModel(); // 声明 Topic 类型的交换机 channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Topic); // 声明队列(如果不存在则创建),并设置为持久化 var queueNameResult = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); // 将队列绑定到交换机,并指定路由键 channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingKey); // 创建消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { // 处理消息 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); handler(message); // 手动确认消息 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; // 开始消费,手动确认消息 channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); } }
六、整合到.Net6的注入依赖:
在 .NET 应用中,通常我们会使用依赖注入来管理服务的生命周期。以下是一个扩展类,用于将 RabbitMQ 的生产者和消费者注册到服务集合中:
////// RabbitMQ 服务集合扩展类,用于将 RabbitMQ 客户端和监听器添加到依赖注入容器。 /// public static class RabbitMQServiceCollectionExtensions { ////// 添加 RabbitMQ 服务到服务集合。 /// /// 服务集合。 /// 服务集合。 /// 当配置选项无效时抛出。 public static IServiceCollection AddRabbmitMQ(this IServiceCollection services) { // 从容器中获取配置的 RabbitMQServiceOptions var serviceProvider = services.BuildServiceProvider(); var serviceOptions = serviceProvider.GetRequiredService>().Value; // 验证服务选项是否有效 if (serviceOptions == null || serviceOptions.Host.IsNullOrEmpty() || serviceOptions.Port new RabbitMQProducer( sp.GetRequiredService>(), serviceOptions)); // 注册 RabbitMQ消费端作为单例服务 services.AddSingleton(sp => new RabbitMQConsumer( sp.GetRequiredService>(), serviceOptions)); return services; } }
七、注入 RabbitMQ到程序(Program):
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
// 加载 RabbitMQ 配置
builder.Services.Configure(builder.Configuration.GetSection("RabbitMQ"));
// 添加 RabbitMQ 客户端和监听器
builder.Services.AddRabbmitMQ();
var app = builder.Build();
// Configure the HTTP request pipeline.
app.UseAuthorization();
app.MapControllers();
app.Run();
八、生产者的Demo:
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
class Program
{
static void Main(string[] args)
{
// 配置 RabbitMQ 连接选项
var options = new RabbitMQServiceOptions
{
Host = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest"
};
// 创建日志记录器(这里使用控制台日志)
using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger();
// 创建 RabbitMQ 生产者
var producer = new RabbitMQProducer(logger, options);
// 发送工作队列消息
producer.WorkQueueSendMessage("work_queue", "Hello Work Queue");
// 发送简单队列消息
producer.SimpleSendMessage("simple_queue", "Hello Simple Queue");
// 发送 Fanout 交换机消息(自动生成交换机名称)
producer.FanoutSendMessage("fanout_queue", "Hello Fanout Queue");
// 发送 Fanout 交换机消息(指定交换机名称)
producer.FanoutSendMessage("my_fanout_exchange", "fanout_queue", "Hello Fanout Queue");
// 发送 Direct 交换机消息(自动生成交换机名称)
producer.DirectSendMessage("direct_queue", "Hello Direct Queue");
// 发送 Direct 交换机消息(指定交换机名称和路由键)
producer.DirectSendMessage("my_direct_exchange", "direct_queue", "direct_key", "Hello Direct Queue");
// 发送 Topic 交换机消息(自动生成交换机名称)
producer.TopicSendMessage("topic_queue", "topic.key", "Hello Topic Queue", new List { "topic.*" });
// 发送 Topic 交换机消息(指定交换机名称和路由键)
producer.TopicSendMessage("my_topic_exchange", "topic_queue", "topic.key", "Hello Topic Queue", new List { "topic.*" });
Console.WriteLine("消息发送完成!");
}
}
-
通过以上示例代码,你可以轻松地在 .NET 6 中使用 RabbitMQ 实现多种消息发送模式。每种模式都有其特定的应用场景,例如:
:适用于多消费者负载均衡。 - 工作队列模式
- 简单模式:适用于单生产者和单消费者。
- Fanout 交换机模式:适用于广播消息到所有绑定队列。
- Direct 交换机模式:适用于路由键完全匹配的消息分发。
- Topic 交换机模式:适用于路由键模式匹配的消息分发。
九、消费者Demo:
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using System;
using System.Text;
class Program
{
static void Main(string[] args)
{
// 配置 RabbitMQ 连接选项
var options = new RabbitMQServiceOptions
{
Host = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest"
};
// 创建日志记录器(这里使用控制台日志)
using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger();
// 创建 RabbitMQ 消费者
var consumer = new RabbitMQConsumer(logger, options);
// 简单消费者
consumer.SimpleConsumer("simple_queue", message =>
{
Console.WriteLine($"接收到简单队列消息: {message}");
});
// 工作队列消费者
consumer.WorkConsumer("work_queue", message =>
{
Console.WriteLine($"接收到工作队列消息: {message}");
return true; // 处理成功,手动确认消息
});
// 发布/订阅消费者
consumer.PubSubConsumer("my_fanout_exchange", "fanout_queue", message =>
{
Console.WriteLine($"接收到发布/订阅消息: {message}");
});
// 路由消费者
consumer.RoutingConsumer("my_direct_exchange", "direct_queue", "direct_key", message =>
{
Console.WriteLine($"接收到路由消息: {message}");
});
// 主题消费者
consumer.TopicConsumer("my_topic_exchange", "topic_queue", "topic.key", message =>
{
Console.WriteLine($"接收到主题消息: {message}");
});
Console.WriteLine("消费者已启动,等待接收消息...");
Console.ReadLine(); // 保持程序运行
}
}
-
通过以上示例代码,你可以轻松地在 .NET 6 中使用 RabbitMQ 实现多种消息消费模式。每种模式都有其特定的应用场景,例如:
:适用于单生产者和单消费者。 - 简单消费者
- 工作队列消费者:适用于多消费者负载均衡。
- 发布/订阅消费者:适用于广播消息到所有绑定队列。
- 路由消费者:适用于路由键完全匹配的消息分发。
- 主题消费者:适用于路由键模式匹配的消息分发。
到此这篇关于.NET6+中使用RabbitMQ详细指南的文章就介绍到这了,更多相关.net使用rabbitmq内容请搜索IT俱乐部以前的文章或继续浏览下面的相关文章希望大家以后多多支持IT俱乐部!
