程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 數據庫知識 >> DB2數據庫 >> DB2教程 >> 為 InfoSphere Warehouse 提供實時數據的高效解決方案

為 InfoSphere Warehouse 提供實時數據的高效解決方案

編輯:DB2教程

簡介

信息是現代企業的重要資源,是企業運用科學管理、決策分析的基礎,於是企業如何通過各種技術手段,並把數據轉換為信息、知識,已經成了提高其核心競爭力的主要瓶頸。而 ETL 則是一個主要的技術手段。ETL(數據的提取、轉換和加載)過程的設計和實現是數據倉庫解決方案中極其重要的一部分。由於傳統的 ETL 過程中數據抽取是需要加載所有源數據庫中的數據,這樣對於需要經常進行數據集中的案例,將帶來無可忍受的低效率。例如一個有 50G 數據量的數據庫, 如果只有 0.01%(也就是大約 50M)的數據較上次加載有更新,但是為了抽取這部分數據,仍然需要抽取所有 50G 的數據,這將是非常低效的。在這篇文章中,我們將介紹通過結合 InfoSphere Replication Server 和 InfoSphere DataStage, 實現數據倉庫的實時更新,並且僅僅需要抽取更新了的數據。

ETL 過程簡介

ETL 過程就是數據流動的過程,從不同的數據源流向不同的目標數據集中地。它是構建數據倉庫的重要一環,用戶從數據源抽取出所需的數據,經過數據清洗 , 最終按照預先定義好的數據倉庫模型,將數據加載到數據倉庫中。它包涵三個階段:E(Extract),T(Transform)和 L(Load)。

提取(Extract):從不同的數據庫(DB2,Oracle,flat file 等)中讀取源數據。通過接口提取源數據,例如 ODBC、專用數據庫接口和平面文件提取器,並參照元數據來決定數據的提取及其提取方式。

轉換(Transform):開發者將提取的數據,按照業務需要轉換為目標數據結構,並實現匯總。

裝載(Load):加載經轉換和匯總的數據到目標數據倉庫中,可實現 SQL 或批量加載。

InfoSphere Replication Server 簡介

IBM InfoSphere Replication Server 是一個高速移動大量數據的企業軟件應用程序,用於幫助企業連接分布在全球的業務、對客戶進行快速響應以及從影響關鍵數據庫系統的問題中恢復。只所以能夠高效的提取數據是因為它用可恢復日志來記錄數據庫裡數據的變化,Capture 程序負責連續讀取數據庫的恢復日志並捕獲對源數據庫更改(指對數據的插入、刪除和更新操作),Apply 程序負責把這些變化的數據寫入到目標數據庫中。利用 Replication Server 的這一功能就可從大量的數據量中只提取出較上次更新的數據。

Replication Server 和 Event publisher 的架構

InfoSphere Replication Server 中提供了兩種不同類型的復制:Q 復制和 SQL 復制。

InfoSphere Data Event Publisher 捕獲“更改的數據”事件並以 WebSphere MQ 消息的形式發布這些事件,其他應用程序可以使用這些消息來驅動後續處理。

SQL 復制

Capture 捕獲數據變化後存儲在一個臨時中間表(staging tables),apply 程序把這些更新復制到相應的目標表。隨著數據量的加大和客戶對實時數據復制的要求,Q 復制應運而生。它的架構如圖 1 所示:

圖 1. SQL 復制架構圖
為 InfoSphere Warehouse 提供實時數據的高效解決方案

查看原圖(大圖)

Q 復制

一個高吞吐量低延遲的方案,它不用中間表來存儲已經提交的事務性數據,而是捕獲對源表的更改並將已提交的數據轉換為消息,即用 WebShpere MQ 消息隊列在源和目標數據庫間傳送數據。它的架構如圖 2 所示:

圖 2. Q 復制架構圖
為 InfoSphere Warehouse 提供實時數據的高效解決方案

查看原圖(大圖)

Event publisher(EP)

不同於 Q 復制,EP 不需要啟動 apply 程序,捕獲對源表的更改並將已落實的事務性數據轉換為“可擴展標記語言”(XML)格式或定界格式(CSV: comma-separated value)的消息,以供用戶直接從接受隊列讀取消息。在本文中,我們將利用 EP 的這個特點和 DataStage 整合為數據倉庫提供實時高效的數據。它的構架如圖 3 所示:

圖 3. EP 架構圖
為 InfoSphere Warehouse 提供實時數據的高效解決方案

查看原圖(大圖)

IBM InfoSphere DataStage 簡介

IBM InfoSphere DataStage 是一款強大的基於圖形化界面的 ETL 工具,它可以從多個不同的業務系統,多個平台的數據源中抽取數據、轉換數據、裝載數據到各種目標系統中。它有如下特點:

基於圖形化的開發環境,無需手工編碼便可快速開發 ETL 作業,實現復雜的數據合並和轉換邏輯。並且可以在開發新的作業時快捷的重用已有作業中的邏輯。

支持廣泛的數據源。DataStage 幾乎支持所有的主流的數據庫、企業級應用程序、文件作為數據源進行讀取或寫入數據。例如:DB2、Oracle、SQL Server、UniData、Informix、PeopleSoft、SAP、SIEbel、順序文件(如 CSV)、XML 文件等等。它也支持以多種常用的方式進行數據讀取和寫入,例如 FTP、SFTP、JMS 等等。

強大的並行處理能力,能夠對數據通過分割、管道等方式進行處理,提高硬件的使用效率,從而提高作業的性能。

支持對數據進行批量和實時處理操作。

InfoSphere Replication Server 和 InfoSphere DataStage 的整合

DataStage 可以讀取在不同數據庫中數據,但是沒有能力通過讀取可恢復日志只捕獲較上次更新的數據;另一方面,Replication Server 有能力捕獲更新的數據卻沒有類似 DataStage 轉換數據的功能,並且不像 DataStage, 支持對如此多的數據庫,企業級應用程序和文件進行讀寫。所以本文將結合兩者的優勢,為 Warehouse 提供實時高效的數據,

整合原理

首先,利用 Replication Server 的 Event Publisher(EP),Q capture 從可恢復日志中捕獲更新的數據,並且把數據變化寫到 MQ 隊列中;接著,MQ 消息通過 MQ 觸發器觸發了 DataStage 作業;最後,DataStage 的作業從 MQ 隊列裡直接讀取數據進行處理。

EP 支持兩種類型的 MQ 消息:XML 和 CSV,XML 格式有好的移植性和靈活性而 CSV 有很好的性能,在這裡我們將以 CSV 作為樣例。DataStage 可以通過使用 MQ Connector stage 讀取隊列中的消息,然後基於所選的消息格式來解析消息,最後完成必要的轉換。

具體的架構圖如圖 4 所示:

圖 4. 總體架構圖
為 InfoSphere Warehouse 提供實時數據的高效解決方案

查看原圖(大圖)

下面將具體介紹其實現。

具體實現

所需軟件:

IBM InfoSphere Replication Server 9.7

IBM InfoSphere Information Server 8.1

Event Publisher 的配置

如果 source 是 Oracle,需要通過 Replication Server Oracle capture feature 來完成對變化數據的提取,請參考“參考資料”部分。在本文中,我們 source 以 DB2 為例:

1. 創建 DB2 對象

在本文中創建數據庫 SOURCE,和表”DEMO”.”CUSTOMER”,並 import 數據

清單 1. 創建表及導入數據

 CREATE TABLE "DEMO"."CUSTOMER" (  
 "CUSTOMER_ID" INTEGER NOT NULL , 
 "SEX" CHAR(1) , 
 "BIRTHDAY" TIMESTAMP , 
 "SSN" VARCHAR(30) , 
 "CITY" VARCHAR(25) , 
 "STATE" VARCHAR(25) , 
 "ZIP" VARCHAR(15) , 
 "PHONE" VARCHAR(15) , 
 "PRI_LANGUAGE" VARCHAR(15) , 
 "LAST_UPDATE" TIMESTAMP , 
 "FIRST_NAME" VARCHAR(20) , 
 "MIDDLE_NAME" VARCHAR(10) , 
 "LAST_NAME" VARCHAR(20) ) ; 
 ALTER TABLE "DEMO "."CUSTOMER" ADD PRIMARY KEY ("CUSTOMER_ID"); 
 DB2 import from customer.ixf of ixf insert into ”DEMO”.”CUSTOMER” 

2. 創建 MQ 對象

創建 Q manager:crtmqm QManager

啟動 Q manager:strmqm QManager

創建隊列 : runmqsc QMamanger < mq.in

清單 2. 創建 MQ 對象

 define qlocal (ADMINQ) 
 define qlocal (RESTARTQ) 
 define qlocal (q1) 
 define qmodel (IBMQREP.SPILL.MODELQ) DEFSOPT(shared) 
 MAXDEPTH(500000) MSGDLVSQ(fifo) DEFTYPE(permdyn) 
 
 define channel(CHANNEL1) chltype(svrconn) trptype(tcp) mcauser('mqm') 
 define listener(listener1) trptype(tcp) control(qmgr) port (2264) 
 start listener (listener1) 
 end 

3. setup Event Publisher

3.1 創建 control tables:asnclp – f cncap.in

清單 3. 創建控制表

 asnclp session set to q replication; 
 set run script now stop on sql error on; 
 set qmanager " QManager " for capture schema; 
 set server capture to db source; 
 create control tables for capture server 
 using restartq "RESTARTQ" adminq "ADMINQ" ; 
 quit; 

3.2 創建 pubqmap 和 pub:asnclp – f crtqmappub.in

清單 4. 創建 pubqmap 及 pub

 asnclp session set to Q replication; 
 set output capture script "qpubmap.sql" ; 
 set log "qpub.log"; 
 set server capture to db source ; 
 set run script now stop on sql error on; 
 set qmanager " QManager " for capture schema; 
 create pubqmap pubqmap1 using sendq "q1" message 
 format delimited message content type T; 
 create pub using pubqmap PUBQMAP1 
 (pubname eventpub1 "DEMO"."CUSTOMER") ; 
 quit; 

4. 啟動 capture:

清單 5. 啟動 capture

 asnqcap capture_server=source capture_schema=ASN 
 2010-04-22-15.48.28.549339 ASN0600I "Q Capture" : "" : 
 "Initial" : Program "mqpub 9.7.0" is starting. 
 
 2010-04-22-15.48.38.537012 ASN7010I "Q Capture" : "ASN" : 
 "WorkerThread" : The program successfully activated publication 
 or Q subscription "EVENTPUB1" (send queue "q1", 
 publishing or replication queue map "PUBQMAP1") 
 for source table "DEMO.CUSTOMER". 
 
 2010-04-22-15.48.38.641830 ASN7000I "Q Capture" : "ASN" : 
 "WorkerThread" : "0" subscriptions are active. 
 "0" subscriptions are inactive. "1" subscriptions that 
 were new and were successfully activated. "0" subscriptions 
 that were new could not be activated and are now inactive. 
 
 2010-04-22-15.48.38.747328 ASN0572I "Q Capture" : "ASN" : 
 "WorkerThread" : The "mqpub 9.7.0" 
 program initialized successfully. 

5. 對源表做些插入、刪除和更新操作

清單 6. 更新源表

 connect to source; 
 insert into "DEMO"."CUSTOMER" values ('9000', 'F','1960-05-06-00.00.00.000000', 
'467897085', 'Boise','Idaho','83701-83733','(71)657-9085','English', 
 '2006-08-18-21.52.29.000000','IRVING','H','STERN'); 
 
 insert into "DEMO"."CUSTOMER" values ('9100', 'F','1960-05-06-00.00.00.000000', 
'467897085','Boise', 'Idaho','83701-83733','(71)657-9085','English', 
 '2006-08-18-21.52.29.000000','IRVING','H','STERN'); 
 
 update "DEMO"."CUSTOMER" set CUSTOMER_ID=5000 where CUSTOMER_ID=2075; 
 update "DEMO"."CUSTOMER" set FIRST_NAME='meggy' where FIRST_NAME='EILEEN'; 
 update "DEMO"."CUSTOMER" set LAST_NAME='Leee' where LAST_NAME='JOHNSON'; 
 delete from "DEMO"."CUSTOMER" where CUSTOMER_ID=6057; 
 delete from "DEMO"."CUSTOMER" where CUSTOMER_ID=8354; 
 connect reset; 

DataStage 作業開發

通過使用 Event Publisher, 我們可以把數據表裡由於 DML 操作發生的數據變化實時放入到 MQ 消息隊列中,它將生成如下的消息:

