程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> 關於.NET >> 我的WCF之旅(12):使用MSMQ進行Reliable Messaging

我的WCF之旅(12):使用MSMQ進行Reliable Messaging

編輯:關於.NET

一、為什麼要使用MSMQ

在一個分布式的環境中,我們往往需要根據具體的情況采用不同的方式進行數據的傳輸。比如在一個Intranet內,我們一般通過TCP進行高效的數據通信;而在一個Internet的環境中,我們則通常使用Http進行跨平台的數據交換。而這些通信方式具有一個顯著的特點,那就是他們是基於Connection的,也就是說,交互雙方在進行通信的時候必須保證有一個可用的Connection存在於他們之間。而在某些時候,比如那些使用撥號連接的用戶、以及使用便攜式計算機的用戶,我們不能保證在他們和需要訪問的Server之間有一個的可靠的連接,在這種情況下,基於Messaging Queue的連接就顯得尤為重要了。我們今天就來談談在WCF中如何使用MSMQ。

MSMQ不僅僅是作為支持客戶端連接工具而存在,合理的使用MSMQ可以在很大程度上提升系統的Performance和Scalability。我們先來看看MSMQ能給我們帶來怎樣的好處:

1.MSMQ是基於Disconnection

MSMQ通過Message Queue進行通信,這種通信方式為離線工作成為了可能。比如在介紹MSMQ時都會提到的Order Delivery的例子:在一個基於B2C的系統中,訂單從各種各樣的客戶傳來,由於 客戶的各異性,不能保證每個客戶在每時每刻都和用於接收訂單的Server保持一個可靠的連接,我們有時候甚至允許客戶即使在離線的情況下也可以遞交訂單(雖然訂單不能發送到訂單的接收方,但是我們可以通過某種機制保證先在本地保存該訂單,一旦連接建立,則馬上向接收方遞交訂單),而MSMQ則有效地提供了這樣的機制:Server端建立一個Message Queue來接收來個客戶的訂單,客戶端通過向該Message Queue發送承載了訂單數據的Message實現訂單的遞交。如果在客戶離線的情況下,他仍然可以通過客戶端程序進行訂單遞交的操作,存儲著訂單數據的Message會被暫時保存在本地的Message Queue中,一旦客戶聯機,MSMQ將Message從中取出,發送到真正的接收方,而這個動作對於用戶的透明的。

2.MSMQ天生是One-way、異步的

在MSMQ中,Message始終以One-way的方式進行發送,所以MSMQ具有天生的異步特性。所以MSMQ使用於那些對於用戶的請求,Server端無需立即響應的場景。也就是說Server對數據的處理無需和Client的數據的發送進行同步,它可以獨自地按照自己的Schedule進行工作。這可以避免峰值負載。比如Server端可以在一個相對低負載的時段(比如深夜)來對接收到的Order進行批處理,而無需一天24小時一直進行Order的監聽、接收和處理。

3.MSMQ能夠提供高質量的Reliable Messaging

我們知道,在一般的情況下,如果Client端以異步的方式對Service進行調用就意味著:Client無法獲知Message是否成功抵達Service端;也不會獲得Service端執行的結果和出錯信息。但是我們仍然說MSMQ為我們提供了可靠的傳輸(Reliable Messaging),這主要是因為MSMQ為我們提供一些列Reliable Messaging的機制:

超時機制(Timeout):可以設置發送和接收的時間,超出該時間則被認為操作失敗。

確認機制(Acknowledgement):當Message成功抵達Destination Queue,或者被成功接收,向發送端發送一個Acknowledgement message用以確認操作的狀態。

日志機制(Journaling):當Message被發送或接收後,被Copy一份存放在Journal Queue中。

此外,MSMQ還提供了死信隊列(Dead letter Queue)用以保存發送失敗的message。這一切保證了保證了Reliable Messaging。

二、MSMQ在WCF的運用

在WCF中,MSMQ提供的數據傳輸功能被封裝在一個Binding中,提供WCF Endpoint之間、以及Endpoint和現有的基於MSMQ的Application進行通信的實現。為此WCF為我們提供了兩種不同的built-in binding:

NetMsmqBinding:從提供的功能和使用 方式上看,NetMsmqBinding和一般使用的binding,比如basicHttpBinding,netTcpBinding沒有什麼區別:在兩個Endpoint之間實現了數據的通信,所不同的是,它提供的是基於MSMQ的Reliable Messaging。從變成模式上看,和一般的binding完全一樣。

MsmqIntegrationBinding:從命名上我們可以看出,MsmqIntegrationBinding主要用於需要將我們的WCF Application和現有的基於MSMQ的Application集成的情況。MsmqIntegrationBinding實現了WCF Endpoint和某個Message Queue進行數據的通信,具體來說,就是實現了單一的向某個Message Queue 發送Message,和從某個Message Queue中接收Message的功能。從編程模式上看,也有所不同,比如Operation只接收一個MsmqMessage<T>的參數。

這是Client和Service通信的圖示:

三、MSMQ和Transaction

MSMQ提供對Transaction的支持。在一般的情況下,MSMQ通過Message Queue Transaction實現對Transaction的原生的支持,借助Message Queue Transaction,可以把基於一個或多個Message Queue的相關操作納入同一個Transaction中。

Message Queue Transaction僅僅限於基於Message Queue的操作,倘若操作涉及到另外一些資源,比如SQL Server, 則可以使用基於DTC的分布式Transaction。

對於WCF中MSMQ,由於Client和Service的相對獨立(可能Client發送Message到Service處理Message會相隔很長一段時間),所以Client和Service的操作只能納入不同的Transaction中,如下圖。

四、Sample1:NetMsmqBinding

我們首先做一個基於NetMsmqBinding Sample,實現的功能就是我們開篇所提出的Order Delivery。我們說過,NetMsmqBinding和一般的binding在實現的功能和變成模式上完全一樣。下面是我們熟悉的4層結構:

1.Contract

DataContract:Order & OrderItem

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;
namespace Artech.QueuedService.Contract
{
    [DataContract]
    [KnownType(typeof(OrderItem))]
    public class Order
    {
        Private Fields#region Private Fields
        private Guid _orderNo;
        private DateTime _orderDate;
        private Guid _supplierID;
        private string _supplierName;
        private IList<OrderItem> _orderItems;
        #endregion
        Constructors#region Constructors
        public Order()
        {
            this._orderItems = new List<OrderItem>();
        }
        public Order(Guid orderNo, DateTime orderDate, Guid supplierID, string supplierName)
        {
            this._orderNo = orderNo;
            this._orderDate = orderDate;
            this._supplierID = supplierID;
            this._supplierName = supplierName;
            this._orderItems = new List<OrderItem>();
        }
        #endregion
        Public Properties#region Public Properties
        [DataMember]
        public Guid OrderNo
        {
            get { return _orderNo; }
            set { _orderNo = value; }
        }
        [DataMember]
        public DateTime OrderDate
        {
            get { return _orderDate; }
            set { _orderDate = value; }
        }
        [DataMember]
        public Guid SupplierID
        {
            get { return _supplierID; }
            set { _supplierID = value; }
        }
        [DataMember]
        public string SupplierName
        {
            get { return _supplierName; }
            set { _supplierName = value; }
        }
        [DataMember]
        public IList<OrderItem> OrderItems
        {
            get { return _orderItems; }
            set { _orderItems = value; }
        }
        #endregion
        Public Methods#region Public Methods
        public override string ToString()
        {
            string description = string.Format("General Informaion:\n\tOrder No.\t: {0}\n\tOrder Date\t: {1}\n\tSupplier No.\t: {2}\n\tSupplier Name\t: {3}", this._orderNo, this._orderDate.ToString("yyyy/MM/dd"), this._supplierID, this._supplierName);
            StringBuilder productList = new StringBuilder();
            productList.AppendLine("\nProducts:");
            int index = 0;
            foreach (OrderItem item in this._orderItems)
            {
                productList.AppendLine(string.Format("\n{4}. \tNo.\t\t: {0}\n\tName\t\t: {1}\n\tPrice\t\t: {2}\n\tQuantity\t: {3}", item.ProductID, item.ProductName, item.UnitPrice, item.Quantity, ++index));
            }
            return description + productList.ToString();
        }
        #endregion
    }
}

3.Hosting

Configuration

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <system.serviceModel>
    <bindings>
      <netMsmqBinding>
        <binding name="msmqBinding">
          <security>
            <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
            <message clientCredentialType="None" />
          </security>
        </binding>
      </netMsmqBinding>
    </bindings>
    <services>
      <service name="Artech.QueuedService.Service. OrderProcessorService">
        <endpoint address="net.msmq://localhost/private/orders" binding="netMsmqBinding"
          bindingConfiguration="msmqBinding" contract="Artech.QueuedService.Contract.IOrderProcessor" />
      </service>
    </services>
  </system.serviceModel>
</configuration>

在默認的情況下,netMsmqBinding 的msmqAuthenticationMode為WindowsDomain,由於基於WindowsDomain必須安裝AD,利於在本機模擬,我把msmqAuthenticationMode改為None,相應的ProtectionLevel和clientCredentialType改為None。

Program:

using System;
using System.Collections.Generic;
using System.Text;
using System.Messaging;
using System.ServiceModel;
using Artech.QueuedService.Service;
namespace Artech.QueuedService.Hosting
{
    class Program
    {
        static void Main(string[] args)
        {
            string path = @".\private$\orders";
            if(!MessageQueue.Exists(path))
            {
                MessageQueue.Create(path,true);
            }
            using (ServiceHost host = new ServiceHost(typeof(OrderProcessorService)))
            {
                host.Opened += delegate
                {
                    Console.WriteLine("Service has begun to listen\n\n");
                };
                host.Open();
                Console.Read();
            }
        }
    }
}

在Host Service之前,通過MessageQueue.Create創建一個Message Queue,第二個參數為表明Queue是否支持Transaction的indicator,這裡支持Transaction。

4.Client:

Configuration

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <system.serviceModel>
    <bindings>
      <netMsmqBinding>
        <binding name="msmqBinding">
          <security>
            <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
            <message clientCredentialType="None" />
          </security>
        </binding>
      </netMsmqBinding>
    </bindings>
    <client>
      <endpoint address="net.msmq://localhost/private/orders" binding="netMsmqBinding"
        bindingConfiguration="msmqBinding" contract="Artech.QueuedService.Contract.IOrderProcessor"
        name="defaultEndpoint" />
    </client>
  </system.serviceModel>
</configuration>

Program

using System;
using System.Collections.Generic;
using System.Text;
using Artech.QueuedService.Contract;
using System.ServiceModel;
using System.Transactions;

namespace Artech.QueuedService.Client
{
  class Program
  {
    static void Main(string[] args)
    {
      ChannelFactory<IOrderProcessor> channelFactory = new ChannelFactory<IOrderProcessor>("defaultEndpoint");
      IOrderProcessor channel = channelFactory.CreateChannel();

      Order order = new Order(Guid.NewGuid(),DateTime.Today,Guid.NewGuid(),"A Company");
      order.OrderItems.Add(new OrderItem(Guid.NewGuid(),"PC",5000,20));
      order.OrderItems.Add(new OrderItem(Guid.NewGuid(),"Printer",7000,2));

      Console.WriteLine("Submit order to server");
      using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
      {
         channel.Submit(order);
         scope.Complete();
      }   
      Console.Read();
    }
  }
}

先後運行Host和Client,Host端有下面的輸出:

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved