RabbitMQ Publish/Subscribe
到目前為止我們已經學會了 RabbitMQ 中最基礎的應用,在上一篇文章中這種一個 Queue
搭配多個 Consumer
的模式叫做 Competing Consumers Pattern
,不過就如同它名稱所示 消費者競爭模式
每一個 Consumer
都是競爭關係代表信息被某一個 Consumer
取得之後其它的 Consumer
就再也沒有機會知道這條信息裡面有什麼內容。
但是今天如果我們想要同時分享一條信息給所有 Consumer
讓所有 Consumer
可以處理同一條信息,我們就需要改用發布訂閱模式。
那麼什麼樣的場合會需要使用這樣的技術?例如在體育賽是的比分中我們需要將現場比分快速的分享給所有 Consumer
就可以用到這種技術。
Exchange
不過依照目前了解的知識還沒有辦法做到這件事情因此我們還需要先學習什麼是 Exchange
。
我們現在已經了解什麼是 Producer
, Consumer
, Queue
當我們的 Producer
在產生信息時需要明確寫入到特定的 Queue
,之後 Consumer
需要明確指定要讀取那一個 Queue
。
由於信息一旦 Dequeue
後就沒辦法被其他人讀取了,所以想要做到分享最直觀的做法就是為每一個 Consumer
都建立一個新的 Queue
,
例如 Consumer1
連接時就幫它分配 Queue1
, Consumer2
連接時就幫它分配 Queue2
。
但是 Producer
要寫入信息就會碰到困難了因為 Consumer
數量是運行期間動態產生的所以沒辦法在開發期間知道要寫到哪一個 Queue
,
所以我們之前開發中使用方法沒辦法適用。
因此這裡導入了 Exchange
這個概念,基本上就是在 Producer
和 Queue
之間多了一層 Exchange
來達到發布/訂閱的功能。
Exchange
從字面上來看就是交換的意思,經常會在兌換貨幣時看到這個詞,概念其實也差不多。例如你想要將台幣一萬元兌換成美元,你只要將這個
目標跟兌換所說明他們就會幫你處理,同樣的在 RabbitMQ 中你也只需要跟 Exchange
說明你的目標它就會把你的信息轉送到特定的 Queue
。
就像兌換所有提供許多種貨幣轉換的選擇,Exchange
也有 direct
, topic
, headers
和 fanout
這四種模式可以使用。
Fanout
這篇文章會先討論比較簡單的 fanout
模式,接下來我們開啟之前建立的專案並且修改一下 Send
專案。
首先我們需要建立一個 Exchange
並選擇 Fanout
模式
channel.ExchangeDeclare("score_exchange", ExchangeType.Fanout);
之前原本 Exchange
是直接設定成 string.Empty
,這邊要改成我們剛剛建立的 Exchange
因為 Fanout
模式會無視條件發送到所有綁定的 Queue
,
原本這邊 routingKey
是直接指定到之前建立好的 Queue
,這邊要設定成 string.Empty
。
channel.BasicPublish(exchange: "score_exchange",
routingKey: string.Empty,
mandatory: true,
basicProperties: null,
body: body);
Send
專案這樣就修改完成了,我們之後發送的信息將會直接送給 score_exchange
並且會發送給所有綁定到 Exchange
的 Queue
。
這裡需要考慮一個問題如果目前 Exchange
內沒有綁定任何的 Queue
會發生什麼事情?在上方有個設定值 mandatory
當這個值為 false 時
如果沒有任何 Queue
會把目前傳入的信息捨棄掉,設定為 true
時則會傳回一個 BasicReturnEvent
給 Producer
讓我們可以寫一個
報錯日誌或者再次重新發送。
channel.BasicReturn += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(message);
Console.WriteLine(" No Binding Queue");
Console.WriteLine(" [x] Done");
};
接著修改 Receive
專案,流程也差不多需要指定 Consumer
要用哪一個 Exchange
,不過還需要額外設定 Queue
與 Exchange
的關聯性。
我們在開始有提到我們並不清楚未來會有多少 Consumer
因此我們可以在建立 Queue
時不指定 Queue
的名稱代表讓 RabbitMQ 幫我們決定 Queue
的名稱,
建立後我們把這個動態生成的 Queue
跟 Exchange
綁定在一起,之後就可以收到 Exchange
轉發給 Queue
的信息了。
channel.ExchangeDeclare("score_exchange", ExchangeType.Fanout);
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queueName, "score_exchange", string.Empty);
channel.BasicConsume(
queue: queueName,
autoAck: true,
consumer: consumer);
接下來我們先運行 send
專案並發送一條信息,因為目前並沒有任何 Queue
所以會運行 BasicReturnEvent
並顯示 console 提示
dotnet run "First message."
info: Services.Common.DefaultRabbitMQPersistentConnection[0]
RabbitMQ Client is trying to connect
info: Services.Common.DefaultRabbitMQPersistentConnection[0]
RabbitMQ Client acquired a persistent connection to 'localhost' and is subscribed to failure events
[x] Sent First message.
Press [enter] to exit.
First message.
No Binding Queue
[x] Done
接下來開啟兩個 shell 並運行 Receive
專案,可以開啟 RabbitMQ 後台會發現建立了兩個隨機名稱的 Queue
類似以下名稱
- amq.gen-9GWIm2FJmKteuNY_xnE0uQ
- amq.gen-mfDFASp4FmDK2KiX-1YsMw
接下來再次發送信息 Exchange
就會同時轉送到以上兩個 Queue
,最後 Consumer
會會讀取自己專屬的信息並執行任務
# shell 1
info: Services.Common.DefaultRabbitMQPersistentConnection[0]
RabbitMQ Client is trying to connect
info: Services.Common.DefaultRabbitMQPersistentConnection[0]
RabbitMQ Client acquired a persistent connection to 'localhost' and is subscribed to failure events
[*] Waiting for messages.
Press [enter] to exit.
[x] Received First message.
[x] Done
# shell 2
info: Services.Common.DefaultRabbitMQPersistentConnection[0]
RabbitMQ Client is trying to connect
info: Services.Common.DefaultRabbitMQPersistentConnection[0]
RabbitMQ Client acquired a persistent connection to 'localhost' and is subscribed to failure events
[*] Waiting for messages.
Press [enter] to exit.
[x] Received First message.
[x] Done
Summary
今天學習 RabbitMQ 內提供的發布訂閱模式,並且了解 Fanout
類型會直接將所有信息分享給所有訂閱的 Queue
,接下來還有其它種模式會在之後的文章討論
今天的進度 Github