程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

Super hard core! 11 very practical Python and shell script instances!

編輯:Python

source : Jay's IT The journey

This time I'll share with you a few scripts I used in my work , It is mainly divided into :Python and Shell Two parts .

Python Script part instance : Enterprise wechat alert 、FTP client 、SSH client 、Saltstack client 、vCenter client 、 Access to domain name ssl Certificate expiration time 、 Send today's weather forecast and future weather trend chart ;

Shell Script part instance :SVN Full backup 、Zabbix Monitoring user password expiration 、 Build local YUM And the needs of readers in the last article ( When the load is high , Find out the process script with high occupancy and store or push the notification );

It's a little long , Please turn to the end of the article patiently .

Python  Script part

Enterprise wechat alert

This script is applied through enterprise wechat , Make wechat alarm , Can be used for Zabbix monitor .

# -*- coding: utf-8 -*-
import requests
import json
class DLF:
    def __init__(self, corpid, corpsecret):
        self.url = "https://qyapi.weixin.qq.com/cgi-bin"
        self.corpid = corpid
        self.corpsecret = corpsecret
        self._token = self._get_token()
    def _get_token(self):
        '''
         Get enterprise wechat API Interface access_token
        :return:
        '''
        token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret)
        try:
            res = requests.get(token_url).json()
            token = res['access_token']
            return token
        except Exception as e:
            return str(e)
    def _get_media_id(self, file_obj):
        get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token)
        data = {"media": file_obj}
        try:
            res = requests.post(url=get_media_url, files=data)
            media_id = res.json()['media_id']
            return media_id
        except Exception as e:
            return str(e)
    def send_text(self, agentid, content, touser=None, toparty=None):
        send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
        send_data = {
            "touser": touser,
            "toparty": toparty,
            "msgtype": "text",
            "agentid": agentid,
            "text": {
                "content": content
            }
        }
        try:
            res = requests.post(send_msg_url, data=json.dumps(send_data))
        except Exception as e:
            return str(e)
    def send_image(self, agentid, file_obj, touser=None, toparty=None):
        media_id = self._get_media_id(file_obj)
        send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
        send_data = {
            "touser": touser,
            "toparty": toparty,
            "msgtype": "image",
            "agentid": agentid,
            "image": {
                "media_id": media_id
           }
        }
        try:
            res = requests.post(send_msg_url, data=json.dumps(send_data))
        except Exception as e:
            return str(e)

FTP client

adopt ftplib Module operation ftp The server , Upload and download .

# -*- coding: utf-8 -*-
from ftplib import FTP
from os import path
import copy
class FTPClient:
    def __init__(self, host, user, passwd, port=21):
        self.host = host
        self.user = user
        self.passwd = passwd
        self.port = port
        self.res = {'status': True, 'msg': None}
        self._ftp = None
        self._login()
    def _login(self):
        '''
         Sign in FTP The server
        :return:  Return error message when connection or login is abnormal
        '''
        try:
            self._ftp = FTP()
            self._ftp.connect(self.host, self.port, timeout=30)
            self._ftp.login(self.user, self.passwd)
        except Exception as e:
            return e
    def upload(self, localpath, remotepath=None):
        '''
         Upload ftp file
        :param localpath: local file path
        :param remotepath: remote file path
        :return:
        '''
        if not localpath: return 'Please select a local file. '
        #  Read local file
        # fp = open(localpath, 'rb')
        #  If the remote file path is not passed , Then upload to the current directory , The file name is the same as the local file
        if not remotepath:
            remotepath = path.basename(localpath)
        #  Upload files
        self._ftp.storbinary('STOR ' + remotepath, localpath)
        # fp.close()
    def download(self, remotepath, localpath=None):
        '''
        localpath
        :param localpath: local file path
        :param remotepath: remote file path
        :return:
        '''
        if not remotepath: return 'Please select a remote file. '
        #  If the local file path is not passed , Download to the current directory , The file name is the same as the remote file
        if not localpath:
            localpath = path.basename(remotepath)
        #  If localpath If it's a catalog, it's like remotepath Of basename Splicing
        if path.isdir(localpath):
            localpath = path.join(localpath, path.basename(remotepath))
        #  Write to local file
        fp = open(localpath, 'wb')
        #  Download the file
        self._ftp.retrbinary('RETR ' + remotepath, fp.write)
        fp.close()
    def nlst(self, dir='/'):
        '''
         Check the contents of the catalog
        :return:  Returns all the contents in the directory as a list
        '''
        files_list = self._ftp.nlst(dir)
        return files_list
    def rmd(self, dir=None):
        '''
         Delete directory
        :param dir:  Directory name
        :return:  Execution results
        '''
        if not dir: return 'Please input dirname'
        res = copy.deepcopy(self.res)
        try:
            del_d = self._ftp.rmd(dir)
            res['msg'] = del_d
        except Exception as e:
            res['status'] = False
            res['msg'] = str(e)
        return res
    def mkd(self, dir=None):
        '''
         Create directory
        :param dir:  Directory name
        :return:  Execution results
        '''
        if not dir: return 'Please input dirname'
        res = copy.deepcopy(self.res)
        try:
            mkd_d = self._ftp.mkd(dir)
            res['msg'] = mkd_d
        except Exception as e:
            res['status'] = False
            res['msg'] = str(e)
        return res
    def del_file(self, filename=None):
        '''
         Delete file
        :param filename:  File name
        :return:  Execution results
        '''
        if not filename: return 'Please input filename'
        res = copy.deepcopy(self.res)
        try:
            del_f = self._ftp.delete(filename)
            res['msg'] = del_f
        except Exception as e:
            res['status'] = False
            res['msg'] = str(e)
        return res
    def get_file_size(self, filenames=[]):
        '''
         Get file size , Unit is byte
         Determine file type
        :param filename:  File name
        :return:  Execution results
        '''
        if not filenames: return {'msg': 'This is an empty directory'}
        res_l = []
        for file in filenames:
            res_d = {}
            #  If the directory or file does not exist, an error will be reported
            try:
                size = self._ftp.size(file)
                type = 'f'
            except:
                #  If it's a path size Show  - , file Add at the end / (/dir/)
                size = '-'
                type = 'd'
                file = file + '/'
            res_d['filename'] = file
            res_d['size'] = size
            res_d['type'] = type
            res_l.append(res_d)
        return res_l
    def rename(self, old_name=None, new_name=None):
        '''
         rename
        :param old_name:  Old file or directory name
        :param new_name:  New file or directory name
        :return:  Execution results
        '''
        if not old_name or not new_name: return 'Please input old_name and new_name'
        res = copy.deepcopy(self.res)
        try:
            rename_f = self._ftp.rename(old_name, new_name)
            res['msg'] = rename_f
        except Exception as e:
            res['status'] = False
            res['msg'] = str(e)
        return res
    def close(self):
        '''
         sign out ftp Connect
        :return:
        '''
        try:
            #  Send to server quit command
            self._ftp.quit()
        except Exception:
            return 'No response from server'
        finally:
            #  The client unilaterally closes the connection
            self._ftp.close()

SSH client

This script is only used to pass key Connect , If you need a password connection , Simply modify it .

# -*- coding: utf-8 -*-
import paramiko
class SSHClient:
    def __init__(self, host, port, user, pkey):
        self.ssh_host = host
        self.ssh_port = port
        self.ssh_user = user
        self.private_key = paramiko.RSAKey.from_private_key_file(pkey)
        self.ssh = None
        self._connect()
    def _connect(self):
        self.ssh = paramiko.SSHClient()
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port, username=self.ssh_user, pkey=self.private_key, timeout=10)
        except:
            return 'ssh connect fail'
    def execute_command(self, command):
        stdin, stdout, stderr = self.ssh.exec_command(command)
        out = stdout.read()
        err = stderr.read()
        return out, err
    def close(self):
        self.ssh.close()

Saltstack client

adopt api Yes Saltstack The server operates , Carry out orders .

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import requests
import json
import copy
class SaltApi:
    """
     Definition salt api The class of the interface
     Initialize get token
    """
    def __init__(self):
        self.url = "http://172.85.10.21:8000/"
        self.username = "saltapi"
        self.password = "saltapi"
        self.headers = {"Content-type": "application/json"}
        self.params = {'client': 'local', 'fun': None, 'tgt': None, 'arg': None}
        self.login_url = self.url + "login"
        self.login_params = {'username': self.username, 'password': self.password, 'eauth': 'pam'}
        self.token = self.get_data(self.login_url, self.login_params)['token']
        self.headers['X-Auth-Token'] = self.token
    def get_data(self, url, params):
        '''
         request url get data
        :param url:  Requested url Address
        :param params:  Pass to url Parameters of
        :return:  Requested results
        '''
        send_data = json.dumps(params)
        request = requests.post(url, data=send_data, headers=self.headers)
        response = request.json()
        result = dict(response)
        return result['return'][0]
    def get_auth_keys(self):
        '''
         Get all the certified key
        :return:
        '''
        data = copy.deepcopy(self.params)
        data['client'] = 'wheel'
        data['fun'] = 'key.list_all'
        result = self.get_data(self.url, data)
        try:
            return result['data']['return']['minions']
        except Exception as e:
            return str(e)
    def get_grains(self, tgt, arg='id'):
        """
         Get the basic information of the system
        :tgt:  The target host
        :return:
        """
        data = copy.deepcopy(self.params)
        if tgt:
            data['tgt'] = tgt
        else:
            data['tgt'] = '*'
        data['fun'] = 'grains.item'
        data['arg'] = arg
        result = self.get_data(self.url, data)
        return result
    def execute_command(self, tgt, fun='cmd.run', arg=None, tgt_type='list', salt_async=False):
        """
         perform saltstack  Module command , Be similar to salt '*' cmd.run 'command'
        :param tgt:  The target host
        :param fun:  Module method   May be empty
        :param arg:  Pass parameters   May be empty
        :return:  Execution results
        """
        data = copy.deepcopy(self.params)
        if not tgt: return {'status': False, 'msg': 'target host not exist'}
        if not arg:
            data.pop('arg')
        else:
            data['arg'] = arg
        if tgt != '*':
            data['tgt_type'] = tgt_type
        if salt_async: data['client'] = 'local_async'
        data['fun'] = fun
        data['tgt'] = tgt
        result = self.get_data(self.url, data)
        return result
    def jobs(self, fun='detail', jid=None):
        """
         Mission
        :param fun: active, detail
        :param jod: Job ID
        :return:  Task execution results
        """
        data = {'client': 'runner'}
        data['fun'] = fun
        if fun == 'detail':
            if not jid: return {'success': False, 'msg': 'job id is none'}
            data['fun'] = 'jobs.lookup_jid'
            data['jid'] = jid
        else:
            return {'success': False, 'msg': 'fun is active or detail'}
        result = self.get_data(self.url, data)
        return result

vCenter client

Through the official SDK Yes vCenter Carry out daily operation , This script is for cmdb Platform , Get host information automatically , Store in database .

from pyVim.connect import SmartConnect, Disconnect, SmartConnectNoSSL
from pyVmomi import vim
from asset import models
import atexit
class Vmware:
    def __init__(self, ip, user, password, port, idc, vcenter_id):
        self.ip = ip
        self.user = user
        self.password = password
        self.port = port
        self.idc_id = idc
        self.vcenter_id = vcenter_id
    def get_obj(self, content, vimtype, name=None):
        '''
         The list returns ,name  You can specify matching objects
        '''
        container = content.viewManager.CreateContainerView(content.rootFolder, vimtype, True)
        obj = [ view for view in container.view ]
        return obj
    def get_esxi_info(self):
        #  Host information
        esxi_host = {}
        res = {"connect_status": True, "msg": None}
        try:
            # connect this thing
            si = SmartConnectNoSSL(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionPoolTimeout=60)
        except Exception as e:
            res['connect_status'] = False
            try:
                res['msg'] = ("%s Caught vmodl fault : " + e.msg) % (self.ip)
            except Exception as e:
                res['msg'] = '%s: connection error' % (self.ip)
            return res
        # disconnect this thing
        atexit.register(Disconnect, si)
        content = si.RetrieveContent()
        esxi_obj = self.get_obj(content, [vim.HostSystem])
        for esxi in esxi_obj:
            esxi_host[esxi.name] = {}
            esxi_host[esxi.name]['idc_id'] = self.idc_id
            esxi_host[esxi.name]['vcenter_id'] = self.vcenter_id
            esxi_host[esxi.name]['server_ip'] = esxi.name
            esxi_host[esxi.name]['manufacturer'] = esxi.summary.hardware.vendor
            esxi_host[esxi.name]['server_model'] = esxi.summary.hardware.model
            for i in esxi.summary.hardware.otherIdentifyingInfo:
                if isinstance(i, vim.host.SystemIdentificationInfo):
                    esxi_host[esxi.name]['server_sn'] = i.identifierValue
            #  System name
            esxi_host[esxi.name]['system_name'] = esxi.summary.config.product.fullName
            # cpu Total number of cores
            esxi_cpu_total = esxi.summary.hardware.numCpuThreads
            #  Total memory  GB
            esxi_memory_total = esxi.summary.hardware.memorySize / 1024 / 1024 / 1024
            #  Get the total number of hard disks  GB
            esxi_disk_total = 0
            for ds in esxi.datastore:
                esxi_disk_total += ds.summary.capacity / 1024 / 1024 / 1024
            #  The default configuration 4 nucleus 8G100G, Calculate the remaining allocable virtual machines based on this configuration
            default_configure = {
                'cpu': 4,
                'memory': 8,
                'disk': 100
            }
            esxi_host[esxi.name]['vm_host'] = []
            vm_usage_total_cpu = 0
            vm_usage_total_memory = 0
            vm_usage_total_disk = 0
            #  Virtual machine information
            for vm in esxi.vm:
                host_info = {}
                host_info['vm_name'] = vm.name
                host_info['power_status'] = vm.runtime.powerState
                host_info['cpu_total_kernel'] = str(vm.config.hardware.numCPU) + ' nucleus '
                host_info['memory_total'] = str(vm.config.hardware.memoryMB) + 'MB'
                host_info['system_info'] = vm.config.guestFullName
                disk_info = ''
                disk_total = 0
                for d in vm.config.hardware.device:
                    if isinstance(d, vim.vm.device.VirtualDisk):
                        disk_total += d.capacityInKB / 1024 / 1024
                        disk_info += d.deviceInfo.label + ": " +  str((d.capacityInKB) / 1024 / 1024) + ' GB' + ','
                host_info['disk_info'] = disk_info
                esxi_host[esxi.name]['vm_host'].append(host_info)
                #  Calculate the available capacity of the current host : Total amount  -  assigned
                if host_info['power_status'] == 'poweredOn':
                    vm_usage_total_cpu += vm.config.hardware.numCPU
                    vm_usage_total_disk += disk_total
                    vm_usage_total_memory += (vm.config.hardware.memoryMB / 1024)
            esxi_cpu_free = esxi_cpu_total - vm_usage_total_cpu
            esxi_memory_free = esxi_memory_total - vm_usage_total_memory
            esxi_disk_free = esxi_disk_total - vm_usage_total_disk
            esxi_host[esxi.name]['cpu_info'] = 'Total: %d nucleus , Free: %d nucleus ' % (esxi_cpu_total, esxi_cpu_free)
            esxi_host[esxi.name]['memory_info'] = 'Total: %dGB, Free: %dGB' % (esxi_memory_total, esxi_memory_free)
            esxi_host[esxi.name]['disk_info'] = 'Total: %dGB, Free: %dGB' % (esxi_disk_total, esxi_disk_free)
            #  Calculation cpu  Memory   The disk is allocated according to the minimum value of the default resource , That is, the current allocable resources
            if esxi_cpu_free < 4 or esxi_memory_free < 8 or esxi_disk_free < 100:
                free_allocation_vm_host = 0
            else:
                free_allocation_vm_host = int(min(
                    [
                        esxi_cpu_free / default_configure['cpu'],
                        esxi_memory_free / default_configure['memory'],
                        esxi_disk_free / default_configure['disk']
                    ]
                ))
            esxi_host[esxi.name]['free_allocation_vm_host'] = free_allocation_vm_host
        esxi_host['connect_status'] = True
        return esxi_host
    def write_to_db(self):
        esxi_host = self.get_esxi_info()
        #  The connection fails
        if not esxi_host['connect_status']:
            return esxi_host
        del esxi_host['connect_status']
        for machine_ip in esxi_host:
            #  Physical machine information
            esxi_host_dict = esxi_host[machine_ip]
            #  Virtual machine information
            virtual_host = esxi_host[machine_ip]['vm_host']
            del esxi_host[machine_ip]['vm_host']
            obj = models.EsxiHost.objects.create(**esxi_host_dict)
            obj.save()
            for host_info in virtual_host:
                host_info['management_host_id'] = obj.id
                obj2 = models.virtualHost.objects.create(**host_info)
                obj2.save()

Access to domain name ssl Certificate expiration time

be used for zabbix The alarm

import re
import sys
import time
import subprocess
from datetime import datetime
from io import StringIO
def main(domain):
    f = StringIO()
    comm = f"curl -Ivs https://{domain} --connect-timeout 10"
    result = subprocess.getstatusoutput(comm)
    f.write(result[1])
    try:
        m = re.search('start date: (.*?)\n.*?expire date: (.*?)\n.*?common name: (.*?)\n.*?issuer: CN=(.*?)\n', f.getvalue(), re.S)
        start_date = m.group(1)
        expire_date = m.group(2)
        common_name = m.group(3)
        issuer = m.group(4)
    except Exception as e:
        return 999999999
    # time  String to time array
    start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT")
    start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date)
    # datetime  String to time array
    expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT")
    expire_date_st = datetime.strftime(expire_date,"%Y-%m-%d %H:%M:%S")
    #  Days left
    remaining = (expire_date-datetime.now()).days
    return remaining 
if __name__ == "__main__":
    domain = sys.argv[1] 
    remaining_days = main(domain)
    print(remaining_days)

Send today's weather forecast and future weather trend chart

This script is used to send today's weather forecast and future weather trend map to my wife , Now wechat has banned the website , I can't send it to wechat , I inform you through wechat , You need to bring your wife to wechat , If you are not interested, you can skip it .

# -*- coding: utf-8 -*-
    import requests
    import json
    import datetime
    def weather(city):
        url = "http://wthrcdn.etouch.cn/weather_mini?city=%s" % city
        try:
            data = requests.get(url).json()['data']
            city = data['city']
            ganmao = data['ganmao']
            today_weather = data['forecast'][0]
            res = " My wife is today {}\n Today's weather \n City : {:<10}\n Time : {:<10}\n The high temperature : {:<10}\n low temperature : {:<10}\n wind : {:<10}\n wind direction : {:<10}\n The weather : {:<10}\n\n The recent temperature trend chart will be sent later , Please check .\
            ".format(
                ganmao,
                city,
                datetime.datetime.now().strftime('%Y-%m-%d'),
                today_weather['high'].split()[1],
                today_weather['low'].split()[1],
                today_weather['fengli'].split('[')[2].split(']')[0],
                today_weather['fengxiang'],today_weather['type'],
            )
            return {"source_data": data, "res": res}
        except Exception as e:
            return str(e)
    ```
    +  Get weather forecast trends
    ```python
    # -*- coding: utf-8 -*-
    import matplotlib.pyplot as plt
    import re
    import datetime
    def Future_weather_states(forecast, save_path, day_num=5):
        '''
         Show the trend chart of future weather forecast
        :param forecast:  Weather forecast data
        :param day_num:  In the next few days
        :return:  Trend chart
        '''
        future_forecast = forecast
        dict={}
        for i in range(day_num):
            data = []
            date = future_forecast[i]["date"]
            date = int(re.findall("\d+",date)[0])
            data.append(int(re.findall("\d+", future_forecast[i]["high"])[0]))
            data.append(int(re.findall("\d+", future_forecast[i]["low"])[0]))
            data.append(future_forecast[i]["type"])
            dict[date] = data
        data_list = sorted(dict.items())
        date=[]
        high_temperature = []
        low_temperature = []
        for each in data_list:
            date.append(each[0])
            high_temperature.append(each[1][0])
            low_temperature.append(each[1][1])
            fig = plt.plot(date,high_temperature,"r",date,low_temperature,"b")
        current_date = datetime.datetime.now().strftime('%Y-%m')
        plt.rcParams['font.sans-serif'] = ['SimHei']
        plt.rcParams['axes.unicode_minus'] = False
        plt.xlabel(current_date)
        plt.ylabel("℃")
        plt.legend([" The high temperature ", " low temperature "])
        plt.xticks(date)
        plt.title(" Temperature trends in recent days ")
        plt.savefig(save_path)
    ```
    +  Send to enterprise wechat
    ```python
    # -*- coding: utf-8 -*-
    import requests
    import json
    class DLF:
        def __init__(self, corpid, corpsecret):
            self.url = "https://qyapi.weixin.qq.com/cgi-bin"
            self.corpid = corpid
            self.corpsecret = corpsecret
            self._token = self._get_token()
        def _get_token(self):
            '''
             Get enterprise wechat API Interface access_token
            :return:
            '''
            token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret)
            try:
                res = requests.get(token_url).json()
                token = res['access_token']
                return token
            except Exception as e:
                return str(e)
        def _get_media_id(self, file_obj):
            get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token)
            data = {"media": file_obj}
            try:
                res = requests.post(url=get_media_url, files=data)
                media_id = res.json()['media_id']
                return media_id
            except Exception as e:
                return str(e)
        def send_text(self, agentid, content, touser=None, toparty=None):
            send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
            send_data = {
                "touser": touser,
                "toparty": toparty,
                "msgtype": "text",
                "agentid": agentid,
                "text": {
                    "content": content
                }
            }
            try:
                res = requests.post(send_msg_url, data=json.dumps(send_data))
            except Exception as e:
                return str(e)
        def send_image(self, agentid, file_obj, touser=None, toparty=None):
            media_id = self._get_media_id(file_obj)
            send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
            send_data = {
                "touser": touser,
                "toparty": toparty,
                "msgtype": "image",
                "agentid": agentid,
                "image": {
                    "media_id": media_id
               }
            }
            try:
                res = requests.post(send_msg_url, data=json.dumps(send_data))
            except Exception as e:
                return str(e)
+ main Script
# -*- coding: utf-8 -*-
from plugins.weather_forecast import weather
from plugins.trend_chart import Future_weather_states
from plugins.send_wechat import DLF
import os
#  Enterprise wechat related information
corpid = "xxx"
corpsecret = "xxx"
agentid = "xxx"
#  Weather forecast trend chart preservation path
_path = os.path.dirname(os.path.abspath(__file__))
save_path = os.path.join(_path ,'./tmp/weather_forecast.jpg')
#  Get weather forecast information
content = weather(" Daxing ")
#  Send a text message
dlf = DLF(corpid, corpsecret)
dlf.send_text(agentid=agentid, content=content['res'], toparty='1')
#  Generate weather forecast trend map
Future_weather_states(content['source_data']['forecast'], save_path)
#  Send a picture message
file_obj = open(save_path, 'rb')
dlf.send_image(agentid=agentid, toparty='1', file_obj=file_obj)

Shell  Script part

SVN Full backup

adopt hotcopy Conduct SVN Full backup , Backup retention 7 God .

#!/bin/bash
# Filename   :  svn_backup_repos.sh
# Date       :  2020/12/14
# Author     :  JakeTian      
# Email      :  [email protected]***.com
# Crontab    :  59 23 * * * /bin/bash $BASE_PATH/svn_backup_repos.sh >/dev/null 2>&1
# Notes      :   Add script to crontab in , Regular execution every day
# Description:  SVN Full backup
set -e
SRC_PATH="/opt/svndata"
DST_PATH="/data/svnbackup"
LOG_FILE="$DST_PATH/logs/svn_backup.log"
SVN_BACKUP_C="/bin/svnadmin hotcopy"
SVN_LOOK_C="/bin/svnlook youngest"
TODAY=$(date +'%F')
cd $SRC_PATH
ALL_REPOS=$(find ./ -maxdepth 1 -type d ! -name 'httpd' -a ! -name 'bak' | tr -d './')
#  Create backup directory , Backup script log directory
test -d $DST_PATH || mkdir -p $DST_PATH
test -d $DST_PATH/logs || mkdir $DST_PATH/logs
test -d $DST_PATH/$TODAY || mkdir $DST_PATH/$TODAY
#  Backup repos file
for repo in $ALL_REPOS
do
    $SVN_BACKUP_C $SRC_PATH/$repo $DST_PATH/$TODAY/$repo
    #  Determine if the backup is complete
    if $SVN_LOOK_C $DST_PATH/$TODAY/$repo;then
        echo "$TODAY: $repo Backup Success" >> $LOG_FILE 
    else
        echo "$TODAY: $repo Backup Fail" >> $LOG_FILE
    fi
done
# #  Backup user password file and permission file
cp -p authz access.conf $DST_PATH/$TODAY
#  Log file dump
mv $LOG_FILE $LOG_FILE-$TODAY
#  Delete the seven day old backup
seven_days_ago=$(date -d "7 days ago" +'%F')
rm -rf $DST_PATH/$seven_days_ago

zabbix Monitoring user password expiration

be used for Zabbix monitor Linux System users (shell by /bin/bash and /bin/sh) Password expired , The password is still valid 7 Day trigger plus auto discovery users .

#!/bin/bash
diskarray=(`awk -F':' '$NF ~ /\/bin\/bash/||/\/bin\/sh/{print $1}' /etc/passwd`)
length=${#diskarray[@]}
printf "{\n"
printf  '\t'"\"data\":["
for ((i=0;i<$length;i++))
do
    printf '\n\t\t{'
    printf "\"{#USER_NAME}\":\"${diskarray[$i]}\"}"
    if [ $i -lt $[$length-1] ];then
            printf ','
    fi
done
printf  "\n\t]\n"
printf "}\n"
Check for user password expiration
#!/bin/bash
export LANG=en_US.UTF-8
SEVEN_DAYS_AGO=$(date -d '-7 day' +'%s')
user="$1"
#  take Sep 09, 2018 The time of the format is converted into unix Time
expires_date=$(sudo chage -l $user | awk -F':' '/Password expires/{print $NF}' | sed -n 's/^ //p')
if [[ "$expires_date" != "never" ]];then
    expires_date=$(date -d "$expires_date" +'%s')
    if [ "$expires_date" -le "$SEVEN_DAYS_AGO" ];then
        echo "1"
    else
        echo "0"
    fi
else
    echo "0"
fi

Build local YUM

adopt rsync How to synchronize yum, adopt nginx Only do http yum Site ;

however centos6 I've been unable to use my image recently , It seems to be banned in China , If you find the right address to change .

#!/bin/bash
#  to update yum Mirror image
RsyncCommand="rsync -rvutH -P --delete --delete-after --delay-updates --bwlimit=1000"
DIR="/app/yumData"
LogDir="$DIR/logs"
Centos6Base="$DIR/Centos6/x86_64/Base"
Centos7Base="$DIR/Centos7/x86_64/Base"
Centos6Epel="$DIR/Centos6/x86_64/Epel"
Centos7Epel="$DIR/Centos7/x86_64/Epel"
Centos6Salt="$DIR/Centos6/x86_64/Salt"
Centos7Salt="$DIR/Centos7/x86_64/Salt"
Centos6Update="$DIR/Centos6/x86_64/Update"
Centos7Update="$DIR/Centos7/x86_64/Update"
Centos6Docker="$DIR/Centos6/x86_64/Docker"
Centos7Docker="$DIR/Centos7/x86_64/Docker"
Centos6Mysql5_7="$DIR/Centos6/x86_64/Mysql/Mysql5.7"
Centos7Mysql5_7="$DIR/Centos7/x86_64/Mysql/Mysql5.7"
Centos6Mysql8_0="$DIR/Centos6/x86_64/Mysql/Mysql8.0"
Centos7Mysql8_0="$DIR/Centos7/x86_64/Mysql/Mysql8.0"
MirrorDomain="rsync://rsync.mirrors.ustc.edu.cn"
#  Create a directory if it doesn't exist
check_dir(){
    for dir in $*
    do
        test -d $dir || mkdir -p $dir
    done
}
#  Check rsync Synchronization results
check_rsync_status(){
    if [ $? -eq 0 ];then
        echo "rsync success" >> $1
    else
        echo "rsync fail" >> $1
    fi
}
check_dir $DIR $LogDir $Centos6Base $Centos7Base $Centos6Epel $Centos7Epel $Centos6Salt $Centos7Salt $Centos6Update $Centos7Update $Centos6Docker $Centos7Docker $Centos6Mysql5_7 $Centos7Mysql5_7 $Centos6Mysql8_0 $Centos7Mysql8_0
# Base yumrepo
#$RsyncCommand "$MirrorDomain"/repo/centos/6/os/x86_64/ $Centos6Base >> "$LogDir/centos6Base.log" 2>&1
# check_rsync_status "$LogDir/centos6Base.log"
$RsyncCommand "$MirrorDomain"/repo/centos/7/os/x86_64/ $Centos7Base >> "$LogDir/centos7Base.log" 2>&1
check_rsync_status "$LogDir/centos7Base.log"
# Epel yumrepo
# $RsyncCommand "$MirrorDomain"/repo/epel/6/x86_64/ $Centos6Epel >> "$LogDir/centos6Epel.log" 2>&1
# check_rsync_status "$LogDir/centos6Epel.log"
$RsyncCommand "$MirrorDomain"/repo/epel/7/x86_64/ $Centos7Epel >> "$LogDir/centos7Epel.log" 2>&1
check_rsync_status "$LogDir/centos7Epel.log"
# SaltStack yumrepo
# $RsyncCommand "$MirrorDomain"/repo/salt/yum/redhat/6/x86_64/ $Centos6Salt >> "$LogDir/centos6Salt.log" 2>&1
# ln -s $Centos6Salt/archive/$(ls $Centos6Salt/archive | tail -1) $Centos6Salt/latest
# check_rsync_status "$LogDir/centos6Salt.log"
$RsyncComman "$MirrorDomain"/repo/salt/yum/redhat/7/x86_64/ $Centos7Salt >> "$LogDir/centos7Salt.log" 2>&1
check_rsync_status "$LogDir/centos7Salt.log"
# ln -s $Centos7Salt/archive/$(ls $Centos7Salt/archive | tail -1) $Centos7Salt/latest
# Docker yumrepo
$RsyncCommand "$MirrorDomain"/repo/docker-ce/linux/centos/7/x86_64/stable/ $Centos7Docker >> "$LogDir/centos7Docker.log" 2>&1
check_rsync_status "$LogDir/centos7Docker.log"
# centos update yumrepo
# $RsyncCommand "$MirrorDomain"/repo/centos/6/updates/x86_64/ $Centos6Update >> "$LogDir/centos6Update.log" 2>&1
# check_rsync_status "$LogDir/centos6Update.log"
$RsyncCommand "$MirrorDomain"/repo/centos/7/updates/x86_64/ $Centos7Update >> "$LogDir/centos7Update.log" 2>&1
check_rsync_status "$LogDir/centos7Update.log"
# mysql 5.7 yumrepo
# $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/6/x86_64/ "$Centos6Mysql5_7" >> "$LogDir/centos6Mysql5.7.log" 2>&1
# check_rsync_status "$LogDir/centos6Mysql5.7.log"
$RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/7/x86_64/ "$Centos7Mysql5_7" >> "$LogDir/centos7Mysql5.7.log" 2>&1
check_rsync_status "$LogDir/centos7Mysql5.7.log"
# mysql 8.0 yumrepo
# $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/6/x86_64/ "$Centos6Mysql8_0" >> "$LogDir/centos6Mysql8.0.log" 2>&1
# check_rsync_status "$LogDir/centos6Mysql8.0.log"
$RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/7/x86_64/ "$Centos7Mysql8_0" >> "$LogDir/centos7Mysql8.0.log" 2>&1
check_rsync_status "$LogDir/centos7Mysql8.0.log"

Readers need answers

When the load is high , Find out the process script with high occupancy and store or push the notification

#!/bin/bash
#  Physics cpu Number
physical_cpu_count=$(egrep 'physical id' /proc/cpuinfo | sort | uniq | wc -l)
#  Single Physics cpu Check the number
physical_cpu_cores=$(egrep 'cpu cores' /proc/cpuinfo | uniq | awk '{print $NF}')
#  Total number of cores
total_cpu_cores=$((physical_cpu_count*physical_cpu_cores))
#  One minute, respectively 、 Five minutes 、 The 15 minute load threshold , If one of them exceeds the threshold, it will trigger
one_min_load_threshold="$total_cpu_cores"
five_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores"' * "0.8"}')
fifteen_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores"' * "0.7"}')
#  They are minutes 、 Five minutes 、 15 minute load average
one_min_load=$(uptime | awk '{print $(NF-2)}' | tr -d ',')
five_min_load=$(uptime | awk '{print $(NF-1)}' | tr -d ',')
fifteen_min_load=$(uptime | awk '{print $NF}' | tr -d ',')
#  Get current cpu  Memory   disk io Information , And write the log file
#  If you need to send a message or call another , Please write your own function
get_info(){
    log_dir="cpu_high_script_log"
    test -d "$log_dir" || mkdir "$log_dir"
    ps -eo user,pid,%cpu,stat,time,command --sort -%cpu | head -10 > "$log_dir"/cpu_top10.log
    ps -eo user,pid,%mem,rss,vsz,stat,time,command --sort -%mem | head -10 > "$log_dir"/mem_top10.log
    iostat -dx 1 10 > "$log_dir"/disk_io_10.log
}
export -f get_info
echo "$one_min_load $one_min_load_threshold $five_min_load $five_min_load_threshold $fifteen_min_load $fifteen_min_load_threshold" | \
awk '{ if ($1>=$2 || $3>=$4 || $5>=$6) system("get_info") }'

above , That's all I'm going to share today .

I hope you can apply what you have learned through these cases , Combined with their own actual scene to use , So as to improve their work efficiency .


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved