程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

python腳本通過dataX增量同步mysql數據至HIVE

編輯:Python

python通過dataX同步mysql數據

1、介紹

該腳本主要是使用python腳本通過dataX查詢mysql的數據增量同步至HIVE。

2、環境

1、python3
2、dataX
3、pyhive
4、pyspark

3、功能

(1)支持測試和生產的自由切換。
(2)支持增量同步。
(3)支持補歷史數據。
(4)運行環境簡單。
(5)支持HIVE隊列的切換。

4、優化

對於同步數據,該腳本基本已經都支持。還有優化空間就是:
1、連接HIVE時可以使用HA模式,不連接單節點。減少宕機風險。
2、日志打印的規范性。
3、dataX腳本的缺失。由於其他原因不能放出來。我後面見放一個demo出來。
4、可以通過變量的形式將要同步的表目標表傳進來。實現舉一反三的同步。
後續在其他腳本優化。

5、源碼

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 增量同步消息
from pyhive import hive
import os, sys,datetime
isPrd = True
hiveInfo = {
'host':'192.168.1.1','port':10000, 'user':'root','database':'ods','hdfs':'hdfs://192.168.1.1:8020'} \
if(isPrd) else {
'host':'192.168.1.122','port':10000, 'user':'root','database':'ods','hdfs':'hdfs://192.168.1.1:8020'}
sourceDbInfo = {
'url':'192.168.1.1:3306/db','user':'root','passwd':'123'} \
if(isPrd) else {
'url':'192.168.1.122:3306/db','user':'root','passwd':'root123'}
sys.path.append(os.getcwd())
UTF8 = "UTF-8";
class HiveClient:
def __init__(self):
self.conn = hive.connect(
host=hiveInfo.get('host'),
port=hiveInfo.get('port'),
username=hiveInfo.get('user'),
database=hiveInfo.get('database'),)
def query(self, sql):
sql = sql.replace("\n", "").replace("\t", "").replace("\r\n", "")
print(sql)
with self.conn.cursor() as cursor:
cursor.execute("set mapreduce.job.queuename=root.users.project")
cursor.execute(sql)
return cursor.fetchall()
def execute(self, sql):
sql = sql.replace("\n", "").replace("\t", "").replace("\r\n", "")
print(sql)
with self.conn.cursor() as cursor:
cursor.execute("set mapreduce.job.queuename=root.users.project")
cursor.execute(sql)
def close(self):
self.conn.close()
def __getMaxPk():
#增加分區
addPartion="alter table ods.ods_message_incr add if not exists partition (dt='{dt}') ".format(dt=dt)
HiveClient().execute(addPartion)
#獲取最大ID
sql = """select max(id) from ods.ods_message_incr where dt='{dt}'""".format(dt=dt)
data = HiveClient().query(sql)
HiveClient().close()
print(data)
if (data[0][0] == None):
return 0
return data[0][0]
# 增量同步推送消息
def syncPushMessage(dt):
maxPk = __getMaxPk();
datax_json_path = os.getcwd() + '/ods_message_incr.json'
etime = (datetime.datetime.strptime(dt, "%Y-%m-%d") + datetime.timedelta(days=1)).strftime('%Y-%m-%d')
# 這是執行dataX 命令,後面是傳參。
commandStr = "python /home/datax/bin/datax.py %s -p '-DmaxId=%s -Dsurl=%s -Dsuser=%s -Dspasswd=%s -Dstime=%s -Detime=%s -Dtime=%s -Dhdfs=%s '" % (
datax_json_path, maxPk, sourceDbInfo.get("url"), sourceDbInfo.get("user"), sourceDbInfo.get("passwd"), dt, etime, dt, hiveInfo.get('hdfs'));
print(commandStr)
os.system(commandStr)
# 補充缺失消息
def syncPushMessage_history(dt,maxPk):
datax_json_path = os.getcwd() + '/ods_message_incr.json'
etime = (datetime.datetime.strptime(dt, "%Y-%m-%d") + datetime.timedelta(days=1)).strftime('%Y-%m-%d')
commandStr = "python /home/datax/bin/datax.py %s -p '-DmaxId=%s -Dsurl=%s -Dsuser=%s -Dspasswd=%s -Dstime=%s -Detime=%s -Dtime=%s -Dhdfs=%s '" % (
datax_json_path, maxPk, sourceDbInfo.get("url"), sourceDbInfo.get("user"), sourceDbInfo.get("passwd"), dt, etime, dt, hiveInfo.get('hdfs'));
print(commandStr)
os.system(commandStr)
if __name__ == '__main__':
if len(sys.argv) == 1:
dt = (datetime.datetime.now()).strftime('%Y-%m-%d')
syncPushMessage(dt)
elif len(sys.argv) == 2:
dt = sys.argv[1]
syncPushMessage(dt)
elif len(sys.argv) == 3:
dt = sys.argv[1]
maxPk = sys.argv[2]
syncPushMessage_history(dt, maxPk)
else:
print('參數輸入錯誤')
sys.exit(1)

6、最後

文章為原創,轉載請出示原地址。
感謝你的閱讀,如果這篇文章能幫到你。是我的榮幸!謝謝~


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved