RabbitMQ RPC
今天是 RabbitMQ 基礎用法的最後一篇,之前在討論非同步任務了解到我們可以建立一個 Queue
並且背後建立多個 Worker
來分散執行的任務,這種模式有一個問題
那就是任務執行完成後發送信息的人沒有辦法知道任務已經完成或者是任務失敗等其他狀況,發送人還需要主動的去確認任務是否已經完成,例如一個報表導出的任務當你按下按鈕後
會發送一個信息任務到 Queue
中並且由 Worker
來執行任務,這時發送人可能就去操作網站其他功能了甚至也有忘記自己有發送報表導出的任務,之後當你回來查看結果時
最好的情況是報表已經導出完成可以直接下載,也有可能早就任務失敗或者卡住此時你又必須重新發送一次任務。
既然 Worker
只要綁定一組 Queue
之後只要有新信息會主動通知 Worker
來處理任務,那麼只要發送人也綁定一組特殊 Queue
之後 Worker
執行完任務後主動發送到這組特殊的 Queue
,
這樣發送人就可以在任務完成後收到通知。
首先我們修改一下 Receive
專案建立一個 rpc_queue
並關閉自動 Ack
與 Qos
讓每個 Worker 每次只能接收處理一個任務
channel.QueueDeclare(queue: "rpc_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new AsyncEventingBasicConsumer(channel);
channel.BasicConsume(
queue: "rpc_queue",
autoAck: false,
consumer: consumer);
接下來要處理接收信息後的 Event 我們需要在任務處理完成後發送一條完成信息給專門存放回應的 Queue
才能完成整個流程,要做到這件事情有兩種作法
第一個是為每一個發送者都專門建立一個回應 Queue
但是這種處理方式效能會比較差, 因此有第二種模式就是全部發送者都共用一個回應 Queue
並且搭配
使用 ReplyTo
與 CorrelationId
來避免發送者收到別的任務回應信息。
我們需要在信息發送之前就先確認好要回傳回應到哪一個 Queue
和哪一個發送者,這樣在 Worker
處理完後才能知道要回應給誰
ReplyTo
代表需要專門保存回應的Queue
CorrelationId
為一組隨機Guid
用來確定該返回給發送者
接下來繼續修該 Receive
專案並修改 Event 處理方法,我們將在 finally block 新增回應信息的邏輯
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
var message = Encoding.UTF8.GetString(body);
replyProps.CorrelationId = props.CorrelationId;
try
{
Console.WriteLine($" [x] Received {message}");
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine($" [x] {message} Done");
}
catch (Exception e)
{
Console.WriteLine(e);
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(message + "Success!");
channel.BasicPublish(exchange: string.Empty,
routingKey: props.ReplyTo,
basicProperties: replyProps,
body: responseBytes);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
return Task.CompletedTask;
};
接下來處理 Send
專案這邊我們可以參考 RPC
的設計模式將方法包裝起來建立一個 RpcClient
Class 這樣就可以重複使用發送邏輯
cd ./src/Services.Common
dotnet new class RpcClient
using System.Collections.Concurrent;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Services.Common;
public class RpcClient : IDisposable
{
private const string QUEUE_NAME = "rpc_queue";
private readonly IRabbitMQPersistentConnection _connection;
private readonly IModel _channel;
private readonly string _replyQueueName;
private readonly ConcurrentDictionary<string, TaskCompletionSource<string>> _callbackMapper = new();
public RpcClient(IRabbitMQPersistentConnection connection)
{
_connection = connection;
if (!_connection.IsConnected)
{
_connection.TryConnect();
}
_channel = _connection.CreateModel();
_replyQueueName = _channel.QueueDeclare().QueueName;
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
if (!_callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out var tcs))
return Task.CompletedTask;
var body = ea.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
tcs.TrySetResult(response);
return Task.CompletedTask;
};
_channel.BasicConsume(consumer: consumer,
queue: _replyQueueName,
autoAck: true);
}
public Task<string> CallAsync(string message, CancellationToken cancellationToken = default)
{
IBasicProperties props = _channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId;
props.ReplyTo = _replyQueueName;
var messageBytes = Encoding.UTF8.GetBytes(message);
var tcs = new TaskCompletionSource<string>();
_callbackMapper.TryAdd(correlationId, tcs);
_channel.BasicPublish(exchange: string.Empty,
routingKey: QUEUE_NAME,
basicProperties: props,
body: messageBytes);
Console.WriteLine($" [x] Sent {message}");
cancellationToken.Register(() => _callbackMapper.TryRemove(correlationId, out _));
return tcs.Task;
}
public void Dispose()
{
_connection.Dispose();
}
}
最後處理 Send
專案,這裡因為原本的發送邏輯已經搬到 RpcClient.cs
因此把就的邏輯刪除改成注入 RpcClient
來發送信息
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Services.Common;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddEventBus();
builder.Services.AddScoped<RpcClient, RpcClient>();
var provider = builder.Services.BuildServiceProvider();
var rpcClient = provider.GetService<RpcClient>();
var response = await rpcClient.CallAsync(string.Join("", args));
Console.WriteLine(" [x] Got {0}", response);
完成後運行 Send
和 Receive
專案並發送測試信息
# shell 1
cd src/Receive
dotnet run
# shell 2
cd src/Send
dotnet run "Fifth message....."
這邊使用的是模式一因此在發送信息後會為個發送者建立一個專用的回應 Queue
所以你可以打開 RabbitMQ 後台會看到臨時的 Queue
並且會在完成回應後直接刪除
# shell 1
[*] Waiting for messages.
Press [enter] to exit.
2[x] Received Fifth message.....
3[x] Fifth message..... Done
# shell 2
1[x] Sent Fifth message.....
4[x] Got Fifth message.....Success!
要改成模式二只需要修改一下 _replyQueueName
讓之後的回應都使用這個 Queue
即可
_replyQueueName = _channel.QueueDeclare("replyQueue", false, false, false).QueueName;
Summary
今天學習了怎麼透過 RabbitMQ 建立出 RPC 的功能,這樣的好處是開發者使用時並不會知道你背後的邏輯是怎麼實踐的並且使用起來 跟使用本地方法一樣,並且可以快速的完成一個分布式的架構我們在建立微服務時就可以多多利用這個模式就可以忽略麻煩的程式與程式之間的溝通問題。
今天的進度 Github