RabbitMQ Publish/Subscribe

到目前為止我們已經學會了 RabbitMQ 中最基礎的應用,在上一篇文章中這種一個 Queue 搭配多個 Consumer 的模式叫做 Competing Consumers Pattern ,不過就如同它名稱所示 消費者競爭模式 每一個 Consumer 都是競爭關係代表信息被某一個 Consumer 取得之後其它的 Consumer 就再也沒有機會知道這條信息裡面有什麼內容。

但是今天如果我們想要同時分享一條信息給所有 Consumer讓所有 Consumer 可以處理同一條信息,我們就需要改用發布訂閱模式。

那麼什麼樣的場合會需要使用這樣的技術?例如在體育賽是的比分中我們需要將現場比分快速的分享給所有 Consumer 就可以用到這種技術。


Exchange

不過依照目前了解的知識還沒有辦法做到這件事情因此我們還需要先學習什麼是 Exchange

我們現在已經了解什麼是 Producer, Consumer, Queue 當我們的 Producer 在產生信息時需要明確寫入到特定的 Queue,之後 Consumer 需要明確指定要讀取那一個 Queue

由於信息一旦 Dequeue 後就沒辦法被其他人讀取了,所以想要做到分享最直觀的做法就是為每一個 Consumer 都建立一個新的 Queue, 例如 Consumer1 連接時就幫它分配 Queue1, Consumer2 連接時就幫它分配 Queue2

但是 Producer 要寫入信息就會碰到困難了因為 Consumer 數量是運行期間動態產生的所以沒辦法在開發期間知道要寫到哪一個 Queue, 所以我們之前開發中使用方法沒辦法適用。

因此這裡導入了 Exchange 這個概念,基本上就是在 ProducerQueue 之間多了一層 Exchange 來達到發布/訂閱的功能。

Exchange 從字面上來看就是交換的意思,經常會在兌換貨幣時看到這個詞,概念其實也差不多。例如你想要將台幣一萬元兌換成美元,你只要將這個 目標跟兌換所說明他們就會幫你處理,同樣的在 RabbitMQ 中你也只需要跟 Exchange 說明你的目標它就會把你的信息轉送到特定的 Queue。 就像兌換所有提供許多種貨幣轉換的選擇,Exchange 也有 direct, topic, headersfanout 這四種模式可以使用。


Fanout

這篇文章會先討論比較簡單的 fanout 模式,接下來我們開啟之前建立的專案並且修改一下 Send 專案。

首先我們需要建立一個 Exchange 並選擇 Fanout 模式

channel.ExchangeDeclare("score_exchange", ExchangeType.Fanout);

之前原本 Exchange 是直接設定成 string.Empty,這邊要改成我們剛剛建立的 Exchange 因為 Fanout 模式會無視條件發送到所有綁定的 Queue, 原本這邊 routingKey 是直接指定到之前建立好的 Queue,這邊要設定成 string.Empty

channel.BasicPublish(exchange: "score_exchange",
    routingKey: string.Empty,
    mandatory: true,
    basicProperties: null,
    body: body);

Send 專案這樣就修改完成了,我們之後發送的信息將會直接送給 score_exchange 並且會發送給所有綁定到 ExchangeQueue

這裡需要考慮一個問題如果目前 Exchange 內沒有綁定任何的 Queue 會發生什麼事情?在上方有個設定值 mandatory 當這個值為 false 時 如果沒有任何 Queue 會把目前傳入的信息捨棄掉,設定為 true 時則會傳回一個 BasicReturnEventProducer 讓我們可以寫一個 報錯日誌或者再次重新發送。

channel.BasicReturn += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(message);
    Console.WriteLine(" No Binding Queue");
    Console.WriteLine(" [x] Done");
};

接著修改 Receive 專案,流程也差不多需要指定 Consumer 要用哪一個 Exchange,不過還需要額外設定 QueueExchange 的關聯性。 我們在開始有提到我們並不清楚未來會有多少 Consumer 因此我們可以在建立 Queue 時不指定 Queue 的名稱代表讓 RabbitMQ 幫我們決定 Queue 的名稱, 建立後我們把這個動態生成的 QueueExchange 綁定在一起,之後就可以收到 Exchange 轉發給 Queue 的信息了。

channel.ExchangeDeclare("score_exchange", ExchangeType.Fanout);
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queueName, "score_exchange", string.Empty);

channel.BasicConsume(
    queue: queueName,
    autoAck: true,
    consumer: consumer);

接下來我們先運行 send 專案並發送一條信息,因為目前並沒有任何 Queue 所以會運行 BasicReturnEvent 並顯示 console 提示

dotnet run "First message."

info: Services.Common.DefaultRabbitMQPersistentConnection[0]
      RabbitMQ Client is trying to connect
info: Services.Common.DefaultRabbitMQPersistentConnection[0]
      RabbitMQ Client acquired a persistent connection to 'localhost' and is subscribed to failure events
[x] Sent First message.
Press [enter] to exit.
First message.
No Binding Queue
[x] Done

接下來開啟兩個 shell 並運行 Receive 專案,可以開啟 RabbitMQ 後台會發現建立了兩個隨機名稱的 Queue 類似以下名稱

  • amq.gen-9GWIm2FJmKteuNY_xnE0uQ
  • amq.gen-mfDFASp4FmDK2KiX-1YsMw

接下來再次發送信息 Exchange 就會同時轉送到以上兩個 Queue,最後 Consumer 會會讀取自己專屬的信息並執行任務

# shell 1
info: Services.Common.DefaultRabbitMQPersistentConnection[0]
      RabbitMQ Client is trying to connect
info: Services.Common.DefaultRabbitMQPersistentConnection[0]
      RabbitMQ Client acquired a persistent connection to 'localhost' and is subscribed to failure events
[*] Waiting for messages.
Press [enter] to exit.
[x] Received First message.
[x] Done
# shell 2
info: Services.Common.DefaultRabbitMQPersistentConnection[0]
      RabbitMQ Client is trying to connect
info: Services.Common.DefaultRabbitMQPersistentConnection[0]
      RabbitMQ Client acquired a persistent connection to 'localhost' and is subscribed to failure events
[*] Waiting for messages.
Press [enter] to exit.
[x] Received First message.
[x] Done

Summary

今天學習 RabbitMQ 內提供的發布訂閱模式,並且了解 Fanout 類型會直接將所有信息分享給所有訂閱的 Queue,接下來還有其它種模式會在之後的文章討論

今天的進度 Github