程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 數據庫知識 >> MYSQL數據庫 >> MySQL綜合教程 >> kettle中通過 時間戳(timestamp)方式 來實現數據庫的增量同步操作(一)

kettle中通過 時間戳(timestamp)方式 來實現數據庫的增量同步操作(一)

編輯:MySQL綜合教程

這個實驗主要思想是在創建數據庫表的時候,

通過增加一個額外的字段,也就是時間戳字段,

例如在同步表 tt1 和表 tt2 的時候,

通過檢查那個表是最新更新的,那個表就作為新表,而另外的表最為舊表被新表中的數據進行更新。

實驗數據如下:

mysql database 5.1

test.tt1( id int primary key , name varchar(50) );

mysql.tt2( id int primary key, name varchar(50) );

 

快照表,可以將其存放在test數據庫中,

同樣可以為了簡便,可以將其創建為temporary 表類型。

 

數據如圖 kettle-1

kettle-1

============================================================

 

主流程如圖 kettle-2

 kettle-2

 

在prepare中,向 tt1,tt2 表中增加 時間戳字段,

由於tt1,tt2所在的數據庫是不同的,所以分別創建兩個數據庫的連接。

prepare

 

kettle-3

 

 在執行這個job之後,就會在數據庫查詢的時候看到下面的字段:

 

kettle-4

然後, 我們來對tt1表做一個 insert 操作 一個update操作吧~

kettle-5

在原表上無論是insert操作還是update操作,對應的updateTime都會發生變更。

如果tt1 表 和 tt2 表中 updateTime 字段為最新時間的話,則說明該表是新表 。

 

下面只要是對應main_thread的截圖:

kettle-6

 

在這裡介紹一下Main的層次:

Main

START

Main.prepare

Main.main_thread

{

  START

  main_thread.create_tempTable

  main_thread.insert_tempTable

  main_thread.tt1_tt2_syn

  SUCCESS

}

Main.finish

SUCCESS

 

在main_thread中的過程是這樣的:

作為一個局部的整體,使它每隔200s內進行一次循環,

這樣的話,如果在其中有指定的表 tt1 或是 tt2 對應被更新或是插入的話,

該表中的updateTime字段就會被捕捉到,並且進行同步。

如果沒有更新出現,則會走switch的 default 路線對應的是write to log.

繼續循環。

 

首先創建一個快照表,然後將tt1,tt2表中的最大(最新)時間戳的值插入到快照表中。

然後,通過一個transformation來判斷那個表的updateTime值最新,

來選擇對應是 tt1表來更新 tt2 還是 tt2 表來更新 tt1 表;

 

main_thread.create_tempTable.JOB:

 

 

main_thread.insert_tempTable.Job:

PS: 對於第二個SQL 應該改成(不修改會出錯的)

set @var1 = ( select MAX(updatetime) from tt2);

insert into test.temp values ( 2 , @var1 ) ;

因為conn對應的是連接mysql(數據庫實例名稱),

但是我們把快照表和tt1 表都存到了test(數據庫實例名稱)裡面。

 

在上面這個圖中對應的語句是想實現,在temp表中插入兩行記錄元組。

其中id為1 的元組對應的temp.lastTime 字段 是 從tt1 表中選出的 updateTime 值為最新的,

id 為2的元組對應的 temp.lastTime 字段 是 從 tt2 表中選出的 updateTime 值為最新的 字段。

當然 , id 是用來給後續 switch 操作提供參考的,用於標示最新 updateTime 是來自 tt1 還是 tt2,

同樣也可以使用 tableName varchar(50) 這種字段 來存放 最新updateTime 對應的 數據庫.數據表的名稱也可以的。

 

main_thread.tt1_tt2_syn.Transformation:

首先,創建連接 test 數據庫的 temp 表的連接,

選擇 temp表中 對應 lastTime 值最新的所在的記錄

所對應的 id 號碼。

首先將temp中 lastTime 字段進行 降序排列,

然後選擇id , 並且將選擇記錄僅限定成一行。

然後根據id的值進行 switch選擇。

在這裡LZ很想使用,SQL Executor,

但是它無法返回對應的id值。

但是表輸入可以返回對應的id值,

並被switch接收到。

 

 

下圖是對應 switch id = 1 的時候:即 tt1 更新 tt2

注意合並行比較 的新舊數據源 的選擇

和Insert/Update 中的Target table的選擇

 

下圖是對應 switch id = 2 的時候:即 tt2 更新 tt1

注意合並行比較 的新舊數據源 的選擇

和Insert/Update 中的Target table的選擇 

 

 

 

但是考慮到增加一個 column 會浪費很多的空間,

所以咋最終結束同步之後使用 finish操作步驟來將該 updateTime這個字段進行刪除操作即可。

這個與Main中的prepare的操作是相對應的。

Main.finish

 

 

 

 

這樣的話,實驗環境已經搭建好了,

接下來進行,實驗的數據測試了,寫到下一個博客中。

 當然,觸發器也是一種同步的好方法,寫到後續博客中吧~

 

時間戳的方式相比於觸發器,較為簡單並且通用,

但是 數據庫表中的時間戳字段,很費空間,並且無法對應刪除操作,

也就是說 表中刪除一行記錄, 該表應該作為新表來更新其余表,但是由於記錄刪除 時間戳無所依附所以無法記錄到。

 

 

 

 

 

 

 

 

 

 

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved