程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> 關於C++ >> C++異步調用利器future/promise實現原理

C++異步調用利器future/promise實現原理

編輯:關於C++

前言

在異步編程中,各種回調將讓人眼花缭亂,代碼分散,維護起來十分困難。boost和C++11 的 future/promise 提供了一個很好的解決方案,使得代碼更加漂亮、易維護。

在工作中,我也用過幾次future/promise,但是還是十分生疏,所以決定學習下它的原理,用起來才更加順暢。

查了很多資料,發現很多語言都有這個機制,但是關於C++的promise的資料卻很少,只有一些使用的教程,而沒有找到原理方面的。

“源碼之前,了無秘密。”

所以還是決定從源碼入手學習!

本文針對的源碼是借鑒boost實現的版本,由於copyright的原因,就不貼出完整的源碼了,需要學習的朋友可以參見boost的實現。

一、基於Future/Promise的異步同步化編程依賴的組件

bind和callback,類似與boost庫的bind和function; shared_ptr、scoped_ptr、tuple、exception,參考boost庫中的shared_ptr、scoped_ptr、tuple、exception實現; Future和Promise,借鑒boost庫的future設計思想; when_all,通過Future、Promise、tuple來實現,針對異步並行的同步化。

二、Function和Bind的用法

下面提到的bind類似於boost的bind,而Callback類似於Function,一般跟bind搭配使用。

三、shared_ptr、scoped_ptr、tuple

shared_ptr:引用計數。

scoped_ptr:不可轉移所有權。

Tuple:很多的時候我們需要為函數返回多個值,我們可以使用class或struct來封裝要返回的多個值,然後返回封裝struct或class,但是使用這種方法的弊端就是增加的程序的代碼量,最好是能通過一種匿名的struct或class來解決這個問題。Boost::tuple就為我們提供了一種類似於匿名struct的方法為我們解決函數的多個返回值的問題。既增強了代碼的可讀性又不增加代碼量。其實std::pair就是boost::tuple的2個參數的特例,對boost::tuple你可以綁定更多的參數,或者你可以迭代實現無限多參數的情況。

四、EnableSharedFromThis

使用boost庫時,經常會看到如下的類:

class A:public enable_share_from_this

在什麼情況下要使類A繼承enable_share_from_this?

使用場合 :當類A被share_ptr管理,且在類A的成員函數裡需要把當前類對象作為參數傳給其他函數時,就需要傳遞一個指向自身的share_ptr。

我們就使類A繼承enable_share_from_this,然後通過其成員函數 share_from_this()返回當指向自身的share_ptr。

以上有2個疑惑:

1.把當前類對象作為參數傳給其他函數時,為什麼要傳遞share_ptr呢?直接傳遞this指針不可以嗎?

一個裸指針傳遞給調用者,誰也不知道調用者會干什麼?假如調用者delete了該對象,而share_tr此時還指向該對象。

2.這樣傳遞share_ptr可以嗎?share_ptr< this >

這樣會造成2個非共享的share_ptr指向一個對象,最後造成2次析構該對象。

部分代碼:

 1 class tcp_connection
 2   : public boost::enable_shared_from_this
 3 {
 4 public:
 5   typedef boost::shared_ptr pointer;
 6 
 7   static pointer create(boost::asio::io_service& io_service)
 8   {
 9     return pointer(new tcp_connection(io_service));
10   }
11 
12   tcp::socket& socket()
13   {
14     return socket_;
15   }
16 
17   void start()
18   {
19     message_ = make_daytime_string();
20 
21     boost::asio::async_write(socket_, boost::asio::buffer(message_),
22         boost::bind(&tcp_connection::handle_write, shared_from_this(),
23           boost::asio::placeholders::error,
24           boost::asio::placeholders::bytes_transferred));
25   }
26 
27 private:
28   tcp_connection(boost::asio::io_service& io_service)
29     : socket_(io_service)
30   {
31   }
32 
33   void handle_write(const boost::system::error_code& /*error*/,
34       size_t /*bytes_transferred*/)
35   {
36   }
37 
38   tcp::socket socket_;
39   std::string message_;
40 };

類tcp_connection繼承enable_share_from_this,在22行裡,它的成員函數start(),通過share_from_this返回指向自身的share_ptr。

五、Future和Promise

5.1 簡介

Promise對象可保存T類型的值,該值可被future對象讀取(可能在另一個線程中),這是promise提供的同步的一種手段。

在構造promise時,promise對象可以與共享狀態關聯起來,這個共享狀態可以存儲一個T類型或者一個由std::exception派生出的類的值,並可以通過get_future來獲取與promise對象關聯的對象,調用該函數之後,兩個對象共享相同的共享狀態(shared state)

Promise對象是異步provider,它可以在某一時刻設置共享狀態的值。

Future對象可以返回共享狀態的值,或者在必要的情況下阻塞調用者並等待共享狀態標識變為ready,然後才能獲取共享狀態的值。

5.2 關系圖總覽

這裡寫圖片描述

5.3從使用入手學習

下面是使用future/promise的一個例子(目前僅討論串行調用的情況):

//服務對外接口,串行調用
taf::Int32 
AServantImp::queryResultSerial(const std::string& sIn, std::string &sOut, taf::JceCurrentPtr current)
{
    //設置異步回包
    current->setResponse(false);

    // 向服務B發送異步請求,返回值的類型是
    // promise::Future,
    // 意思就是服務B未來會返回一個string類型的數據
    promise::Future f = sendBReq(_pPrxB, sIn, current);

    // f調用其成員函數then,給未來要到達的string類型的
    // 返回結果設置一個處理函數
    // 在handleBRspAndSendCReq中獲取返回結果,
    // 並return sendCReq(),即f2,然後f2通過鏈式法則調用then
    f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current))
    .then(promise::bind(&handleCRspAndReturnClient, current));

    return 0;
}

promise::Future 
sendBReq(BServantPrx prx, const std::string& sIn, taf::JceCurrentPtr current)
{
    // 定義一個promise::Promise類型的變量promise,
    // 其目的是承諾會在promise裡面存放一個string類型的數據,
    // 然後把這個變量傳到BServantCallback對象中,
    // 然後發起異步調用
    // 最後返回promise.getFuture(),
    // 意思是promise承諾的string類型數據
    // 可以通過promise::Future類型的
    // promise.getFuture()來獲得

    promise::Promise promise;

    Test::BServantPrxCallbackPtr cb = new BServantCallback(current, promise);

    prx->async_queryResult(cb, sIn);

    return promise.getFuture();     //返回一個future給f
}

//////////////////////////////////////////////////////
promise::Future 
sendCReq(CServantPrx prx, const std::string& sIn, taf::JceCurrentPtr current)
{
    //這個跟sendBReq的意思類似
    //……
}

//////////////////////////////////////////////////////
promise::Future 
handleBRspAndSendCReq(CServantPrx prx, JceCurrentPtr current, const promise::Future& future)
{
    std::string sResult("");
    std::string sException("");
    try
    {
        //此行代碼被執行的時候,promie承諾給future的數據已經達到
        //達到的數據分兩種情況,一是正常的數據,即請求服務B的結果數據返回過來了,
        //那麼調用future.get()會得到這個數據
        //二是異常的數據,即請求服務B的結果數據沒有返回,比如異步調用超時了
        //那麼調用future.get()會拋出異常,所以需要try-catch一下

        sResult = future.get();
        return sendCReq(prx, sResult, current);
    }
    catch (exception& e)
    {
        TLOGDEBUG("Exception:" << e.what() << endl);
        sException = e.what();
    } 

    promise::Promise promise;
    promise.setValue(sException); 
    return promise.getFuture();
}

//////////////////////////////////////////////////////
int 
handleCRspAndReturnClient(JceCurrentPtr current, const promise::Future& future)
{
    int ret = 0;
    std::string sResult("");

    try
    {
        //與handleBRspAndSendCReq處理類似
        sResult = future.get();
    }
    catch (exception& e)
    {
        ret = -1;
        sResult = e.what();
        TLOGDEBUG("Exception:" << e.what() << endl);
    }

    //回包
    AServant::async_response_queryResultSerial(current, ret, sResult);
    return 0;
}

我們一步步看看發生了什麼吧~

5.4定義並初始化一個Future類型的變量

promise::Future f = sendBReq(_pPrxB, sIn, current); 

sendBReq通過promise.getFuture()返回了一個Future,用它來初始化f。

兩個問題:
1.promise.getFuture()是怎麼來的?
2.f是如何初始化的?

1.promise.getFuture()是怎麼來的?

promise::Promise promise;
Test::BServantPrxCallbackPtr cb = new BServantCallback(current, promise);
prx->async_queryResult(cb, sIn);
return promise.getFuture();     //返回一個future給f

promise內部有一個數據成員:

SharedPtr > m_future; 

該成員的默認構造:

Promise()
        : m_future(SharedPtr >(new detail::FutureObject()))
    {}

它使用了FutureObject來作為FutureObjectInterface的具體實現。

Promise的getFuture()方法用m_future構造了一個臨時對象Future< T >(該構造函數為private,因此需要Future將Promise設為友元)並返回,因此promise.getFuture()臨時對象中的m_future和promise中的m_future指向同一個FutureObject對象。

Future getFuture() const
{
    return Future (m_future);
}

2.f是如何初始化的?

Future< T >繼承自FutureBase< T >,繼承的數據成員:

typedef SharedPtr > FuturePtr;
FuturePtr m_future;

我們的目的就是用promise.getFuture()(一個匿名的臨時對象)的m_future來初始化f的m_future,使promise、promise.getFuture()和f的m_future均指向同一個對象,之後promise.getFuture()臨時對象析構,只剩下promise和f的m_future指向同一個對象,有了這個共享,我們就可以在promise中進行賦值(在BServantCallback中調用setValue進行賦值),而在f中進行讀取(通過f.get())!

Future< T >的3個public構造函數:

Future() {}

explicit Future(typename detail::FutureTraits::rvalue_source_type t)
: detail::FutureBase( SharedPtr >
(new detail::PromptFutureObject(t)) )
    {}

Future(const ExceptionPtr& e)
        : detail::FutureBase(e)
{}

對於第二個,由於T為string,detail::FutureTraits< T >::rvalue_source_type實際上就是 const std::string&。

從上面看,並沒有匹配的構造函數可用,且其父類也沒有拷貝構造函數,因此編譯器會進行成員逐個拷貝,最終將sendBReq中的m_future成員拷貝過來,該成員由shared_ptr進行管理,因此promise和f的m_future指向同一個對象的目的達到。

5.5綁定回調函數進行處理,處理完畢之後鏈式調用

當promise承諾的值設置好之後,需要回調函數進行處理。因此我們需要通過then來綁定回調函數。另外,為支持鏈式調用,then應該返回一個future,這個future一般是回調函數的返回值,在回調函數中通過promise.getFuture()來獲取。

Future< T >從FutureBase< T >繼承下來的成員函數:

get();
isDone();
hasValue();
hasException();
operator unspecified_bool_type() const; // Returns true if this future has been initialized.

以上函數都是轉調用m_future的相關函數,因此整個future的細節封裝在了FutureObject和PromptFutureObject當中,這裡先不深究。

Future< T >另外自己實現了一個函數then:

/**
* Register a callback which will be called once the future is satisfied. If an
* exception is thrown the callback will not be registered and then will not be called.
* 
* \throws std::bad_alloc if memory is unavailable.
*/
template 
Future::type> 
then(const Callback& callback) const
{
    typedef typename detail::resolved_type::type value_type;

    if (!this->m_future)
    {
        throwException(FutureUninitializedException(__FILE__, __LINE__));
    }

    Promise promise;

    this->m_future->registerCallback(
        bind(&detail::SequentialCallback::template run,
            owned(new detail::SequentialCallback(callback, promise))));

    return promise.getFuture();
}

該函數接受一個Callback對象作為參數,Callback封裝了一個函數,其返回值為R,參數為Future。

比如以下調用:

f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current));

其中handleBRspAndSendCReq的簽名如下:

promise::Future handleBRspAndSendCReq(CServantPrx prx, JceCurrentPtr current, const promise::Future& future);

bind綁定了函數handleBRspAndSendCReq的前兩個參數prx和current,剩下第三個參數future。

5.5.1 then的返回值類型

看完參數,我們來看看then的返回值類型:

Future::type> 

在上面例子中,handleBRspAndSendCReq的返回類型為promise::Future< std::string >,它被用來具現化then的模板參數R。

為什麼這裡的返回值類型不直接使用R,而要通過resolved_type來決議呢?

我們先看一下resolved_type的定義:

    template 
    struct resolved_type 
    {
        typedef T type;
    };

    template 
    struct resolved_type > 
    {
        typedef T type;
};

resolved_type< T >的type為T;
resolved_type< Future< T > >的type也為T。
無論是普通的T,還是Future< T >,通過resolved_type決議出的type成員都為T。

我們看以下另一個then的調用:

f.then(promise::bind(&handleCRspAndReturnClient, current));

其中handleCRspAndReturnClient的簽名如下:

int handleCRspAndReturnClient(JceCurrentPtr current, const promise::Future& future)

此時將用int來具現化模板參數R。

為了可進行鏈式調用,我們應該保證then返回的是一個Future,因此這時不能直接用R,而需要用Future< R >。如果R本身就是Future< T >,我們則可以通過resolved_type將T萃取出來。

5.5.2 then的函數體

首先明確下then的使命:
1.注冊一個回調函數,來處理就緒的future;
2.將回調函數的返回值帶回來,返回一個future給用戶做鏈式調用。

函數體內的操作:
1.首先保證m_future已經初始化。
2.定義了一個Promise變量:

Promise promise;

3.調用this->m_future->registerCallback()來注冊我們傳進來的callback函數,這裡使用了bind、SequentialCallback進行了包裝:

this->m_future->registerCallback(
    bind(&detail::SequentialCallback::template run,
             owned(new detail::SequentialCallback(callback, promise))));

4.返回promise的future:

return promise.getFuture();

具體的細節這裡暫不探討,目前只需要明白,這裡的promise承諾返回一個Future< value_type >而value_type是跟函數callback(比如handleBRspAndSendCReq)的返回值息息相關的(比如handleBRspAndSendCReq返回一個Future< std::string >,其value_type為string,handleCRspAndReturnClient返回一個int,其value_type為int)。

Promise和SequentialCallback的作用就是把callback的返回值給帶回來,最終返回給用戶來做鏈式調用。

2016/9/15更新:

帶回callback的返回值

現在簡單說下在then中,如何將callback的返回值帶回來並生成一個future返回:

// then()中的代碼片段
Promise promise;

this->m_future->registerCallback(bind(
&detail::SequentialCallback::template run,
owned(new detail::SequentialCallback(callback, promise))));

return promise.getFuture();

這裡多引入了一層promise,該promise**承諾帶回函數callback()的返回值**(如果該返回值是一個future,則還是帶回一個future,如果不是future,比如int,則帶回一個future< int >)。

可以看到,bind綁定的是SequentialCallback的成員函數run(template表示這是一個模板)。
通過第二個參數的SequentialCallback對象進行調用,該對象封裝了callback和一個promise。

當上一層的future就緒時,會調用回調函數,此時調用的是SequentialCallback 的run函數,而不是真正的callback。在run函數中,再調用真正的callback進行處理,並將callback的返回值設置到SequentialCallback對象的promise當中,而then返回的正是這個promise關聯的future。

因此,通過一層間接的future/promise,then成功地返回了callback的返回值的一個future。

Run有多個重載版本:

// For callback which returns void.
template 
typename enable_if >::type run(const FuturePtr& future)
{
    try 
    {
        m_callback(future);
        m_promise.set();
    } 
    catch (...) 
    {
        m_promise.setException(currentException());
    }
}

當callback返回值為void時,無需返回值,所以promise調用空的set。

// For callback which returns non-void non-future type
template 
typename enable_if_c::value && !is_future_type::value>::type
run(const FuturePtr& future)
{
    try 
    {
        m_promise.setValue(m_callback(future));
    } 
    catch (...) 
    {
        m_promise.setException(currentException());
    }
}

當callback返回值為非void且非future時,調用m_callback並將返回值設置到promise中,於是該值可以通過promise.getFuture().get()來獲取。

// For callback which returns future type.
template 
typename enable_if >::type run(const FuturePtr& future)
{
    try 
    {
        m_callback(future).then(
            bind(&ForwardValue::template run,
                owned(new ForwardValue(m_promise))));
    } 
    catch (...) 
    {
        m_promise.setException(currentException());
    }
}

當callback返回值為future時,則對該future使用then綁定到ForwardValue的run函數當中(類似的手法),run函數中再通過get()方法把future內的值取出來,並設置到then中定義的promise當中,於是then的返回值便以一個future的形式存放著callback的返回值。

至於如何對類型進行判斷,當然是利器traits技法,這裡不再展開。

5.5.3 then函數總結

我們重新看一下下面這個例子,理清這兩句代碼包含的內容:

promise::Future f = sendBReq(_pPrxB, sIn, current);
f.then(promise::bind(&handleBRspAndSendCReq,_pPrxC,current))
.then(promise::bind(&handleCRspAndReturnClient, current));

首先,我們在sendBReq中定義了一個Promise,並把它綁定到BServantCallback中。

當異步調用回包時,將回調BServantCallback,在裡面調用Promise的setValue進行賦值。

賦值完畢之後,將調用通過then綁定的回調handleBRspAndSendCReq來處理。

由於我們通過promise.getFuture()使得f和promise的m_future**指向了同一個對象**,所以我們在回調handleBRspAndSendCReq中可以通過f.get()來讀取該值。

f.get()只是完成了handleBRsp部分,在SendCReq的時候,類似於sendBReq,我又定義了一個Promise,我們需要把與之關聯的future(通過promise.getFuture()獲取)作為handleBRspAndSendCReq的返回值,並通過then中的Promise和SequentialCallback將這個future返回給用戶,從而用戶可以繼續調用then來指定handle。

總結:then幫我們把handle回調函數注冊到future當中,當future可讀時,將調用該handle進行處理,then還為我們把handle的返回值帶回來,以供鏈式調用。

5.6 future和promise的關系梳理

兩者都有一個m_future成員,其類型為

SharedPtr >

這裡寫圖片描述

由於為Future< T >需要針對void進行特化,為避免過多重復的代碼,把與特化無關的部分抽離出來形成FutureBase作為基類。

從上圖可以看出,Future和Promise均持有m_future,兩者正是通過該對象進行共享、關聯的(通過promise.getFuture()實現)。其中Promise對外提供了對m_future的set(寫)接口,而Future對外提供了m_future的get(讀)接口。

5.7 FutureObjectInterface的實現

下圖展示了FutureObjectInterface的具體實現:

這裡寫圖片描述

可以看到,FutureObjectInterface有FutureObject和PromptFutureObject兩種實現。

Promise的FutureObjectInterface 是一個FutureObject。

Future的FutureObjectInterface有兩種情況:直接用一個值來構造Future時(比如調用makeFuture來獲取一個future)用的是PromptFutureObject,而其他情況(比如通過Promise獲得的future)用的是FutureObject。

那麼,兩者有何區別呢?

對於第一個應用場景,future不是通過promise來獲取的,而是直接用一個立即數構造:<喎?/kf/ware/vc/" target="_blank" class="keylink">vcD4NCjxwcmUgY2xhc3M9"brush:java;"> explicit Future(typename detail::FutureTraits::rvalue_source_type t) : detail::FutureBase(SharedPtr > (new detail::PromptFutureObject(t))) {}

比如下面這個應用場景:

Future< int > hsF = makeFuture(-1);    //使用立即數構造
if (openSwitch)
{
    // sendBReq中進行異步調用,並通過promise.getFuture()返回一個future
    hsF = sendBReq();
}
// …

在handle中,我們可以通過以下判斷來決定是否處理:

int result = hsF.get();
if (result != -1)
{
    // handle
}

立即數構造這種用法,由於其值已經設定了,不需要等待promise填值進去,因此該future內部的PromptFutureObject是只讀的,也就不需要加鎖。如果還是使用FutureObject這個版本,將會在加鎖解鎖上做無用功。因此PromptFutureObject是針對這種場景進行優化的。

而當Future 與Promise共享同一個m_future時,由於Future和Promise可能在不同線程中,因此可能同時讀寫,這裡存在race condition,因此需要加鎖。FutureObject正是一個加鎖的版本。

關於FutureObject,有幾個需要注意的點:
1.在一開始介紹的時候我們說過,Future可以獲取共享狀態的值(通過get()方法),在必要的情況下阻塞調用者並等待共享狀態標識變為ready,然後才能獲取共享狀態的值。
這裡寫圖片描述
這裡寫圖片描述

2.setValue只能被調用一次,即共享狀態的值只能設置一次,如果試圖設置第二次,將拋出異常。
這裡寫圖片描述

3.在registerCallback時,根據m_is_done來判斷是否已經setValue,如果m_is_done為true,則直接調用callback(this->sharedFromThis())來處理,否則將callback加入m_pending_callbacks列表中,等待setValue之後調用。在setValue中,除了設置值之外,還會調用doPendingCallbacks()函數,在該函數中逐個調用m_pending_callbacks列表中的callback。

最後,關於when_all的實現(並行異步調用),後續有時間再補充~

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved