  雙十一剛過不久,大家都知道在天貓、京東、蘇寧等等電商網站上有很多秒殺活動,例如在某一個時刻搶購一個原價1999現在秒殺價只要999的手機時,會迎來一個用戶請求的高峰期,可能會有幾十萬幾百萬的並發量,來搶這個手機,在高並發的情形下會對數據庫服務器或者是文件服務器應用服務器造成巨大的壓力,嚴重時說不定就宕機了,另一個問題是,秒殺的東西都是有量的,例如一款手機只有10台的量秒殺,那麼,在高並發的情況下,成千上萬條數據更新數據庫(例如10台的量被人搶一台就會在數據集某些記錄下 減1),那次這個時候的先後順序是很亂的,很容易出現10台的量,搶到的人就不止10個這種嚴重的問題。那麼,以後所說的問題我們該如何去解決呢?

       接下來我所分享的技術就可以拿來處理以上的問題: 分布式鎖任務隊列






  以上就是實現 分布式鎖 和 任務隊列 的簡單思路,如果你看完有點模稜兩可,那請看接下來的代碼實現。




(2)系統級的鎖當進程無論何種原因時出現crash時,操作系統會自己回收鎖,所以不會出現資源丟失,但分布式鎖不用,若一次性設置很長時間,一旦由於各種原因出現進程crash 或者其他異常導致unlock未被調用時,則該鎖在剩下的時間就會變成垃圾鎖,導致其他進程或者進程重啟後無法進入加鎖區域。


  這裡先取得當前時間,然後再獲取到鎖失敗時的等待超時的時刻(是個時間戳),再獲取到鎖的最大生存時刻是多少。這裡redis的key用這種格式:"Lock:鎖的標識名",這裡就開始進入循環了,先是插入數據到redis裡,使用setnx()函數,這函數的意思是,如果該鍵不存在則插入數據,將最大生存時刻作為值存儲,假如插入成功,則對該鍵進行失效時間的設置,並將該鍵放在$lockedName數組裡,返回true,也就是上鎖成功;如果該鍵存在,則不會插入操作了,這裡有一步嚴謹的操作,那就是取得當前鍵的剩余時間,假如這個時間小於0,表示key上沒有設置生存時間(key是不會不存在的,因為前面setnx會自動創建)如果出現這種狀況,那就是進程的某個實例setnx成功後 crash 導致緊跟著的expire沒有被調用,這時可以直接設置expire並把鎖納為己用。如果沒設置鎖失敗的等待時間 或者 已超過最大等待時間了,那就退出循環,反之則 隔 $waitIntervalUs 後繼續 請求。  這就是加鎖的整一個代碼分析。

   * 加鎖
   * @param [type] $name      鎖的標識名
   * @param integer $timeout    循環獲取鎖的等待超時時間,在此時間內會一直嘗試獲取鎖直到超時,為0表示失敗後直接返回不等待
   * @param integer $expire     當前鎖的最大生存時間(秒),必須大於0,如果超過生存時間鎖仍未被釋放,則系統會自動強制釋放
   * @param integer $waitIntervalUs 獲取鎖失敗後掛起再試的時間間隔(微秒)
   * @return [type]         [description]
  public function lock($name, $timeout = 0, $expire = 15, $waitIntervalUs = 100000) {
    if ($name == null) return false;

    $now = time();
    $timeoutAt = $now + $timeout;
    $expireAt = $now + $expire;

    $redisKey = "Lock:{$name}";
    while (true) {
      $result = $this->redisString->setnx($redisKey, $expireAt);

      if ($result != false) {
        $this->redisString->expire($redisKey, $expireAt);
        $this->lockedNames[$name] = $expireAt;
        return true;

      $ttl = $this->redisString->ttl($redisKey);

      //ttl小於0 表示key上沒有設置生存時間(key是不會不存在的,因為前面setnx會自動創建)
      //如果出現這種狀況,那就是進程的某個實例setnx成功後 crash 導致緊跟著的expire沒有被調用
      if ($ttl < 0) {
        $this->redisString->set($redisKey, $expireAt);
        $this->lockedNames[$name] = $expireAt;
        return true;

      //如果沒設置鎖失敗的等待時間 或者 已超過最大等待時間了,那就退出
      if ($timeout <= 0 || $timeoutAt < microtime(true)) break;

      //隔 $waitIntervalUs 後繼續 請求


    return false;


   * 解鎖
   * @param [type] $name [description]
   * @return [type]    [description]
  public function unlock($name) {
    if ($this->isLocking($name)) {
      if ($this->redisString->deleteKey("Lock:$name")) {
        return true;
    return false;
   * 釋放當前所有獲得的鎖
   * @return [type] [description]
  public function unlockAll() {
    $allSuccess = true;
    foreach ($this->lockedNames as $name => $expireAt) {
      if (false === $this->unlock($name)) {
        $allSuccess = false;  
    return $allSuccess;


(3)這個隊列和普通隊列不一樣,入隊時的id是用來區分重復入隊的,隊列裡面只會有一條記錄,同一個id後入的覆蓋前入的,而不是追加, 如果需求要求重復入隊當做不用的任務,請使用不同的id區分


   * 入隊一個 Task
   * @param [type] $name     隊列名稱
   * @param [type] $id      任務id(或者其數組)
   * @param integer $timeout    入隊超時時間(秒)
   * @param integer $afterInterval [description]
   * @return [type]         [description]
  public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) {
    if (empty($name) || empty($id) || $timeout <= 0) return false;

    if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) {
      Logger::get('queue')->error("enqueue faild becouse of lock failure: name = $name, id = $id");
      return false;
    //入隊時以當前時間戳作為 score
    $score = microtime(true) + $afterInterval;
    foreach ((array)$id as $item) {
      if (false === $this->_redis->zset->getScore("Queue:$name", $item)) {
        $this->_redis->zset->add("Queue:$name", $score, $item);

    return true;


  接著來看一下出隊的代碼分析:出隊一個Task,需要指定它的$id 和 $score,如果$score與隊列中的匹配則出隊,否則認為該Task已被重新入隊過,當前操作按失敗處理。首先和對參數進行合法性檢測,接著又用到加鎖的功能了,然後及時出隊了,先使用getScore()從Redis裡獲取到該id的score,然後將傳入的$score和Redis裡存儲的score進行對比,如果兩者相等就進行出隊操作,也就是使用zset裡的delete()方法刪掉該任務id,最後當前就是解鎖了。這就是出隊的代碼分析。

   * 出隊一個Task,需要指定$id 和 $score
   * 如果$score 與隊列中的匹配則出隊,否則認為該Task已被重新入隊過,當前操作按失敗處理
   * @param [type] $name  隊列名稱 
   * @param [type] $id   任務標識
   * @param [type] $score  任務對應score,從隊列中獲取任務時會返回一個score,只有$score和隊列中的值匹配時Task才會被出隊
   * @param integer $timeout 超時時間(秒)
   * @return [type]      Task是否成功,返回false可能是redis操作失敗,也有可能是$score與隊列中的值不匹配(這表示該Task自從獲取到本地之後被其他線程入隊過)
  public function dequeue($name, $id, $score, $timeout = 10) {
    if (empty($name) || empty($id) || empty($score)) return false;
    if (!$this->_redis->lock->lock("Queue:$name", $timeout)) {
      Logger:get('queue')->error("dequeue faild becouse of lock lailure:name=$name, id = $id");
      return false;
    $serverScore = $this->_redis->zset->getScore("Queue:$name", $id);
    $result = false;
    if ($serverScore == $score) {
      $result = (float)$this->_redis->zset->delete("Queue:$name", $id);
      if ($result == false) {
        Logger::get('queue')->error("dequeue faild because of redis delete failure: name =$name, id = $id");

    return $result;

  學過數據結構這門課的朋友都應該知道,隊列操作還有彈出頂部某個值的方法等等,這裡處理入隊出隊操作,我還實現了 獲取隊列頂部若干個Task 並將其出隊的方法,想了解的朋友可以看這段代碼,假如看不太明白就留言,這裡我不再對其進行分析了。

   * 獲取隊列頂部若干個Task 並將其出隊
   * @param [type] $name  隊列名稱
   * @param integer $count  數量
   * @param integer $timeout 超時時間
   * @return [type]      返回數組[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
  public function pop($name, $count = 1, $timeout = 10) {
    if (empty($name) || $count <= 0) return []; 
    if (!$this->_redis->lock->lock("Queue:$name")) {
      Log::get('queue')->error("pop faild because of pop failure: name = $name, count = $count");
      return false;
    $result = [];
    $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);

    //將其放在$result數組裡 並 刪除掉redis對應的id
    foreach ($array as $id => $score) {
      $result[] = ['id'=>$id, 'score'=>$score];
      $this->_redis->zset->delete("Queue:$name", $id);


    return $count == 1 ? (empty($result) ? false : $result[0]) : $result;


