程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

超硬核!11 個非常實用的 Python 和 Shell 拿來就用腳本實例!

編輯:Python

來源:傑哥的IT之旅

這次再來給大家分享一波我工作中用到的幾個腳本,主要分為:Python和Shell兩個部分。

Python 腳本部分實例:企業微信告警、FTP 客戶端、SSH 客戶端、Saltstack 客戶端、vCenter 客戶端、獲取域名 ssl 證書過期時間、發送今天的天氣預報以及未來的天氣趨勢圖;

Shell 腳本部分實例:SVN 完整備份、Zabbix 監控用戶密碼過期、構建本地 YUM 以及上篇文章中有讀者的需求(負載高時,查出占用比較高的進程腳本並存儲或推送通知);

篇幅有些長,還請大家耐心翻到文末。

Python 腳本部分

企業微信告警

此腳本通過企業微信應用,進行微信告警,可用於 Zabbix 監控。

# -*- coding: utf-8 -*-
import requests
import json
class DLF:
    def __init__(self, corpid, corpsecret):
        self.url = "https://qyapi.weixin.qq.com/cgi-bin"
        self.corpid = corpid
        self.corpsecret = corpsecret
        self._token = self._get_token()
    def _get_token(self):
        '''
        獲取企業微信API接口的access_token
        :return:
        '''
        token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret)
        try:
            res = requests.get(token_url).json()
            token = res['access_token']
            return token
        except Exception as e:
            return str(e)
    def _get_media_id(self, file_obj):
        get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token)
        data = {"media": file_obj}
        try:
            res = requests.post(url=get_media_url, files=data)
            media_id = res.json()['media_id']
            return media_id
        except Exception as e:
            return str(e)
    def send_text(self, agentid, content, touser=None, toparty=None):
        send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
        send_data = {
            "touser": touser,
            "toparty": toparty,
            "msgtype": "text",
            "agentid": agentid,
            "text": {
                "content": content
            }
        }
        try:
            res = requests.post(send_msg_url, data=json.dumps(send_data))
        except Exception as e:
            return str(e)
    def send_image(self, agentid, file_obj, touser=None, toparty=None):
        media_id = self._get_media_id(file_obj)
        send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
        send_data = {
            "touser": touser,
            "toparty": toparty,
            "msgtype": "image",
            "agentid": agentid,
            "image": {
                "media_id": media_id
           }
        }
        try:
            res = requests.post(send_msg_url, data=json.dumps(send_data))
        except Exception as e:
            return str(e)

FTP 客戶端

通過 ftplib 模塊操作 ftp 服務器,進行上傳下載等操作。

# -*- coding: utf-8 -*-
from ftplib import FTP
from os import path
import copy
class FTPClient:
    def __init__(self, host, user, passwd, port=21):
        self.host = host
        self.user = user
        self.passwd = passwd
        self.port = port
        self.res = {'status': True, 'msg': None}
        self._ftp = None
        self._login()
    def _login(self):
        '''
        登錄FTP服務器
        :return: 連接或登錄出現異常時返回錯誤信息
        '''
        try:
            self._ftp = FTP()
            self._ftp.connect(self.host, self.port, timeout=30)
            self._ftp.login(self.user, self.passwd)
        except Exception as e:
            return e
    def upload(self, localpath, remotepath=None):
        '''
        上傳ftp文件
        :param localpath: local file path
        :param remotepath: remote file path
        :return:
        '''
        if not localpath: return 'Please select a local file. '
        # 讀取本地文件
        # fp = open(localpath, 'rb')
        # 如果未傳遞遠程文件路徑,則上傳到當前目錄,文件名稱同本地文件
        if not remotepath:
            remotepath = path.basename(localpath)
        # 上傳文件
        self._ftp.storbinary('STOR ' + remotepath, localpath)
        # fp.close()
    def download(self, remotepath, localpath=None):
        '''
        localpath
        :param localpath: local file path
        :param remotepath: remote file path
        :return:
        '''
        if not remotepath: return 'Please select a remote file. '
        # 如果未傳遞本地文件路徑,則下載到當前目錄,文件名稱同遠程文件
        if not localpath:
            localpath = path.basename(remotepath)
        # 如果localpath是目錄的話就和remotepath的basename拼接
        if path.isdir(localpath):
            localpath = path.join(localpath, path.basename(remotepath))
        # 寫入本地文件
        fp = open(localpath, 'wb')
        # 下載文件
        self._ftp.retrbinary('RETR ' + remotepath, fp.write)
        fp.close()
    def nlst(self, dir='/'):
        '''
        查看目錄下的內容
        :return: 以列表形式返回目錄下的所有內容
        '''
        files_list = self._ftp.nlst(dir)
        return files_list
    def rmd(self, dir=None):
        '''
        刪除目錄
        :param dir: 目錄名稱
        :return: 執行結果
        '''
        if not dir: return 'Please input dirname'
        res = copy.deepcopy(self.res)
        try:
            del_d = self._ftp.rmd(dir)
            res['msg'] = del_d
        except Exception as e:
            res['status'] = False
            res['msg'] = str(e)
        return res
    def mkd(self, dir=None):
        '''
        創建目錄
        :param dir: 目錄名稱
        :return: 執行結果
        '''
        if not dir: return 'Please input dirname'
        res = copy.deepcopy(self.res)
        try:
            mkd_d = self._ftp.mkd(dir)
            res['msg'] = mkd_d
        except Exception as e:
            res['status'] = False
            res['msg'] = str(e)
        return res
    def del_file(self, filename=None):
        '''
        刪除文件
        :param filename: 文件名稱
        :return: 執行結果
        '''
        if not filename: return 'Please input filename'
        res = copy.deepcopy(self.res)
        try:
            del_f = self._ftp.delete(filename)
            res['msg'] = del_f
        except Exception as e:
            res['status'] = False
            res['msg'] = str(e)
        return res
    def get_file_size(self, filenames=[]):
        '''
        獲取文件大小,單位是字節
        判斷文件類型
        :param filename: 文件名稱
        :return: 執行結果
        '''
        if not filenames: return {'msg': 'This is an empty directory'}
        res_l = []
        for file in filenames:
            res_d = {}
            # 如果是目錄或者文件不存在就會報錯
            try:
                size = self._ftp.size(file)
                type = 'f'
            except:
                # 如果是路徑的話size顯示 - , file末尾加/ (/dir/)
                size = '-'
                type = 'd'
                file = file + '/'
            res_d['filename'] = file
            res_d['size'] = size
            res_d['type'] = type
            res_l.append(res_d)
        return res_l
    def rename(self, old_name=None, new_name=None):
        '''
        重命名
        :param old_name: 舊的文件或者目錄名稱
        :param new_name: 新的文件或者目錄名稱
        :return: 執行結果
        '''
        if not old_name or not new_name: return 'Please input old_name and new_name'
        res = copy.deepcopy(self.res)
        try:
            rename_f = self._ftp.rename(old_name, new_name)
            res['msg'] = rename_f
        except Exception as e:
            res['status'] = False
            res['msg'] = str(e)
        return res
    def close(self):
        '''
        退出ftp連接
        :return:
        '''
        try:
            # 向服務器發送quit命令
            self._ftp.quit()
        except Exception:
            return 'No response from server'
        finally:
            # 客戶端單方面關閉連接
            self._ftp.close()

SSH 客戶端

此腳本僅用於通過 key 連接,如需要密碼連接,簡單修改下即可。

# -*- coding: utf-8 -*-
import paramiko
class SSHClient:
    def __init__(self, host, port, user, pkey):
        self.ssh_host = host
        self.ssh_port = port
        self.ssh_user = user
        self.private_key = paramiko.RSAKey.from_private_key_file(pkey)
        self.ssh = None
        self._connect()
    def _connect(self):
        self.ssh = paramiko.SSHClient()
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port, username=self.ssh_user, pkey=self.private_key, timeout=10)
        except:
            return 'ssh connect fail'
    def execute_command(self, command):
        stdin, stdout, stderr = self.ssh.exec_command(command)
        out = stdout.read()
        err = stderr.read()
        return out, err
    def close(self):
        self.ssh.close()

Saltstack 客戶端

通過 api 對 Saltstack 服務端進行操作,執行命令。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import requests
import json
import copy
class SaltApi:
    """
    定義salt api接口的類
    初始化獲得token
    """
    def __init__(self):
        self.url = "http://172.85.10.21:8000/"
        self.username = "saltapi"
        self.password = "saltapi"
        self.headers = {"Content-type": "application/json"}
        self.params = {'client': 'local', 'fun': None, 'tgt': None, 'arg': None}
        self.login_url = self.url + "login"
        self.login_params = {'username': self.username, 'password': self.password, 'eauth': 'pam'}
        self.token = self.get_data(self.login_url, self.login_params)['token']
        self.headers['X-Auth-Token'] = self.token
    def get_data(self, url, params):
        '''
        請求url獲取數據
        :param url: 請求的url地址
        :param params: 傳遞給url的參數
        :return: 請求的結果
        '''
        send_data = json.dumps(params)
        request = requests.post(url, data=send_data, headers=self.headers)
        response = request.json()
        result = dict(response)
        return result['return'][0]
    def get_auth_keys(self):
        '''
        獲取所有已經認證的key
        :return:
        '''
        data = copy.deepcopy(self.params)
        data['client'] = 'wheel'
        data['fun'] = 'key.list_all'
        result = self.get_data(self.url, data)
        try:
            return result['data']['return']['minions']
        except Exception as e:
            return str(e)
    def get_grains(self, tgt, arg='id'):
        """
        獲取系統基礎信息
        :tgt: 目標主機
        :return:
        """
        data = copy.deepcopy(self.params)
        if tgt:
            data['tgt'] = tgt
        else:
            data['tgt'] = '*'
        data['fun'] = 'grains.item'
        data['arg'] = arg
        result = self.get_data(self.url, data)
        return result
    def execute_command(self, tgt, fun='cmd.run', arg=None, tgt_type='list', salt_async=False):
        """
        執行saltstack 模塊命令,類似於salt '*' cmd.run 'command'
        :param tgt: 目標主機
        :param fun: 模塊方法 可為空
        :param arg: 傳遞參數 可為空
        :return: 執行結果
        """
        data = copy.deepcopy(self.params)
        if not tgt: return {'status': False, 'msg': 'target host not exist'}
        if not arg:
            data.pop('arg')
        else:
            data['arg'] = arg
        if tgt != '*':
            data['tgt_type'] = tgt_type
        if salt_async: data['client'] = 'local_async'
        data['fun'] = fun
        data['tgt'] = tgt
        result = self.get_data(self.url, data)
        return result
    def jobs(self, fun='detail', jid=None):
        """
        任務
        :param fun: active, detail
        :param jod: Job ID
        :return: 任務執行結果
        """
        data = {'client': 'runner'}
        data['fun'] = fun
        if fun == 'detail':
            if not jid: return {'success': False, 'msg': 'job id is none'}
            data['fun'] = 'jobs.lookup_jid'
            data['jid'] = jid
        else:
            return {'success': False, 'msg': 'fun is active or detail'}
        result = self.get_data(self.url, data)
        return result

vCenter 客戶端

通過官方 SDK 對 vCenter 進行日常操作,此腳本是我用於 cmdb 平台的,自動獲取主機信息,存入數據庫。

from pyVim.connect import SmartConnect, Disconnect, SmartConnectNoSSL
from pyVmomi import vim
from asset import models
import atexit
class Vmware:
    def __init__(self, ip, user, password, port, idc, vcenter_id):
        self.ip = ip
        self.user = user
        self.password = password
        self.port = port
        self.idc_id = idc
        self.vcenter_id = vcenter_id
    def get_obj(self, content, vimtype, name=None):
        '''
        列表返回,name 可以指定匹配的對象
        '''
        container = content.viewManager.CreateContainerView(content.rootFolder, vimtype, True)
        obj = [ view for view in container.view ]
        return obj
    def get_esxi_info(self):
        # 宿主機信息
        esxi_host = {}
        res = {"connect_status": True, "msg": None}
        try:
            # connect this thing
            si = SmartConnectNoSSL(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionPoolTimeout=60)
        except Exception as e:
            res['connect_status'] = False
            try:
                res['msg'] = ("%s Caught vmodl fault : " + e.msg) % (self.ip)
            except Exception as e:
                res['msg'] = '%s: connection error' % (self.ip)
            return res
        # disconnect this thing
        atexit.register(Disconnect, si)
        content = si.RetrieveContent()
        esxi_obj = self.get_obj(content, [vim.HostSystem])
        for esxi in esxi_obj:
            esxi_host[esxi.name] = {}
            esxi_host[esxi.name]['idc_id'] = self.idc_id
            esxi_host[esxi.name]['vcenter_id'] = self.vcenter_id
            esxi_host[esxi.name]['server_ip'] = esxi.name
            esxi_host[esxi.name]['manufacturer'] = esxi.summary.hardware.vendor
            esxi_host[esxi.name]['server_model'] = esxi.summary.hardware.model
            for i in esxi.summary.hardware.otherIdentifyingInfo:
                if isinstance(i, vim.host.SystemIdentificationInfo):
                    esxi_host[esxi.name]['server_sn'] = i.identifierValue
            # 系統名稱
            esxi_host[esxi.name]['system_name'] = esxi.summary.config.product.fullName
            # cpu總核數
            esxi_cpu_total = esxi.summary.hardware.numCpuThreads
            # 內存總量 GB
            esxi_memory_total = esxi.summary.hardware.memorySize / 1024 / 1024 / 1024
            # 獲取硬盤總量 GB
            esxi_disk_total = 0
            for ds in esxi.datastore:
                esxi_disk_total += ds.summary.capacity / 1024 / 1024 / 1024
            # 默認配置4核8G100G,根據這個配置計算剩余可分配虛擬機
            default_configure = {
                'cpu': 4,
                'memory': 8,
                'disk': 100
            }
            esxi_host[esxi.name]['vm_host'] = []
            vm_usage_total_cpu = 0
            vm_usage_total_memory = 0
            vm_usage_total_disk = 0
            # 虛擬機信息
            for vm in esxi.vm:
                host_info = {}
                host_info['vm_name'] = vm.name
                host_info['power_status'] = vm.runtime.powerState
                host_info['cpu_total_kernel'] = str(vm.config.hardware.numCPU) + '核'
                host_info['memory_total'] = str(vm.config.hardware.memoryMB) + 'MB'
                host_info['system_info'] = vm.config.guestFullName
                disk_info = ''
                disk_total = 0
                for d in vm.config.hardware.device:
                    if isinstance(d, vim.vm.device.VirtualDisk):
                        disk_total += d.capacityInKB / 1024 / 1024
                        disk_info += d.deviceInfo.label + ": " +  str((d.capacityInKB) / 1024 / 1024) + ' GB' + ','
                host_info['disk_info'] = disk_info
                esxi_host[esxi.name]['vm_host'].append(host_info)
                # 計算當前宿主機可用容量:總量 - 已分配的
                if host_info['power_status'] == 'poweredOn':
                    vm_usage_total_cpu += vm.config.hardware.numCPU
                    vm_usage_total_disk += disk_total
                    vm_usage_total_memory += (vm.config.hardware.memoryMB / 1024)
            esxi_cpu_free = esxi_cpu_total - vm_usage_total_cpu
            esxi_memory_free = esxi_memory_total - vm_usage_total_memory
            esxi_disk_free = esxi_disk_total - vm_usage_total_disk
            esxi_host[esxi.name]['cpu_info'] = 'Total: %d核, Free: %d核' % (esxi_cpu_total, esxi_cpu_free)
            esxi_host[esxi.name]['memory_info'] = 'Total: %dGB, Free: %dGB' % (esxi_memory_total, esxi_memory_free)
            esxi_host[esxi.name]['disk_info'] = 'Total: %dGB, Free: %dGB' % (esxi_disk_total, esxi_disk_free)
            # 計算cpu 內存 磁盤按照默認資源分配的最小值,即為當前可分配資源
            if esxi_cpu_free < 4 or esxi_memory_free < 8 or esxi_disk_free < 100:
                free_allocation_vm_host = 0
            else:
                free_allocation_vm_host = int(min(
                    [
                        esxi_cpu_free / default_configure['cpu'],
                        esxi_memory_free / default_configure['memory'],
                        esxi_disk_free / default_configure['disk']
                    ]
                ))
            esxi_host[esxi.name]['free_allocation_vm_host'] = free_allocation_vm_host
        esxi_host['connect_status'] = True
        return esxi_host
    def write_to_db(self):
        esxi_host = self.get_esxi_info()
        # 連接失敗
        if not esxi_host['connect_status']:
            return esxi_host
        del esxi_host['connect_status']
        for machine_ip in esxi_host:
            # 物理機信息
            esxi_host_dict = esxi_host[machine_ip]
            # 虛擬機信息
            virtual_host = esxi_host[machine_ip]['vm_host']
            del esxi_host[machine_ip]['vm_host']
            obj = models.EsxiHost.objects.create(**esxi_host_dict)
            obj.save()
            for host_info in virtual_host:
                host_info['management_host_id'] = obj.id
                obj2 = models.virtualHost.objects.create(**host_info)
                obj2.save()

獲取域名 ssl 證書過期時間

用於 zabbix 告警

import re
import sys
import time
import subprocess
from datetime import datetime
from io import StringIO
def main(domain):
    f = StringIO()
    comm = f"curl -Ivs https://{domain} --connect-timeout 10"
    result = subprocess.getstatusoutput(comm)
    f.write(result[1])
    try:
        m = re.search('start date: (.*?)\n.*?expire date: (.*?)\n.*?common name: (.*?)\n.*?issuer: CN=(.*?)\n', f.getvalue(), re.S)
        start_date = m.group(1)
        expire_date = m.group(2)
        common_name = m.group(3)
        issuer = m.group(4)
    except Exception as e:
        return 999999999
    # time 字符串轉時間數組
    start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT")
    start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date)
    # datetime 字符串轉時間數組
    expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT")
    expire_date_st = datetime.strftime(expire_date,"%Y-%m-%d %H:%M:%S")
    # 剩余天數
    remaining = (expire_date-datetime.now()).days
    return remaining 
if __name__ == "__main__":
    domain = sys.argv[1] 
    remaining_days = main(domain)
    print(remaining_days)

發送今天的天氣預報以及未來的天氣趨勢圖

此腳本用於給老婆大人發送今天的天氣預報以及未來的天氣趨勢圖,現在微信把網頁端禁止了,沒法發送到微信了,我是通過企業微信進行通知的,需要把你老婆大人拉到企業微信,無興趣的小伙伴跳過即可。

# -*- coding: utf-8 -*-
    import requests
    import json
    import datetime
    def weather(city):
        url = "http://wthrcdn.etouch.cn/weather_mini?city=%s" % city
        try:
            data = requests.get(url).json()['data']
            city = data['city']
            ganmao = data['ganmao']
            today_weather = data['forecast'][0]
            res = "老婆今天是{}\n今天天氣概況\n城市: {:<10}\n
  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved