程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> RabbitMQ入門教程——發布/訂閱,rabbitmq入門教程

RabbitMQ入門教程——發布/訂閱,rabbitmq入門教程

編輯:C#入門知識

RabbitMQ入門教程——發布/訂閱,rabbitmq入門教程


什麼是發布訂閱

發布訂閱是一種設計模式定義了一對多的依賴關系,讓多個訂閱者對象同時監聽某一個主題對象。這個主題對象在自身狀態變化時,會通知所有的訂閱者對象,使他們能夠自動更新自己的狀態。

為了描述這種模式,我們將會構建一個簡單的日志系統。它包括兩個程序——第一個程序負責發送日志消息,第二個程序負責獲取消息並輸出內容。在我們的這個日志系統中,所有正在運行的接收方程序都會接受消息。我們用其中一個接收者(receiver)把日志寫入硬盤中,另外一個接受者(receiver)把日志輸出到屏幕上。最終,日志消息被廣播給所有的接受者(receivers)。

Exchanges

RabbitMQ消息模型的核心理念是生產者永遠不會直接發送任何消息給隊列,生產者只能發送消息給到exchange,exchange比較簡單,一邊從生產者就收消息,一邊把消息推送到隊列中。exchange必須清楚的知道消息應該按照什麼規則路由到對應的隊列中,而具體使用那種路由算法是由exchange type決定的。AMQP協議提供了四種交換機類型:

Name(交換機類型)

Default pre-declared names(預聲明的默認名稱)

Direct exchange(直連交換機)

(Empty string) and amq.direct

Fanout exchange(扇型交換機)

amq.fanout

Topic exchange(主題交換機)

amq.topic

Headers exchange(頭交換機)

amq.match (and amq.headers in RabbitMQ)

