程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 數據庫知識 >> MYSQL數據庫 >> MySQL綜合教程 >> 用Python對MySQL同步狀態進行監控

用Python對MySQL同步狀態進行監控

編輯:MySQL綜合教程

用Python對MySQL同步狀態進行監控


用Python對MySQL同步狀態進行監控

使用Python對MySQL數據庫服務器是否可訪問,及主從同步是否中斷進行監控,是一件非常簡單的事情。感謝Python給我們帶來了如此簡單,強大,快捷的開發環境。

本文使用到的Python模塊
使用telnetlib校驗服務器是否可被訪問
使用SMTP向管理員發送通知郵件
使用MySQL官方的驅動對數據庫進行訪問
使用optparse實現命令行參數的提取

實現原理
使用optparse模塊獲取命令行參數。讀取defaults-file設置文件內容(如果存在),使用參數覆蓋defaults-file的值(如果傳遞參數,如:–host, –user, –to之類)。

直接去連MySQL等待是否能進行訪問的返回結果太慢了,所以使用telnet對服務器的連通性進行驗證。可以設置等待時間,可控性高一些。

當服務器工作正常,使用MySQL與服務器進行連接,獲取主從同步的狀態。

將獲取服務器的異常狀態信息(服務器無法訪問,主從同步的狀態中斷),使用SMTP發送給管理員,並把造成中斷同步的異常信息一同發送到管理員的郵箱中。


Created with Rapha?l 2.1.2starttelnet host:portconnect?slave statusslave?endnotifyyesnoyesno

slavecheckpoint.py

coding=utf-8
"""
數據庫同步狀態偵測
MySQL數據庫同步復制狀態監測腳本。可配合Linux下的crond進行定時監測。如果同步
狀態異常,側使用郵件通知管理員,並將造成同步中斷的錯誤信息也包含到郵件當中,管
理員可即時通過錯誤信息直接定位異常。

實例:
python slavecheckpoint.py --defaults-file=/etc/slave.cnf [email protected]

===FILE:slave.cnf===========
[config]
smtp_host=smtp.163.com
from=消息中心<[email protected]>
host=localhost
"""

import mysql.connector
from mysql.connector import errorcode
import telnetlib
import smtplib
from email.mime.text import MIMEText
import optparse
from ConfigParser import ConfigParser
import os,time,sys

class SlaveStatu:
    __instance__ = None
    __error__ = []

    def __init__(self,*args,**kwargs):
        self.__config__ = {
            "host":"localhsot",
            "user":"root",
            "password":"",
            "port":3306,
            "smtp_host":"localhost",
            "smtp_user":"",
            "smtp_password":"",
            "from":"admin@localhost",
            "to":""
        }

        #優先讀取設置文件中的值
        if not kwargs["defaults_file"] is None:
            defaults_file = self.__read_defaults_file__( kwargs["defaults_file"] )
            del kwargs["defaults_file"]

        #使用參數的設置去覆蓋設置文件的值
        for key,val in kwargs.items():
            if not val is None and len(val) > 0:
                self.__config__[key] = val

    def __configParseMySQL__(self):
        """
        提取數據庫的設置
        :return: dict
        """
        return {
            "host"     : self.__config__["host"],
            "port"     : self.__config__["port"],
            "user"     : self.__config__["user"],
            "password" : self.__config__["password"]
        }

    def __configParseSMTP__(self):
        """
        提取SMTP郵件設置
        :return: dict
        """
        return {
            "smtp_host": self.__config__["smtp_host"],
            "smtp_user": self.__config__["smtp_user"],
            "smtp_password": self.__config__["smtp_password"],
            "from": self.__config__["from"],
            "to": self.__config__["to"]
        }

    def __read_defaults_file__( self, filePath ):
        """
        加載設置文件設置的值
        :param filePath: 設置文件路徑
        :return:
        """
        section = "config"
        if os.path.exists( filePath ):
            cnf = ConfigParser()
            cnf.read( filePath )
            options = cnf.options( section )

            for key in options:
                self.__config__[key] = cnf.get( section, key )


    def telnet( self, host, port, timeout=5 ):
        """
        測試服務器地址和端口是否暢通
        :param host: 服務器地址
        :param port: 服務器端口
        :param timeout: 測試超時時間
        :return: Boolean
        """
        try:
            tel = telnetlib.Telnet( host, port, timeout )
            tel.close()
            return True
        except:
            return False

    def connect(self):
        """
        創建數據庫鏈接
        """
        try:
            config = self.__configParseMySQL__()
            if self.telnet( config["host"],config["port"]):
                self.__instance__ = mysql.connector.connect( **config )
                return True
            else:
                raise Exception("unable connect")
        except:
            self.__error__.append( "無法連接服務器主機: {host}:{port}".format( host=config[
                    "host"], port=config["port"]) )
            return False

    def isSlave(self):
        """
        數據庫同步是否正常
        :return: None同步未開啟,False同步中斷,True同步正常
        """
        cur = self.__instance__.cursor(dictionary=True)
        cur.execute("SHOW SLAVE STATUS")
        result = cur.fetchone()
        cur.close()

        if result:
            if result["Slave_SQL_Running"] == "Yes" and result["Slave_IO_Running"] == "Yes":
                return True
            else:
                if result["Slave_SQL_Running"] == "No":
                    self.__error__.append( result["Last_SQL_Error"] )
                else:
                    self.__error__.append( result["Last_IO_Error"] )
                return False

    def get_last_error(self):
        """
        獲取第一個錯誤信息
        :return: String
        """
        if self.__error__:
            return self.__error__.pop(0)

    def notify(self,title,message):
        """
        發送消息提醒
        :param title: 消息的標題
        :param message: 消息的內容
        :return:
        """
        msg    = [title,message]
        pool   = []
        notify = notify_email( self.__configParseSMTP__() )
        pool.append( notify )

        for item in pool:
            item.ring( msg )

    def close(self):
        """
        關閉數據庫鏈接
        """
        if self.__instance__:
            self.__instance__.close()

class notify_email(object):
    def __init__(self,config):
        self.config = config

    def ring(self, message=[]):
        subject = message.pop(0)
        messageBody = "".join( message )
        mailList = self.config["to"].split(";")
        datetime = time.strftime("%Y-%m-%d %H:%M:%S")
        for to in mailList:
            body = """
            <p>管理員<strong>{admin}</strong>,你好:</p>
            <p>收到這封郵件說明你的數據庫同步出現異常,請您及時進行處理。</p>
            <p>異常信息:<br />{body}</p>
            <p>{date}</p>
            """.format( admin=to, body=messageBody, date=datetime )

            msg            = MIMEText( body, "html", "utf-8" )
            msg["From"]    = self.config["from"]
            msg["To"]      = to
            msg["Subject"] = subject
            smtp           = smtplib.SMTP()

            smtp.connect( self.config["smtp_host"] )
            if self.config.has_key("smtp_user"):
                smtp.login( self.config["smtp_user"], self.config["smtp_password"] )
            smtp.sendmail( self.config["from"], to, msg.as_string() )
            smtp.quit()

if __name__ == "__main__":
    #命令行參數列表
    usage = """usage: MySQLStat [options]"""

    opt = optparse.OptionParser(usage=usage)
    opt.add_option("-H","--host",dest="host",help="MySQL host (default: localhost)")
    opt.add_option("-u","--user",dest="user",help="MySQL user")
    opt.add_option("-p","--password",dest="password",help="MySQL password")
    opt.add_option("-P","--port",dest="port",help="MySQL port (default: 3306)")
    opt.add_option("","--smtp_host",dest="smtp_host",help="SMTP host (default: localhost)")
    opt.add_option("","--smtp_user",dest="smtp_user",help="SMTP user")
    opt.add_option("","--smtp_password",dest="smtp_password",help="SMTP password")
    opt.add_option("","--from",dest="from",help="Email from")
    opt.add_option("","--to",dest="to",help="Email to")
    opt.add_option("","--defaults-file",dest="defaults_file",help="config file path")
    (options,args) = opt.parse_args()

    options = options.__dict__
    Statu = SlaveStatu( **options )
    subject = "服務中心異常信息提醒"
    if Statu.connect() is False or Statu.isSlave() is False:
        Statu.notify( subject, Statu.get_last_error() )
    Statu.close()

server1.cnf 設置文件內容

[config]
smtp_host=smtp.aliyun.com
[email protected]
smtp_password=xxxxxx
from=管理中心<[email protected]>
host=xxx.xxx.xxx.xxx
user=root
password=123456

完成了以上的配置之後,我們在定時任務裡添加一條任務,就可以讓程序為我們監控MySQL的服務器狀態了。
crontab設置

*/2 * * * * python slavecheckpoint.py --defaults-file=server1.cnf [email protected]

github項目地址: https://github.com/yagas/checkpoint.git

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved