RabbitMQ Asynchronous Task

在上一篇文章內容中了解到我們能將信息傳送到 RabbitMQ 中,之後有人需要時才會把信息轉交給他,如果沒有人需要則會繼續存放在 RabbitMQ 內繼續等待接收。 這些操作在 RabbitMQ 當中有各自的專有名詞,首先是傳送到信息 RabbitMQ 中的系統通常稱為 producer 與需要取得 RabbitMQ 中信息的系統稱為 consumer 和 RabbitMQ 將信息保存的地方稱為 Queue

所以更正式一點的說法是昨天建立的 Send 專案是一個 producer 用途是用來產生信息並發送到 RabbitMQ 的 Queue 當中等待接收,最後 Receive 專案會訂閱目標 QueueQueue 內部有新信息時則發送給訂閱者也就是 Receive 專案如果沒有新信息 Receive 專案會繼續等待。

今天來看一下使用 RabbitMQ 的另一個場景『異步任務』,例如在導出報表時背後可能因為數據量過大或者需要轉檔因此須要執行一段時間才能回應給會員, 這時如果使用 RabbitMQ 就可以把導出操作視為一個任務讓它自己在背後運行,在導出期間我們可以自由的操作其他功能並不會有網頁凍住讓會員有不好的體驗。

先開啟昨天的方案修改一下 Send 專案可以根據之後傳入的參數傳送對應的內容

var message = GetMessage(args);

static string GetMessage(string[] args)
{
    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
}

接著修改 Receive 專案會查看信息傳入的 . 的數量來決定 sleep 的秒數來模擬程式在背後運行的效果

var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine($" [x] Received {message}");
    int dots = message.Split('.').Length - 1;
    Thread.Sleep(dots * 1000);
    Console.WriteLine(" [x] Done");
    return Task.CompletedTask;
};

完成後我們運行 Send 專案發送五個信息到 RabbitMQ Queue

cd src/Send
dotnet run "First message."
dotnet run "Second message.."
dotnet run "Third message..."
dotnet run "Fourth message...."
dotnet run "Fifth message....."

接著運行 Receive 專案等待一段時間會依序執行任務

cd src/Receive
dotnet run

info: Services.Common.DefaultRabbitMQPersistentConnection[0]
      RabbitMQ Client is trying to connect
info: Services.Common.DefaultRabbitMQPersistentConnection[0]
      RabbitMQ Client acquired a persistent connection to 'localhost' and is subscribed to failure events
 [*] Waiting for messages.
 Press [enter] to exit.
 [x] Received First message.
 [x] Done
 [x] Received Second message..
 [x] Done
 [x] Received Third message...
 [x] Done
 [x] Received Fourth message....
 [x] Done
 [x] Received Fifth message.....
 [x] Done

不過目前程式會有一個大問題,當我們 Receive 專案異常時我們的信息會丟失掉,你可以再次發送五個任務並且在 Receive 專案執行第一個任務期間直接用 Ctrl + C 中斷運行,並且再次運行 Receive 專案你會發現後面第二到第五的任務會沒有運行於是任務已經丟失掉了,如果在支付的場景下這樣會造成會員款項丟失的問題。

為了解決這個問題 RabbitMQ 有提出一個功能叫做 Message acknowledgment 簡稱為 ack 我們之後也會很常看到 ack 這個詞,從字面上來看是代表 承認或認可信息屬實的意思,我們在 Receive 專案下方會看到 autoAck: true 這個設定,代表我們一旦將 Queue 的信息交給 consumer 後就直接把 信息從 Queue 裡面刪除,並不會管 consumer 是否有正常完成任務。

所以我們可以知道 autoAck: true 這個設定並不是很安全,我們可以將此設定值改成 false 這時的流程就會改變成發送給 consumerQueue 裡面的 信息不會直接刪除,取而代之我們必須在 consumer 執行完成後額外跟 Queue 通知說任務已經完成,最後 Queue 才會安心的把任務刪除。

我們需要將 Receive 專案的 autoAck 修改成 false,並且在 Received Event 執行完成後手動通知也就是使用 BasicAck 這個方法

var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine($" [x] Received {message}");
    int dots = message.Split('.').Length - 1;
    Thread.Sleep(dots * 1000);
    Console.WriteLine(" [x] Done");
    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false ); 
    return Task.CompletedTask;
};

channel.BasicConsume(
    queue: "hello",
    autoAck: false,
    consumer: consumer);

修改後再次發送五個任務到 Queue 當中,並且同樣也在 Receive 專案運行任務時直接中斷,不過這次再次運行後會發現會繼續從剛剛中斷的任務繼續執行, 這樣就不會有丟失信息的問題了。


持久化

雖然已經解決資料丟失的問題,不過還有另一個可能就是 RabbitMQ 發生異常那麼目前存在記憶體內的信息也會丟失掉,值這時我們就需要開啟 RabbitMQ 的信息持久化功能, 類似 Docker 中持久化的概念,我們可以在收到信息時同時寫一份到硬碟內部這樣一來如果 RabbitMQ 的機器當機了,只要快速將機器重啟後 RabbitMQ 會去硬碟內部 讀取之前的資料回到到當機前的狀態。

要使用持久化的功能需要重新建立一個新的 Queue 並且設定 durable 參數,並且修改之前所有的設定改用新的 Queue,需要注意的雖然 Queue 已經設定為持久了 不過這樣還不夠,因為信息同時也要開啟持久化設定才會將信息保存到硬碟內,把 Queue 設定為持久並不代表信息是持久的同樣如果只有把信息設定為持久 Queue 沒有設定, 那麼也會被認為是臨時 Queue 只要 RabbitMQ 一關閉 Queue 也會被直接刪除。

channel.QueueDeclare(queue: "task_queue",
    durable: true, 
    exclusive: false,
    autoDelete: false,
    arguments: null);
    
...

var properties = channel.CreateBasicProperties();
properties.Persistent = true; // persistent
    
channel.BasicPublish(exchange: string.Empty,
    routingKey: "task_queue",
    mandatory: true,
    basicProperties: properties,
    body: body);
channel.QueueDeclare(queue: "task_queue",
    durable: true,
    exclusive: false,
    autoDelete: false,
    arguments: null);
    
...
    
channel.BasicConsume(
    queue: "task_queue",
    autoAck: false,
    consumer: consumer);

測試的方法也很簡單你可以再次發送五個信息,然後直接重啟 Docker 容器來模擬 RabbitMQ 異常,重啟後如果運行 Receive 專案還能繼續接收任務就代表一開始的 信息已經成功持久化了。

不過這樣的作法還是會有非常低的機率會丟失信息,就是在寫入硬碟期間如果發生故障還是會丟失一小部份的信息,要解決這個問題需要在 producer 端的 channel 開啟 ConfirmSelect 這樣就會在發佈後再次去確認是否真的有確實寫入到硬碟內才能保證不會有任何信息丟失,但是開啟後性能會降低可以按照需求判斷是否需要開啟。


Summary

今天學習了 ack 和持久化的使用方式,基本上這兩個設定在正式環境是必須要使用才能保證信息的安全,不過目前是用的場景非常簡單只有用到一個 producer 和 一個 consumer 但是大多數環境可以同使用多個 consumer 來增加任務運行的速度之後的文章會根據這部份來討論。

今天的進度 Github