程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 數據庫知識 >> MYSQL數據庫 >> MySQL綜合教程 >> python--同一mysql數據庫下批量遷移數據,python--mysql

python--同一mysql數據庫下批量遷移數據,python--mysql

編輯:MySQL綜合教程

python--同一mysql數據庫下批量遷移數據,python--mysql


最近接手些mysql數據庫維護,發現mysql在批量操作方面就是個渣渣啊,比起MS SQL SERVER簡直就是“不可同日而語”。

咨詢了下MySQL的高手,對於數據遷移這種問題,一種處理方式就是直接“一步到位” ,一次性將所有數據查詢插入到另外一個表,然後再刪除原表數據;另外一種處理方式就是使用pt--archiver工具來歸檔。

 

然並卵,“一步到位”法太刺激,pt--archiver工具用不順手,由於目前大部分的表都以自增id為主鍵,以此為此為前提自己寫個小腳本,厚臉拿出來供各位參考:

# coding: utf-8 import MySQLdb import time # common config EXEC_DETAIL_FILE = 'exec_detail.txt' DATETIME_FORMAT = '%Y-%m-%d %X' Default_MySQL_Host = '192.168.166.169' Default_MySQL_Port = 3358 Default_MySQL_User = "mysql_admin" Default_MySQL_Password = 'mysql@Admin@Pwd' Default_MySQL_Charset = "utf8" Default_MySQL_Connect_TimeOut = 120 # Transfer Config Transfer_Database_Name = "db001" Transfer_Source_Table_Name = "tb2001" Transfer_Target_Table_Name = "tb2001_his" Transfer_Condition = "dt <'2016-10-01'" Transfer_Rows_Per_Batch = 10000 Sleep_Second_Per_Batch = 0.5 def get_time_string(dt_time): """ 獲取指定格式的時間字符串 :param dt_time: 要轉換成字符串的時間 :return: 返回指定格式的字符串 """ global DATETIME_FORMAT return time.strftime(DATETIME_FORMAT, dt_time) def get_time_string(dt_time): return time.strftime("%Y-%m-%d %X", dt_time) def highlight(s): return "%s[30;2m%s%s[1m" % (chr(27), s, chr(27)) def print_warning_message(message): """ 以紅色字體顯示消息內容 :param message: 消息內容 :return: 無返回值 """ message = str(message) print(highlight('') + "%s[31;1m%s%s[0m" % (chr(27), message, chr(27))) def print_info_message(message): """ 以綠色字體輸出提醒性的消息 :param message: 消息內容 :return: 無返回值 """ message = str(message) print(highlight('') + "%s[32;2m%s%s[0m" % (chr(27), message, chr(27))) def write_file(file_path, message): """ 將傳入的message追加寫入到file_path指定的文件中 請先創建文件所在的目錄 :param file_path: 要寫入的文件路徑 :param message: 要寫入的信息 :return: """ file_handle = open(file_path, 'a') file_handle.writelines(message) # 追加一個換行以方便浏覽 file_handle.writelines(chr(13)) file_handle.close() def get_mysql_connection(): """ 根據默認配置返回數據庫連接 :return: 數據庫連接 """ conn = MySQLdb.connect( host=Default_MySQL_Host, port=Default_MySQL_Port, user=Default_MySQL_User, passwd=Default_MySQL_Password, connect_timeout=Default_MySQL_Connect_TimeOut, charset=Default_MySQL_Charset, db=Transfer_Database_Name ) return conn def mysql_exec(sql_script, sql_param=None): """ 執行傳入的腳本,返回影響行數 :param sql_script: :param sql_param: :return: 腳本最後一條語句執行影響行數 """ try: conn = get_mysql_connection() print_info_message("在服務器{0}上執行腳本:{1}".format( conn.get_host_info(), sql_script)) cursor = conn.cursor() if sql_param is not None: cursor.execute(sql_script, sql_param) else: cursor.execute(sql_script) affect_rows = cursor.rowcount conn.commit() cursor.close() conn.close() return affect_rows except Exception as ex: cursor.close() conn.rollback() raise Exception(str(ex)) def mysql_exec_many(sql_script_list): """ 執行傳入的腳本,返回影響行數 :param sql_script_list: 要執行的腳本List,List中每個元素為sql_script, sql_param對 :return: 返回執行每個腳本影響的行數列表 """ try: conn = get_mysql_connection() exec_result_list = [] for sql_script, sql_param in sql_script_list: print_info_message("在服務器{0}上執行腳本:{1}".format( conn.get_host_info(), sql_script)) cursor = conn.cursor() if sql_param is not None: cursor.execute(sql_script, sql_param) else: cursor.execute(sql_script) affect_rows = cursor.rowcount exec_result_list.append("影響行數:{0}".format(affect_rows)) conn.commit() cursor.close() conn.close() return exec_result_list except Exception as ex: cursor.close() conn.rollback() raise Exception(str(ex)) def mysql_query(sql_script, sql_param=None): """ 執行傳入的SQL腳本,並返回查詢結果 :param sql_script: :param sql_param: :return: 返回SQL查詢結果 """ try: conn = get_mysql_connection() print_info_message("在服務器{0}上執行腳本:{1}".format( conn.get_host_info(), sql_script)) cursor = conn.cursor() if sql_param != '': cursor.execute(sql_script, sql_param) else: cursor.execute(sql_script) exec_result = cursor.fetchall() cursor.close() conn.close() return exec_result except Exception as ex: cursor.close() conn.close() raise Exception(str(ex)) def get_column_info_list(table_name): sql_script = """ DESC {0} """.format(table_name) column_info_list = [] query_result = mysql_query(sql_script=sql_script, sql_param=None) for row in query_result: column_name = row[0] column_key = row[3] column_info = column_name, column_key column_info_list.append(column_info) return column_info_list def get_id_range(): """ 按照傳入的表獲取要刪除數據最大ID、最小ID、刪除總行數 :return: 返回要刪除數據最大ID、最小ID、刪除總行數 """ global Transfer_Condition global Transfer_Rows_Per_Batch sql_script = """ SELECT MAX(ID) AS MAX_ID, MIN(ID) AS MIN_ID, COUNT(1) AS Total_Count FROM {0} WHERE {1}; """.format(Transfer_Source_Table_Name, Transfer_Condition) query_result = mysql_query(sql_script=sql_script, sql_param=None) max_id, min_id, total_count = query_result[0] # 此處有一坑,可能出現total_count不為0 但是max_id 和min_id 為None的情況 # 因此判斷max_id和min_id 是否為NULL if (max_id is None) or (min_id is None): max_id, min_id, total_count = 0, 0, 0 return max_id, min_id, total_count def check_env(): try: source_columns_info_list = get_column_info_list(Transfer_Source_Table_Name) target_columns_info_list = get_column_info_list(Transfer_Target_Table_Name) if len(source_columns_info_list) != len(target_columns_info_list): print_info("源表和目標表的列數不對,不滿足遷移條件") return False column_count = len(source_columns_info_list) id_flag = False for column_id in range(column_count): source_column_name, source_column_key = source_columns_info_list[column_id] target_column_name, target_column_key = target_columns_info_list[column_id] if source_column_name != target_column_name: print_info("源表和目標表的列名不匹配,不滿足遷移條件") return False if source_column_name.lower() == 'id' \ and source_column_key.lower() == 'pri' \ and target_column_name.lower() == 'id' \ and target_column_key.lower() == 'pri': id_flag = True if not id_flag: print_info("未找到為主鍵的id列,不滿足遷移條件") return False return True except Exception as ex: print_info("執行出現異常,異常為{0}".format(ex.message)) return False def main(): flag = check_env() if not flag: return loop_trans_data() def trans_data(current_min_id, current_max_id): global Transfer_Source_Table_Name global Transfer_Target_Table_Name global Transfer_Condition global Transfer_Rows_Per_Batch print_info_message("*" * 70) copy_data_script = """ INSERT INTO {0} SELECT * FROM {1} WHERE ID>={2} AND ID<{3} AND {4} ; """.format(Transfer_Target_Table_Name, Transfer_Source_Table_Name, current_min_id, current_max_id, Transfer_Condition) delete_data_script = """ DELETE FROM {0} WHERE ID IN ( SELECT ID FROM {1} WHERE ID>={2} AND ID<{3}) AND ID>={4} AND ID<{5}; """.format(Transfer_Source_Table_Name, Transfer_Target_Table_Name, current_min_id, current_max_id, current_min_id, current_max_id) sql_script_list = [] tem_sql_script = copy_data_script, None sql_script_list.append(tem_sql_script) tem_sql_script = delete_data_script, None sql_script_list.append(tem_sql_script) exec_result_list = mysql_exec_many(sql_script_list) print_info_message("執行結果:") for item in exec_result_list: print_info_message(item) def loop_trans_data(): max_id, min_id, total_count = get_id_range() if min_id == max_id: print_info_message("無數據需要結轉") return current_min_id = min_id global Transfer_Rows_Per_Batch while current_min_id <= max_id: current_max_id = current_min_id + Transfer_Rows_Per_Batch trans_data(current_min_id, current_max_id) current_percent = (current_max_id - min_id) * 100.0 / (max_id - min_id) left_rows = max_id - current_max_id if left_rows < 0: left_rows = 0 current_percent_str = "%.2f" % current_percent info = "當前復制進度{0}/{1},剩余{2},進度為{3}%".format(current_max_id, max_id, left_rows, current_percent_str) print_info_message(info) time.sleep(Sleep_Second_Per_Batch) current_min_id = current_max_id print_info_message("*" * 70) print_info_message("執行完成") if __name__ == '__main__': main() View Code

 

按照各位場景的,需要修改數據庫連接信息:

還有需要遷移表的信息:

 

生成測試數據的mysql腳本:

CREATE TABLE `tb2001` ( `id` int(11) NOT NULL AUTO_INCREMENT, `c1` varchar(200) DEFAULT NULL, `dt` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; create table tb2001_his like tb2001; insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM mysql.user; insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001; insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001; insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001; insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001; insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001; insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001; insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001; insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001; insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001; insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001; insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001; insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001; insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001; insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001; insert tb2001(c1,dt) select 'abc',date_add(localtime(),interval FLOOR(RAND() * 20000) hour) FROM tb2001; View Code

 

最終運行結果如下:

顯示簡單粗暴,有興趣的可以在此基礎上修改!

=================================================================

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved