通過 .NET 學習 RabbitMQ 建立可靠的訊息佇列系統:RPC 模式 通過 .NET 學習 RabbitMQ 建立可靠的訊息佇列系統:RPC 模式

Published on Sunday, June 4, 2023

RabbitMQ RPC

今天是 RabbitMQ 基礎用法的最後一篇,之前在討論非同步任務了解到我們可以建立一個 Queue 並且背後建立多個 Worker 來分散執行的任務,這種模式有一個問題 那就是任務執行完成後發送信息的人沒有辦法知道任務已經完成或者是任務失敗等其他狀況,發送人還需要主動的去確認任務是否已經完成,例如一個報表導出的任務當你按下按鈕後 會發送一個信息任務到 Queue 中並且由 Worker 來執行任務,這時發送人可能就去操作網站其他功能了甚至也有忘記自己有發送報表導出的任務,之後當你回來查看結果時 最好的情況是報表已經導出完成可以直接下載,也有可能早就任務失敗或者卡住此時你又必須重新發送一次任務。

既然 Worker 只要綁定一組 Queue 之後只要有新信息會主動通知 Worker 來處理任務,那麼只要發送人也綁定一組特殊 Queue 之後 Worker 執行完任務後主動發送到這組特殊的 Queue, 這樣發送人就可以在任務完成後收到通知。

首先我們修改一下 Receive 專案建立一個 rpc_queue 並關閉自動 AckQos 讓每個 Worker 每次只能接收處理一個任務

channel.QueueDeclare(queue: "rpc_queue",
    durable: false,
    exclusive: false,
    autoDelete: false,
    arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

var consumer = new AsyncEventingBasicConsumer(channel);
channel.BasicConsume(
    queue: "rpc_queue",
    autoAck: false,
    consumer: consumer);

接下來要處理接收信息後的 Event 我們需要在任務處理完成後發送一條完成信息給專門存放回應的 Queue 才能完成整個流程,要做到這件事情有兩種作法 第一個是為每一個發送者都專門建立一個回應 Queue 但是這種處理方式效能會比較差, 因此有第二種模式就是全部發送者都共用一個回應 Queue 並且搭配 使用 ReplyToCorrelationId 來避免發送者收到別的任務回應信息。

我們需要在信息發送之前就先確認好要回傳回應到哪一個 Queue 和哪一個發送者,這樣在 Worker 處理完後才能知道要回應給誰

  • ReplyTo 代表需要專門保存回應的 Queue
  • CorrelationId 為一組隨機 Guid 用來確定該返回給發送者

接下來繼續修該 Receive 專案並修改 Event 處理方法,我們將在 finally block 新增回應信息的邏輯

consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var props = ea.BasicProperties;
    var replyProps = channel.CreateBasicProperties();
    var message = Encoding.UTF8.GetString(body);
    replyProps.CorrelationId = props.CorrelationId;

    try
    {
        Console.WriteLine($" [x] Received {message}");
        int dots = message.Split('.').Length - 1;
        Thread.Sleep(dots * 1000);
        Console.WriteLine($" [x] {message} Done");
    }
    catch (Exception e)
    {
        Console.WriteLine(e);
    }
    finally
    {
        var responseBytes = Encoding.UTF8.GetBytes(message + "Success!");
        channel.BasicPublish(exchange: string.Empty,
            routingKey: props.ReplyTo,
            basicProperties: replyProps,
            body: responseBytes);
        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    }

    return Task.CompletedTask;
};

接下來處理 Send 專案這邊我們可以參考 RPC 的設計模式將方法包裝起來建立一個 RpcClient Class 這樣就可以重複使用發送邏輯

cd ./src/Services.Common
dotnet new class RpcClient
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Services.Common;

public class RpcClient : IDisposable
{
    private const string QUEUE_NAME = "rpc_queue";

    private readonly IRabbitMQPersistentConnection _connection;
    private readonly IModel _channel;
    private readonly string _replyQueueName;
    private readonly ConcurrentDictionary<string, TaskCompletionSource<string>> _callbackMapper = new();

    public RpcClient(IRabbitMQPersistentConnection connection)
    {
        _connection = connection;
        
        if (!_connection.IsConnected)
        {
            _connection.TryConnect();
        }
        
        _channel = _connection.CreateModel();
        _replyQueueName = _channel.QueueDeclare().QueueName;
        
        var consumer = new AsyncEventingBasicConsumer(_channel);
        consumer.Received += (model, ea) =>
        {
            if (!_callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out var tcs))
                return Task.CompletedTask;
            var body = ea.Body.ToArray();
            var response = Encoding.UTF8.GetString(body);
            tcs.TrySetResult(response);
            
            return Task.CompletedTask;
        };

        _channel.BasicConsume(consumer: consumer,
            queue: _replyQueueName,
            autoAck: true);
    }
    
    public Task<string> CallAsync(string message, CancellationToken cancellationToken = default)
    {
        IBasicProperties props = _channel.CreateBasicProperties();
        var correlationId = Guid.NewGuid().ToString();
        props.CorrelationId = correlationId;
        props.ReplyTo = _replyQueueName;
        var messageBytes = Encoding.UTF8.GetBytes(message);
        var tcs = new TaskCompletionSource<string>();
        _callbackMapper.TryAdd(correlationId, tcs);

        _channel.BasicPublish(exchange: string.Empty,
            routingKey: QUEUE_NAME,
            basicProperties: props,
            body: messageBytes);
        
        Console.WriteLine($" [x] Sent {message}");

        cancellationToken.Register(() => _callbackMapper.TryRemove(correlationId, out _));
        return tcs.Task;
    }

    public void Dispose()
    {
        _connection.Dispose();
    }
}

最後處理 Send 專案,這裡因為原本的發送邏輯已經搬到 RpcClient.cs 因此把就的邏輯刪除改成注入 RpcClient 來發送信息

using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Services.Common;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddEventBus();
builder.Services.AddScoped<RpcClient, RpcClient>();
var provider = builder.Services.BuildServiceProvider();
var rpcClient = provider.GetService<RpcClient>();
var response = await rpcClient.CallAsync(string.Join("", args));
Console.WriteLine(" [x] Got {0}", response);

完成後運行 SendReceive 專案並發送測試信息

# shell 1
cd src/Receive
dotnet run
# shell 2
cd src/Send
dotnet run "Fifth message....."

這邊使用的是模式一因此在發送信息後會為個發送者建立一個專用的回應 Queue 所以你可以打開 RabbitMQ 後台會看到臨時的 Queue 並且會在完成回應後直接刪除

# shell 1
[*] Waiting for messages.
Press [enter] to exit.
2[x] Received Fifth message.....
3[x] Fifth message..... Done
# shell 2
1[x] Sent Fifth message.....
4[x] Got Fifth message.....Success!

要改成模式二只需要修改一下 _replyQueueName 讓之後的回應都使用這個 Queue 即可

_replyQueueName = _channel.QueueDeclare("replyQueue", false, false, false).QueueName;

Summary

今天學習了怎麼透過 RabbitMQ 建立出 RPC 的功能,這樣的好處是開發者使用時並不會知道你背後的邏輯是怎麼實踐的並且使用起來 跟使用本地方法一樣,並且可以快速的完成一個分布式的架構我們在建立微服務時就可以多多利用這個模式就可以忽略麻煩的程式與程式之間的溝通問題。

今天的進度 Github