清單 7. Event Publisher 生成的 MQ 消息示例

 10,"IBM","2010098","181602875000","DEMO","CUSTOMER", 
 "ISRT","0000:0000:0000:0000:5914","0000:0000:0000:04f2:d60f", 
 "2010-04-08-17.16.02",,0000,,,,,,,,,,,,,,9000,"F", 
 "1960-05-06-00.00.00.000000","467897085","Boise","Idaho", 
 "83701-83733","(71)657-9085","English","2006-08-18-21.52.29.000000", 
 "IRVING","H","STERN" 
 
 10,"IBM","2010098","181603343000","DEMO","CUSTOMER", 
 "REPL","0000:0000:0000:0000:5916","0000:0000:0000:04f2:daaf", 
 "2010-04-08-17.16.02",,0000,2075,"F","1950-04-16-20.45.00.000000", 
 "205674075","Atlantaa","Georgia","30302-30399","(206)567-6075", 
 "English","2007-04-19-20.45.00.000000","SALLY","", 
 "STERN",5000,"F","1950-04-16-20.45.00.000000","205674075", 
 "Atlantaa","Georgia","30302-30399","(206)567-6075", 
 "English","2007-04-19-20.45.00.000000","SALLY","","STERN" 
 
 10,"IBM","2010098","181603343000","DEMO","CUSTOMER", 
 "DLET","0000:0000:0000:0000:591a","0000:0000:0000:04f2:f768", 
 "2010-04-08-17.16.02",,0000,8354,"F","2009-07-06-00.00.00.000000", 
 "34124","Beijing","fadsfs","10083","13439097809","Chinese", 
 "2009-07-28-00.00.00.000000","","","SHI LIXIAN",,,,,,,,,,,,, 

我們可以發現在這些消息裡包含了對所變化數據的描述。例如:在第一條消息中,第五個字段記錄了數據變化發生表的 schema:“DEMO”;第六個字段記錄了表名“CUSTOMER”;第七個字段“ISRT”記錄的是對應的 DML 操作 , “ISRT”說明這是一個 Insert 操作引起了數據變化,之後的字段描述了數據變化發生的時間以及數據是如何變化的。該消息表明一條如下的新記錄被插入到“DEMO.CUSTOMER”表中:

 9000,"F","1960-05-06-00.00.00.000000","467897085", 
 "Boise","Idaho","83701-83733","(71)657-9085","English", 
 "2006-08-18-21.52.29.000000","IRVING","H","STERN" 

同樣,我們可以從第二和第三條消息中看出這裡面所包含的數據變化信息:

第二條消息描述了一個“update”操作發生在表“DEMO.CUSTOMER”中,第三消息描述了一個“delete”操作發生在表“DEMO.CUSTOMER”中。

接下來,我們將開發一個 DataStage 作業,用來讀取 MQ 中的消息,並且對消息進行處理,最後把數據放入數據倉庫。如下圖 5 所示:

圖 5. DataStage 作業示例
為 InfoSphere Warehouse 提供實時數據的高效解決方案

查看原圖(大圖)

首先,我們將使用 MQ Connector 從 MQ 隊列中讀取消息(這些消息如前面所示包含了數據的變更情況)。MQ Connector 支持兩種方式訪問 MQ Server, 一種是“server”方式,對應 DataStage 是和 MQ Server 在一台機器上的情況;另一種是“client”方式,對應 DataStage 跟 MQ Server 不在同一台機器,DataStage 需要通過 MQ ClIEnt 訪問 MQ Server。在該例中,我們使用了 Server 的方式訪問 MQ Server。如圖所示:

圖 6. MQ Connector 設置
為 InfoSphere Warehouse 提供實時數據的高效解決方案

查看原圖(大圖)

接著,我們需要使用 DataStage Transform Stage 對 MQ 的消息進行處理。並行實現一些邏輯轉換。在 Transform 中,我們先要知道某條消息包括了一個什麼的操作,以便把相同的操作應用到數據倉庫。如圖所示,我們定義了一個約束,針對不同的操作 “DLET”,“REPL”和“ISRT”定義了不同的數據輸出。其中 , “Operator”是一個變量,我們使用 DataStage 內部函數 FIEld() 讀出每條消息的第五個字段,並且使用 Trim() 函數去掉雙引號後賦給該變量。

圖 7. 添加 Transform Stage 約束
為 InfoSphere Warehouse 提供實時數據的高效解決方案

查看原圖(大圖)

然後,我們從消息中讀取包含的數據信息,並且這些數據信息映射到對應的的輸出中。

圖 8. 映射 Transform Stage 的輸入與輸出
為 InfoSphere Warehouse 提供實時數據的高效解決方案

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved