程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> 關於.NET >> WCF後續之旅(11) 關於並發、回調的線程關聯性(Thread Affinity)

WCF後續之旅(11) 關於並發、回調的線程關聯性(Thread Affinity)

編輯:關於.NET

對於一般的多線程操作,比如異步地進行基於文件系統的IO操作;異步地調用Web Service;或者是異步地進行數據庫訪問等等,是和具體的線程無關的。也就是說,對於這些操作,任意創建一個新的線程來執行都是等效的。但是有些情況下,有些操作卻只能在固定的線程下執行。比如,在GUI應用下,對控件的訪問就需要在創建該控件的線程下執行;或者我們在某個固定的線程中通過TLS(Thread Local Storage)設置了一些Context信息,供具體的操作使用,我們把操作和某個固定的線程的依賴稱為線程關聯性(Thread Affinity)。在這種情況下,我們的異步操作就需要被Marshal到固定的線程執行。在WCF並發或者Callback的情況下也具有這樣的基於線程關聯性的問題。

一、從基於Windows Application客戶端的WCF回調失敗談起

在"我的WCF之旅"系列文章中,有一篇(WinForm Application中調用Duplex Service出現TimeoutException的原因和解決方案)專門介紹在一個Windows Application客戶端應用, 通過WCF 的Duplex通信方式進行回調失敗的文章.我們今天以此作為出發點介紹WCF在Thread Affinity下的表現和解決方案.

我們來創建一個WCF的應用來模擬該場景: 客戶端是一個基於Windows Form應用, 完成一個計算器的功能, 用戶輸入操作數,點擊"計算"按鈕, 後台通過調用WCF service, 並傳遞一個用於顯示計算結果的Callback對象; service進行相應的計算得到最後的運算結果,調用該Callback對象將運算結果顯示到客戶端界面.這是我們的WCF四層結構:

1、Contract:ICalculate & ICalculateCallback

namespace Artech.ThreadAffinity.Contracts
{
  [ServiceContract(CallbackContract = typeof(ICalculateCallback))]
  public interface ICalculate
  {
    [OperationContract]
    void Add(double op1, double op2);
  }
}

這是Service Contract,下面是Callback Contract,用於顯示運算結果:

namespace Artech.ThreadAffinity.Contracts
{
  public interface ICalculateCallback
  {
    [OperationContract]
    void DisplayResult(double result);
  }
}

2、Service:CalculateService

namespace Artech.ThreadAffinity.Services
{
  [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
  public class CalculateService:ICalculate
  {
    public static ListBox DisplayPanel
    { get; set; }

    #region ICalculate Members

    public void Add(double op1, double op2)
    {
      double result = op1 + op2;
      ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();

DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));
      callback.DisplayResult(result);
    }

    #endregion
  }
}
由於需要進行callback, 我們把ConcurrencyMode 設為Reentrant。當得到運算的結果後,通過OperationContext.Current.GetCallbackChannel得到callback對象,並調用之。還有一點需要提的是,該service是通過一個Windows Form application進行host的。並且有一個ListBox列出所有service執行的結果,就像這樣:

3、Hosting

Hosting的代碼寫在Form的Load事件中:

private void HostForm_Load(object sender, EventArgs e)
{
  this._serviceHost = new ServiceHost(typeof(CalculateService));
  CalculateService.DisplayPanel = this.listBoxResult;
  CalculateService.SynchronizationContext = SynchronizationContext.Current;
  this._serviceHost.Opened += delegate
  {
this.Text = "The calculate service has been started up!";
  };

  this._serviceHost.Open();
}

我們注意到了CalculateService使用到的用於顯示所有預算結果的ListBox就是在這了通過static property傳遞的。

這麼配置文件

<configuration>
  <system.serviceModel>
    <services>
      <service name="Artech.ThreadAffinity.Services.CalculateService">
        <endpoint binding="netTcpBinding" bindingConfiguration="" contract="Artech.ThreadAffinity.Contracts.ICalculate" />
        <host>
          <baseAddresses>
            <add baseAddress="net.tcp://127.0.0.1:8888/calculateservice" />
          </baseAddresses>
        </host>
      </service>
    </services>
  </system.serviceModel>
</configuration>

4、Client

Client的界面很簡單:輸入兩個操作數,點擊“=”按鈕,將運算結果顯示出來。

先來看看client端對callback contract的實現:

namespace Clients
{
  public class CalculateCallback : ICalculateCallback
  {
    public static TextBox ResultPanel;

    #region ICalculateCallback Members

    public void DisplayResult(double result)
    {
      ResultPanel.Text = result.ToString();
    }

    #endregion
  }
}

這是配置:

<configuration>
  <system.serviceModel>
    <client>
      <endpoint address="net.tcp://127.0.0.1:8888/calculateservice"
        binding="netTcpBinding" bindingConfiguration="" contract="Artech.ThreadAffinity.Contracts.ICalculate"
        name="calculateservice" />
    </client>
  </system.serviceModel>
</configuration>

然後是我們“=”按鈕的單擊事件對運算的實現:

private void buttonCalculate_Click(object sender, EventArgs e)
{
  CalculateCallback.ResultPanel = this.textBoxResult;
  DuplexChannelFactory<ICalculate> channelFactory = new DuplexChannelFactory<ICalculate>(new CalculateCallback(), "calculateservice");
  ICalculate calculator = channelFactory.CreateChannel();
  calculator.Add(double.Parse(this.textBoxOp1.Text), double.Parse(this.textBoxOp2.Text));
}

CalculateCallback 用於顯示運算結果的TextBox通過statis property實現傳遞。

這個實現很簡單,貌似沒有什麼問題,但是我們運行程序,在客戶端就會拋出這樣的exception。可以看出是一個TimeoutException。

二、是什麼導致TimeoutException?

我們現在來分析是什麼導致了TimeoutException的拋出。原因很簡單:由於我們對service的調用的是在UI 線程調用的,所以在開始調用到最終得到結果,這個UI Thread會被鎖住;但是當service進行了相應的運算的到運算的結果後,需要調用callback對象對client進行回調,默認的情況下,Callback的執行是在UI線程執行的。當Callback試圖執行的時候,發現UI 線程被鎖,只能等待。這樣形成一個死鎖,UI線程需要等待CalculateService執行返回後才能解鎖,而CalculateService需要Callback執行完成;而Callback需要等到UI線程解鎖才能執行。

基於上門的原因,我們有兩種解決方案:

1、CalculateService不必等到Callback執行完成就返回,我們可以通過異步調用Callback。或者讓Client異步方式調用CalculateService,以便及時釋放UI線程,我們可以通過One-way的方式來進行service的調用。

2、讓Callback的執行不必綁定到UI線程

三、解決方案一:通過異步調用或者One-way回調

為了簡單起見,我們通過ThreadPool實現了異步回調:

public void Add(double op1, double op2)
{
  double result = op1 + op2;
  ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();

  ThreadPool.QueueUserWorkItem(delegate{ callback.DisplayResult(result); }, null);
}

這是一種方案,另一種是將Add操作設成One-way的:

namespace Artech.ThreadAffinity.Contracts
{
  [ServiceContract(CallbackContract = typeof(ICalculateCallback))]
  public interface ICalculate
  {
    [OperationContract(IsOneWay = true)]
    void Add(double op1, double op2);
  }
}

這兩種方案都可以解決問題。

四、方案二、通過解除Callback操作和UI線程的關聯性

現在我們才進入我們今天討論的主題:WCF並發操作的線程關聯性問題。在這之前,我們需要了解一個重要的對象:SynchonizationContext(System.Threading.SynchronizationContext)。SynchonizationContext就是為了解決這種線程關聯性問題而設計的。SynchonizationContext提供了兩個主要的API將操作和對應的Thread關聯:Post和Send。

public virtual void Post(SendOrPostCallback d, object state)

public virtual void Send(SendOrPostCallback d, object state)

Send和Post分別以同步和異步的方式將以Delegate表示的具體的操作和SynchonizationContext對象對應的Thread關聯,而SendOrPostCallback delegate對象代表你需要的線程關聯操作,state代表傳入delegate的參數:

public delegate void SendOrPostCallback(object state);

對於某些具有線程關聯的應用,比如Windows Form application,在程序啟動的時候,會設置當前的SynchonizationContext對象(Windows Form application使用的是繼承了SynchonizationContext的WindowsFormsSynchronizationContext :System.Windows.Forms.WindowsFormsSynchronizationContext)。當前SynchonizationContext被成功初始化後,你就可以通過SynchonizationContext的靜態屬性Current得到它。在你自己的應用中,如何有需要,你也可以自定義SynchonizationContext,並通過靜態方法SetSynchronizationContext將其設置為current SynchronizationContext。

對應WCF來說,無論是host一個service,還是在調用service時制定callback,在默認的情況下,service和callback的操作將自動和當前的SynchonizationContext進行關聯(如何有的話)。也就是說,如過我們的service被host到Windows Form application下,那麼service的操作將在UI 線程下執行;同理,如何我們在一個Windows Forms UI線程下調用duplex service並制定callback,那麼callback的最終執行將在UI線程。

關於WCF對線程關聯性的控制,可以通過ServiceBehavior或者CallbackBehavior的UseSynchronizationContext屬性進行設定,該屬性默認為true,這正式WCF默認具有線程關聯性的原因。

現在我們來實現我們的第二套方案:讓Callback的執行不必綁定到UI線程。為此我們只需要加上如何的CallbackBehavior attribute就可以了。

namespace Artech.ThreadAffinity.Clients
{
  [CallbackBehavior(UseSynchronizationContext = false)]
  public class CalculateCallback : ICalculateCallback
  {
    public static TextBox ResultPanel;

    #region ICalculateCallback Members

    public void DisplayResult(double result)
    {
      ResultPanel.Text = result.ToString();

    }

    #endregion
  }
}

但是現在我們運行我們的程序,將會出現如下的InvalidOperation異常:

原因很簡單,由於我們將callbaclk的UseSynchronizationContext 設置成false,那麼callback的操作將不會再UI線程下執行。但是我們需要運算的結果輸入到UI的TextBox上,對UI上控件的操作需要在UI線程上執行,顯然會拋出異常了。

為了我們引入SynchonizationContext到CalculateCallback中:將SynchonizationContext定義成一個static屬性,通過Post方法異步地實現對運算結果的顯示。

namespace Artech.ThreadAffinity.Clients
{
  [CallbackBehavior(UseSynchronizationContext = false)]
  public class CalculateCallback : ICalculateCallback
  {
    public static TextBox ResultPanel;
    public static SynchronizationContext SynchronizationContext;

    #region ICalculateCallback Members

    public void DisplayResult(double result)
    {
       SynchronizationContext.Post(delegate { ResultPanel.Text = result.ToString(); }, null);
    }

    #endregion
  }
}

SynchonizationContext在調用service的時候指定:

private void buttonCalculate_Click(object sender, EventArgs e)
{
  CalculateCallback.ResultPanel = this.textBoxResult;
  CalculateCallback.SynchronizationContext = SynchronizationContext.Current;

  DuplexChannelFactory<ICalculate> channelFactory = new DuplexChannelFactory<ICalculate>(new CalculateCallback(), "calculateservice");
  ICalculate calculator = channelFactory.CreateChannel();
  calculator.Add(double.Parse(this.textBoxOp1.Text), double.Parse(this.textBoxOp2.Text));
}

現在我們程序能夠正常運行了。

五、另一種可選方案:通過ISynchronizeInvoke的Invoke/BeginInvoke

熟悉Windows Form編程的讀者應該都知道,WinForm空間的基類Control(System.Windows.Forms.Control)都實現了System.ComponentModel.ISynchronizeInvoke接口,而Control對ISynchronizeInvoke的實現就是為了解決Control的操作必須在創建Control線程的問題,ISynchronizeInvoke定義Invoke和BeginInvoke方法方面我們以同步或者異步的方式操作Control:

public interface ISynchronizeInvoke
{
  // Methods
  [HostProtection(SecurityAction.LinkDemand, Synchronization=true, ExternalThreading=true)]
  IAsyncResult BeginInvoke(Delegate method, object[] args);
  object EndInvoke(IAsyncResult result);
  object Invoke(Delegate method, object[] args);

  // Properties
  bool InvokeRequired { get; }
}

如何我們放棄基於SynchonizationContext的解決方案,我們也可以通過基於ISynchronizeInvoke的方式來解決這個問題。為此我們這樣定義CalculateCallback:

namespace Artech.ThreadAffinity.Clients
{
  [CallbackBehavior(UseSynchronizationContext = false)]
  public class CalculateCallback : ICalculateCallback
  {
    public static TextBox ResultPanel;
    public delegate void DisplayResultDelegate(TextBox resultPanel, double result);

    #region ICalculateCallback Members

    public void DisplayResult(double result)
    {
      DisplayResultDelegate displayResultDelegate = new DisplayResultDelegate(DisplayResult);
      ResultPanel.BeginInvoke(displayResultDelegate, new object[] { ResultPanel, result });
    }

    private void DisplayResult(TextBox resultPanel, double result)
    {
      resultPanel.Text = result.ToString();
    }

    #endregion
  }
}

由於BeginInvoke方式只能接受一個具體的delegate對象(不能使用匿名方法),所以需要定義一個具體的Delegate(DisplayResultDelegate)和對應的方法(DisplayResult),參數通過一個object[]傳入。

從本質上將,這兩種方式的實現完全是一樣的,如何你查看System.Windows.Forms.WindowsFormsSynchronizationContext的代碼,你會發現其Send和Post方方法就是通過調用Invoke和BeginInvoke方式實現的。

六、Service Hosting的線程關聯性

我們花了很多的精力介紹了WCF Duplex通信中Callback操作的線程關聯性問題,實際上我們使用到更多的還是service操作的線程關聯性問題。就以我們上面的程序為例,我們通過一個Windows Form application來host我們的service,並且要求service的運算結束後將結果輸出到server端的Window form的ListBox中,對ListBox的操作肯定需要的Host程序的UI線程中執行。

按照我們一般的想法,我們的Service面向若干client,肯定是並發的接收client端的請求,以多線程的方式執行service的操作,那麼操作中UI 控件的操作肯定會出現錯誤。

我們的程序依然可以正常運行,其根本原因是WCF的service操作默認實現了對Host service的當前線程的SynchonizationContext實現了關聯。與Callback操作的線程關聯性通過CallbackBehavior的UseSynchronizationContext 進行控制一樣,service的線程關聯性通過ServiceBehavir的UseSynchronizationContext 進行設定。UseSynchronizationContext 的默認值為true。

如何我們將CalculateService的UseSynchronizationContext 設為false:

namespace Artech.ThreadAffinity.Services
{
  [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant,UseSynchronizationContext = false)]
  public class CalculateService:ICalculate
  {
    public static ListBox DisplayPanel
    { get; set; }

    #region ICalculate Members

    public void Add(double op1, double op2)
    {
      double result = op1 + op2;
      ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();

      DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));
      callback.DisplayResult(result);
    }

    #endregion
  }
}

有control被不是創建它的線程操作,肯定會拋出一個InvalidOperationException,就像這樣:

我們一樣可以通過SynchonizationContext或者ISynchronizeInvoke的方式來解決這樣的問題,我們只討論前面一種,為此我們改變了CalculateService的定義:通過SynchonizationContext的Post方法實現對ListBox的訪問。

namespace Artech.ThreadAffinity.Services
{
  [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant,UseSynchronizationContext = false)]
  public class CalculateService:ICalculate
  {
    public static ListBox DisplayPanel
    { get; set; }

    public static SynchronizationContext SynchronizationContext
    { get; set; }

    #region ICalculate Members

    public void Add(double op1, double op2)
    {
      double result = op1 + op2;
      ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();
      SynchronizationContext.Post(delegate
      {
        DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));
      }, null);

      callback.DisplayResult(result);
    }

    #endregion
  }
}

通過static屬性定義的SynchonizationContext在host的時候指定:

private void HostForm_Load(object sender, EventArgs e)
{
  this._serviceHost = new ServiceHost(typeof(CalculateService));
  CalculateService.DisplayPanel = this.listBoxResult;
  CalculateService.SynchronizationContext = SynchronizationContext.Current;
  this._serviceHost.Opened += delegate
  {
this.Text = "The calculate service has been started up!";
  };

  this._serviceHost.Open();
}

這樣我們的程序又可以正常運行了。

本文配套源碼

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved