在java多線程編程中,我們經常使用線程池提交任務,並且通過Future來獲取任務執行的結果,以此達到異步或者並行執行的效果。在jdk1.7以前,FutureTask是Future唯一的實現類,1.7後加入了ForkJoinTask類。本文主要總結一下我對FutureTask的理解。
Future接口定義了5個方法,分別是
boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
分別介紹一下這五個接口的用途:
寫個demo便於理解
1 public class FutureDemo {
2 public static void main(String[] args) {
3 ExecutorService executorService = Executors.newCachedThreadPool();
4 Future future = executorService.submit(new Callable<Object>() {
5 @Override
6 public Object call() throws Exception {
7 Long start = System.currentTimeMillis();
8 while (true) {
9 Long current = System.currentTimeMillis();
10 if ((current - start) > 1000) {
11 return 1;
12 }
13 }
14 }
15 });
16
17 try {
18 Integer result = (Integer)future.get();
19 System.out.println(result);
20 }catch (Exception e){
21 e.printStackTrace();
22 }
23 }
24 }
這裡我們模擬了1s鐘的CPU空轉,當執行future.get()的時候,主線程阻塞了大約一秒後,把結果打印出來:1。
當然我們也可以使用V get(long timeout, TimeUnit unit),這個方法提供了一個超時時間的設置,如果超過當前時間任務線程還未返回,那麼就會停止阻塞狀態,並且拋出一個timeout異常。如下
1 try {
2 Integer result = (Integer) future.get(500, TimeUnit.MILLISECONDS);
3 System.out.println(result);
4 } catch (Exception e) {
5 e.printStackTrace();
6 }
這裡我們設置的超時時間是500毫秒,由於一開始我們模擬了1s的CPU計算時間,這裡便會拋出超時異常,打印出堆棧信息

當然,如果我們把超時時間設置的長一些,還是可以得到預期的結果的。
剛我們測試了最常用的兩個方法,接下來我們來探一探FutureTask的內部實現機制。首先我們看一下FutureTask的繼承結構:

FutureTask實現了RunnableFuture接口,而RunnableFuture繼承了Runnable和Future,也就是說FutureTask既可以當做一個Runnable,也可以當做一個Future。
FutureTask內部定義了7個狀態,代表了FutureTask當前所處狀態。如下
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
當一個任務剛提交的時候,狀態為NEW,由FutureTask的構造器可知:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
任務執行正常結束前,state會被設置成COMPLETING,代表任務即將完成,接下來很快就會被設置為NARMAL或者EXCEPTIONAL,這取決於調用Runnable中的call()方法是否拋出了異常。如果沒有異常,則state設為NARMAL,反之為EXCEPTIONAL。
如果任務提交後,在任務執行結束之前調用cancel(boolean mayInterruptIfRunning) 取消任務,那麼有可能進入到後3個狀態。如果傳入的參數是false,state會被置為CANCELLED,反之如果傳入true,state先被置為INTERRUPTING,後被置為INTERRUPTED。
總結下,FutureTask的狀態流轉過程,可以出現以下三種狀態:
1. 正常執行完畢。 NEW -> COMPLETING -> NORMAL
2. 執行中出現異常。NEW -> COMPLETING -> EXCEPTIONAL
3. 任務執行過程中被取消,並且不響應中斷。NEW -> CANCELLED
4. 任務執行過程中被取消,並且響應中斷。 NEW -> INTERRUPTING -> INTERRUPTED
那麼以上狀態為什麼會這麼流轉呢?接下來我們一起扒一扒FutureTask的源碼。我們從futureTask的方法看起。
1 public void run()
1 public void run() {
2 if (state != NEW ||
3 !UNSAFE.compareAndSwapObject(this, runnerOffset,
4 null, Thread.currentThread()))
5 return;
6 try {
7 Callable<V> c = callable;
8 if (c != null && state == NEW) {
9 V result;
10 boolean ran;
11 try {
12 result = c.call();
13 ran = true;
14 } catch (Throwable ex) {
15 result = null;
16 ran = false;
17 setException(ex);
18 }
19 if (ran)
20 set(result);
21 }
22 } finally {
23 // runner must be non-null until state is settled to
24 // prevent concurrent calls to run()
25 runner = null;
26 // state must be re-read after nulling runner to prevent
27 // leaked interrupts
28 int s = state;
29 if (s >= INTERRUPTING)
30 handlePossibleCancellationInterrupt(s);
31 }
32 }
翻譯一下,這個方法經歷了以下幾步
1. 校驗Task狀態和當前線程引用runner,如果state不為NEW或者runner引用為null,直接返回。
2. 調用runner的call()方法執行主邏輯,並且嘗試獲得返回值result。
3. 如果拋出異常,調用setException(Throwable t)方法
4. 如果沒有異常,調用set(V v)方法
5. 一些掃尾工作
那麼setException(Throwable t)和set(V v)做了什麼呢?我們看一下源碼
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
set(V v) 方法首先做一個CAS操作,將state字段由NEW->COMPLETING,這裡的CAS操作讀者可以自行百度原理。如果成功,那麼把執行結果v賦給成員變量outcome,再把state的值設置為NORMAL,最後做一些清理工作,喚醒所有等待線程並把callable對象置為null。
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
同理,setException(Throwable t) 方法大同小異,只不過state字段流轉為NEW->COMPLETING->EXCEPTION。同時把異常對象賦予v。
這裡我們就清楚了,當一個任務被提交後,狀態流轉中1、2是怎麼來的了。同時我們可以確定,outcome變量,存著是執行結果或者拋出的異常對象。
2 public V get() throws InterruptionException,ExecutionException
get() 和 get(long timeout, TimeUnit unit)方法是獲取執行結果的兩個方法,我們這裡就看get()方法即可。首先貼源碼
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
首先檢查state值,如果小於COMPLETING,則阻塞,阻塞時可能會拋出異常,這裡我們不糾結這個,往下看。如果沒有拋出異常,獲取執行後返回的state值,最後調用report(s)方法。接著我們看report方法,如果s為NORMAL,返回執行結果outcome,否則拋出異常。結合之前的run()方法,我們這裡可以得出,如果主邏輯正常執行完畢,則返回執行結果,如果拋出異常,那麼這裡會封裝該異常為ExecutionException並且拋出。如果任務執行過程中被取消了,則可能拋出CancellationException()。
3 public boolean cancel(boolean mayInterruptIfRunning)
這個方法個人認為是最具爭議的方法。這裡我們先貼個demo
1 public class FutureDemo {
2 public static void main(String[] args) {
3 ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
4 // 預創建線程
5 executorService.prestartCoreThread();
6
7 Future future = executorService.submit(new Callable<Object>() {
8 @Override
9 public Object call() {
10 System.out.println("start to run callable");
11 Long start = System.currentTimeMillis();
12 while (true) {
13 Long current = System.currentTimeMillis();
14 if ((current - start) > 1000) {
15 System.out.println("當前任務執行已經超過1s");
16 return 1;
17 }
18 }
19 }
20 });
21
22 System.out.println(future.cancel(false));
23
24 try {
25 Thread.currentThread().sleep(3000);
26 executorService.shutdown();
27 } catch (Exception e) {
28 //NO OP
29 }
30 }
31 }
我們多次測試後發現,出現了2種打印結果,如圖

結果1

結果2
咦,兩個結果和預期的都好像不太一樣?第一種是任務壓根沒取消,第二種則是任務壓根沒提交成功,似乎和方法簽名cancel不太一致?
我們先看一下方法簽名上的作者注釋
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when <tt>cancel</tt> is called,
* this task should never run. If the task has already started,
* then the <tt>mayInterruptIfRunning</tt> parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return <tt>true</tt>. Subsequent calls to {@link #isCancelled}
* will always return <tt>true</tt> if this method returned <tt>true</tt>.
*
* @param mayInterruptIfRunning <tt>true</tt> if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return <tt>false</tt> if the task could not be cancelled,
* typically because it has already completed normally;
* <tt>true</tt> otherwise
*/
這裡我們可以看到,"嘗試"取消任務的執行,如果當前任務已經結束或者已經取消,則當前取消操作會失敗。如果任務還沒開始就被取消,那麼任務則不會被執行。
這裡我們就知道了,如果任務還沒開始執行時cancel(false)就被調用,那麼這個任務是不會被執行的,這就解釋了出現上圖結果2的情況。那如果任務已經開始執行,並且
調用cancel(false),是不會終止任務的。我們還是從源碼去分析cancel()究竟做了哪些事。
public boolean cancel(boolean mayInterruptIfRunning) {
if (state != NEW)
return false;
if (mayInterruptIfRunning) {
if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
return false;
Thread t = runner;
if (t != null)
t.interrupt();
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
}
else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
return false;
finishCompletion();
return true;
}
執行邏輯如下
1. 如果當前futureTask狀態不為NEW,直接返回false,表示取消操作失敗。
2. 如果傳入true,代表可能會引發線程中斷。一個CAS操作,把狀態由NEW->INTERRUPTING,如果執行失敗則直接返回false。設置當前工作線程中斷標識為true,然後把futureTask狀態設置為INTERRUPTED。
3. 如果傳入false,把futureTask狀態設置為CANCELLED。
4. 做一些清理工作
可見,cancel()方法僅僅是改變了futureTask的狀態位!如果傳入的是false,當前任務是不會被終止的,而是會繼續執行,直到異常或者執行完畢。如果傳入的是true,會調用當前線程的interrupt()方法,把中斷標志位設為true。所以cancel()方法其實個人理解是有歧義的,它並不能真正取消一個任務的執行。事實上,除非線程自己停止自己的任務,或者退出JVM,是沒有其他方法完全終止一個線程的任務的。cancel(true)方法也只是希望當前線程可以響應中斷而已,當線程被阻塞,拋出InterruptedException。同時,由之前的future.get()方法可知,如果一個futureTask被cancel()了,調用get()方法會拋出CancellationException。
理解FutureTask,我們使用Future類才能更加得心應手。這裡也只是作者自己的理解,如有不對之處,還望讀者批評指正。
作者:mayday芋頭
出處:http://www.cnblogs.com/maypattis/ 本博客中未標明轉載的文章歸作者mayday芋頭和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利