通過 .NET 學習 RabbitMQ 建立可靠的訊息佇列系統:解耦 通過 .NET 學習 RabbitMQ 建立可靠的訊息佇列系統:解耦

Published on Monday, May 22, 2023

RabbitMQ Basic

這個系列會重新複習 .NET 中 RabbitMQ 的使用方式,首先我們使用 docker 建立測試 RabbitMQ 測試環境,需要注意 image 有兩個版本一種是自帶管理界面與另一種沒有界面的版本, 這次使用的 image 是 rabbitmq:3.11-management 是有界面的版本,RabbitMQ管理界面預設使用的 port 為 15672 RabbitMQ 服務為 5672

docker run -it --name myrabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management

接著打開瀏覽器進入管理界面 https://localhost:15672 可以使用預設的使用者進行登入 guest/guest

確定建立測試環境後我們來建立新專案,這邊參考 eShopOnContainers 額外建立一個專案 Services.Common 用來註冊 RabbitMQ 服務

mkdir rabbitmq-tutorial 
cd rabbitmq-tutorial
dotnet new sln -n RabbitMQTutorial
mkdir src
dotnet new console --output ./src/Send
dotnet new console --output ./src/Receive
dotnet new classlib --output ./src/Services.Common
dotnet sln add ./src/Send
dotnet sln add ./src/Receive
dotnet sln add ./src/Services.Common

dotnet add ./src/Services.Common/Services.Common.csproj package RabbitMQ.Client
dotnet add ./src/Send/Send.csproj reference ./src/Services.Common/Services.Common.csproj
dotnet add ./src/Receive/Receive.csproj reference ./src/Services.Common/Services.Common.csproj

在寫程式之前我們需要先了解 RabbitMQ 是如何建立與處理連線的,RabbitMQ 當中有兩種連線功能上看起來非常相似分別為 connectionchannel

  • connection: 為實際建立的 TCP 連線並且為『持久連接』也就是說每次要操作 RabbitMQ 會重複使用連線以達到更高的效率,當 Client 端完成工作時應該要 盡快的將 connection 釋放掉,詳細內容可以參考connection 文檔
  • channel: 因為在 Multi-Threads 環境底下如果為每一個 Thread 都建立 TCP 連線是非常不好的作法,所以才產生出 channel 的概念來達到多路復用, 建議是為每一個 Thread 都建立一個 channel 並且每一個 channel 都會分配一個 ID 所以每個 channel 都是獨立的並不會互相影響,最重要的是在建立 channel 之前必須要先建立 connection, 同理可知當我們關掉 connection 時所有相關的 channel 也會被關閉,詳細內容可以參考channel 文檔

了解基礎知識後首先我們建立一個 interface IRabbitMQPersistentConnection 會與我們之後建立的 Class 會註冊為 Singleton 確保之後可以共用一條 connection 才不會 建立過多 TCP 連線造成回應緩慢等問題

cd ./src/Services.Common/
rm Class1.cs
dotnet new interface -n IRabbitMQPersistentConnection

這裡定義了三個界面方法:

  • IsConnected: 用來確認目前是否有成功建立的 Connection
  • TryConnect: 如果 IsConnected 回傳是 false 則需要呼叫此方法建立新的連線
  • CreateModel: 這個方法則用用來建立新的 channel 在呼叫這個方法之前需要確定 IsConnected 為 true 才能繼續建立 channel
public interface IRabbitMQPersistentConnection : IDisposable
{
    bool IsConnected { get; }

    bool TryConnect();

    IModel CreateModel();
}

接著建立新的 class DefaultRabbitMQPersistentConnection 並且實做 IRabbitMQPersistentConnection 界面

dotnet new class -n DefaultRabbitMQPersistentConnection

這邊也是參考 eShopOnContainers 是如何實做此方法,之後只要將此 class 註冊為 Singleton 到系統內就能確保之後只會共用同一個 connection, 特別的是可以和 Polly 這個 package 整合在一起能夠達到失敗重連的效果

public class DefaultRabbitMQPersistentConnection : IRabbitMQPersistentConnection
{
    private readonly IConnectionFactory _connectionFactory;
    private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger;
    private readonly int _retryCount;
    private IConnection _connection;
    public bool Disposed;

    readonly object _syncRoot = new();

    public DefaultRabbitMQPersistentConnection(
        IConnectionFactory connectionFactory,
        ILogger<DefaultRabbitMQPersistentConnection> logger,
        int retryCount = 5)
    {
        _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
        _retryCount = retryCount;
    }

    public bool IsConnected => _connection is { IsOpen: true } && !Disposed;

    public IModel CreateModel()
    {
        if (!IsConnected)
        {
            throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
        }

        return _connection.CreateModel();
    }

    public void Dispose()
    {
        if (Disposed) return;

        Disposed = true;

        try
        {
            _connection.ConnectionShutdown -= OnConnectionShutdown;
            _connection.CallbackException -= OnCallbackException;
            _connection.ConnectionBlocked -= OnConnectionBlocked;
            _connection.Dispose();
        }
        catch (IOException ex)
        {
            _logger.LogCritical(ex.ToString());
        }
    }

    public bool TryConnect()
    {
        _logger.LogInformation("RabbitMQ Client is trying to connect");

        lock (_syncRoot)
        {
            var policy = RetryPolicy.Handle<SocketException>()
                .Or<BrokerUnreachableException>()
                .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
                    (ex, time) =>
                    {
                        _logger.LogWarning(ex, "RabbitMQ Client could not connect after {TimeOut}s", $"{time.TotalSeconds:n1}");
                    }
                );

            policy.Execute(() =>
            {
                _connection = _connectionFactory
                    .CreateConnection();
            });

            if (IsConnected)
            {
                _connection.ConnectionShutdown += OnConnectionShutdown;
                _connection.CallbackException += OnCallbackException;
                _connection.ConnectionBlocked += OnConnectionBlocked;

                _logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", _connection.Endpoint.HostName);

                return true;
            }
            else
            {
                _logger.LogCritical("Fatal error: RabbitMQ connections could not be created and opened");
                return false;
            }
        }
    }

    private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
    {
        if (Disposed) return;
        _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect...");
        TryConnect();
    }

    void OnCallbackException(object sender, CallbackExceptionEventArgs e)
    {
        if (Disposed) return;
        _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect...");
        TryConnect();
    }

    void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
    {
        if (Disposed) return;
        _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");
        TryConnect();
    }
}

因為剛剛建立的 Send 與 Receive 兩個 Console 專案都會需要注入剛剛建立的 DefaultRabbitMQPersistentConnection,所以為了方便起見我們建立一個 class 來存放共用的擴充方法

dotnet new class CommonExtensions
public static class CommonExtensions
{
    public static IServiceCollection AddEventBus(this IServiceCollection services, IConfiguration configuration)
    {
        var eventBusSection = configuration.GetSection("EventBus");
        if (!eventBusSection.Exists())
        {
            return services;
        }
        
        services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
        {
            var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
            var factory = new ConnectionFactory()
            {
                HostName = configuration.GetConnectionString("EventBus"),
                DispatchConsumersAsync = true
            };
            var retryCount = eventBusSection.GetValue("RetryCount", 5);
            return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
        });
        
        return services;
    }
}

連線相關的程式到這裡已經準備好了接下來處理 Send 專案,首先需要使用剛剛寫的擴充方法註冊連線方法並且建立新的 connection 與 channel, 並且使用 BasicPublish 方法發送一串 Hello World! 訊息到 RabbitMQ 中

using System.Text;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Services.Common;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddEventBus();
var provider = builder.Services.BuildServiceProvider();
var connection = provider.GetService<IRabbitMQPersistentConnection>();

if (!connection.IsConnected)
{
    connection.TryConnect();
}
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "hello",
    durable: false, 
    exclusive: false,
    autoDelete: false,
    arguments: null);
    
const string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: string.Empty,
    routingKey: "hello",
    mandatory: true,
    basicProperties: null,
    body: body);
Console.WriteLine($" [x] Sent {message}");

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

完成後將 Send 專案運行起來可以看到以下輸出內容代表已經成功發送信息到 RabbitMQ 中了,可以多運行幾次信息會堆積在 RabbitMQ 等待之後接收程式來接收

info: Services.Common.DefaultRabbitMQPersistentConnection[0]
      RabbitMQ Client is trying to connect
info: Services.Common.DefaultRabbitMQPersistentConnection[0]
      RabbitMQ Client acquired a persistent connection to 'localhost' and is subscribed to failure events
[x] Sent Hello World!
Press [enter] to exit.

最後是 Receive 專案,流程與 Send 專案差不多只不過後半段是使用 BasicConsume 方法接收信息,並且定義一個 event 說明接收到信息後要怎麼處理

using System.Text;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Services.Common;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddEventBus();
var provider = builder.Services.BuildServiceProvider();
var connection = provider.GetService<IRabbitMQPersistentConnection>();

if (!connection.IsConnected)
{
    connection.TryConnect();
}
using var channel = connection.CreateModel();

channel.QueueDeclare(queue: "hello",
    durable: false,
    exclusive: false,
    autoDelete: false,
    arguments: null);

Console.WriteLine(" [*] Waiting for messages.");

var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine($" [x] Received {message}");
    return Task.CompletedTask;
};

channel.BasicConsume(
    queue: "hello",
    autoAck: true,
    consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

完成後將 Receive 專案運行起來可以看到以下輸出內容,可以發現底下接收了剛剛使用 Send 專案發送三次的信息

info: Services.Common.DefaultRabbitMQPersistentConnection[0]
      RabbitMQ Client is trying to connect
info: Services.Common.DefaultRabbitMQPersistentConnection[0]
      RabbitMQ Client acquired a persistent connection to 'localhost' and is subscribed to failure events
[*] Waiting for messages.
Press [enter] to exit.
[x] Received Hello World!
[x] Received Hello World!
[x] Received Hello World!

Summary

本篇文章成功建立了發送與接收的專案學習到這樣的架構的彈性很大,不管有沒有人接收都可以持續發送信息暫存到 RabbitMQ 內,不像一般傳統的架構如果兩個系統 中壞了一個都會影響到整體的運行這就是 RabbitMQ 最主要的功能之一『解耦』。

今天的進度 Github