除交換機類型外,在聲明交換機時還可以附帶許多其他的屬性,其中最重要的幾個分別是:

  • Name
  • Durability (消息代理重啟後,交換機是否還存在)
  • Auto-delete (當所有與之綁定的消息隊列都完成了對此交換機的使用後,刪掉它)
  • Arguments(依賴代理本身)

    交換機可以有兩個狀態:持久(durable)、暫存(transient)。持久化的交換機會在消息代理(broker)重啟後依舊存在,而暫存的交換機則不會(它們需要在代理再次上線後重新被聲明)。然而並不是所有的應用場景都需要持久化的交換機。

    本文中具體講解下以下兩種交換機:直連交換機(前面幾個例子中使用的交換機類型),扇形交換機(本文中要使用的交換機類型)

    直連交換機

    直連交換機(direct exchange)可以使用消息攜帶的路由鍵(routing key)將消息投遞給對應的隊列中。用來處理消息的單播路由(unicast routing),也可以處理多播路由。

    那麼它具體是如何工作的呢

    • 將一個隊列綁定到某個交換機上,同時給該綁定指定一個路由鍵(routing key)
    • 當一個攜帶路由鍵為R的消息被發送到直連交換機時,交換機會把它路由給綁定值同樣為R的隊列。

    直連交換機經常用來循環分發任務給多個工作者,當這樣做時,一定要明白,這時消息的負載均衡是發生在消費者(consumer)之間的,而不是隊列(queue)中。

    直連交換機圖例:

    扇形交換機

    扇形交換機(funout exchange)將消息路由給綁定到它身上的所有隊列,不關心所綁定的路由鍵(routing key)。扇形交換機用來處理消息的廣播路由(broadcast routing)。

    由於扇形交換機投遞消息到所有綁定他的隊列,以下幾個場景比較適合使用扇形交換機:

    • 大規模多用戶在線(MMO)游戲可以使用它來處理排行榜更新等全局事件
    • 體育新聞網站可以用它來近乎實時地將比分更新分發給移動客戶端
    • 分發系統使用它來廣播各種狀態和配置更新
    • 在群聊的時候,它被用來分發消息給參與群聊的用戶。(AMQP沒有內置presence的概念,因此XMPP可能會是個更好的選擇)

    扇形交換機圖例

    創建exchange

     

                        channel.ExchangeDeclare(exchange: "log_exchange", //exchange 名稱

                            type: ExchangeType.Fanout, //exchange 類型

                            durable: false,

                            autoDelete: false,

                            arguments: null);

     

    臨時隊列

    之前的幾個示例中我們在為每一個聲名的隊列都指定了一個名字,因為我們希望consumer指向正確的隊列。當我們希望在生產者和消費者之間共享隊列時,為隊列命名就非常的重要了。

    不過我們要實現的日志系統只是想要得到所有的消息,而且只對當前正在傳遞的消息感興趣,並不關心隊列的名稱,所以為了滿足我們的需求,要做兩件事情:

    無論什麼時間連接到RabbitMQ我們都需要一個新的空的隊列。為了達到目的我們可以使用隨機數創建隊列,或讓服務器給我們提供一個隨機的名稱。

    一旦消費者與RabbitMQ斷開,消費者所接受的隊列都應該被自動刪除。

    創建臨時隊列

     

                        //創建一個未命名的新的消息隊列,

                        QueueDeclareOk queue = channel.QueueDeclare(queue: "", //隊列名稱,為空時有系統自動分配

                            durable: false,

                            exclusive: false,

                            autoDelete: true,//自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。

                            arguments: null);

                        //或

                        //queue = channel.QueueDeclare();

     

    綁定

    我們已經創建了一個扇型交換機(fanout)和一個隊列。現在我們需要告訴交換機如何發送消息給我們的隊列。交換器和隊列之間的聯系我們稱之為綁定(binding)

    創建交換機與隊列的關系

     

    //扇形交換機(funout exchange)將消息路由給綁定到它身上的所有隊列,不關心所綁定的路由鍵(routing key)

                        //fanout exchange不需要指定routing key 指定了也沒用

                        //通過綁定告訴exchange 需要發送消息到哪些消息隊列

                        channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);

     

    完整代碼:

    生產者  Pub_SubProducer.cs

     

    using System;

    using System.Collections.Generic;

    using System.Linq;

    using System.Text;

    using System.Threading.Tasks;

    using RabbitMQ.Client;

     

    namespace RabbitMQProducer

    {

        public class Pub_SubProducer

        {

            const string EXCHANGE_NAME = "log_exchange";

            const string ROUTING_KEY = "";

     

            //直接發送消息到交換機

            public static void Publish()

            {

                var factory = new ConnectionFactory()

                {

                    HostName = "127.0.0.1"

                };

                using (var connection = factory.CreateConnection())

                {

                    using (IModel channel = connection.CreateModel())

                    {

                        channel.ExchangeDeclare(exchange: EXCHANGE_NAME, //exchange 名稱

                            type: ExchangeType.Fanout, //exchange 類型

                            durable: false,

                            autoDelete: false,

                            arguments: null);

     

                        Parallel.For(1, 100, item =>

                        {

                            string message = $"日志內容{DateTime.Now.ToString()}";

                            channel.BasicPublish(exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, basicProperties: null, body: Encoding.UTF8.GetBytes(message));

                            Console.WriteLine(message);

                        });

     

                        Console.WriteLine(" Press [enter] to exit.");

                        Console.ReadLine();

                    }

                }

            }

        }

    }

     

    消費者 Pub_SubConsumer.cs

     

    using RabbitMQ.Client;

    using System;

    using System.Collections.Generic;

    using System.Linq;

    using System.Text;

    using System.Threading.Tasks;

    using RabbitMQ.Client.Events;

    using System.IO;

     

    namespace RabbitMQConsumer

    {

        public class Pub_SubConsumer

        {

            const string EXCHANGE_NAME = "log_exchange";

            const string ROUTING_KEY = "";

            //輸出到屏幕

            public static void Subscribe()

            {

                var factory = new ConnectionFactory()

                {

                    HostName = "127.0.0.1"

                };

                using (var connection = factory.CreateConnection())

                {

                    using (IModel channel = connection.CreateModel())

                    {

                        channel.ExchangeDeclare(exchange: EXCHANGE_NAME, //exchange 名稱

                            type: ExchangeType.Fanout, //exchange 類型

                            durable: false,

                            autoDelete: false,

                            arguments: null);

     

                        //創建一個未命名的新的消息隊列,

                        QueueDeclareOk queue = channel.QueueDeclare(queue: "", //隊列名稱,為空時有系統自動分配

                            durable: false,

                            exclusive: false,

                            autoDelete: true,//自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。

                            arguments: null);

                        //或

                        //queue = channel.QueueDeclare();

     

                        string queueName = queue.QueueName;

                        //扇形交換機(funout exchange)將消息路由給綁定到它身上的所有隊列,不關心所綁定的路由鍵(routing key)

                        //fanout exchange不需要指定routing key 指定了也沒用

                        //通過綁定告訴exchange 需要發送消息到哪些消息隊列

                        channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);

     

                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

                        consumer.Received += (sender, args) =>

                        {

                            string message = Encoding.UTF8.GetString(args.Body);

                            Console.WriteLine(message);

                        };

     

                        channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);

     

                        Console.WriteLine(" Press [enter] to exit.");

                        Console.ReadLine();

                    }

                }

            }

     

            /// <summary>

            /// 輸出到文件

            /// </summary>

            public static void SubscribeFile()

            {

                var factory = new ConnectionFactory()

                {

                    HostName = "127.0.0.1"

                };

                using (var connection = factory.CreateConnection())

                {

                    using (IModel channel = connection.CreateModel())

                    {

                        channel.ExchangeDeclare(exchange: EXCHANGE_NAME, //exchange 名稱

                            type: ExchangeType.Fanout, //exchange 類型

                            durable: false,

                            autoDelete: false,

                            arguments: null);

     

                        //創建一個未命名的新的消息隊列,

                        QueueDeclareOk queue = channel.QueueDeclare(queue: "", //隊列名稱,為空時有系統自動分配

                            durable: false,

                            exclusive: false,

                            autoDelete: true,//自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。

                            arguments: null);

                        //或

                        //queue = channel.QueueDeclare();

     

                        string queueName = queue.QueueName;

                        //扇形交換機(funout exchange)將消息路由給綁定到它身上的所有隊列,不關心所綁定的路由鍵(routing key)

                        //fanout exchange不需要指定routing key 指定了也沒用

                        //通過綁定告訴exchange 需要發送消息到哪些消息隊列

                        channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);

     

                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

                        consumer.Received += (sender, args) =>

                        {

                            string message = Encoding.UTF8.GetString(args.Body);

     

                            //寫入日志到txt文件

                            using (StreamWriter writer = new StreamWriter(@"c:\log\log.txt", true, Encoding.UTF8))

                            {

                                writer.WriteLine(message);

                                writer.Close();

                            }

     

                            Console.WriteLine(message);

                        };

     

                        channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);

     

                        Console.WriteLine(" Press [enter] to exit.");

                        Console.ReadLine();

                    }

                }

            }

        }

    }

     

    運行以上實例代碼發現,每個訂閱者實例 都能得到相同的內容。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved