隨著互聯網信息技術日新月異的發展,一個海量數據爆炸的時代已經到來。如何有效地處理、分析這些海量的數據資源,成為各大技術廠商爭在激烈的競爭中脫穎而出的一個利器。可以說,如果不能很好的快速處理分析這些海量的數據資源,將很快被市場無情地所淘汰。當然,處理分析這些海量數據目前可以借鑒的方案有很多:首先,在分布式計算方面有Hadoop裡面的MapReduce並行計算框架,它主要針對的是離線的數據挖掘分析。此外還有針對實時在線流式數據處理方面的,同樣也是分布式的計算框架Strom,也能很好的滿足數據實時性分析、處理的要求。最後還有Spring Batch,這個完全面向批處理的框架,可以大規模的應用於企業級的海量數據處理。
在這裡,我就不具體展開說明這些框架如何部署、以及如何開發使用的詳細教程說明。我想在此基礎上更進一步:我們能否借鑒這些開源框架背後的技術背景,為服務的企業或者公司,量身定制一套符合自身數據處理要求的批處理框架。
首先我先描述一下,目前我所服務的公司所面臨的一個用戶數據存儲處理的一個現狀背景。目前移動公司一個省內在網用戶數據規模達到幾千萬的規模數量級,而且每個省已經根據地市區域對用戶數據進行劃分,我們把這批數據存儲在傳統的關系型數據庫上面(基於Oracle,地市是分區)。移動公司的計費結算系統會根據用戶手機話費的余額情況,實時的通知業務處理系統,給手機用戶進行停機、復機的操作。業務處理系統收到計費結算系統的請求,會把要處理的用戶數據往具體的交換機網元上派發不同的交換機指令,這裡簡單的可以稱為Hlr停復機指令(下面開始本文都簡稱Hlr指令)。目前面臨的現狀是,在日常情況下,傳統的C++多進程的後台處理程序還能勉強的“准實時”地處理這些數據請求,但是,如果一旦到了每個月的月初幾天,要處理的數據量往往會暴增,而C++後台程序處理的效率並不高。這時問題來了,往往會有用戶投訴,自己繳費了,為什麼沒有復機?或者某些用戶明明已經欠費了,但是還沒有及時停機。這樣的結果會直接降低客戶對移動運營商支撐的滿意度,於此同時,移動運營商本身也可能流失這些客戶資源。
自己認真評估了一下,造成上述問題的幾個瓶頸所在。
針對上述的問題,本人想到了幾個優化方案。
基於以上幾點考慮,得出如下圖所示的設計方案的組件圖:

下面就具體說明一下,其中關鍵模塊如何協同工作的。

<?xml version="1.0" encoding="GBK"?>
<batchtask>
<!-- 批處理異步線程池參數配置 -->
<jobpool name="newlandframework_batchtask">
<attribute name="corePoolSize" value="15" />
<attribute name="maxPoolSize" value="30" />
<attribute name="keepAliveTime" value="1000" />
<attribute name="workQueueSize" value="200" />
</jobpool>
</batchtask>
其中corePoolSize表示保留的線程池大小,workQueueSize表示的是阻塞隊列的大小,maxPoolSize表示的是線程池的最大大小,keepAliveTime指的是空閒線程結束的超時時間。其中創建線程池方法ThreadPoolExecutor裡面有個參數是unit,它表示一個枚舉,即keepAliveTime的單位。說了半天,這幾個參數到底什麼關系呢?我舉一個例子說明一下,當出現需要處理的任務的時候,ThreadPoolExecutor會分配corePoolSize數量的線程池去處理,如果不夠的話,會把任務放入阻塞隊列,阻塞隊列的大小是workQueueSize,當然這個時候還可能不夠,怎麼辦。只能叫來“臨時工線程”幫忙處理一下,這個時候“臨時工線程”的數量是maxPoolSize-corePoolSize,當然還會繼續不夠,這個時候ThreadPoolExecutor線程池會采取4種處理策略。
現在具體說一下是那些處理策略。首先是ThreadPoolExecutor.AbortPolicy 中,處理程序遭到拒絕將拋出運行時 RejectedExecutionException。然後是ThreadPoolExecutor.CallerRunsPolicy 中,線程調用運行該任務的 execute 本身。此策略提供簡單的反饋控制機制,能夠減緩新任務的提交速度。其次是,ThreadPoolExecutor.DiscardPolicy 中,不能執行的任務將被刪除。最後是ThreadPoolExecutor.DiscardOldestPolicy 中,如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然後重試執行程序(如果再次失敗,則重復此過程)。如果要處理的任務沒有那麼多了,ThreadPoolExecutor線程池會根據keepAliveTime設置的時間單位來回收多余的“臨時工線程”。你可以把keepAliveTime理解成專門是為maxPoolSize-corePoolSize的“臨時工線程”專用的。
線程池參數的設定。正常情況下我們要如何設置線程池的參數呢?我們應該這樣設置:I、workQueueSize阻塞隊列的大小至少大於等於corePoolSize的大小。II、maxPoolSize線程池的大小至少大於等於corePoolSize的大小。III、corePoolSize是你期望處理的默認線程數,個人覺得線程池機制的話,至少大於1吧?不然的話,你這個線程池等於單線程處理任務了,這樣就失去了線程池設計存在的意義了。
介紹完畢了幾個核心模塊主要的功能,那下面就依次介紹一下主要模塊的詳細設計思路。
create table notify_users
(
home_city number(3) /*手機用戶的歸屬地市編碼*/,
msisdn number(15) /*手機號碼*/,
user_id number(15) /*手機用戶的用戶標識*/
);
/**
* @filename:NotifyUsers.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:要進行批處理通知的用戶對象
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.model;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
public class NotifyUsers {
public NotifyUsers() {
}
// 用戶歸屬地市編碼(這裡具體是:591表示福州/592表示廈門)
private Integer homeCity;
// 用戶的手機號碼
private Integer msisdn;
// 用戶標識
private Integer userId;
public Integer getHomeCity() {
return homeCity;
}
public void setHomeCity(Integer homeCity) {
this.homeCity = homeCity;
}
public Integer getMsisdn() {
return msisdn;
}
public void setMsisdn(Integer msisdn) {
this.msisdn = msisdn;
}
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append("homeCity", homeCity).append("userId", userId)
.append("msisdn", msisdn).toString();
}
}
我們通過並行查詢加載模塊BatchQueryLoader調用異步並行查詢執行器BatchQueryExecutor,來並行地加載不同數據源的查詢結果集合。StatementWrapper則是對JDBC裡面Statement的封裝。具體代碼如下所示:/**
* @filename:StatementWrapper.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Statement封裝類
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.parallel;
import java.sql.Connection;
import java.sql.Statement;
public class StatementWrapper {
private final String sql;
private final Statement statement;
private final Connection con;
public StatementWrapper(String sql, Statement statement, Connection con) {
this.sql = sql;
this.statement = statement;
this.con = con;
}
public String getSql() {
return sql;
}
public Statement getStatement() {
return statement;
}
public Connection getCon() {
return con;
}
}
定義兩個並行加載的異常類BatchQueryInterruptedException、BatchQueryExecutionException
/**
* @filename:BatchQueryInterruptedException.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:並行查詢加載InterruptedException異常類
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.parallel;
public class BatchQueryInterruptedException extends RuntimeException {
public BatchQueryInterruptedException(final String errorMessage, final Object... args) {
super(String.format(errorMessage, args));
}
public BatchQueryInterruptedException(final Exception cause) {
super(cause);
}
}
/**
* @filename:BatchQueryExecutionException.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:並行查詢加載ExecutionException異常類
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.parallel;
public class BatchQueryExecutionException extends RuntimeException {
public BatchQueryExecutionException(final String errorMessage, final Object... args) {
super(String.format(errorMessage, args));
}
public BatchQueryExecutionException(final Exception cause) {
super(cause);
}
}
再抽象出一個批量查詢接口,主要是為了後續能擴展在不同的數據庫之間進行批量加載。接口類BatchQuery定義如下
/**
* @filename:BatchQuery.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:異步查詢接口定義
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.parallel;
public interface BatchQuery<IN, OUT> {
OUT query(IN input) throws Exception;
}
好了,現在封裝一個異步並行查詢執行器BatchQueryExecutor
/**
* @filename:BatchQueryExecutor.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:異步並行查詢執行器
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.parallel;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.collections.Closure;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.functors.ForClosure;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class BatchQueryExecutor {
private final static int FUTUREQUERYNUMBER = 1;
public BatchQueryExecutor() {
}
public <IN, OUT> List<OUT> executeQuery(final Collection<IN> inputs,final BatchQuery<IN, OUT> executeUnit) {
ListenableFuture<List<OUT>> futures = submitBatchTaskFutures(inputs,executeUnit);
delegateAsynTask(futures);
return getAsynResults(futures);
}
private <IN, OUT> ListenableFuture<List<OUT>> submitBatchTaskFutures(
final Collection<IN> inputs, final BatchQuery<IN, OUT> executeUnit) {
final Set<ListenableFuture<OUT>> result = new HashSet<ListenableFuture<OUT>>(
inputs.size());
final ListeningExecutorService service = MoreExecutors
.listeningDecorator(Executors.newFixedThreadPool(inputs.size()));
Closure futureQuery = new Closure() {
public void execute(Object input) {
final IN p = (IN) input;
result.add(service.submit(new Callable<OUT>() {
@Override
public OUT call() throws Exception {
return executeUnit.query(p);
}
}));
}
};
Closure parallelTask = new ForClosure(FUTUREQUERYNUMBER, futureQuery);
CollectionUtils.forAllDo(inputs, parallelTask);
service.shutdown();
return Futures.allAsList(result);
}
private <OUT> OUT getAsynResults(final ListenableFuture<OUT> futures) {
try {
return futures.get();
} catch (InterruptedException ex) {
throw new BatchQueryInterruptedException(ex);
} catch (ExecutionException ex) {
throw new BatchQueryExecutionException(ex);
}
}
private <TYPE> void delegateAsynTask(
final ListenableFuture<TYPE> allFutures) {
Futures.addCallback(allFutures, new FutureCallback<TYPE>() {
@Override
public void onSuccess(final TYPE result) {
System.out.println("並行加載查詢執行成功");
}
@Override
public void onFailure(final Throwable thrown) {
System.out.println("並行加載查詢執行失敗");
}
});
}
}
最後的並行查詢加載模塊BatchQueryLoader直接就是調用上面的異步並行查詢執行器BatchQueryExecutor,完成不同數據源的數據並行異步加載,代碼如下
/**
* @filename:BatchQueryLoader.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:並行查詢加載模塊
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.parallel;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
public class BatchQueryLoader {
private final Collection<StatementWrapper> statements = new ArrayList<StatementWrapper>();
public void attachLoadEnv(final String sql, final Statement statement,
final Connection con) {
statements.add(new StatementWrapper(sql, statement, con));
}
public Collection<StatementWrapper> getStatements() {
return statements;
}
public void close() throws SQLException {
Iterator<StatementWrapper> iter = statements.iterator();
while (iter.hasNext()) {
iter.next().getCon().close();
}
}
public List<ResultSet> executeQuery() throws SQLException {
List<ResultSet> result;
if (1 == statements.size()) {
StatementWrapper entity = statements.iterator().next();
result = Arrays.asList(entity.getStatement().executeQuery(
entity.getSql()));
return result;
} else {
BatchQueryExecutor query = new BatchQueryExecutor();
result = query.executeQuery(statements,
new BatchQuery<StatementWrapper, ResultSet>() {
@Override
public ResultSet query(final StatementWrapper input)
throws Exception {
return input.getStatement().executeQuery(
input.getSql());
}
});
return result;
}
}
}
/**
* @filename:BatchTaskConfiguration.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:批處理線程池參數配置
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.parallel;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
public class BatchTaskConfiguration {
private String name;
private int corePoolSize;
private int maxPoolSize;
private int keepAliveTime;
private int workQueueSize;
public void setName(String name) {
this.name = name;
}
public String getName() {
return this.name;
}
public int getCorePoolSize() {
return corePoolSize;
}
public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}
public int getMaxPoolSize() {
return maxPoolSize;
}
public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
public int getKeepAliveTime() {
return keepAliveTime;
}
public void setKeepAliveTime(int keepAliveTime) {
this.keepAliveTime = keepAliveTime;
}
public int getWorkQueueSize() {
return workQueueSize;
}
public void setWorkQueueSize(int workQueueSize) {
this.workQueueSize = workQueueSize;
}
public int hashCode() {
return new HashCodeBuilder(1, 31).append(name).toHashCode();
}
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append("name", name).append("corePoolSize", corePoolSize)
.append("maxPoolSize", maxPoolSize)
.append("keepAliveTime", keepAliveTime)
.append("workQueueSize", workQueueSize).toString();
}
public boolean equals(Object o) {
boolean res = false;
if (o != null
&& BatchTaskConfiguration.class.isAssignableFrom(o.getClass())) {
BatchTaskConfiguration s = (BatchTaskConfiguration) o;
res = new EqualsBuilder().append(name, s.getName()).isEquals();
}
return res;
}
}
當然了,你進行參數配置的時候,還可以指定多個線程池,於是要設計一個:批處理線程池工廠類BatchTaskThreadFactoryConfiguration,來依次循環保存若干個線程池的參數配置
/**
* @filename:BatchTaskThreadFactoryConfiguration.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:線程池參數配置工廠
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.parallel;
import java.util.Map;
import java.util.HashMap;
public class BatchTaskThreadFactoryConfiguration {
// 批處理線程池參數配置
private Map<String, BatchTaskConfiguration> batchTaskMap = new HashMap<String, BatchTaskConfiguration>();
public BatchTaskThreadFactoryConfiguration() {
}
public void joinBatchTaskConfiguration(BatchTaskConfiguration batchTaskConfiguration) {
if (batchTaskMap.containsKey(batchTaskConfiguration.getName())) {
return;
}else{
batchTaskMap.put(batchTaskConfiguration.getName(), batchTaskConfiguration);
}
}
public Map<String, BatchTaskConfiguration> getBatchTaskMap() {
return batchTaskMap;
}
}
剩下的是,加載運行時參數配置模塊BatchTaskConfigurationLoader
/**
* @filename:BatchTaskConfigurationLoader.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:線程池參數配置加載
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.parallel;
import java.io.InputStream;
import org.apache.commons.digester.Digester;
public final class BatchTaskConfigurationLoader {
private static final String BATCHTASK_THREADPOOL_CONFIG = "./newlandframework/batchtask/parallel/batchtask-configuration.xml";
private static BatchTaskThreadFactoryConfiguration config = null;
private BatchTaskConfigurationLoader() {
}
// 單例模式為了控制並發要進行同步控制
public static BatchTaskThreadFactoryConfiguration getConfig() {
if (config == null) {
synchronized (BATCHTASK_THREADPOOL_CONFIG) {
if (config == null) {
try {
InputStream is = getInputStream();
config = (BatchTaskThreadFactoryConfiguration) getDigester().parse(getInputStream());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
return config;
}
private static InputStream getInputStream() {
return BatchTaskConfigurationLoader.class.getClassLoader()
.getResourceAsStream(BATCHTASK_THREADPOOL_CONFIG);
}
private static Digester getDigester() {
Digester digester = new Digester();
digester.setValidating(false);
digester.addObjectCreate("batchtask", BatchTaskThreadFactoryConfiguration.class);
// 加載批處理異步批處理線程池參數配置
digester.addObjectCreate("*/jobpool", BatchTaskConfiguration.class);
digester.addSetProperties("*/jobpool");
digester.addSetProperty("*/jobpool/attribute", "name", "value");
digester.addSetNext("*/jobpool", "joinBatchTaskConfiguration");
return digester;
}
}
上面的這些模塊主要是針對線程池的運行參數可以調整而設計准備的。
BatchTaskRunner這個接口,主要定義了批處理框架要初始化和回收資源的動作。
/**
* @filename:BatchTaskRunner.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:批處理資源管理定義接口
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.parallel;
import java.io.Closeable;
public interface BatchTaskRunner extends Closeable {
public void initialize();
public void close();
}
我們還要重新實現一個線程工廠類BatchTaskThreadFactory,用來管理我們線程池當中的線程。我們可以把線程池當中的線程放到線程組裡面,進行統一管理。比如線程池中的線程,它的運行狀態監控等等處理,你可以通過重新生成一個監控線程,
來運行、跟蹤線程組裡面線程的運行情況。當然你還可以重新封裝一個JMX(Java Management Extensions)的MBean對象,通過JMX方式對線程池進行監控處理,本文的後面,有給出運用JMX技術,進行批處理線程池任務完成情況監控的實現,實現線程池中線程運行狀態的監控可以參考一下。這裡就不具體給出,線程池線程狀態監控的JMX模塊代碼了。言歸正傳,線程工廠類BatchTaskThreadFactory的實現如下
/**
* @filename:BatchTaskThreadFactory.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:線程池工廠
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.parallel;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ThreadFactory;
public class BatchTaskThreadFactory implements ThreadFactory {
final private static String BATCHTASKFACTORYNAME = "batchtask-pool";
final private String name;
final private ThreadGroup threadGroup;
final private AtomicInteger threadNumber = new AtomicInteger(0);
public BatchTaskThreadFactory() {
this(BATCHTASKFACTORYNAME);
}
public BatchTaskThreadFactory(String name) {
this.name = name;
SecurityManager security = System.getSecurityManager();
threadGroup = (security != null) ? security.getThreadGroup() : Thread.currentThread().getThreadGroup();
}
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(threadGroup, runnable);
thread.setName(String.format("BatchTask[%s-%d]", threadGroup.getName(),
threadNumber.incrementAndGet()));
System.out.println(String.format("BatchTask[%s-%d]", threadGroup.getName(),
threadNumber.incrementAndGet()));
if (thread.isDaemon()) {
thread.setDaemon(false);
}
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
return thread;
}
}
下面是關鍵模塊:並行異步批處理模塊BatchTaskReactor的實現代碼,主要還是對ThreadPoolExecutor進行地封裝,考慮使用有界的數組阻塞隊列ArrayBlockingQueue,還是為了防止:生產者無休止的請求服務,導致內存崩潰,最終做到內存使用可控
采取的措施。
/**
* @filename:BatchTaskReactor.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:批處理並行異步線程池處理模塊
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.parallel;
import java.util.Set;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public final class BatchTaskReactor implements BatchTaskRunner {
private Map<String, ExecutorService> threadPools = new ConcurrentHashMap<String, ExecutorService>();
private static BatchTaskReactor context;
private static Lock REACTORLOCK = new ReentrantLock();
public static final String BATCHTASK_THREADPOOL_NAME = "newlandframework_batchtask";
private BatchTaskReactor() {
initialize();
}
// 防止並發重復創建批處理反應器對象
public static BatchTaskReactor getReactor() {
if (context == null) {
try {
REACTORLOCK.lock();
if (context == null) {
context = new BatchTaskReactor();
}
} finally {
REACTORLOCK.unlock();
}
}
return context;
}
public ExecutorService getBatchTaskThreadPoolName() {
return getBatchTaskThreadPool(BATCHTASK_THREADPOOL_NAME);
}
public ExecutorService getBatchTaskThreadPool(String poolName) {
if (!threadPools.containsKey(poolName)) {
throw new IllegalArgumentException(String.format(
"批處理線程池名稱:[%s]參數配置不存在", poolName));
}
return threadPools.get(poolName);
}
public Set<String> getBatchTaskThreadPoolNames() {
return threadPools.keySet();
}
// 關閉線程池,同時等待異步執行的任務返回執行結果
public void close() {
for (Entry<String, ExecutorService> entry : threadPools.entrySet()) {
entry.getValue().shutdown();
System.out.println(String.format("關閉批處理線程池:[%s]成功", entry.getKey()));
}
threadPools.clear();
}
// 初始化批處理線程池
public void initialize() {
BatchTaskThreadFactoryConfiguration poolFactoryConfig = BatchTaskConfigurationLoader.getConfig();
if (poolFactoryConfig != null) {
initThreadPool(poolFactoryConfig);
}
}
private void initThreadPool(BatchTaskThreadFactoryConfiguration poolFactoryConfig) {
for (Entry<String, BatchTaskConfiguration> entry : poolFactoryConfig.getBatchTaskMap().entrySet()) {
BatchTaskConfiguration config = entry.getValue();
// 使用有界的阻塞隊列,考慮為了防止生產者無休止的請求服務,導致內存崩潰,最終做到內存使用可控
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(config.getWorkQueueSize());
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
config.getCorePoolSize(), config.getMaxPoolSize(),
config.getKeepAliveTime(), TimeUnit.SECONDS, queue,
new BatchTaskThreadFactory(entry.getKey()),new ThreadPoolExecutor.CallerRunsPolicy());
threadPools.put(entry.getKey(), threadPool);
System.out.println(String.format("批處理線程池:[%s]創建成功",config.toString()));
}
}
}
/**
* @filename:BusinessEvent.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:業務事件任務接口定義
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.model;
public interface BusinessEvent {
// 執行具體批處理的任務
public int execute(Integer userId);
}
然後具體的Hlr指令發送任務模塊HlrBusinessEvent要實現這個接口類的方法,完成用戶停復機Hlr指令的派發。代碼如下:
/**
* @filename:HlrBusinessEvent.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Hlr指令派發任務接口定義
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.model;
import org.apache.commons.lang.math.RandomUtils;
public class HlrBusinessEvent implements BusinessEvent {
// 交換機上的指令執行成功失敗標識0表示成功 1表示失敗
public final static int TASKSUCC = 0;
public final static int TASKFAIL = 1;
private final static int ELAPSETIME = 1000;
@Override
public int execute(Integer userId) {
// 這裡為了舉例,隨機產生1000以內的隨機數
int millis = RandomUtils.nextInt(ELAPSETIME);
// 簡單模擬往交換機發送停機/復機的指令
try {
Thread.sleep(millis);
String strContent = String.format(
"線程標識[%s]用戶標識:[%d]執行交換機指令工單耗時:[%d]毫秒", Thread
.currentThread().getName(), userId, millis);
System.out.println(strContent);
// 這裡為了演示直接簡單根據隨機數是不是偶數簡單模擬交換機指令執行的結果
return (millis % 2 == 0) ? TASKSUCC : TASKFAIL;
} catch (InterruptedException e) {
e.printStackTrace();
return TASKFAIL;
}
}
}
實際運行情況中,我們可能要監控一下指令發送的時長,於是再設計一個:針對Hlr指令發送任務模塊HlrBusinessEvent,切面嵌入代理的Hlr指令時長計算代理類:HlrBusinessEventAdvisor,具體的代碼如下:
/**
* @filename:HlrBusinessEventAdvisor.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Hlr指令派發時長計算代理類
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.model;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.lang.time.StopWatch;
public class HlrBusinessEventAdvisor implements MethodInterceptor {
public HlrBusinessEventAdvisor() {
}
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
// 計算一下指令派發時長
StopWatch sw = new StopWatch();
sw.start();
Object obj = invocation.proceed();
sw.stop();
System.out.println("執行交換機指令工單耗時: [" + sw.getTime() + "] 毫秒");
return obj;
}
}
剩下的,我們由於是要,異步並行計算得到執行結果,於是我們設計一個:批處理Hlr任務執行模塊HlrBusinessEventTask,它要實現java.util.concurrent.Callable接口的方法call,它會返回一個異步任務的執行結果。
/**
* @filename:HlrBusinessEventTask.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:Hlr指令派任務執行類
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.model;
import java.util.concurrent.Callable;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.NameMatchMethodPointcutAdvisor;
public class HlrBusinessEventTask implements Callable<Integer> {
private NotifyUsers user = null;
private final static String MAPPERMETHODNAME = "execute";
public HlrBusinessEventTask(NotifyUsers user) {
this.user = user;
}
@Override
public Integer call() throws Exception {
synchronized (this) {
ProxyFactory weaver = new ProxyFactory(new HlrBusinessEvent());
NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor();
advisor.setMappedName(MAPPERMETHODNAME);
advisor.setAdvice(new HlrBusinessEventAdvisor());
weaver.addAdvisor(advisor);
BusinessEvent proxyObject = (BusinessEvent) weaver.getProxy();
Integer result = new Integer(proxyObject.execute(user.getUserId()));
// 返回執行結果
return result;
}
}
}
/**
* @filename:NotifyUsersBatchTask.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:通知用戶批處理任務管理類
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.commons.collections.Closure;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.functors.IfClosure;
import org.apache.commons.lang.StringUtils;
import newlandframework.batchtask.jmx.BatchTaskMonitor;
import newlandframework.batchtask.model.NotifyUsers;
import newlandframework.batchtask.parallel.BatchQueryLoader;
import newlandframework.batchtask.parallel.BatchTaskReactor;
public class NotifyUsersBatchTask {
public NotifyUsersBatchTask() {
}
private ArrayList<DataSource> dataSource;
// 基於JMX的任務完成情況監控計數器
private BatchTaskMonitor monitor = new BatchTaskMonitor(BatchTaskReactor.BATCHTASK_THREADPOOL_NAME);
// 支持同時加載多個數據源
public NotifyUsersBatchTask(ArrayList<DataSource> dataSource) {
this.dataSource = dataSource;
}
// 批處理任務執行成功計數器
class NotifyTaskSuccCounter implements Closure {
public static final String NOTIFYTASKSUCCCOUNTER = "TASKSUCCCOUNTER";
private int numberSucc = 0;
public void execute(Object input) {
monitor.increaseBatchTaskCounter(NOTIFYTASKSUCCCOUNTER);
numberSucc++;
}
public int getSuccNumber() {
return numberSucc;
}
}
// 批處理任務執行失敗計數器
class NotifyTaskFailCounter implements Closure {
public static final String NOTIFYTASKFAILCOUNTER = "TASKFAILCOUNTER";
private int numberFail = 0;
public void execute(Object input) {
monitor.increaseBatchTaskCounter(NOTIFYTASKFAILCOUNTER);
numberFail++;
}
public int getFailNumber() {
return numberFail;
}
}
// 並行加載查詢多個水平分庫的數據集合
public List<NotifyUsers> query() throws SQLException {
BatchQueryLoader loader = new BatchQueryLoader();
String strSQL = "select home_city, msisdn, user_id from notify_users";
for (int i = 0; i < dataSource.size(); i++) {
Connection con = dataSource.get(i).getConnection();
Statement st = con.createStatement();
loader.attachLoadEnv(strSQL, st, con);
}
List<ResultSet> list = loader.executeQuery();
System.out.println("查詢出記錄總數為:" + list.size());
final List<NotifyUsers> listNotifyUsers = new ArrayList<NotifyUsers>();
for (int i = 0; i < list.size(); i++) {
ResultSet rs = list.get(i);
while (rs.next()) {
NotifyUsers users = new NotifyUsers();
users.setHomeCity(rs.getInt(1));
users.setMsisdn(rs.getInt(2));
users.setUserId(rs.getInt(3));
listNotifyUsers.add(users);
}
}
// 釋放連接資源
loader.close();
return listNotifyUsers;
}
// 批處理數據集合,任務分派
public void batchNotify(List<NotifyUsers> list,
final ExecutorService excutor) {
System.out.println("處理記錄總數為:" + list.size());
System.out.println(StringUtils.center("記錄明細如下", 40, "-"));
NotifyTaskSuccCounter cntSucc = new NotifyTaskSuccCounter();
NotifyTaskFailCounter cntFail = new NotifyTaskFailCounter();
BatchTaskPredicate predicate = new BatchTaskPredicate(excutor);
Closure batchAction = new IfClosure(predicate, cntSucc, cntFail);
CollectionUtils.forAllDo(list, batchAction);
System.out.println("批處理一共處理:" + list.size() + "記錄,處理成功:"
+ cntSucc.getSuccNumber() + "條記錄,處理失敗:" + cntFail.getFailNumber() + "條記錄");
}
}
異步處理任務執行提交模塊BatchTaskPredicate,主要是從線程池中采集異步提交要處理的任務,然後根據異步的執行結果,反饋給線程池:這個任務執行成功還是執行失敗了。具體代碼如下:
/**
* @filename:BatchTaskPredicate.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:批處理異步任務提交執行任務模塊
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.Predicate;
import newlandframework.batchtask.model.HlrBusinessEvent;
import newlandframework.batchtask.model.HlrBusinessEventTask;
import newlandframework.batchtask.model.NotifyUsers;
public class BatchTaskPredicate implements Predicate {
private ExecutorService excutor = null;
public BatchTaskPredicate(ExecutorService excutor) {
this.excutor = excutor;
}
public boolean evaluate(Object object) {
if (object instanceof NotifyUsers) {
NotifyUsers users = (NotifyUsers) object;
Future<Integer> future = excutor.submit(new HlrBusinessEventTask(users));
try {
// 設定5s超時
Integer result = future.get(5, TimeUnit.SECONDS);
return result.intValue() == HlrBusinessEvent.TASKSUCC;
} catch (Exception e) {
// 如果失敗試圖取消對此任務的執行
future.cancel(true);
e.printStackTrace();
return false;
}
} else {
return false;
}
}
}
最後,我們通過,通知用戶批處理任務管理類NotifyUsersBatchTask,它構造的時候,可以通過指定數據庫連接池,批量加載多個數據源的數據對象。這裡我們假設並行加載cms/ccs兩個數據源對應的notify_users表的數據,它的spring配置batchtask-multidb.xml配置內容如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>newlandframework/batchtask/jdbc-cms.properties</value>
<value>newlandframework/batchtask/jdbc-ccs.properties</value>
</list>
</property>
</bean>
<bean id="dtSource-cms" destroy-method="close" class="org.apache.commons.dbcp.BasicDataSource">
<property name="driverClassName" value="${jdbc.cms.driverClassName}"/>
<property name="url" value="${jdbc.cms.url}"/>
<property name="username" value="${jdbc.cms.username}"/>
<property name="password" value="${jdbc.cms.password}"/>
</bean>
<bean id="dtSource-ccs" destroy-method="close" class="org.apache.commons.dbcp.BasicDataSource">
<property name="driverClassName" value="${jdbc.ccs.driverClassName}"/>
<property name="url" value="${jdbc.ccs.url}"/>
<property name="username" value="${jdbc.ccs.username}"/>
<property name="password" value="${jdbc.ccs.password}"/>
</bean>
<bean id="notifyUsers" class="newlandframework.batchtask.NotifyUsersBatchTask">
<constructor-arg name="dataSource">
<list>
<ref bean="dtSource-ccs"/>
<ref bean="dtSource-cms"/>
</list>
</constructor-arg>
</bean>
</beans>
/**
* @filename:BatchTaskMonitorMBean.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:JMX批處理任務監控接口
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.jmx;
public interface BatchTaskMonitorMBean {
public int getBatchTaskCounter(String taskName);
}
我們再來實現這個接口,於是設計得到BatchTaskMonitor模塊
/**
* @filename:BatchTaskMonitor.java
*
* Newland Co. Ltd. All rights reserved.
*
* @Description:JMX批處理任務監控模塊
* @author tangjie
* @version 1.0
*
*/
package newlandframework.batchtask.jmx;
import javax.management.AttributeChangeNotification;
import javax.management.NotificationBroadcasterSupport;
import javax.management.ObjectName;
import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanRegistrationException;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import java.util.concurrent.atomic.AtomicInteger;
import java.lang.management.ManagementFactory;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
public class BatchTaskMonitor extends NotificationBroadcasterSupport
implements BatchTaskMonitorMBean {
private static final String TASKMONITOR_NAME = "newlandframework.batchtask.jmx.{0}:type=BatchTaskMonitor";
Map<String, AtomicInteger> batchTaskCounter;
private int sequenceTaskNumber = 0;
// 注冊MBean,內置計數器,實時監控批處理任務的成功/失敗情況
public BatchTaskMonitor(String taskName) {
batchTaskCounter = new HashMap<String, AtomicInteger>();
try {
registerMBean(taskName);
} catch (InstanceAlreadyExistsException e) {
System.out.println("InstanceAlreadyExistsException BatchTaskMonitor Register Fail");
} catch (MBeanRegistrationException e) {
System.out.println("MBeanRegistrationException BatchTaskMonitor Register Fail");
} catch (NotCompliantMBeanException e) {
System.out.println("NotCompliantMBeanException BatchTaskMonitor Register Fail");
} catch (MalformedObjectNameException e) {
System.out.println("MalformedObjectNameException BatchTaskMonitor Register Fail");
}
}
private void registerMBean(String taskName)
throws InstanceAlreadyExistsException, MBeanRegistrationException,
NotCompliantMBeanException, MalformedObjectNameException {
String strObjectName = MessageFormat.format(TASKMONITOR_NAME, taskName);
ManagementFactory.getPlatformMBeanServer().registerMBean(this,
new ObjectName(strObjectName));
}
// 批處理任務計數器遞增
public void increaseBatchTaskCounter(String taskName) {
if (batchTaskCounter.containsKey(taskName)) {
notifyMessage(taskName, batchTaskCounter.get(taskName).incrementAndGet());
} else {
batchTaskCounter.put(taskName, new AtomicInteger(1));
}
}
private void notifyMessage(String taskName, int batchNewTaskCounter) {
sendNotification(new AttributeChangeNotification(this,
sequenceTaskNumber++, System.currentTimeMillis(),
"batchTaskCounter \"" + taskName + "\" incremented",
"batchTaskCounter", "int", batchNewTaskCounter - 1,
batchNewTaskCounter));
}
// 獲取計數器的計數結果
public int getBatchTaskCounter(String taskName) {
if (batchTaskCounter.containsKey(taskName)) {
return batchTaskCounter.get(taskName).intValue();
} else {
return 0;
}
}
}
其中,計數器的名稱,我已經在NotifyUsersBatchTask模塊中已經指定了。批處理任務執行成功計數器叫做:String NOTIFYTASKSUCCCOUNTER = "TASKSUCCCOUNTER"。批處理任務執行失敗計數器叫做String NOTIFYTASKFAILCOUNTER = "TASKFAILCOUNTER"。這樣我們就可以通過JConsole實現,監控線程池任務的運行處理情況了。
try {
// 初始化並行異步任務執行反應器
BatchTaskReactor reactor = BatchTaskReactor.getReactor();
final ExecutorService excutor = reactor.getBatchTaskThreadPool(BatchTaskReactor.BATCHTASK_THREADPOOL_NAME);
List<NotifyUsers> listNotifyUsers = null;
NotifyUsersBatchTask notifyTask = (NotifyUsersBatchTask) context.getBean("notifyUsers");
// 並行查詢水平分庫的結果
listNotifyUsers = notifyTask.query();
StopWatch sw = new StopWatch();
sw.start();
// 並行異步批處理查詢結果集合
notifyTask.batchNotify(listNotifyUsers, excutor);
sw.stop();
reactor.close();
String strContent = String.format("=========批處理並行任務執行結束,耗時[%d]毫秒=========", sw.getTime());
System.out.println(strContent);
} catch (SQLException e) {
e.printStackTrace();
}
我們再來運行一下,看下結果如何?先在數據庫中分別插入福州591、廈門592一共80條的待處理數據(實際上,你可以插得更多,越多越能體現出這種異步並行批處理框架的價值)。運行截圖如下:

正如我們所預想地那樣。很好。
現在,我們再通過JMX技術,查看監控一下,並行批處理異步線程池任務的完成情況吧。我們先連接上我們的MBean對象BatchTaskMonitor。

發現裡面有個暴露的操作方法getBatchTaskCounter(根據計數器名稱返回計數結果)。我們在上面紅圈的輸入框內,輸入統計失敗任務個數的計數器TASKFAILCOUNTER,然後點擊確定。最後運行結果如下所示:

發現我們批處理任務,目前已經處理失敗了196個啦!正如我們希望的那樣,可視化實時監控的感覺非常好。
寫在最後
最終,我們通過並行異步加載技術和線程池機制設計出了一個精簡的批處理框架。上面的代碼雖然不算多,但是,有它很獨特的應用場景,麻雀雖小五髒俱全。相信它對於其他的同行朋友,還是很有借鑒意義的。況且現在的服務器都是多核、多CPU的配置,我們要很好地利用這一硬件資源。對於IO密集型的應用,可以根據上面的思路,加以改良,相信一定能收到不錯的效果!
好了,不知不覺地寫了這麼多的內容和代碼。本文的前期准備、編碼、調試、文章編寫工作,也消耗了本人大量的腦力和精力。不過還是挺開心的,想著能把自己的一些想法通過博客的方式沉澱下來,對別人有借鑒意義,而對自己則是一種“學習和總結”。路漫漫其修遠兮,吾將上下而求索。故在此,拋磚引玉。如果本人有說地不對的地方,希望各位園友批評指正!不吝賜教!