程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> 關於C語言 >> C高級 跨平台協程庫,程庫

C高級 跨平台協程庫,程庫

編輯:關於C語言

C高級 跨平台協程庫,程庫


1.0 協程庫引言

  協程對於上層語言還是比較常見的. 例如C# 中 yield retrun, lua 中 coroutine.yield 等來構建同步並發的程序.

本文就是探討如何從底層實現開發級別的協程庫. 在說協程之前, 簡單溫故一下進程和纖程關系.

進程擁有一個完整的虛擬地址空間,不依賴於線程而獨立存在. 線程是進程的一部分,沒有自己的地址空間,

與進程內的其他線程一起共享分配給該進程的所有資源。進程和纖程是1對多關系, 協程同線程關系也是類似.

一個線程中可以有多個協程. 協程同線程相比區別再於, 線程是操作系統控制調度(異步並發),

而纖程是程序自身控制調度(同步串行). 簡單總結協程特性如下:

  1. 相比線程具有更優的性能(假定, 程序寫的沒有明顯失誤) , 省略了操作系統的切換操作

  2. 相比線程具有更少的內存空間, 線程是操作系統對象很耗資源, 協程是用戶態資源, 占用系統層資源很少.

  3. 對比線程開發, 邏輯結構更復雜, 需要開發人員了解程序運行走向.

舉個例子 數碼寶貝例子 : 滾球獸 ->  亞古獸->  暴龍獸->  機械暴龍獸 -> 戰斗暴龍獸

'類比協程進化史' if .. else / switch -> goto -> setjmp / logjump -> coroutine -> .......

協程開發是串行程序開發中構建異步效果的開發模型.

本文參照博文和資料記錄

  C 的 coroutine 庫 : http://blog.codingnow.com/2012/07/c_coroutine.html

  纖程 : http://blog.codingnow.com/2005/10/fiber.html

  cloudwu/coroutine :  https://github.com/cloudwu/coroutine

這裡補充說明一下, 為什麼需要再造輪子. 也是有''歷史''原因額. 有一個騰訊寫的libco協程庫, 但是用的是匯編加cpp混編的.

而雲風的coroutine是運行在linux 和 mac OS上的, window上沒法跑. 因此需要一個支持linux 加 window上純c運行的庫.

這就是設計這個庫的歷史原因. 主要思想還是參照雲風關於協程的理解, 我只是有幸站在絕頂高手的腳底下, 興風作浪~~~~

   一流高手和絕頂高手的差距在哪裡? https://www.zhihu.com/question/43704220

 

2.0 協程庫操作系統相關知識儲備

2.1 window fiber 儲備

  window fiber也叫纖程. 官方說明是 "Microsoft公司給Windows添加了一種纖程,以便能夠非常容易地將現有的UNIX服務器應用程序移植到Windows中".

這就是纖程概念的由來.

  window核心編程中關於fiber介紹 http://www.cnblogs.com/wz19860913/archive/2008/08/26/1276816.html

  Microsoft fiber desc  https://msdn.microsoft.com/en-us/library/windows/desktop/ms682661(v=vs.85).aspx

而我們這裡會詳細解釋其中關於window fiber常用api. 先浏覽關於當前線程開啟纖程相關接口說明.

//
// Fiber creation flags
//
#define FIBER_FLAG_FLOAT_SWITCH 0x1     // context switch floating point

/*
 * VS編譯器特性約定
 * 1. 其參數都是從右向左通過堆棧傳遞的
 * 2. 函數調用在返回前要由被調用者清理堆棧(被調用函數彈出的時候銷毀堆棧)
 */
#define WINAPI      __stdcall

/*
 * 將當前線程轉成纖程, 返回轉換成功的主纖程對象域
 * lpParameter    : 轉換的時候傳入到主線程中用戶數據
 * dwFlags        : 附加參數, 默認填寫 FIBER_FLAG_FLOAT_SWITCH
 *                : 返回轉換成功後的主纖程對象域
 */
WINBASEAPI __out_opt LPVOID WINAPI ConvertThreadToFiberEx(
    __in_opt LPVOID lpParameter,
    __in DWORD dwFlags
);

// 得到當前纖程中用戶傳入的數據, 就是上面 lpParameter
__inline PVOID GetFiberData( void )    { return *(PVOID *) (ULONG_PTR) __readfsdword (0x10);}

// 得到當前運行纖程對象
__inline PVOID GetCurrentFiber( void ) { return (PVOID) (ULONG_PTR) __readfsdword (0x10);}
                                                          
/*
 * 將當前纖程轉換成線程, 對映ConvertThreadToFiberEx操作系列函數. 返回原始環境
 *                : 返回成功狀態, TRUE標識成功
 */
WINBASEAPI BOOL WINAPI ConvertFiberToThread(VOID);

 

下面是關於如何創建纖程並切換(啟動)官方接口說明.

// 標識纖程執行體的注冊函數聲明, lpFiberParameter 可以通過 GetFiberData 得到
typedef VOID (WINAPI *PFIBER_START_ROUTINE)(LPVOID lpFiberParameter);
typedef PFIBER_START_ROUTINE LPFIBER_START_ROUTINE;

/*
 * 創建一個沒有啟動纖程對象並返回
 * dwStackCommitSize    : 當前纖程棧大小, 0標識默認大小
 * dwStackReserveSize    : 當前纖程初始化化保留大小, 0標識默認大小
 * dwFlags                : 纖程創建狀態, 默認FIBER_FLAG_FLOAT_SWITCH, 支持浮點數操作
 * lpStartAddress        : 指定纖程運行的載體.等同於纖程執行需要指明執行函數
 * lpParameter            : 纖程執行的時候, 傳入的用戶數據, 在纖程中GetFiberData可以得到
 *                        : 返回創建好的纖程對象 
 */                                              
WINBASEAPI __out_opt LPVOID WINAPI CreateFiberEx(
    __in     SIZE_T dwStackCommitSize,
    __in     SIZE_T dwStackReserveSize,
    __in     DWORD dwFlags,
    __in     LPFIBER_START_ROUTINE lpStartAddress,
    __in_opt LPVOID lpParameter
);

// 銷毀一個申請的纖程資源和CreateFiberEx成對出現
WINBASEAPI VOID WINAPI DeleteFiber(__in LPVOID lpFiber);

// 纖程跳轉, 跳轉到lpFiber指定的纖程
WINBASEAPI VOID WINAPI SwitchToFiber(__in LPVOID lpFiber);

 

 我們通過上面api 寫一個基礎的演示demo , fiber_handle.c,  實踐能補充猜想.

#include <Windows.h>
#include <stdio.h>
#include <stdlib.h>

// fiber one run
static void WINAPI _fiber_one_run(LPVOID pars) {
    LPVOID * fibers = pars;
    puts("_fiber_one_run start");
    
    fibers[1] = GetCurrentFiber();
    // 切換到主纖程中
    SwitchToFiber(fibers[0]);

    puts("_fiber_one_run end");
    SwitchToFiber(fibers[0]);
}

/*
 * test 纖程練習
 */
int main(int argc, char * argv[]) {
    PVOID fibers[2];
    // A pointer to a variable that is passed to the fiber. The fiber can retrieve this data by using the GetFiberData macro.
    fibers[0] = ConvertThreadToFiberEx(NULL, FIBER_FLAG_FLOAT_SWITCH);
    
    // 創建普通纖程, 當前還是在主纖程中
    fibers[1] = CreateFiberEx(0, 0, FIBER_FLAG_FLOAT_SWITCH, _fiber_one_run, fibers);
    
    puts("main ConvertThreadToFiberEx start");
    SwitchToFiber(fibers[1]);
    
    puts("main ConvertThreadToFiber SwitchToFiber");
    SwitchToFiber(fibers[1]);
    puts("main ConvertThreadToFiber SwitchToFiber two");

    DeleteFiber(fibers[1]);
    ConvertFiberToThread();
    puts("main ConvertThreadToFiber SwitchToFiber two end");

    return 0;
}

 示例演示結果

到這兒關於window 纖程部分儲備完畢.

自己看一遍, 練習一遍, 基本上就能熟練掌握window fiber 對象了. 哎, 如果人如何NB. 我的猜測是

  遇到更NB人 && 不懶

 

2.2 linux ucontext 儲備

  同樣對於linux, 同樣有一套機制ucp, 上下文記錄機制. 翻譯了其中用的api 

#include <ucontext.h>

/*
 * 得到當前程序運行此處上下文信息
 * ucp        : 返回當前程序上下文並保存在ucp指向的內存中
 *            : -1標識失敗, 0標識成功
 */
int getcontext(ucontext_t * ucp);

/*
 * 設置到執行程序上下文對象中. 
 * ucp        : 准備跳轉的上下文對象
 *            : 失敗返回-1. 成功不返回
 */
int setcontext(const ucontext_t * ucp);

/*
 * 重新設置ucp上下文. 
 * ucp        : 待設置的上下文對象
 * func        : 新上下文執行函數體, 其實gcc認為聲明是void * func(void)
 * argc        : func 函數參數個數
 * ...        : 傳入func中的參數
 */
void makecontext(ucontext_t * ucp, void * func(), int argc, ...);

/*
 * 保存當前上下文對象 oucp, 並且跳轉到執行上下文件對象 ucp 中
 * oucp        : 保存當前上下文對象
 * ucp        : 執行的上下文對象
 *            : 失敗返回-1, 成功不返回
 */
int swapcontext (ucontext_t * oucp, ucontext_t * ucp);

 

相比window fiber確實很清爽. 擴充一下, 關於ucontext_t 一種結構實現

/* Userlevel context.  */
typedef struct ucontext {
     unsigned long int uc_flags;
     struct ucontext * uc_link;                // 下一個執行的序列, NULL不繼續執行了
     stack_t uc_stack;                         // 當前上下文, 堆棧信息
     mcontext_t uc_mcontext;
     __sigset_t uc_sigmask;
    struct _libc_fpstate __fpregs_mem;
} ucontext_t;

/* Alternate, preferred interface.  */
typedef struct sigaltstack {
    void * ss_sp;                             // 指向當前堆棧信息首地址
    int ss_flags;
    size_t ss_size;                           // 當前堆棧大小
} stack_t;

上面加了中文注釋的部分, 就是我們開發中需要用到的幾個字段. 設置執行順序, 指定當前上下文堆棧信息.

有了這些知識, 我們在linux上練練上, 采用官方 man 手冊中提供的一段代碼, 演示一下結果. ucontext_demo.c 

#include <ucontext.h>
#include <stdio.h>
#include <stdlib.h>

static ucontext_t uctx_main, uctx_func1, uctx_func2;

#define handle_error(msg) \
    do { perror(msg); exit(EXIT_FAILURE); } while (0)

static void _func1(void) {
    printf("func1: started\n");
    printf("func1: swapcontext(&uctx_func1, &uctx_func2)\n");
    if (swapcontext(&uctx_func1, &uctx_func2) == -1)
        handle_error("swapcontext");
    printf("func1: returning\n");
}

static void _func2(void) {
    printf("func2: started\n");
    printf("func2: swapcontext(&uctx_func2, &uctx_func1)\n");
    if (swapcontext(&uctx_func2, &uctx_func1) == -1)
        handle_error("swapcontext");
    printf("func2: returning\n");
}

int main(int argc, char * argv[]) {
    char func1_stack[16384];
    char func2_stack[16384];

    if (getcontext(&uctx_func1) == -1)
        handle_error("getcontext");
    uctx_func1.uc_stack.ss_sp = func1_stack;
    uctx_func1.uc_stack.ss_size = sizeof(func1_stack);
    uctx_func1.uc_link = &uctx_main;
    makecontext(&uctx_func1, _func1, 0);

    if (getcontext(&uctx_func2) == -1)
        handle_error("getcontext");
    uctx_func2.uc_stack.ss_sp = func2_stack;
    uctx_func2.uc_stack.ss_size = sizeof(func2_stack);
    uctx_func2.uc_link = &uctx_func1;
    makecontext(&uctx_func2, _func2, 0);

    printf("main: swapcontext(&uctx_main, &uctx_func2)\n");
    if (swapcontext(&uctx_main, &uctx_func2) == -1)
        handle_error("swapcontext");

    printf("main: exiting\n");
    return 0;
}

參照下面編譯操作

run 結果

通過上練test, 對於linux ucontext api 基本全部熟悉了.

上面代碼埋了一個小坑, _func1, _func2都沒有傳參, 大家試試為上面函數傳參結果會如何, x86和x64都試試.

恭喜, 到這裡基本上操作系統提供上下文切換(高級 longjmp/setjmp)知識點都儲備完畢, 後面就可以不用看了.

 

3.0 協程庫封裝

3.1 協程庫統一接口封裝

  備注 : 協程,纖程,上下文 認為是一個概念.

  到這裡基本上就是開發級別封裝庫了, 還是存在相當大含金量的. 先提供統一接口 coroutine.h 

#ifndef _H_COROUTINE
#define _H_COROUTINE

typedef enum costatus {       // 纖程存在狀態
    CS_Dead       = 0,        // 纖程死亡狀態
    CS_Ready      = 1,        // 纖程已經就緒
    CS_Running    = 2,        // 纖程正在運行
    CS_Suspend    = 3,        // 纖程暫停等待
} costatus_e;

typedef struct comng * comng_t;

/*
 * 創建運行纖程的主體, 等同於纖程創建需要執行的函數體.
 * schedule : co_start 函數返回的結果
 * ud       : 用戶自定義數據
 */
typedef void (* co_f)(comng_t comng, void * ud);

/*
 * 開啟纖程系統, 並創建主纖程
 *            : 返回開啟的纖程調度系統管理器
 */
extern comng_t co_start(void);

/*
 * 關閉開啟的纖程系統
 * comng    : co_start 返回的纖程管理器
 */
extern void co_close(comng_t comng);

/*
 * 創建一個纖程對象,並返回創建纖程的id. 創建好的纖程狀態是CS_Ready
 * comng    : co_start 返回的纖程管理器
 * func     : 纖程運行的主體
 * ud       : 用戶傳入的數據, co_f 中 ud 會使用
 *          : 返回創建好的纖程標識id
 */
extern int co_create(comng_t comng, co_f func, void * ud);

/*
 * 激活創建的纖程對象.
 * comng    : 纖程管理器對象
 * id       : co_create 創建的纖程對象
 */
extern void co_resume(comng_t comng, int id);

/*
 * 中斷當前運行的的纖程, 並將CPU交給主纖程處理調度.
 * comng    : 纖程管理器對象
 */
extern void co_yield(comng_t comng);

/*
 * 得到當前纖程運行的狀態
 * comng    : 纖程管理器對象
 * id       : co_create 創建的纖程對象
 *          : 返回狀態具體參照 costatus_e
 */
extern costatus_e co_status(comng_t comng, int id);

/*
 * 得到當前纖程系統中運行的纖程, 返回 < 0表示沒有纖程在運行
 * comng    : 纖程管理器對象
 *          : 返回當前運行的纖程標識id, 
 */
extern int co_running(comng_t comng);

#endif // !_H_COROUTINE

核心思路是

  co_create   -> CS_Ready

  co_resume   -> CS_Running

  co_yield   -> CS_Suspend

纖程運行完畢就是 CS_Dead. 主協程默認一直運行不參與狀態變化中. 協調控制所有子協程.

這裡我們先入為主的給出一個演示內容 main.c 

#include <stdio.h>
#include "coroutine.h"

struct args {
    int n;
};

static void _foo(void * comng, void * ud) {
    struct args * arg = ud;
    int start = arg->n;
    int i;
    for (i = 0;i<5;i++) {
        printf("coroutine %d : %d\n", co_running(comng), start + i);
        co_yield(comng);
    }
}

static void _test(void * comng) {
    struct args arg1 = { 0 };
    struct args arg2 = { 100 };

    int co1 = co_create(comng, _foo, &arg1);
    int co2 = co_create(comng, _foo, &arg2);
    printf("main start\n");
    while (co_status(comng, co1) && co_status(comng, co2)) {
        co_resume(comng, co1);
        co_resume(comng, co2);
    }
    printf("main end\n");
}

/*
 * test coroutine demo
 */
int main(int argc, char * argv[]) {
    void * comng = co_start();
    _test(comng);
    co_close(comng);

    return 0;
}

演示結果

同樣在window 上演示結果 也是如此

協程總的邏輯就是, 得到資源運行, 阻塞, 其它協程得到資源運行 這種定向跳轉. 關於協程設計的總方針就是以上那些.

 

3.2 window實現封裝

   coroutine-window.c 

#include "coroutine.h"
#include <Windows.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>

// 纖程棧大小
#define _INT_STACK        (1024 * 1024)
// 默認初始化創建纖程數目
#define _INT_COROUTINE    (16)

/*
 * 單個纖程單元 coroutine , 還有纖程集管理器 comng
 */
struct coroutine;

struct comng {
    PVOID main;                 // 纖程管理器中保存的臨時纖程對象
    int running;                // 當前纖程管理器中運行的纖程id
    int nco;                    // 當前纖程集輪詢中當前索引
    int cap;                    // 纖程集容量,
    struct coroutine ** co;     // 保存的纖程集
};

struct coroutine {
    PVOID ctx;                    // 操作系統纖程對象                
    co_f func;                    // 纖程執行的函數體
    void * ud;                    // 纖程執行的額外參數
    costatus_e status;            // 當前纖程運行狀態
    struct comng * comng;         // 當前纖程集管理器
};

/*
 * 開啟纖程系統, 並創建主纖程
 *            : 返回開啟的纖程調度系統管理器
 */
inline comng_t
co_start(void) {
    struct comng * comng = malloc(sizeof(struct comng));
    assert(NULL != comng);
    comng->nco = 0;
    comng->running = -1;
    comng->co = calloc(comng->cap = _INT_COROUTINE, sizeof(struct coroutine *));
    assert(NULL != comng->co);
    // 開啟Window協程
    comng->main = ConvertThreadToFiberEx(NULL, FIBER_FLAG_FLOAT_SWITCH);
    return comng;
}

// 銷毀一個纖程
static inline void _co_delete(struct coroutine * co) {
    DeleteFiber(co->ctx);
    free(co);
}

/*
 * 關閉開啟的纖程系統
 * comng    : co_start 返回的纖程管理器
 */
void 
co_close(comng_t comng) {
    int i;
    for (i = 0; i < comng->cap; ++i) {
        struct coroutine * co = comng->co[i];
        if (co) {
            _co_delete(co);
            comng->co[i] = NULL;
        }
    }
    free(comng->co);
    comng->co = NULL;
    free(comng);
    ConvertFiberToThread();
}

// 創建一個纖程對象
static inline struct coroutine * _co_new(comng_t comng, co_f func, void * ud) {
    struct coroutine * co = malloc(sizeof(struct coroutine));
    assert(co && comng && func);
    co->func = func;
    co->ud = ud;
    co->comng = comng;
    co->status = CS_Ready;
    return co;
}

/*
 * 創建一個纖程對象,並返回創建纖程的id. 創建好的纖程狀態是CS_Ready
 * comng    : co_start 返回的纖程管理器
 * func     : 纖程運行的主體
 * ud       : 用戶傳入的數據, co_f 中 ud 會使用
 *          : 返回創建好的纖程標識id
 */
int 
co_create(comng_t comng, co_f func, void * ud) {
    struct coroutine * co = _co_new(comng, func, ud);
    // 下面是普通情況, 可以找見
    if (comng->nco < comng->cap) {
        int i;
        for (i = 0; i < comng->cap; ++i) {
            int id = (i + comng->nco) % comng->cap;
            if (NULL == comng->co[id]) {
                comng->co[id] = co;
                ++comng->nco;
                return id;
            }
        }
        assert(i == comng->cap);
        return -1;
    }

    // 需要重新分配空間, 構造完畢後返回
    comng->co = realloc(comng->co, sizeof(struct coroutine *) * comng->cap * 2);
    assert(NULL != comng->co);
    memset(comng->co + comng->cap, 0, sizeof(struct coroutine *) * comng->cap);
    comng->cap <<= 1;
    comng->co[comng->nco] = co;
    return comng->nco++;
}

static inline VOID WINAPI _comain(LPVOID ptr) {
    struct comng * comng = ptr;
    int id = comng->running;
    struct coroutine * co = comng->co[id];
    co->func(comng, co->ud);
    _co_delete(co);
    comng->co[id] = NULL;
    --comng->nco;
    comng->running = -1;
}

/*
 * 激活創建的纖程對象.
 * comng    : 纖程管理器對象
 * id       : co_create 創建的纖程對象
 */
void 
co_resume(comng_t comng, int id) {
    struct coroutine * co;
    assert(comng->running == -1 && id >= 0 && id < comng->cap);
    co = comng->co[id];
    if(NULL == co || co->status == CS_Dead)
        return;
    switch(co->status) {
    case CS_Ready:
        comng->running = id;
        co->status = CS_Running;
        co->ctx = CreateFiberEx(_INT_STACK, 0, FIBER_FLAG_FLOAT_SWITCH, _comain, comng);
        comng->main = GetCurrentFiber();
        SwitchToFiber(co->ctx);
        break;
    case CS_Suspend:
        comng->running = id;
        co->status = CS_Running;
        comng->main = GetCurrentFiber();
        SwitchToFiber(co->ctx);
        break;
    default:
        assert(0);
    }
}

/*
 * 中斷當前運行的的纖程, 並將CPU交給主纖程處理調度.
 * comng    : 纖程管理器對象
 */
inline void 
co_yield(comng_t comng) {
    struct coroutine * co;
    int id = comng->running;
    assert(id >= 0);
    co = comng->co[id];
    co->status = CS_Suspend;
    comng->running = -1;
    co->ctx = GetCurrentFiber();
    SwitchToFiber(comng->main);
}

/*
 * 得到當前纖程運行的狀態
 * comng    : 纖程管理器對象
 * id       : co_create 創建的纖程對象
 *          : 返回狀態具體參照 costatus_e
 */
inline costatus_e 
co_status(comng_t comng, int id) {
    assert(comng && id >=0 && id < comng->cap);
    return comng->co[id] ? comng->co[id]->status : CS_Dead;
}

/*
 * 得到當前纖程系統中運行的纖程, 返回 < 0表示沒有纖程在運行
 * comng    : 纖程管理器對象
 *          : 返回當前運行的纖程標識id,
 */
inline int 
co_running(comng_t comng) {
    return comng->running;
}

 

實現是非常四平八穩,  利用

struct comng {
    PVOID main;                 // 纖程管理器中保存的臨時纖程對象
    int running;                // 當前纖程管理器中運行的纖程id
    int nco;                    // 當前纖程集輪詢中當前索引
    int cap;                    // 纖程集容量,
    struct coroutine ** co;     // 保存的纖程集
};

comng :: co 中保存所有的協程對象, 不夠就realloc, 夠直接返回. 其中查詢不是用的協程對象思路就是, 循環查找.

協程之間的跳轉采用 先記錄當前環境, 後跳轉思路

    co->ctx = GetCurrentFiber();
    SwitchToFiber(comng->main);

思路還是主要參照雲風大仙的, 實現起來還是很直白小巧的. 容易理解, 極力歡迎嘗試. 寫起來還是很爽的, 抄起來提高很快.

 

3.3 linux實現封裝

   coroutine-linux.c 

#include "coroutine.h"
#include <ucontext.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include <stddef.h>
#include <stdint.h>

// 纖程棧大小
#define _INT_STACK        (1024 * 1024)
// 默認初始化創建纖程數目
#define _INT_COROUTINE    (16)

/*
 * 單個纖程單元 coroutine , 還有纖程集管理器 comng
 */
struct coroutine;

struct comng {
    char stack[_INT_STACK];
    ucontext_t main;            // 纖程管理器中保存的臨時纖程對象
    int running;                // 當前纖程管理器中運行的纖程id
    int nco;                    // 當前纖程集輪詢中當前索引
    int cap;                    // 纖程集容量,
    struct coroutine ** co;     // 保存的纖程集
};

struct coroutine {
    char * stack;
    ucontext_t ctx;               // 操作系統纖程對象
    ptrdiff_t cap;
    ptrdiff_t size;                
    co_f func;                    // 纖程執行的函數體
    void * ud;                    // 纖程執行的額外參數
    costatus_e status;            // 當前纖程運行狀態
    struct comng * comng;         // 當前纖程集管理器
};

/*
 * 開啟纖程系統, 並創建主纖程
 *            : 返回開啟的纖程調度系統管理器
 */
inline comng_t
co_start(void) {
    struct comng * comng = malloc(sizeof(struct comng));
    assert(NULL != comng);
    comng->nco = 0;
    comng->running = -1;
    comng->co = calloc(comng->cap = _INT_COROUTINE, sizeof(struct coroutine *));
    assert(NULL != comng->co);
    return comng;
}

// 銷毀一個纖程
static inline void _co_delete(struct coroutine * co) {
    free(co->stack);
    free(co);
}

/*
 * 關閉開啟的纖程系統
 * comng    : co_start 返回的纖程管理器
 */
void 
co_close(comng_t comng) {
    int i;
    for (i = 0; i < comng->cap; ++i) {
        struct coroutine * co = comng->co[i];
        if (co) {
            _co_delete(co);
            comng->co[i] = NULL;
        }
    }
    free(comng->co);
    comng->co = NULL;
    free(comng);
}

// 創建一個纖程對象
static inline struct coroutine * _co_new(comng_t comng, co_f func, void * ud) {
    struct coroutine * co = malloc(sizeof(struct coroutine));
    assert(co && comng && func);
    co->func = func;
    co->ud = ud;
    co->comng = comng;
    co->status = CS_Ready;
    co->cap = 0;
    co->size = 0;
    co->stack = NULL;
    return co;
}

/*
 * 創建一個纖程對象,並返回創建纖程的id. 創建好的纖程狀態是CS_Ready
 * comng    : co_start 返回的纖程管理器
 * func     : 纖程運行的主體
 * ud       : 用戶傳入的數據, co_f 中 ud 會使用
 *          : 返回創建好的纖程標識id
 */
int 
co_create(comng_t comng, co_f func, void * ud) {
    struct coroutine * co = _co_new(comng, func, ud);
    // 下面是普通情況, 可以找見
    if (comng->nco < comng->cap) {
        int i;
        for (i = 0; i < comng->cap; ++i) {
            int id = (i + comng->nco) % comng->cap;
            if (NULL == comng->co[id]) {
                comng->co[id] = co;
                ++comng->nco;
                return id;
            }
        }
        assert(i == comng->cap);
        return -1;
    }

    // 需要重新分配空間, 構造完畢後返回
    comng->co = realloc(comng->co, sizeof(struct coroutine *) * comng->cap * 2);
    assert(NULL != comng->co);
    memset(comng->co + comng->cap, 0, sizeof(struct coroutine *) * comng->cap);
    comng->cap <<= 1;
    comng->co[comng->nco] = co;
    return comng->nco++;
}

static inline void _comain(uint32_t low32, uint32_t hig32) {
    uintptr_t ptr = (uintptr_t)low32 | ((uintptr_t)hig32 << 32);
    struct comng * comng = (struct comng *)ptr;
    int id = comng->running;
    struct coroutine * co = comng->co[id];
    co->func(comng, co->ud);
    _co_delete(co);
    comng->co[id] = NULL;
    --comng->nco;
    comng->running = -1;
}

/*
 * 激活創建的纖程對象.
 * comng    : 纖程管理器對象
 * id       : co_create 創建的纖程對象
 */
void 
co_resume(comng_t comng, int id) {
    struct coroutine * co;
    uintptr_t ptr;
    assert(comng->running == -1 && id >= 0 && id < comng->cap);
    co = comng->co[id];
    if(NULL == co || co->status == CS_Dead)
        return;
    switch(co->status) {
    case CS_Ready:
        comng->running = id;
        co->status = CS_Running;
        getcontext(&co->ctx);
        co->ctx.uc_stack.ss_sp = comng->stack;
        co->ctx.uc_stack.ss_size = _INT_STACK;
        co->ctx.uc_link = &comng->main;
        ptr = (uintptr_t)comng;
        makecontext(&co->ctx, (void (*)())_comain, 2, (uint32_t)ptr, (uint32_t)(ptr >> 32));
        swapcontext(&comng->main, &co->ctx);
        break;
    case CS_Suspend:
        comng->running = id;
        co->status = CS_Running;
        // stack add is high -> low
        memcpy(comng->stack + _INT_STACK - co->size, co->stack, co->size);
        swapcontext(&comng->main, &co->ctx);
        break;
    default:
        assert(0);
    }
}

// 保存當前運行的堆棧信息
static void _save_stack(struct coroutine * co, char * top) {
    char dummy = 0;
    assert(top - &dummy <= _INT_STACK);
    if(co->cap < top - &dummy) {
        free(co->stack);
        co->cap = top - &dummy;
        co->stack = malloc(co->cap);
        assert(co->stack);
    }
    co->size = top - &dummy;
    memcpy(co->stack, &dummy, co->size);
}

/*
 * 中斷當前運行的的纖程, 並將CPU交給主纖程處理調度.
 * comng    : 纖程管理器對象
 */
inline void 
co_yield(comng_t comng) {
    struct coroutine * co;
    int id = comng->running;
    assert(id >= 0);
    co = comng->co[id];
    assert((char *)&co > comng->stack);
    _save_stack(co, comng->stack + _INT_STACK);
    co->status = CS_Suspend;
    comng->running = -1;
    swapcontext(&co->ctx, &comng->main);
}

/*
 * 得到當前纖程運行的狀態
 * comng    : 纖程管理器對象
 * id       : co_create 創建的纖程對象
 *          : 返回狀態具體參照 costatus_e
 */
inline costatus_e 
co_status(comng_t comng, int id) {
    assert(comng && id >=0 && id < comng->cap);
    return comng->co[id] ? comng->co[id]->status : CS_Dead;
}

/*
 * 得到當前纖程系統中運行的纖程, 返回 < 0表示沒有纖程在運行
 * comng    : 纖程管理器對象
 *          : 返回當前運行的纖程標識id,
 */
inline int 
co_running(comng_t comng) {
    return comng->running;
}

對於linux上關於協程啟動部分 static inline void _comain(uint32_t low32, uint32_t hig32)

函數聲明方式, 主要為了解決gcc x64 編譯接收的內存地址, 高地位順序問題.

        ptr = (uintptr_t)comng;
        makecontext(&co->ctx, (void (*)())_comain, 2, (uint32_t)ptr, (uint32_t)(ptr >> 32));

上面在實際調用中, 如果只用一個comng參數傳過去, 到了_comain 中接收的 comng地址順序就會錯位. 以上就是linux上解決makecontext傳地址錯誤的思路.

 _save_stack 保存當前堆棧信息一個技巧性函數調用. 其它思路等同於window封裝的那套庫代碼.

 

4.0 協程庫融合

  最終形態 coroutine.c

#include "coroutine.h" #include <string.h> #include <assert.h> #include <stdlib.h> // 纖程棧大小 #define _INT_STACK (1024 * 1024) // 默認初始化創建纖程數目 #define _INT_COROUTINE (16) /* * 單個纖程單元 coroutine , 還有纖程集管理器 comng */ struct coroutine; #if defined(__GNUC__) #include <ucontext.h> #include <stddef.h> #include <stdint.h> struct comng { char stack[_INT_STACK]; ucontext_t main; // 纖程管理器中保存的臨時纖程對象 int running; // 當前纖程管理器中運行的纖程id int nco; // 當前纖程集輪詢中當前索引 int cap; // 纖程集容量, struct coroutine ** co; // 保存的纖程集 }; struct coroutine { char * stack; ucontext_t ctx; // 操作系統纖程對象 ptrdiff_t cap; ptrdiff_t size; co_f func; // 纖程執行的函數體 void * ud; // 纖程執行的額外參數 costatus_e status; // 當前纖程運行狀態 struct comng * comng; // 當前纖程集管理器 }; /* * 開啟纖程系統, 並創建主纖程 * : 返回開啟的纖程調度系統管理器 */ inline comng_t co_start(void) { struct comng * comng = malloc(sizeof(struct comng)); assert(NULL != comng); comng->nco = 0; comng->running = -1; comng->co = calloc(comng->cap = _INT_COROUTINE, sizeof(struct coroutine *)); assert(NULL != comng->co); return comng; } // 銷毀一個纖程 static inline void _co_delete(struct coroutine * co) { free(co->stack); free(co); } // 創建一個纖程對象 static inline struct coroutine * _co_new(comng_t comng, co_f func, void * ud) { struct coroutine * co = malloc(sizeof(struct coroutine)); assert(co && comng && func); co->func = func; co->ud = ud; co->comng = comng; co->status = CS_Ready; co->cap = 0; co->size = 0; co->stack = NULL; return co; } static inline void _comain(uint32_t low32, uint32_t hig32) { uintptr_t ptr = (uintptr_t)low32 | ((uintptr_t)hig32 << 32); struct comng * comng = (struct comng *)ptr; int id = comng->running; struct coroutine * co = comng->co[id]; co->func(comng, co->ud); _co_delete(co); comng->co[id] = NULL; --comng->nco; comng->running = -1; } /* * 激活創建的纖程對象. * comng : 纖程管理器對象 * id : co_create 創建的纖程對象 */ void co_resume(comng_t comng, int id) { struct coroutine * co; uintptr_t ptr; assert(comng->running == -1 && id >= 0 && id < comng->cap); co = comng->co[id]; if(NULL == co || co->status == CS_Dead) return; switch(co->status) { case CS_Ready: comng->running = id; co->status = CS_Running; getcontext(&co->ctx); co->ctx.uc_stack.ss_sp = comng->stack; co->ctx.uc_stack.ss_size = _INT_STACK; co->ctx.uc_link = &comng->main; ptr = (uintptr_t)comng; makecontext(&co->ctx, (void (*)())_comain, 2, (uint32_t)ptr, (uint32_t)(ptr >> 32)); swapcontext(&comng->main, &co->ctx); break; case CS_Suspend: comng->running = id; co->status = CS_Running; // stack add is high -> low memcpy(comng->stack + _INT_STACK - co->size, co->stack, co->size); swapcontext(&comng->main, &co->ctx); break; default: assert(0); } } // 保存當前運行的堆棧信息 static void _save_stack(struct coroutine * co, char * top) { char dummy = 0; assert(top - &dummy <= _INT_STACK); if(co->cap < top - &dummy) { free(co->stack); co->cap = top - &dummy; co->stack = malloc(co->cap); assert(co->stack); } co->size = top - &dummy; memcpy(co->stack, &dummy, co->size); } /* * 中斷當前運行的的纖程, 並將CPU交給主纖程處理調度. * comng : 纖程管理器對象 */ inline void co_yield(comng_t comng) { struct coroutine * co; int id = comng->running; assert(id >= 0); co = comng->co[id]; assert((char *)&co > comng->stack); _save_stack(co, comng->stack + _INT_STACK); co->status = CS_Suspend; comng->running = -1; swapcontext(&co->ctx, &comng->main); } #endif #if defined(_MSC_VER) #include <Windows.h> #define inline __inline struct comng { PVOID main; // 纖程管理器中保存的臨時纖程對象 int running; // 當前纖程管理器中運行的纖程id int nco; // 當前纖程集輪詢中當前索引 int cap; // 纖程集容量, struct coroutine ** co; // 保存的纖程集 }; struct coroutine { PVOID ctx; // 操作系統纖程對象 co_f func; // 纖程執行的函數體 void * ud; // 纖程執行的額外參數 costatus_e status; // 當前纖程運行狀態 struct comng * comng; // 當前纖程集管理器 }; /* * 開啟纖程系統, 並創建主纖程 * : 返回開啟的纖程調度系統管理器 */ inline comng_t co_start(void) { struct comng * comng = malloc(sizeof(struct comng)); assert(NULL != comng); comng->nco = 0; comng->running = -1; comng->co = calloc(comng->cap = _INT_COROUTINE, sizeof(struct coroutine *)); assert(NULL != comng->co); // 開啟Window協程 comng->main = ConvertThreadToFiberEx(NULL, FIBER_FLAG_FLOAT_SWITCH); return comng; } // 銷毀一個纖程 static inline void _co_delete(struct coroutine * co) { DeleteFiber(co->ctx); free(co); } // 創建一個纖程對象 static inline struct coroutine * _co_new(comng_t comng, co_f func, void * ud) { struct coroutine * co = malloc(sizeof(struct coroutine)); assert(co && comng && func); co->func = func; co->ud = ud; co->comng = comng; co->status = CS_Ready; return co; } static inline VOID WINAPI _comain(LPVOID ptr) { struct comng * comng = ptr; int id = comng->running; struct coroutine * co = comng->co[id]; co->func(comng, co->ud); _co_delete(co); comng->co[id] = NULL; --comng->nco; comng->running = -1; } /* * 激活創建的纖程對象. * comng : 纖程管理器對象 * id : co_create 創建的纖程對象 */ void co_resume(comng_t comng, int id) { struct coroutine * co; assert(comng->running == -1 && id >= 0 && id < comng->cap); co = comng->co[id]; if(NULL == co || co->status == CS_Dead) return; switch(co->status) { case CS_Ready: comng->running = id; co->status = CS_Running; co->ctx = CreateFiberEx(_INT_STACK, 0, FIBER_FLAG_FLOAT_SWITCH, _comain, comng); comng->main = GetCurrentFiber(); SwitchToFiber(co->ctx); break; case CS_Suspend: comng->running = id; co->status = CS_Running; comng->main = GetCurrentFiber(); SwitchToFiber(co->ctx); break; default: assert(0); } } /* * 中斷當前運行的的纖程, 並將CPU交給主纖程處理調度. * comng : 纖程管理器對象 */ inline void co_yield(comng_t comng) { struct coroutine * co; int id = comng->running; assert(id >= 0); co = comng->co[id]; co->status = CS_Suspend; comng->running = -1; co->ctx = GetCurrentFiber(); SwitchToFiber(comng->main); } #endif /* * 關閉開啟的纖程系統 * comng : co_start 返回的纖程管理器 */ void co_close(comng_t comng) { int i; for (i = 0; i < comng->cap; ++i) { struct coroutine * co = comng->co[i]; if (co) { _co_delete(co); comng->co[i] = NULL; } } free(comng->co); comng->co = NULL; free(comng); } /* * 創建一個纖程對象,並返回創建纖程的id. 創建好的纖程狀態是CS_Ready * comng : co_start 返回的纖程管理器 * func : 纖程運行的主體 * ud : 用戶傳入的數據, co_f 中 ud 會使用 * : 返回創建好的纖程標識id */ int co_create(comng_t comng, co_f func, void * ud) { struct coroutine * co = _co_new(comng, func, ud); // 下面是普通情況, 可以找見 if (comng->nco < comng->cap) { int i; for (i = 0; i < comng->cap; ++i) { int id = (i + comng->nco) % comng->cap; if (NULL == comng->co[id]) { comng->co[id] = co; ++comng->nco; return id; } } assert(i == comng->cap); return -1; } // 需要重新分配空間, 構造完畢後返回 comng->co = realloc(comng->co, sizeof(struct coroutine *) * comng->cap * 2); assert(NULL != comng->co); memset(comng->co + comng->cap, 0, sizeof(struct coroutine *) * comng->cap); comng->cap <<= 1; comng->co[comng->nco] = co; return comng->nco++; } /* * 得到當前纖程運行的狀態 * comng : 纖程管理器對象 * id : co_create 創建的纖程對象 * : 返回狀態具體參照 costatus_e */ inline costatus_e co_status(comng_t comng, int id) { assert(comng && id >=0 && id < comng->cap); return comng->co[id] ? comng->co[id]->status : CS_Dead; } /* * 得到當前纖程系統中運行的纖程, 返回 < 0表示沒有纖程在運行 * comng : 纖程管理器對象 * : 返回當前運行的纖程標識id, */ inline int co_running(comng_t comng) { return comng->running; } View Code

 主要做的操作, 是通過 _MSC_VER 和 __GNUC__ 區分編譯器, 執行相關操作.

無數的前戲到這裡基本就是完工了. 精彩往往很短暫, 遇見都是幸運.

  <<心願>> http://music.163.com/#/song?id=379785

 

5.0 最後的話

All knowledge is, in final analysis, history.
All sciences are, in the abstract, mathematics.
All judgements are, in their rationale, statistics.

 

 

  

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved