程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 數據庫知識 >> SqlServer數據庫 >> 關於SqlServer >> sqlserver使用更改跟蹤實現數據同步

sqlserver使用更改跟蹤實現數據同步

編輯:關於SqlServer
 

SQL Server 2008 引入了更改跟蹤,這是一種輕量型解決方案,它為應用程序提供了一種有效的更改跟蹤機制。通常,若要使應用程序能夠查詢對數據庫中的數據所做的更改和訪問與這些更改相關的信息,應用程序開發人員必須實現自定義更改跟蹤機制。創建這些機制通常涉及多項工作,並且常常涉及使用觸發器、timestamp 列和新表組合來存儲跟蹤信息,同時還會涉及使用自定義清除過程。

通過更改跟蹤,可以很容易地編寫同步數據的應用,下面是一個使用更改跟蹤實現單向數據同步的示例。

1. 建立示例環境

-- ====================================================

-- 測試的數據庫

USE master;

GO

CREATE DATABASE DB_test;

GO

-- 啟用更改跟蹤

ALTER DATABASE DB_test SET

CHANGE_TRACKING = ON

(

AUTO_CLEANUP = ON, -- 打開自動清理選項

CHANGE_RETENTION = 1 HOURS -- 數據保存期為1 時

);

 

ALTER DATABASE DB_test SET

ALLOW_SNAPSHOT_ISOLATION ON; -- 允許在測試數據庫中使用 SNAPSHOT 事務隔離級別

GO

 

-- ====================================================

-- 測試的表

USE DB_test;

GO

-- a. 同步的源表

CREATE TABLE dbo.tb_source

(

pk_id int IDENTITY

PRIMARY KEY,

col1 int,

col2 varchar(10),

col3 nvarchar(max),

col4 xml

);

GO

-- 啟用更改跟蹤

ALTER TABLE dbo.tb_source

ENABLE CHANGE_TRACKING

WITH

(

TRACK_COLUMNS_UPDATED = ON -- 記錄UPDATE 的列信息

);

GO

 

-- b. 同步的目錄表

CREATE TABLE dbo.tb_Target

(

pk_id int

PRIMARY KEY,

col1 int,

col2 varchar(10),

col3 nvarchar(max),

col4 xml

);

GO

 

-- 記錄同步情況的表

CREATE TABLE dbo.tb_Change_Tracking

(

id int IDENTITY

PRIMARY KEY,

object_name sysname UNIQUE,

last_sync_version bigint,

last_update_date datetime

);

GO

 

2. 實現同步處理的存儲過程

-- ====================================================

-- 數據同步處理的存儲過程

USE DB_test;

GO

-- 數據同步的存儲過程- 同步未更新的數據

-- 單次更新,更新完成後退出

CREATE PROC dbo.p_SyncChangeData_tb_Srouce_Target

@last_sync_version bigint = NULL OUTPUT,

@min_valid_version bigint = NULL OUTPUT

AS

SET NOCOUNT ON;

-- ========================================

-- TRY...CATCH 中的標准事務處理模塊

-- a. 當前的事務數

DECLARE @__trancount int;

SELECT @__trancount = @@TRANCOUNT;

 

-- TRY...CATCH 處理

BEGIN TRY

-- ========================================

-- 源表信息

DECLARE @object_name sysname,@object_id int;

SELECT @object_name = N'dbo.tb_source',

@object_id = OBJECT_ID(@object_name);

-- ========================================

-- 最後一次同步的版本

IF @last_sync_version IS NULL

BEGIN

SELECT

@last_sync_version = last_sync_version

FROM dbo.tb_Change_Tracking

WHERE object_name = @object_name;

 

IF @@ROWCOUNT = 0

BEGIN

SET @last_sync_version = CHANGE_TRACKING_MIN_VALID_VERSION(@object_id);

 

INSERT dbo.tb_Change_Tracking

(

object_name,

last_sync_version

)

VALUES

(

@object_name,

@last_sync_version

);

END;

END;

 

-- ========================================

-- TRY...CATCH 中的標准事務處理模塊

-- b. 開啟事務, 或者設置事務保存點

SET TRANSACTION ISOLATION LEVEL SNAPSHOT; -- 使用快照隔離級別的事務

IF @__trancount = 0

BEGIN TRAN;

ELSE

SAVE TRAN __TRAN_SavePoint;

 

-- ========================================

-- 版本驗證

-- a. 驗證是否有數據變更(如果上次同步的版本號= 當前數據庫的最大版本號,則視為無數據變化)

IF @last_sync_version = CHANGE_TRACKING_CURRENT_VERSION()

GOTO lb_Return;

 

-- b. 驗證同步的版本號是否有效(如果上次同步的版本號< 當前可用的最小版本號,則視為無效)

IF @last_sync_version < CHANGE_TRACKING_MIN_VALID_VERSION(@object_id)

BEGIN

SET @min_valid_version = CHANGE_TRACKING_MIN_VALID_VERSION(@object_id);

GOTO lb_Return;

END;

 

-- c. 驗證同步的版本號是否有效(如果上次同步的版本號> 當前數據庫的最大版本號,則視為無效)

IF @last_sync_version > CHANGE_TRACKING_CURRENT_VERSION()

BEGIN

SET @last_sync_version = NULL;

GOTO lb_Return;

END;

 

-- ========================================

-- 同步數據

-- a. 插入

WITH

CHG AS

(

SELECT DATA.*

FROM dbo.tb_source DATA

INNER JOIN

CHANGETABLE(CHANGES dbo.tb_source, @last_sync_version) CHG

ON CHG.pk_id = DATA.pk_id

WHERE CHG.SYS_CHANGE_OPERATION = N'I'

)

INSERT dbo.tb_Target

SELECT * FROM CHG;

 

-- b. 刪除

WITH

CHG AS

(

SELECT CHG.*

FROM CHANGETABLE(CHANGES dbo.tb_source, @last_sync_version) CHG

WHERE CHG.SYS_CHANGE_OPERATION = N'D'

)

DELETE DATA

FROM dbo.tb_Target DATA

INNER JOIN CHG

ON CHG.pk_id = DATA.pk_id;

 

-- c. 更新

WITH

COL AS

(

SELECT *

FROM

(

SELECT name, column_id

FROM sys.columns

WHERE object_id = @object_id

) DATA

 

PIVOT

(

MAX(column_id)

FOR name IN( -- 源表的所有列的列表

[col1], [col2], [col3], [col4])

)P

),

CHG AS

(

SELECT DATA.*,__SYS_CHANGE_COLUMNS = CHG.SYS_CHANGE_COLUMNS

FROM dbo.tb_source DATA INNER JOIN

CHANGETABLE(CHANGES dbo.tb_source, @last_sync_version) CHG

ON CHG.pk_id = DATA.pk_id

WHERE CHG.SYS_CHANGE_OPERATION = N'U'

)

UPDATE DATA SET -- 判斷列是否需要更新(也可以不判斷,直接更新所有的列)

[col1] = CASE

WHEN CHG.__SYS_CHANGE_COLUMNS IS NULL OR CHANGE_TRACKING_IS_COLUMN_IN_MASK(COL.[col1], CHG.__SYS_CHANGE_COLUMNS) = 1

THEN CHG.[col1]

ELSE DATA.[col1]

END,

[col2] = CASE

WHEN CHG.__SYS_CHANGE_COLUMNS IS NULL OR CHANGE_TRACKING_IS_COLUMN_IN_MASK(COL.[col2], CHG.__SYS_CHANGE_COLUMNS) = 1

THEN CHG.[col2]

ELSE DATA.[col2]

END,

[col3] = CASE

WHEN CHG.__SYS_CHANGE_COLUMNS IS NULL OR CHANGE_TRACKING_IS_COLUMN_IN_MASK(COL.[col3], CHG.__SYS_CHANGE_COLUMNS) = 1

THEN CHG.[col3]

ELSE DATA.[col3]

END,

[col4] = CASE

WHEN CHG.__SYS_CHANGE_COLUMNS IS NULL OR CHANGE_TRACKING_IS_COLUMN_IN_MASK(COL.[col4], CHG.__SYS_CHANGE_COLUMNS) = 1

THEN CHG.[col4]

ELSE DATA.[col4]

END

FROM dbo.tb_Target DATA

INNER JOIN CHG

ON CHG.pk_id = DATA.pk_id

CROSS JOIN COL;

 

-- ========================================

-- 更新最後一次同步的版本號

UPDATE dbo.tb_Change_Tracking SET

@last_sync_version = CHANGE_TRACKING_CURRENT_VERSION(),

last_sync_version = @last_sync_version,

last_update_date = GETDATE();

 

-- ========================================

-- TRY...CATCH 中的標准事務處理模塊

-- c. 提交事務

-- 有可提交的事務, 並且事務是在當前模塊中開啟的情況下, 才提交事務

IF XACT_STATE() = 1 AND @__trancount = 0

COMMIT;

 

lb_Return:

-- ========================================

-- TRY...CATCH 中的標准事務處理模塊

-- d. 為了防止TRY 塊中有遺漏的事務處理, 在TRY 模塊的結束部分做最終的事務處理

IF @__trancount = 0

BEGIN

IF XACT_STATE() = -1

ROLLBACK TRAN;

ELSE

BEGIN

WHILE @@TRANCOUNT > 0

COMMIT TRAN;

END;

END;

END TRY

BEGIN CATCH

-- ========================================

-- TRY...CATCH 中的標准事務處理模塊

-- e. CATCH 塊的事務回滾

IF XACT_STATE() <> 0

BEGIN

IF @__trancount = 0

ROLLBACK TRAN;

-- XACT_STATE 為-1 時, 不能回滾到事務保存點, 這種情況留給外層調用者做統一的事務回滾

-- 通過@@TRANCOUNT > @__trancount 的判斷, 即使在TRY 模塊中沒有設置事務保存點的情況下跳到此步驟, 也不會出錯

ELSE IF XACT_STATE() = 1 AND @@TRANCOUNT > @__trancount

ROLLBACK TRAN __TRAN_SavePoint;

END;

 

-- ========================================

-- 錯誤消息處理

-- a. 獲取錯誤信息

-- 這提提取了錯誤相關的全部信息, 可以根據實際需要調整

DECLARE @__error_number int,

@__error_message nvarchar(2048),

@__error_severity int,

@__error_state int,

@__error_line int,

@__error_procedure nvarchar(126),

@__user_name nvarchar(128),

@__host_name nvarchar(128);

 

SELECT @__error_number = ERROR_NUMBER(),

@__error_message = ERROR_MESSAGE(),

@__error_severity = ERROR_SEVERITY(),

@__error_state = ERROR_STATE(),

@__error_line = ERROR_LINE(),

@__error_procedure = ERROR_PROCEDURE(),

@__user_name = SUSER_SNAME(),

@__host_name = HOST_NAME();

 

-- c. 如果沒有打算在CATCH 模塊中對錯誤進行處理, 則應該拋出錯誤給調用者

RAISERROR

(

N'User: %s, Host: %s, Procedure: %s, Error %d, Level %d, State %d, Line %d, Message: %s ',

@__error_severity,

1,

@__user_name,

@__host_name,

@__error_procedure,

@__error_number,

@__error_severity,

@__error_state,

@__error_line,

@__error_message

);

END CATCH;

GO

 

-- 數據同步的存儲過程- 同步未更新的數據

-- 循環更新,直到手工結束

-- 可設置一個在SQL Agent服務啟動時工作的JOB 來調用這個存儲過程,這樣就可以實現自動同步

CREATE PROC dbo.p_SyncChangeData_Circle_tb_Srouce_Target

@interval int = 1 -- 循環檢測之間相隔的秒數(指定負數或NULL值會調整為)

AS

SET NOCOUNT ON;

 

IF @interval IS NULL OR @interval < 0

SET @interval = 1;

 

DECLARE @last_sync_version bigint,

@min_valid_version bigint,

@date_delay datetime;

 

WHILE 1 = 1

BEGIN

EXEC dbo.p_SyncChangeData_tb_Srouce_Target

@last_sync_version = @last_sync_version OUTPUT,

@min_valid_version = @min_valid_version OUTPUT;

 

IF @min_valid_version IS NOT NULL

BEGIN

RAISERROR(N'某些未同步的數據已經丟失', 16, 1);

BREAK;

END;

IF @last_sync_version IS NULL

BEGIN

RAISERROR(N'源數據異常,同步版本號小於已經同步的版本號', 16, 1);

BREAK;

END;

 

SET @date_delay = DATEADD(Second, @interval, GETDATE());

WAITFOR TIME @date_delay;

END

GO

 

3.同步測試

-- ====================================================

-- 測試

USE DB_test;

GO

-- A.01. 變更源表數據

INSERT dbo.tb_source

(

col1

)

VALUES

(

1

);

 

INSERT dbo.tb_source

(

col1

)

VALUES

(

2

),

(

3

);

 

UPDATE dbo.tb_source SET

col2 = 'update'

WHERE pk_id = 2;

 

-- A.02. 同步源表數據,並查詢同步結果

DECLARE @last_sync_version bigint,

@min_valid_version bigint;

 

EXEC dbo.p_SyncChangeData_tb_Srouce_Target

@last_sync_version = @last_sync_version OUTPUT,

@min_valid_version = @min_valid_version OUTPUT;

 

IF @min_valid_version IS NOT NULL

RAISERROR(N'某些未同步的數據已經丟失', 16, 1);

 

IF @last_sync_version IS NULL

RAISERROR(N'源數據異常,同步版本號小於已經同步的版本號', 16, 1);

 

SELECT * FROM dbo.tb_source;

SELECT * FROM dbo.tb_Target;

SELECT * FROM dbo.tb_Change_Tracking;

 

-- B.01. 變更源表數據

INSERT dbo.tb_source

(

col1

)

VALUES

(

11

);

 

UPDATE dbo.tb_source SET

col2 = 'update.1'

WHERE pk_id = 2;

 

UPDATE dbo.tb_source SET

col2 = 'update 3'

WHERE pk_id = 3;

 

DELETE FROM dbo.tb_source

WHERE pk_id < 3;

 

SET IDENTITY_INSERT dbo.tb_source ON;

 

INSERT dbo.tb_source

(

pk_id, col1, col2

)

VALUES

(

1, 11, 'insert'

);

SET IDENTITY_INSERT dbo.tb_source OFF;

 

-- B.02. 同步源表數據,並查詢同步結果

EXEC dbo.p_SyncChangeData_tb_Srouce_Target

@last_sync_version = @last_sync_version OUTPUT,

@min_valid_version = @min_valid_version OUTPUT;

 

IF @min_valid_version IS NOT NULL

RAISERROR(N'某些未同步的數據已經丟失', 16, 1);

IF @last_sync_version IS NULL

RAISERROR(N'源數據異常,同步版本號小於已經同步的版本號', 16, 1);

 

SELECT * FROM dbo.tb_source;

SELECT * FROM dbo.tb_Target;

SELECT * FROM dbo.tb_Change_Tracking;

GO

 

4.刪除示例測試環境

-- ====================================================

-- /* -- 刪除測試環境

USE DB_test;

GO

IF OBJECT_ID(N'dbo.p_SyncChangeData_tb_Srouce_Target') IS NOT NULL

DROP PROC dbo.p_SyncChangeData_tb_Srouce_Target;

 

IF OBJECT_ID(N'dbo.tb_source') IS NOT NULL

DROP TABLE dbo.tb_source;

 

IF OBJECT_ID(N'dbo.tb_Target') IS NOT NULL

DROP TABLE dbo.tb_Target;

 

IF OBJECT_ID(N'dbo.tb_Change_Tracking') IS NOT NULL

DROP TABLE dbo.tb_Change_Tracking;

GO

--*/

/*-- 刪除測試數據庫

USE master;

GO

ALTER DATABASE DB_test SET

SINGLE_USER

WITH

ROLLBACK AFTER 0;

GO

DROP DATABASE DB_test;

--*/

 

附:

關於使用更改跟蹤實現數據同步的一些補充說明:

1. 使用 SNAPSHOT 事務隔離級別

在進行同步的處理中,需要記錄當前已經處理的最後一個版本號;而在查詢更改跟蹤信息時,無法指定要查詢的更改跟蹤信息的結束版本號。這就要求同步處理過程中,無論是查詢更改的最後一個版本號,還是查詢更改跟蹤信息,都能保證是同一個變更版本的結束版本號。把所有的處理放在事務中,並使用SNAPSHOT事務隔離級別可以保證這點。

2. 自動同步

更改跟蹤的信息只能被查詢,所以在示例中,是在數據變更後,手動運行同步的存儲過程實現數據同步。如果要自動同步,則只能使用定時查詢的方式。本示例提供了一個名為“dbo.p_SyncChangeData_Circle_tb_Srouce_Target”的存儲過程,可以使用Job或者其他程序在SQL Server啟動時,調用這個存儲過程,則這個存儲過程會定時檢查數據變更,並實現數據變更的自動同步。
 

 
  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved