程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> 網頁編程 >> PHP編程 >> 關於PHP編程 >> php自己實現memcached的隊列類

php自己實現memcached的隊列類

編輯:關於PHP編程

php自己實現memcached的隊列類


 

 

add('1asdf');
 *      $obj->getQueueLength();
 *      $obj->read(11);
 *      $obj->get(8);
 */

class memcacheQueue{
    public static   $client;            //memcache客戶端連接
    public          $access;            //隊列是否可更新   
    private         $currentSide;       //當前輪值的隊列面:A/B
    private         $lastSide;          //上一輪值的隊列面:A/B
    private         $sideAHead;         //A面隊首值
    private         $sideATail;         //A面隊尾值
    private         $sideBHead;         //B面隊首值
    private         $sideBTail;         //B面隊尾值
    private         $currentHead;       //當前隊首值
    private         $currentTail;       //當前隊尾值
    private         $lastHead;          //上輪隊首值
    private         $lastTail;          //上輪隊尾值 
    private         $expire;            //過期時間,秒,1~2592000,即30天內;0為永不過期
    private         $sleepTime;         //等待解鎖時間,微秒
    private         $queueName;         //隊列名稱,唯一值
    private         $retryNum;          //重試次數,= 10 * 理論並發數

    const   MAXNUM      = 2000;                 //(單面)最大隊列數,建議上限10K
    const   HEAD_KEY    = '_lkkQueueHead_';     //隊列首kye
    const   TAIL_KEY    = '_lkkQueueTail_';     //隊列尾key
    const   VALU_KEY    = '_lkkQueueValu_';     //隊列值key
    const   LOCK_KEY    = '_lkkQueueLock_';     //隊列鎖key
    const   SIDE_KEY    = '_lkkQueueSide_';     //輪值面key

    /*
     * 構造函數
     * @param   [config]    array   memcache服務器參數
     * @param   [queueName] string  隊列名稱
     * @param   [expire]    string  過期時間
     * @return  NULL
     */
    public function __construct($queueName ='',$expire='',$config =''){
        if(empty($config)){
            self::$client = memcache_pconnect('localhost',11211);
        }elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')
            self::$client = memcache_pconnect($config['host'],$config['port']);
        }elseif(is_string($config)){//"127.0.0.1:11211"
            $tmp = explode(':',$config);
            $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
            $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
            self::$client = memcache_pconnect($conf['host'],$conf['port']);     
        }
        if(!self::$client) return false;

        ignore_user_abort(TRUE);//當客戶斷開連接,允許繼續執行
        set_time_limit(0);//取消腳本執行延時上限

        $this->access = false;
        $this->sleepTime = 1000;
        $expire = (empty($expire) && $expire!=0) ? 3600 : (int)$expire;
        $this->expire = $expire;
        $this->queueName = $queueName;
        $this->retryNum = 10000;

        $side = memcache_add(self::$client, $queueName . self::SIDE_KEY, 'A',false, $expire);
        $this->getHeadNTail($queueName);
        if(!isset($this->sideAHead) || empty($this->sideAHead)) $this->sideAHead = 0;
        if(!isset($this->sideATail) || empty($this->sideATail)) $this->sideATail = 0;
        if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
        if(!isset($this->sideBHead) || empty($this->sideBHead)) $this->sideBHead = 0;
    }

    /*
     * 獲取隊列首尾值
     * @param   [queueName] string  隊列名稱
     * @return  NULL
     */
    private function getHeadNTail($queueName){
        $this->sideAHead = (int)memcache_get(self::$client, $queueName.'A'. self::HEAD_KEY);
        $this->sideATail = (int)memcache_get(self::$client, $queueName.'A'. self::TAIL_KEY);
        $this->sideBHead = (int)memcache_get(self::$client, $queueName.'B'. self::HEAD_KEY);
        $this->sideBTail = (int)memcache_get(self::$client, $queueName.'B'. self::TAIL_KEY);
    }

    /*
     * 獲取當前輪值的隊列面
     * @return  string  隊列面名稱
     */
    public function getCurrentSide(){
        $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
        if($currentSide == 'A'){
            $this->currentSide = 'A';
            $this->lastSide = 'B';  

            $this->currentHead  = $this->sideAHead;
            $this->currentTail  = $this->sideATail;
            $this->lastHead     = $this->sideBHead;
            $this->lastTail     = $this->sideBTail;         
        }else{
            $this->currentSide = 'B';
            $this->lastSide = 'A';

            $this->currentHead  = $this->sideBHead;
            $this->currentTail  = $this->sideBTail;
            $this->lastHead     = $this->sideAHead;
            $this->lastTail     = $this->sideATail;                     
        }

        return $this->currentSide;
    }

    /*
     * 隊列加鎖
     * @return boolean
     */
    private function getLock(){
        if($this->access === false){
            while(!memcache_add(self::$client, $this->queueName .self::LOCK_KEY, 1, false, $this->expire) ){
                usleep($this->sleepTime);
                @$i++;
                if($i > $this->retryNum){//嘗試等待N次
                    return false;
                    break;
                }
            }
            return $this->access = true;
        }
        return false;
    }

    /*
     * 隊列解鎖
     * @return NULL
     */
    private function unLock(){
        memcache_delete(self::$client, $this->queueName .self::LOCK_KEY);
        $this->access = false;
    }

    /*
     * 添加數據
     * @param   [data]  要存儲的值
     * @return  boolean
     */
    public function add($data){
        $result = false;
        if(!$this->getLock()){
            return $result;
        } 
        $this->getHeadNTail($this->queueName);
        $this->getCurrentSide();

        if($this->isFull()){
            $this->unLock();
            return false;
        }

        if($this->currentTail < self::MAXNUM){
            $value_key = $this->queueName .$this->currentSide . self::VALU_KEY . $this->currentTail;
            if(memcache_add(self::$client, $value_key, $data, false, $this->expire)){
                $this->changeTail();
                $result = true;
            }
        }else{//當前隊列已滿,更換輪值面
            $this->unLock();
            $this->changeCurrentSide();
            return $this->add($data);
        }

        $this->unLock();
        return $result;
    }

    /*
     * 取出數據
     * @param   [length]    int 數據的長度
     * @return  array
     */
    public function get($length=0){
        if(!is_numeric($length)) return false;
        if(empty($length)) $length = self::MAXNUM * 2;//默認讀取所有
        if(!$this->getLock()) return false;

        if($this->isEmpty()){
            $this->unLock();
            return false;
        }

        $keyArray   = $this->getKeyArray($length);
        $lastKey    = $keyArray['lastKey'];
        $currentKey = $keyArray['currentKey'];
        $keys       = $keyArray['keys'];
        $this->changeHead($this->lastSide,$lastKey);
        $this->changeHead($this->currentSide,$currentKey);

        $data   = @memcache_get(self::$client, $keys);
        foreach($keys as $v){//取出之後刪除
            @memcache_delete(self::$client, $v, 0);
        }
        $this->unLock();

        return $data;
    }

    /*
     * 讀取數據
     * @param   [length]    int 數據的長度
     * @return  array
     */
    public function read($length=0){
        if(!is_numeric($length)) return false;
        if(empty($length)) $length = self::MAXNUM * 2;//默認讀取所有
        $keyArray   = $this->getKeyArray($length);
        $data   = @memcache_get(self::$client, $keyArray['keys']);
        return $data;
    }

    /*
     * 獲取隊列某段長度的key數組
     * @param   [length]    int 隊列長度
     * @return  array
     */
    private function getKeyArray($length){
        $result = array('keys'=>array(),'lastKey'=>array(),'currentKey'=>array());
        $this->getHeadNTail($this->queueName);
        $this->getCurrentSide();
        if(empty($length)) return $result;

        //先取上一面的key
        $i = $result['lastKey'] = 0;
        for($i=0;$i<$length;$i++){
            $result['lastKey'] = $this->lastHead + $i;
            if($result['lastKey'] >= $this->lastTail) break;
            $result['keys'][] = $this->queueName .$this->lastSide . self::VALU_KEY . $result['lastKey'];
        }

        //再取當前面的key
        $j = $length - $i;
        $k = $result['currentKey'] = 0;
        for($k=0;$k<$j;$k++){
            $result['currentKey'] = $this->currentHead + $k;
            if($result['currentKey'] >= $this->currentTail) break;
            $result['keys'][] = $this->queueName .$this->currentSide . self::VALU_KEY . $result['currentKey'];
        }

        return $result;
    }

    /*
     * 更新當前輪值面隊列尾的值
     * @return  NULL
     */
    private function changeTail(){
        $tail_key = $this->queueName .$this->currentSide . self::TAIL_KEY;
        memcache_add(self::$client, $tail_key, 0,false, $this->expire);//如果沒有,則插入;有則false;
        //memcache_increment(self::$client, $tail_key, 1);//隊列尾+1
        $v = memcache_get(self::$client, $tail_key) +1;
        memcache_set(self::$client, $tail_key,$v,false,$this->expire);
    }

    /*
     * 更新隊列首的值
     * @param   [side]      string  要更新的面
     * @param   [headValue] int     隊列首的值
     * @return  NULL
     */
    private function changeHead($side,$headValue){
        if($headValue < 1) return false;
        $head_key = $this->queueName .$side . self::HEAD_KEY;
        $tail_key = $this->queueName .$side . self::TAIL_KEY;
        $sideTail = memcache_get(self::$client, $tail_key);
        if($headValue < $sideTail){
            memcache_set(self::$client, $head_key,$headValue+1,false,$this->expire);
        }elseif($headValue >= $sideTail){
            $this->resetSide($side);
        }
    }

    /*
     * 重置隊列面,即將該隊列面的隊首、隊尾值置為0
     * @param   [side]  string  要重置的面
     * @return  NULL
     */
    private function resetSide($side){
        $head_key = $this->queueName .$side . self::HEAD_KEY;
        $tail_key = $this->queueName .$side . self::TAIL_KEY;
        memcache_set(self::$client, $head_key,0,false,$this->expire);
        memcache_set(self::$client, $tail_key,0,false,$this->expire);
    }

    /*
     * 改變當前輪值隊列面
     * @return  string
     */
    private function changeCurrentSide(){
        $currentSide = memcache_get(self::$client, $this->queueName . self::SIDE_KEY);
        if($currentSide == 'A'){
            memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'B',false,$this->expire);
            $this->currentSide = 'B';
        }else{
            memcache_set(self::$client, $this->queueName . self::SIDE_KEY,'A',false,$this->expire);
            $this->currentSide = 'A';
        }
        return $this->currentSide;
    }

    /*
     * 檢查當前隊列是否已滿
     * @return  boolean
     */
    public function isFull(){
        $result = false;
        if($this->sideATail == self::MAXNUM && $this->sideBTail == self::MAXNUM){
            $result = true;
        }
        return $result;
    }

    /*
     * 檢查當前隊列是否為空
     * @return  boolean
     */
    public function isEmpty(){
        $result = true;
        if($this->sideATail > 0 || $this->sideBTail > 0){
            $result = false;
        }
        return $result;
    }

    /*
     * 獲取當前隊列的長度
     * 該長度為理論長度,某些元素由於過期失效而丟失,真實長度小於或等於該長度
     * @return  int
     */
    public function getQueueLength(){
        $this->getHeadNTail($this->queueName);
        $this->getCurrentSide();

        $sideALength = $this->sideATail - $this->sideAHead;
        $sideBLength = $this->sideBTail - $this->sideBHead;
        $result = $sideALength + $sideBLength;

        return $result;
    }

    /*
     * 清空當前隊列數據,僅保留HEAD_KEY、TAIL_KEY、SIDE_KEY三個key
     * @return  boolean
     */
    public function clear(){
        if(!$this->getLock()) return false;
        for($i=0;$iqueueName.'A'. self::VALU_KEY .$i, 0);
            @memcache_delete(self::$client, $this->queueName.'B'. self::VALU_KEY .$i, 0);
        }
        $this->unLock();
        $this->resetSide('A');
        $this->resetSide('B');
        return true;
    }

    /*
     * 清除所有memcache緩存數據
     * @return  NULL
     */
    public function memFlush(){
        memcache_flush(self::$client);
    }

}


 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved