程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> 如何在Weblogic的全局事務執行多線程操作

如何在Weblogic的全局事務執行多線程操作

編輯:關於JAVA

今天有人提出了一個詭異的要求,要求在全局事務中執行多線程操作。他們 全局事務中涉及兩個數據庫中的多個表,如果單線程那麼走完,相應時間上不滿 足要求,說白了就是比較慢,於是提出了這樣的要求。從JTA的規范來看, transaction(TX)和thread是密切相關的,TX一般是不能在應用線程間傳遞的, 即我主線程起一個全局事務,然後我把這個事務傳遞給其他我新起的線程,單純 的變量傳遞沒問題,但這個事務是不能被transaction manager(TM)識別的,TM 對TX的管理有他自己的方式。從weblogic的實現來看,TX被放在當前線程的 threadlocal中,普通應用線程不存在這樣的結構,所以簡單的變量傳遞,對於 TM而言是沒有意義的。那麼到底有沒有方法實現上面的需求的,我做了些測試, 使用weblogic內部的一些API可以實現這個需求。下面我們就來看看實現中的幾 個要點: :)

1:上面說了,簡單的變量傳遞對於weblogic的TM是沒有意義的。TM判斷事務 上下文(transaction context)的時候,會從當前線程的threadlocal檢查,如果 沒有,則說明當前線程沒有和任何TX關聯。那麼我們如何將我們手裡的TX放入當 前線程的threadlocal呢? weblogic的ExecuteThread是我們需要的那種線程, 但它是final的,我們不能繼承它,只能繼承它的父類了,也就是 weblogic.kernel.AuditableThread。

2:我們有繼承了AuditableThread,那麼我們怎麼把TX放入它的threadlocal 中呢?這個可以通過weblogic的TM實現中的一些API來實現,具體到這個類就是 weblogic.transaction.internal.TransactionManagerImpl。比如 interResume(tx),internalSuspend()。由於這個API不是package protect的, 我們自己的類必須也位於weblogic.transaction.internal這個包中。 interResume(tx),用於將當前線程和指定的TX做關聯,而internalSuspend()恰 恰相反,它用於解除這種關聯。

3:因為涉及到多線程,主線程需要決定何時提交或回滾事務,這個我們要自 己要實現一個線程結果檢查的方法(checkCompletion())。

下面就是我自己實現的測試代碼,在Weblogic81測試沒有問題。

1 package weblogic.transaction.internal;
2
3 import weblogic.transaction.TxHelper;
4 import  weblogic.transaction.internal.TransactionManagerImpl;
5 import  javax.transaction.Transaction;
6 import  java.util.ArrayList;
7
8 public class DriverTest  {
9
10     private static String INITIAL_CONTEXT_FACTORY  = "weblogic.jndi.WLInitialContextFactory";
11     private  static String PROVIDER_URL = "t3://localhost:8001";
12      private static String SQL_INSERT = "insert into test values (?)";
13     private static String ANO_SQL_INSERT =  "insert into test1 values(?)";
14
15     public  static void main(String args[])
16     {
17          DriverTest test = new DriverTest();
18          test.multiThreadXATest();
19     }
20
21      private Connection getConnection(String url, String dsName)  throws NamingException, SQLException
22     {
23          InitialContext ctx = initializeEnv(url);
24          DataSource ds = (DataSource)ctx.lookup(dsName);
25          ctx.close();
26         return  ds.getConnection();
27     }
28
29     private  UserTransaction getUserTransaction() throws NamingException,  SQLException
30     {
31         InitialContext  ctx = initializeEnv(null);
32         return  (UserTransaction)ctx.lookup ("javax/transaction/UserTransaction");
33     }
34
35      private InitialContext initializeEnv(String url) throws  NamingException
36     {
37         Properties  prop = new Properties();
38         if(url == null)
39             prop.put(Context.PROVIDER_URL,  PROVIDER_URL);
40         else
41              prop.put(Context.PROVIDER_URL, url);
42          prop.put(Context.INITIAL_CONTEXT_FACTORY,  INITIAL_CONTEXT_FACTORY);
43         return new  InitialContext(prop);
44     }
45
46     private  void executeInsertInPSMT(Connection conn, String sql)
47      {
48         PreparedStatement pstmt = null;
49          try{
50             pstmt =  conn.prepareStatement(sql);
51              pstmt.setString(1, "data_to_insert");
52              pstmt.executeUpdate();
53             pstmt.close ();
54         }catch(SQLException e){
55                  e.printStackTrace();
56         }
57     }
58
59     public void  multiThreadXATest()
60     {
61         ArrayList  result = new ArrayList();
62         try{
63              UserTransaction userTx = getUserTransaction ();
64             userTx.setTransactionTimeout (1000);
65             userTx.begin();
66              Transaction tx = TxHelper.getTransaction();
67              Connection conn = getConnection ("t3://localhost:8011", "TestXADS");
68              if(conn != null) conn.close();
SQLThread thread1 = new  SQLThread(tx,result,"t3://localhost:8011","TestXADS",  SQL_INSERT);
69             SQLThread thread2 = new  SQLThread(tx,result,"t3://localhost:8021","TestXADS_1",  ANO_SQL_INSERT);
70             thread1.start();
71              thread2.start();
72              while(result.size() != 2){
73                  Thread.currentThread().sleep(1);
74             }
75             if(checkCompletion(result)){
76                  userTx.commit();
77              }
78             else{
79                  userTx.rollback();
80             }
81         }catch(Exception e){
82              e.printStackTrace();
83         }
84      }
85
86     private boolean checkCompletion(ArrayList  result){
87         boolean toReturn = true;
88          for(int loop=0; loop<result.size(); loop++){
89              if((!((String)result.get(loop)).equals ("OK"))){
90                 toReturn =  false;
91                 break;
92              }
93         }
94         return  toReturn;
95     }
96
97     class SQLThread  extends weblogic.kernel.AuditableThread {
98
99          private Transaction tx = null;
100         private  ArrayList result = null;
101         private String  dsName = null;
102         private String url =  null;
103         private String sql =  null;
104
105         public SQLThread(Transaction  tx,ArrayList result,String ds, String url, String sql){
106              this.tx = tx;
107              this.result = result;
108             this.dsName  = ds;
109             this.url = url;
110              this.sql = sql;
111         }
112
113         public void run(){
114              Connection conn = null;
115              try{
116                  TransactionManagerImpl tm = (TransactionManagerImpl) TransactionManagerImpl.getTransactionManager();
117                  tm.internalResume((TransactionImpl)tx);
118                  DriverTest test = new DriverTest ();
119                 conn =  test.getConnection(url, dsName);
120                  test.executeInsertInPSMT(conn, sql);
121                  conn.close();
122                  tm.internalSuspend();
123                  result.add("OK");
124             }catch(Exception  e){
125                 result.add("NA");
126                  e.printStackTrace();
127              }finally{
128                 try {
129                     if(conn != null)
130                         conn.close ();
131                 }catch(Exception e) {
132                     e.printStackTrace ();
133                 }
134              }
135         }
136     }
137 }
138
139

下面是關於上面這段測試代碼的一些解釋和代碼中的限制:

1:為什麼會在66行出現Connection conn = getConnection ("t3://localhost:8011", "TestXADS");這個看似無用的語句?Weblogic的TM 實現中只有有XAResource參與到這個global transaction的server實例才有資格 充當這個global transaction的coordinator,其他的server實例只能充當sub- coordinator。而且總是第一個參與全局事務的 XAResource的實例充當 coordinator,因為coordinator的委任決定於TX開始後,第一次RMI request發 送給哪個server。Connection conn = getConnection ("t3://localhost:8001", "TestXADS") 用於指定這個global transaction的 coordinator為8011這個server。如果沒有這個語句,thread1,thread2啟動後, 它們開始XA操作時,每個XAResouce都會把自己當作這個TX的coordinator (Thread1委任8011,Thread2委任8021),這樣就會出現如下的異常,

javax.transaction.TransactionRolledbackException: Current  server is the coordinator and transaction is not found.It was  probably rolled back and forgotten already.
at  weblogic.rjvm.BasicOutboundRequest.sendReceive (BasicOutboundRequest.java:108)
at  weblogic.rmi.cluster.ReplicaAwareRemoteRef.invoke (ReplicaAwareRemoteRef.java:290)
at  weblogic.rmi.cluster.ReplicaAwareRemoteRef.invoke (ReplicaAwareRemoteRef.java:247)
at  weblogic.jdbc.common.internal.RmiDataSource_814_WLStub.getConnection (Unknown Source)
at  weblogic.transaction.internal.DriverTest1.getConnection (DriverTest1.java:39)
at  weblogic.transaction.internal.DriverTest1.access$0 (DriverTest1.java:34)
at  weblogic.transaction.internal.DriverTest1$SQLThread.run (DriverTest1.java:135)

2:某個全局事務中啟動的線程,不能同時操作同一個XAResource,比如 Thread1操作datasource1和 datasource2,thread2操作datasource2和 datasource3。Weblogic中,我們做XA操作的時候,需要同後端的 XA Resource Manager交互,交互中我們會多次調用xaStart(xid, flag),xaEnd(xid, flag) 這裡的flag可以使NOFLAGS、TMSUCESS、TMRESUME、TMSUSPEND等。如果我們在同 一個全局事務的多個線程中同時操作某個RESOURCE,那麼就可能我們不同線程先 後給這個RESOUCE的RM發送相同的FLAG,比如xaStart(xid, TMSUSPEND),即兩個 線程同時發送TMSUSPEND,這樣會引發XA_ERR,如下:

java.sql.SQLException: Unexpected exception while  enlisting XAConnection java.sql.SQLException: XA error:  XAER_RMERR : A resource manager error has occured in the  transaction branch start() failed on resource 'TestXAPool_1':  XAER_RMERR : A resource manager error has occured in the  transaction branch
oracle.jdbc.xa.OracleXAException
at  oracle.jdbc.xa.OracleXAResource.checkError (OracleXAResource.java:1017)
at  oracle.jdbc.xa.client.OracleXAResource.start (OracleXAResource.java:227)
at  weblogic.jdbc.wrapper.VendorXAResource.start (VendorXAResource.java:50)
at weblogic.jdbc.jta.DataSource.start (DataSource.java:629)
at  weblogic.transaction.internal.XAServerResourceInfo.start (XAServerResourceInfo.java:1142)
at  weblogic.transaction.internal.XAServerResourceInfo.xaStart (XAServerResourceInfo.java:1073)
at  weblogic.transaction.internal.XAServerResourceInfo.enlist (XAServerResourceInfo.java:241)
at  weblogic.transaction.internal.ServerTransactionImpl.enlistResource (ServerTransactionImpl.java:463)
at  weblogic.jdbc.jta.DataSource.enlist(DataSource.java:1392)
at  weblogic.jdbc.jta.DataSource.refreshXAConnAndEnlist (DataSource.java:1334)
at  weblogic.jdbc.jta.DataSource.getConnection(DataSource.java:396)
at  weblogic.jdbc.jta.DataSource.connect(DataSource.java:354)
at  weblogic.jdbc.common.internal.RmiDataSource.getConnection (RmiDataSource.java:305)
at  weblogic.jdbc.common.internal.RmiDataSource_WLSkel.invoke(Unknown  Source)
......

雖然測試中沒有什麼問題,但我不建議誰這麼去做,畢竟我們需要遵循規范 。寫這麼個例子,只是讓大家對weblogic的transaction加深些理解,而不是真 的要在生產系統中這樣去做。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved