本文實例為大家分享了RabbitMQ .NET消息隊列使用方法,供大家參考,具體內容如下
首先下載安裝包,我都環境是win7 64位:
去官網下載 otp_win64_19.0.exe 和rabbitmq-server-3.6.3.exe安裝好
然後開始編程了:
(1)創建生產者類:
class Program
{
private static void Main()
{
//建立RabbitMQ連接和通道
var connectionFactory = new ConnectionFactory
{
HostName = "127.0.0.1",
Port = 5672,
UserName = "guest",
Password = "guest",
Protocol = Protocols.DefaultProtocol,
AutomaticRecoveryEnabled = true, //自動重連
RequestedFrameMax = UInt32.MaxValue,
RequestedHeartbeat = UInt16.MaxValue //心跳超時時間
};
try
{
using (var connection = connectionFactory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
//創建一個新的,持久的交換區
channel.ExchangeDeclare("SISOExchange", ExchangeType.Direct, true, false, null);
//創建一個新的,持久的隊列, 沒有排他性,與不自動刪除
channel.QueueDeclare("SISOqueue", true, false, false, null);
// 綁定隊列到交換區
channel.QueueBind("SISOqueue", "SISOExchange", "optionalRoutingKey");
// 設置消息屬性
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; //消息是持久的,存在並不會受服務器重啟影響
//准備開始推送
//發布的消息可以是任何一個(可以被序列化的)字節數組,如序列化對象,一個實體的ID,或只是一個字符串
var encoding = new UTF8Encoding();
for (var i = 0; i < 10; i++)
{
var msg = string.Format("這是消息 #{0}?", i + 1);
var msgBytes = encoding.GetBytes(msg);
//RabbitMQ消息模型的核心思想就是,生產者不把消息直接發送給隊列。實際上,生產者在很多情況下都不知道消息是否會被發送到一個隊列中。取而代之的是,生產者將消息發送到交換區。交換區是一個非常簡單的東西,它一端接受生產者的消息,另一端將他們推送到隊列中。交換區必須要明確的指導如何處理它接受到的消息。是放到一個隊列中,還是放到多個隊列中,亦或是被丟棄。這些規則可以通過交換區的類型來定義。
//可用的交換區類型有:direct,topic,headers,fanout。
//Exchange:用於接收消息生產者發送的消息,有三種類型的exchange:direct, fanout,topic,不同類型實現了不同的路由算法;
//RoutingKey:是RabbitMQ實現路由分發到各個隊列的規則,並結合Binging提供於Exchange使用將消息推送入隊列;
//Queue:是消息隊列,可以根據需要定義多個隊列,設置隊列的屬性,比如:消息移除、消息緩存、回調機制等設置,實現與Consumer通信;
channel.BasicPublish("SISOExchange", "optionalRoutingKey", properties, msgBytes);
}
channel.Close();
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
Console.WriteLine("消息發布!");
Console.ReadKey(true);
}
}
(1)創建消費者類:
class Program
{
private static void Main()
{
// 建立RabbitMQ連接和通道
var connectionFactory = new ConnectionFactory
{
HostName = "127.0.0.1",
Port = 5672,
UserName = "guest",
Password = "guest",
Protocol = Protocols.AMQP_0_9_1,
RequestedFrameMax = UInt32.MaxValue,
RequestedHeartbeat = UInt16.MaxValue
};
using (var connection = connectionFactory.CreateConnection())
using (var channel = connection.CreateModel())
{
// 這指示通道不預取超過1個消息
channel.BasicQos(0, 1, false);
//創建一個新的,持久的交換區
channel.ExchangeDeclare("SISOExchange", ExchangeType.Direct, true, false, null);
//創建一個新的,持久的隊列
channel.QueueDeclare("sample-queue", true, false, false, null);
//綁定隊列到交換區
channel.QueueBind("SISOqueue", "SISOExchange", "optionalRoutingKey");
using (var subscription = new Subscription(channel, "SISOqueue", false))
{
Console.WriteLine("等待消息...");
var encoding = new UTF8Encoding();
while (channel.IsOpen)
{
BasicDeliverEventArgs eventArgs;
var success = subscription.Next(2000, out eventArgs);
if (success == false) continue;
var msgBytes = eventArgs.Body;
var message = encoding.GetString(msgBytes);
Console.WriteLine(message);
channel.BasicAck(eventArgs.DeliveryTag, false);
}
}
}
}
}
消費者--結果如圖:

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持幫客之家。