程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> Redis Sentinel源碼分析(二)

Redis Sentinel源碼分析(二)

編輯:C++入門知識

Base 2.8.7
接Redis Sentinel源碼分析(一)
sentinelTimer函數周期性運行,第一次在服務啟動後1ms執行,後續執行周期1000/server.hz(sentinelTimer函數會修改server.hz的值)
sentinelTimer內部包含sentinel模式需要定期執行的操作,包括check master、slave、sentinel的狀態,並根據配置的條件判斷是否需要fail over。

void sentinelTimer(void) {
	//check是否需要進入TITL模式
	sentinelCheckTiltCondition(); 
	//執行定期操作(檢查redis-server狀態,和其他sentinel節點交互等) 
	sentinelHandleDictOfRedisInstances(sentinel.masters); 
	//運行等待執行的腳本 
	sentinelRunPendingScripts(); 
	//清理已執行完畢腳本 
	sentinelCollectTerminatedScripts(); 
	//殺死超時運行的腳本 
	sentinelKillTimedoutScripts(); 
	//修改hz值(影響sentinel相關操作執行頻率),引入隨機值,盡量避免所有sentinel節點持續性的同一時間發起投票請求 
	server.hz = REDIS_DEFAULT_HZ + rand() % REDIS_DEFAULT_HZ;
}

sentinelCheckTiltCondition函數會check是否進入TITL模式,所謂TITL模式即只收集數據,而不做fail-over
進入TITL模式的原因可能是:
1)sentinel的部分操作被阻塞(可能是系統負載導致)
2)系統時鐘異常
進入條件,兩次進入sentinelCheckTiltCondition時間差值<0或者>2s
進入TITL模式的原因是為了避免錯誤的進行fail-over
void sentinelCheckTiltCondition(void) {
    mstime_t now = mstime();
    mstime_t delta = now - sentinel.previous_time;
    //兩次執行時間<0或者大於2s,則進入TITL模式
    if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
        sentinel.tilt = 1;
        sentinel.tilt_start_time = mstime();
        sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
    }
    sentinel.previous_time = mstime();
}

sentinelHandleDictOfRedisInstances包含遍歷所有instance,執行周期性操作
void sentinelHandleDictOfRedisInstances(dict *instances) {
    dictIterator *di;
    dictEntry *de;
    sentinelRedisInstance *switch_to_promoted = NULL;
    //遍歷獲取所有master結點
    di = dictGetIterator(instances);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        //執行結點的周期性操作
        sentinelHandleRedisInstance(ri);
        // 如果被遍歷的是master,則遍歷和該master關聯的所有slave&sentinel
        if (ri->flags & SRI_MASTER) {
            sentinelHandleDictOfRedisInstances(ri->slaves);
            sentinelHandleDictOfRedisInstances(ri->sentinels);
            //如果master的狀態為SENTINEL_FAILOVER_STATE_UPDATE_CONFIG,則准備執行failover
            if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
                switch_to_promoted = ri;
            }
        }
    }
    //執行failover
    if (switch_to_promoted)
        sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
    dictReleaseIterator(di);
}

sentinelHandleRedisInstance包含了具體的周期性操作,包括針對sentinel、slave、master實例的操作
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
    /* 以下為所有實例都需要執行的操作 */
    //連接及訂閱管理
    sentinelReconnectInstance(ri);
    //和instance交流(PING/INFO/PUBLISH)
    sentinelPingInstance(ri);
    //如果仍然處於TILT模式,啥也不干
    if (sentinel.tilt) {
        if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
        sentinel.tilt = 0;
        sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
    }	
    //判斷instance是否下線(sdown)
    sentinelCheckSubjectivelyDown(ri);
    ......
    /* 以下操作只針對master instance*/
    if (ri->flags & SRI_MASTER) {
        //check master是否為odown(滿足用戶配置的quorum節點數判斷master為sdown)
        sentinelCheckObjectivelyDown(ri);
        //check是否需要做fail over,如果確認需要,則調用sentinelStartFailover修改自身狀態
        if (sentinelStartFailoverIfNeeded(ri))
            //發送SENTINEL is-master-down-by-addr給其他的sentinel,並注冊毀掉函數
            sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
        //執行故障轉移
        sentinelFailoverStateMachine(ri);
        sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
    }
}

sentinelReconnectInstance函數負責建立連接、重連,包括和各個instance建立連接,針對master instance,訂閱其“__sentinel__:hello”頻道
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
    if (!(ri->flags & SRI_DISCONNECTED)) return;
    //和master/slave/sentinel instance建立連接
    if (ri->cc == NULL) {
       ......
    }
    //針對master/slave,訂閱其“__sentinel__:hello”頻道
    if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && ri->pc == NULL) {
       ......
	retval = redisAsyncCommand(ri->pc,
			sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
				SENTINEL_HELLO_CHANNEL); 
        ......
    }
    ......
}

sentinelPingInstance會根據instance狀況,向其發送命令,可能是INFO/PING/PUBLISH
void sentinelPingInstance(sentinelRedisInstance *ri) {
    //假如instance處於不可連接狀態或者過多的命令(100)還沒有發送出去,直接返回
    if (ri->flags & SRI_DISCONNECTED) return;
    if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
    //對於slave instance,如果其master處於異常狀態(SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS),則向該slave發送info的頻率從10s一發提高到1s一發
    if ((ri->flags & SRI_SLAVE) &&
        (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
        info_period = 1000;
    } else {
        info_period = SENTINEL_INFO_PERIOD;
    }
    //對於mastere/slave instance,每隔info_period時間,向其發送info命令,注冊info命令的回調函數為sentinelInfoReplyCallback
    //sentinelInfoReplyCallback會根據從master/slave所得到的回復中分析出相關信息,並更新sentinelRedisInstance的當前狀態
    if ((ri->flags & SRI_SENTINEL) == 0 &&
        (ri->info_refresh == 0 ||
        (now - ri->info_refresh) > info_period))
    {
        /* Send INFO to masters and slaves, not sentinels. */
        retval = redisAsyncCommand(ri->cc,
            sentinelInfoReplyCallback, NULL, "INFO");
        if (retval != REDIS_OK) return;
        ri->pending_commands++;
    } 
    //對於所有類型的instance,都定時向其發送PING命令(1s),注冊ping命令的回調函數為sentinelPingReplyCallback
    //sentinelPingReplyCallback根據PING命令的返回值判斷instance當前狀態
	else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
        retval = redisAsyncCommand(ri->cc,
            sentinelPingReplyCallback, NULL, "PING");
        if (retval != REDIS_OK) return;
        ri->pending_commands++;
    //每隔2s向master/slave的“__sentinel__:hello”頻道發布消息
    //消息內容為:ip,port,runid,current_epoch, master->name,master->ip,master->port
    } else if ((ri->flags & SRI_SENTINEL) == 0 &&
               (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
    {
        sentinelSendHello(ri);
    }
}

sentinelCheckObjectivelyDown函數確認是否將master狀態從sdown改為odown
void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
    ......
    //假如本身的狀態為sdown,則開始判斷是否可以判斷為odown
    if (master->flags & SRI_S_DOWN) {
        quorum = 1; 
        di = dictGetIterator(master->sentinels);
        //遍歷sentinel字典,查看其是否將master狀態職位sdown
        while((de = dictNext(di)) != NULL) {
            sentinelRedisInstance *ri = dictGetVal(de);
            if (ri->flags & SRI_MASTER_DOWN) quorum++;
        }
        dictReleaseIterator(di);
        //假如sentinel flag狀態為SRI_MASTER_DOWN的sentinel個數達到用戶定義的quorum個數,則將master狀態置為odown
        if (quorum >= master->quorum) odown = 1;
    }
	......
}

sentinelStartFailoverIfNeeded函數heck是否需要做fail over,如果確認需要,則調用sentinelStartFailover修改自身狀態
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
    //確認master狀態為odown
    if (!(master->flags & SRI_O_DOWN)) return 0;
    //確認failover沒有在運行
    if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;
    //確認在超時時間*2內沒有failover在運行
    if (mstime() - master->failover_start_time <
        master->failover_timeout*2) return 0;
    sentinelStartFailover(master);
    return 1;
}

在確認要進行failover後,調用sentinelStartFailover修改相關狀態數據
void sentinelStartFailover(sentinelRedisInstance *master) {
    redisAssert(master->flags & SRI_MASTER);
    // 設置 failover 狀態
    master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
    // 設置master當前狀態
    master->flags |= SRI_FAILOVER_IN_PROGRESS;
    // 設置failover_epoch
    master->failover_epoch = ++sentinel.current_epoch;
    // 設置fail over開始時間
    master->failover_start_time = mstime()+rand()%s;
    master->failover_state_change_time = mstime();
}
sentinelAskMasterStateToOtherSentinels是在檢測到master狀態為sdown後,sentinel向其它sentinel節點發送sentinel is-master-down-by-addr消息
void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
    //遍歷關注該master的sentinel節點
    while((de = dictNext(di)) != NULL) {
        //向其它sentinle發送消息SENTINEL is-master-down-by-addr master_ip master_port current_epoch runid/*
        //如果本身已經開始了failover進程,則向其他sentinel節點發送自己的runid,否則發送*
        //注冊回調函數sentinelReceiveIsMasterDownReply接受回復的信息
        string(port,sizeof(port),master->addr->port);
        retval = redisAsyncCommand(ri->cc,
                    sentinelReceiveIsMasterDownReply, NULL,
                    "SENTINEL is-master-down-by-addr %s %s %llu %s",
                    master->addr->ip, port,
                    sentinel.current_epoch,
                    (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
                    server.runid : "*");
        if (retval == REDIS_OK) ri->pending_commands++;
    }
    dictReleaseIterator(di);
}

其他sentinel節點接受到sentinel is-master-down-by-addr消息,調用sentinelCommand處理
void sentinelCommand(redisClient *c) {
    ......
    //處理sentinel is-master-down-by-addr消息
    } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
        /* SENTINEL IS-MASTER-DOWN-BY-ADDR    */
        ......
        //根據其它sentinel傳送過來的消息
        ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
            c->argv[2]->ptr,port,NULL);
        /* It exists? Is actually a master? Is subjectively down? It's down.
         * Note: if we are in tilt mode we always reply with "0". */
        if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
                                    (ri->flags & SRI_MASTER))
            isdown = 1;
        //假如發過來的信息中包含請求來源sentinel的runid,則開始進行投票
        if (ri && ri->flags & SRI_MASTER && strcasecmp(c->argv[5]->ptr,"*")) {
            leader = sentinelVoteLeader(ri,(uint64_t)req_epoch,
                                            c->argv[5]->ptr,
                                            &leader_epoch);
        }
        //回復信息,包括isdown,leader,leader_epoch
        addReplyMultiBulkLen(c,3);
        addReply(c, isdown ? shared.cone : shared.czero);
        addReplyBulkCString(c, leader ? leader : "*");
        addReplyLongLong(c, (long long)leader_epoch);
        if (leader) sdsfree(leader);
 }

sentinelReceiveIsMasterDownReply函數處理發送的給其他sentinel的消息”SENTINEL is-master-down-by-addr“的回復
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
	......
	//根據返回值,判斷是否將對應sentinel的狀態置為SRI_MASTER_DOWN
	if (r->element[0]->integer == 1) {
		ri->flags |= SRI_MASTER_DOWN;
	} else {
		ri->flags &= ~SRI_MASTER_DOWN;
	}
	//如果sentinel返回了其選舉的leader,則更新自己的leader和leader_epoch
	if (strcmp(r->element[1]->str,"*")) {
		sdsfree(ri->leader);
		if (ri->leader_epoch != r->element[2]->integer)
			redisLog(REDIS_WARNING,
				"%s voted for %s %llu", ri->name,
				r->element[1]->str,
				(unsigned long long) r->element[2]->integer);
		ri->leader = sdsnew(r->element[1]->str);
		ri->leader_epoch = r->element[2]->integer;
	}
}

sentinelFailoverStateMachine函數為故障轉移狀態機,其負責執行故障轉移
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
	//master節點&正處於failover狀態則繼續
	redisAssert(ri->flags & SRI_MASTER);
	if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
	switch(ri->failover_state) {
		//等待故障轉移開始,如果自己為leader,置狀態為SENTINEL_FAILOVER_STATE_SELECT_SLAVE,開始下一步操作,否則,不變更狀態,等待fail-over完成/超時
		case SENTINEL_FAILOVER_STATE_WAIT_START:
			sentinelFailoverWaitStart(ri);
			break;
		//從slave中選擇一個master,置狀態為SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE
		case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: sentinelFailoverSelectSlave(ri); break;
		//升級被選中的從服務器為新主服務器,置狀態為SENTINEL_FAILOVER_STATE_WAIT_PROMOTION
		case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: sentinelFailoverSendSlaveOfNoOne(ri); break;
		//等待fail over生效,info語句的回調函數sentinelRefreshInstanceInfo會更新當前狀態SENTINEL_FAILOVER_STATE_RECONF_SLAVES
		case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: sentinelFailoverWaitPromotion(ri); break;
		//令其它從服務器同步新主服務器
		case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: sentinelFailoverReconfNextSlave(ri); break; 
	}
}

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved