程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> 關於.NET >> 我的WCF之旅(13):創建基於MSMQ的Responsive Service

我的WCF之旅(13):創建基於MSMQ的Responsive Service

編輯:關於.NET

一、One-way MEP V.S. Responsible Service

我們知道MSMQ天生就具有異步的特性,它只能以One-way的MEP(Message Exchange Pattern)進行通信。Client和Service之間采用One-way MEP的話就意味著Client調用Service之後立即返回,它無法獲得Service的執行結果,也無法捕捉Service運行的Exception。下圖簡單表述了基於MSMQ的WCF Service中Client和Service的交互。

但是在有些場景 中,這是無法容忍的。再拿我在上一篇文章的Order Delivery的例子來說。Client向Service提交了Order,卻無法確認該Order是否被Service正確處理,這顯然是不能接受的。我們今天就來討論一下,如何創建一個Responsive Service來解決這個問題:Client不再是對Service的執行情況一無所知,它可以獲知Order是否被Service正確處理了。

二、Solution

雖然我們的目的很簡單:當Client向Service遞交了Order之後,能以某種方式獲知Order的執行結果;對於Service端來說,在正確把Order從Message Queue中獲取出來、並正確處理之後,能夠向Order的遞交者發送一個Acknowledge Message。為了簡單起見,這個Acknowledge Message包含兩組信息:

Order No.: 被處理的Order的一個能夠為一標志它的ID。

Exception: 如果處理失敗的Exception,如果成功處理為null。

要在WCF中實現這樣的目的,對於Request/Reply MEP來說是簡單而直接的:Client向Service遞交Order,並等待Service的Response,Service在處理接收到Order之後直接將處理結果 返回給Client就可以了。但是我們說過MSMQ天生就是異步的,我們只有采取一種間接的方式實現“曲線救國”。

我們的解決方案是:在每個Client Domain也創建一個基於MSMQ的本地的WCF Service,用於接收來自Order處理端發送的Acknowledge Message。對於處理Order 的Service來說,在正確處理Order之後,想對應的Client發送Acknowledge Message。下圖簡單演示整個過程:

三、Implementation

了解了上面的Solution之後,我們來看看該Solution在真正實現過程中有什麼樣的困難。對於處理Order的Service來說,在向Client端發送Acknowledge Message的時候,它必須要知道該Order對應的Client的Response Service的MSMQ的Address以及其他和Operation相關的Context信息(在這裡我們不需要,不過考慮到擴展性,我們把包括了address的Context的信息 封裝到一個了Class中,在這裡叫做:OrderResponseContext)。而這些Context卻不能在Configuration中進行配置,因為他可以同時面臨著很多個Client:比如每個Client用於接收Response 的Message Queue的address都不一樣。所以這個OrderResponseContext必須通過對應的Client來提供。基於此,我們具有兩面兩種解決方式:

方式一、修改Service Contract,把OrderResponseContext當成是Operation的一個參數

這是我們最容易想到的,比如我們原來的Operation這樣定義:

namespace Artech.ResponsiveQueuedService.Contract
{
  [ServiceContract]
  [ServiceKnownType(typeof(Order))]
  public interface IOrderProcessor
  {
    [OperationContract(IsOneWay = true)]
    void Submit(Order order);
  }
}

現在變成:

namespace Artech.ResponsiveQueuedService.Contract
{
  [ServiceContract]
  [ServiceKnownType(typeof(Order))]
  public interface IOrderProcessor
  {
    [OperationContract(IsOneWay = true)]
    void Submit(Order order, OrderResponseContext responseContext);
  }
}

雖然這種方式看起來不錯,但是卻不值得推薦。在一般情況下,我們的Contract需要是很穩定的,一經確定就不能輕易更改,因為Contract是被交互的多方共同支持的,牽一發動全身;此外,從Service Contract代表的是Service的一個Interface,他是對業務邏輯的抽象、和具體實現無關,而對於我們的例子來說,我們僅僅是定義一個遞交Order的Operation,從業務邏輯來看,OrderResponseContext和抽象的業務邏輯毫無關系。基於此,我們需要尋求一種和Service Contract無關的解決方式:

方式二、將OrderResponseContext放到Soap Message 的Header中

其實我們要解決的問題很簡單,就是要把OrderResponseContext的信息置於Soap Message中發送到Service。而我們知道,Soap的Header具有極強的可伸縮性,原則上,我們可以把任何控制信息置於Header中。基於WCF的編程模式很容易地幫助我們實現對Soap Header的插入和獲取:

我們可以通過下面的方式獲得當前Operation Context的Incoming Message Headers和Outgoing Message Headers

OperationContext.Current.IncomingMessageHeaders
OperationContext.Current.OutgoingMessageHeaders

如果我們要把一個OrderResponseContext 對象插入到當前Operation Context的Outgoing Message Headers中,我們可以通過下面的代碼來實現:

OrderResponseContext context = new OrderResponseContext();
MessageHeader<OrderResponseContext> header = new MessageHeader<OrderResponseContext>( context);
OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("name", "namespace"));

相應的,我們可以通過下面的代碼從Outgoing Message Headers OrderResponseContext的數據獲取的內容:

OrderResponseContextcontext=OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("name","namespace"));

四、Sample

我們照例給出一個完整的Sample,下面是整個Solution的結構:

除了一貫使用的4層結構(Contract-Service-Hosting-Client),還為ResponseService增加了下面兩層:

Localservice: 作為Client Domain的ResponseService。

LocalHosting:Host Localservice。

1.Contract: Artech.ResponsiveQueuedService.Contract

Service Contract: Artech.ResponsiveQueuedService.Contract. IOrderProcessor

using System;
using System.Collections.Generic;
using System.Text;
using System.ServiceModel;

namespace Artech.ResponsiveQueuedService.Contract
{
  [ServiceContract]
  [ServiceKnownType(typeof(Order))]
  public interface IOrderProcessor
  {
    [OperationContract(IsOneWay = true)]
    void Submit(Order order);
  }
}

Service Contract: Artech.ResponsiveQueuedService.Contract.IOrderRessponse

using System;
using System.Collections.Generic;
using System.Text;
using System.ServiceModel;

namespace Artech.ResponsiveQueuedService.Contract
{
  [ServiceContract]
  public interface IOrderRessponse
  {
    [OperationContract(IsOneWay =true)]
    void SubmitOrderResponse(Guid orderNo,FaultException exception);
  }
}

接收來自Order processing端的Response:Order No.和Exception。

Data Contract: Artech.ResponsiveQueuedService.Contract.Order

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;
namespace Artech.ResponsiveQueuedService.Contract
{
    [DataContract]
    public class Order
    {
        Private Fields#region Private Fields
        private Guid _orderNo;
        private DateTime _orderDate;
        private Guid _supplierID;
        private string _supplierName;
        #endregion
        Constructors#region Constructors
        public Order(Guid orderNo, DateTime orderDate, Guid supplierID, string supplierName)
        {
            this._orderNo = orderNo;
            this._orderDate = orderDate;
            this._supplierID = supplierID;
            this._supplierName = supplierName;
        }
        #endregion
        Public Properties#region Public Properties
        [DataMember]
        public Guid OrderNo
        {
            get { return _orderNo; }
            set { _orderNo = value; }
        }
        [DataMember]
        public DateTime OrderDate
        {
            get { return _orderDate; }
            set { _orderDate = value; }
        }
        [DataMember]
        public Guid SupplierID
        {
            get { return _supplierID; }
            set { _supplierID = value; }
        }
        [DataMember]
        public string SupplierName
        {
            get { return _supplierName; }
            set { _supplierName = value; }
        }
        #endregion
        Public Methods#region Public Methods
        public override string ToString()
        {
            string description = string.Format("Order No.\t: {0}\n\tOrder Date\t: {1}\n\tSupplier No.\t: {2}\n\tSupplier Name\t: {3}", 
                this._orderNo, this._orderDate.ToString("yyyy/MM/dd"), this._supplierID, this._supplierName);
            return description;
        }
        #endregion
    }
}

對Order的封裝。

Data Contract:Artech.ResponsiveQueuedService.Contract. OrderResponseContext

using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;
using System.ServiceModel;

namespace Artech.ResponsiveQueuedService.Contract
{  
  [DataContract]
  public class OrderResponseContext
  {
    private Uri _responseAddress;

    [DataMember]
    public Uri ResponseAddress
    {
      get { return _responseAddress; }
      set { _responseAddress = value; }
    }

    public static OrderResponseContext Current
    {
      get
      {
        if (OperationContext.Current == null)
        {
          return null;
        }

        return OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("OrderResponseContext", "Artech.ResponsiveQueuedService.Contract");
      }
      set
      {
        MessageHeader<OrderResponseContext> header = new MessageHeader<OrderResponseContext>(value);
        OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("OrderResponseContext", "Artech.ResponsiveQueuedService.Contract"));
      }
    }
  }
}

ResponseAddress代表Host在Client Domain的Response Service的Address。同過Current把OrderResponseContext插入到Outgoing Message Headers中、以及從Ingoing Message Headers取出OrderResponseContext對象。

2.Order Processing Service:Artech.ResponsiveQueuedService.Service

using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.Contract;
using System.ServiceModel;
using System.Net.Security;
namespace Artech.ResponsiveQueuedService.Service
{
    public class OrderProcessorService:IOrderProcessor
    {
        private void ProcessOrder(Order order)
        {
            if (order.OrderDate < DateTime.Today)
            {
                throw new Exception();
            }
        }
        IOrderProcessor Members#region IOrderProcessor Members
        public void Submit(Order order)
        {
            Console.WriteLine("Begin to process the order of the order No.: {0}", order.OrderNo);
            FaultException exception= null;
            if (order.OrderDate < DateTime.Today)
            {
                exception = new FaultException(new FaultReason("The order has expried"), new FaultCode("sender"));
                Console.WriteLine("It's fail to process the order.\n\tOrder No.: {0}\n\tReason:{1}", order.OrderNo, "The order has expried");
            }
            else
            {
                Console.WriteLine("It's successful to process the order.\n\tOrder No.: {0}", order.OrderNo);
            }
            NetMsmqBinding binding = new NetMsmqBinding();
            binding.ExactlyOnce = false;
            binding.Security.Transport.MsmqAuthenticationMode = MsmqAuthenticationMode.None;
            binding.Security.Transport.MsmqProtectionLevel = ProtectionLevel.None;
            ChannelFactory<IOrderRessponse> channelFacotry = new ChannelFactory<IOrderRessponse>(binding);
            OrderResponseContext responseContext = OrderResponseContext.Current;
            IOrderRessponse channel = channelFacotry.CreateChannel(new EndpointAddress(responseContext.ResponseAddress));
            using (OperationContextScope contextScope = new OperationContextScope(channel as IContextChannel))
            {
                channel.SubmitOrderResponse(order.OrderNo, exception);
            }
        }
        #endregion
    }
}

在這裡我們模擬了這樣的場景:先通過Order Date判斷Order是否過期,如果過期創建一個FaultException,否則正確處理該Order,然後通過OrderResponseContext.Current從Incoming Message Header中獲取封裝在OrderResponseContext對象中的Response Address,創建Binding並調用Response Service.

3. Order Processing Service Hosting: Artech.ResponsiveQueuedService.Hosting

Configuration

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
 <appSettings>
  <add key="msmqPath" value=".\private$\orderprocessor"/>
 </appSettings>
 <system.serviceModel>
  <bindings>
   <netMsmqBinding>
    <binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false">
     <security>
      <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
     </security>
    </binding>
   </netMsmqBinding>
  </bindings>
  <services>
   <service name="Artech.ResponsiveQueuedService.Service.OrderProcessorService">
    <endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding"
      bindingConfiguration="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" />
   </service>
  </services>
 </system.serviceModel>
</configuration>

Program

using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.Service;
using System.ServiceModel;
using System.Configuration;
using System.Messaging;

namespace Artech.ResponsiveQueuedService.Hosting
{
  class Program
  {
    static void Main(string[] args)
    {
      string path = ConfigurationManager.AppSettings["msmqPath"];
      if (!MessageQueue.Exists(path))
      {
        MessageQueue.Create(path);
      }

      using (ServiceHost host = new ServiceHost(typeof(OrderProcessorService)))
      {
        host.Opened += delegate
        {
          Console.WriteLine("The Order Processor service has begun to listen");
        };

        host.Open();

        Console.Read();
      }
    }
  }
}

4. Response Service: Artech.ResponsiveQueuedService.LocalService.OrderRessponseService

using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.Contract;
using System.ServiceModel;
namespace Artech.ResponsiveQueuedService.LocalService
{
    public class OrderRessponseService : IOrderRessponse
    {
        IOrderRessponse Members#region IOrderRessponse Members
        public void SubmitOrderResponse(Guid orderNo, FaultException exception)
        {
            if (exception == null)
            {
                Console.WriteLine("It's successful to process the order!\n\tOrder No.: {0}",orderNo);
            }
            else
            {
                Console.WriteLine("It's fail to process the order!\n\tOrder No.: {0}\n\tReason: {1}", orderNo, exception.Message);
            }
        }
        #endregion
    }
}

5. Response Service Hosting: Artech.ResponsiveQueuedService.LocalhHosting

Configuration

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
 <appSettings>
  <add key="msmqPath" value=".\private$\orderresponse"/>
 </appSettings>
 <system.serviceModel>
  <bindings>
   <netMsmqBinding>
    <binding name="msmqBinding" exactlyOnce="false">
     <security>
      <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
     </security>
    </binding>
   </netMsmqBinding>
  </bindings>
  <services>
   <service name="Artech.ResponsiveQueuedService.LocalService.OrderRessponseService">
    <endpoint address="net.msmq://localhost/private/orderresponse" binding="netMsmqBinding"
      bindingConfiguration="msmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderRessponse" />
   </service>
  </services>
 </system.serviceModel>
</configuration>

Program

using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.LocalService;
using System.Configuration;
using System.ServiceModel;
using System.Messaging;

namespace Artech.ResponsiveQueuedService.LocalhHosting
{
  class Program
  {
    static void Main(string[] args)
    {
      string path = ConfigurationManager.AppSettings["msmqPath"];
      if (!MessageQueue.Exists(path))
      {
        MessageQueue.Create(path);
      }

      using (ServiceHost host = new ServiceHost(typeof(OrderRessponseService)))
      {
        host.Opened += delegate
        {
          Console.WriteLine("The Order Response service has begun to listen");
        };

        host.Open();

        Console.Read();
      }
    }
  }
}

6. Client: Artech.ResponsiveQueuedService.Client

Configuration:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
 <appSettings>
  <add key="msmqPath" value="net.msmq://localhost/private/orderresponse"/>
 </appSettings>
 <system.serviceModel>
  <bindings>
   <netMsmqBinding>
    <binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false">
     <security>
      <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
     </security>
    </binding>
   </netMsmqBinding>
  </bindings>
  <client>
   <endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding"
      bindingConfiguration="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" name="defaultEndpoint" />
  </client>
 </system.serviceModel>
</configuration>

Program:

using System;
using System.Collections.Generic;
using System.Text;
using System.Configuration;
using System.ServiceModel;
using Artech.ResponsiveQueuedService.Contract;
using System.Messaging;

namespace Artech.ResponsiveQueuedService.Clinet

{
  class Program
  {
    static void Main(string[] args)
    {
      Order order1 = new Order(Guid.NewGuid(), DateTime.Today.AddDays(5), Guid.NewGuid(), "Supplier A");
      Order order2 = new Order(Guid.NewGuid(), DateTime.Today.AddDays(-5), Guid.NewGuid(), "Supplier A");

      string path = ConfigurationManager.AppSettings["msmqPath"];
      Uri address = new Uri(path);
      OrderResponseContext context = new OrderResponseContext();
      context.ResponseAddress = address;

      ChannelFactory<IOrderProcessor> channelFactory = new ChannelFactory<IOrderProcessor>("defaultEndpoint");
      IOrderProcessor orderProcessor = channelFactory.CreateChannel();

      using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel))
      {
        Console.WriteLine("Submit the order of order No.: {0}", order1.OrderNo);
        OrderResponseContext.Current = context;
        orderProcessor.Submit(order1);
      }

      using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel))
      {
        Console.WriteLine("Submit the order of order No.: {0}", order2.OrderNo);
        OrderResponseContext.Current = context;
        orderProcessor.Submit(order2);
      }

      Console.Read();
    }
  }
}

我創建了兩個Order對象, 其中一個已經過期。從Configuration中取出Response Address並購建一個OrderResponseContext,然後分兩次將這兩個Order向Order Processing Service遞交。在調用Order Processing Order的Operation Context Scope中,通過OrderResponseContext.Current將OrderResponseContext對象插入Outcoming Message Header中。

我們現在運行一下整個程序,看看最終的輸出結果:

Client:

Order Processing:

Order Response:

本文配套源碼

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved