程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> 揭密FutureTask,futuretask

揭密FutureTask,futuretask

編輯:JAVA綜合教程

揭密FutureTask,futuretask


      在java多線程編程中,我們經常使用線程池提交任務,並且通過Future來獲取任務執行的結果,以此達到異步或者並行執行的效果。在jdk1.7以前,FutureTask是Future唯一的實現類,1.7後加入了ForkJoinTask類。本文主要總結一下我對FutureTask的理解。

Future類

  Future接口定義了5個方法,分別是 

 boolean cancel(boolean mayInterruptIfRunning);
 boolean isCancelled(); 
 boolean isDone(); 
 V get() throws InterruptedException, ExecutionException; 
 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

    分別介紹一下這五個接口的用途:

  • boolean cancel(boolean mayInterruptInRunning) 取消一個正在執行中的任務,並且返回調用結果。如果取消成功則返回true,反之返回false。這裡要注意,即使方法返回true,當前任務也未必真的被取消了,後面會介紹。
  • boolean isCancelled() 返回當前任務是否被取消。
  • Boolean isDone() 返回當前任務是否執行完畢。這裡done的概念比較廣,包括了futureTask被執行後的任意狀態,例如正常執行完畢、執行異常或者任務被取消。
  • V get() 這個接口就是用來獲取futureTask執行結果,調用這個接口時會被阻塞,直到拿到結果或者異常。
  • V get(long timeout, TimeUnit unit) 這個接口多了一個超時時間,如果過了這個時間task仍然沒有結果返回,則拋出timeout異常

    寫個demo便於理解

 1 public class FutureDemo {
 2     public static void main(String[] args) {
 3         ExecutorService executorService = Executors.newCachedThreadPool();
 4         Future future = executorService.submit(new Callable<Object>() {
 5             @Override
 6             public Object call() throws Exception {
 7                 Long start = System.currentTimeMillis();
 8                 while (true) {
 9                     Long current = System.currentTimeMillis();
10                     if ((current - start) > 1000) {
11                         return 1;
12                     }
13                 }
14             }
15         });
16 
17         try {
18             Integer result = (Integer)future.get();
19             System.out.println(result);
20         }catch (Exception e){
21             e.printStackTrace();
22         }
23     }
24 }

   這裡我們模擬了1s鐘的CPU空轉,當執行future.get()的時候,主線程阻塞了大約一秒後,把結果打印出來:1。

        當然我們也可以使用V get(long timeout, TimeUnit unit),這個方法提供了一個超時時間的設置,如果超過當前時間任務線程還未返回,那麼就會停止阻塞狀態,並且拋出一個timeout異常。如下

1         try {
2             Integer result = (Integer) future.get(500, TimeUnit.MILLISECONDS);
3             System.out.println(result);
4         } catch (Exception e) {
5             e.printStackTrace();
6         }

        這裡我們設置的超時時間是500毫秒,由於一開始我們模擬了1s的CPU計算時間,這裡便會拋出超時異常,打印出堆棧信息

     

      當然,如果我們把超時時間設置的長一些,還是可以得到預期的結果的。

FutureTask內部實現機制

  剛我們測試了最常用的兩個方法,接下來我們來探一探FutureTask的內部實現機制。首先我們看一下FutureTask的繼承結構:

          

      FutureTask實現了RunnableFuture接口,而RunnableFuture繼承了Runnable和Future,也就是說FutureTask既可以當做一個Runnable,也可以當做一個Future。

  FutureTask內部定義了7個狀態,代表了FutureTask當前所處狀態。如下

    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

      當一個任務剛提交的時候,狀態為NEW,由FutureTask的構造器可知:

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

  任務執行正常結束前,state會被設置成COMPLETING,代表任務即將完成,接下來很快就會被設置為NARMAL或者EXCEPTIONAL,這取決於調用Runnable中的call()方法是否拋出了異常。如果沒有異常,則state設為NARMAL,反之為EXCEPTIONAL。

  如果任務提交後,在任務執行結束之前調用cancel(boolean mayInterruptIfRunning) 取消任務,那麼有可能進入到後3個狀態。如果傳入的參數是false,state會被置為CANCELLED,反之如果傳入true,state先被置為INTERRUPTING,後被置為INTERRUPTED。

     總結下,FutureTask的狀態流轉過程,可以出現以下三種狀態:

        1. 正常執行完畢。 NEW -> COMPLETING -> NORMAL

    2. 執行中出現異常。NEW -> COMPLETING -> EXCEPTIONAL

        3. 任務執行過程中被取消,並且不響應中斷。NEW -> CANCELLED

   4. 任務執行過程中被取消,並且響應中斷。 NEW -> INTERRUPTING -> INTERRUPTED  

  那麼以上狀態為什麼會這麼流轉呢?接下來我們一起扒一扒FutureTask的源碼。我們從futureTask的方法看起。

1 public void run()

 1 public void run() {
 2         if (state != NEW ||
 3             !UNSAFE.compareAndSwapObject(this, runnerOffset,
 4                                          null, Thread.currentThread()))
 5             return;
 6         try {
 7             Callable<V> c = callable;
 8             if (c != null && state == NEW) {
 9                 V result;
10                 boolean ran;
11                 try {
12                     result = c.call();
13                     ran = true;
14                 } catch (Throwable ex) {
15                     result = null;
16                     ran = false;
17                     setException(ex);
18                 }
19                 if (ran)
20                     set(result);
21             }
22         } finally {
23             // runner must be non-null until state is settled to
24             // prevent concurrent calls to run()
25             runner = null;
26             // state must be re-read after nulling runner to prevent
27             // leaked interrupts
28             int s = state;
29             if (s >= INTERRUPTING)
30                 handlePossibleCancellationInterrupt(s);
31         }
32     }

  翻譯一下,這個方法經歷了以下幾步

      1. 校驗Task狀態和當前線程引用runner,如果state不為NEW或者runner引用為null,直接返回。

  2. 調用runner的call()方法執行主邏輯,並且嘗試獲得返回值result。

  3. 如果拋出異常,調用setException(Throwable t)方法

  4. 如果沒有異常,調用set(V v)方法

  5. 一些掃尾工作

 那麼setException(Throwable t)和set(V v)做了什麼呢?我們看一下源碼

protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

  set(V v) 方法首先做一個CAS操作,將state字段由NEW->COMPLETING,這裡的CAS操作讀者可以自行百度原理。如果成功,那麼把執行結果v賦給成員變量outcome,再把state的值設置為NORMAL,最後做一些清理工作,喚醒所有等待線程並把callable對象置為null。

 protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

  同理,setException(Throwable t) 方法大同小異,只不過state字段流轉為NEW->COMPLETING->EXCEPTION。同時把異常對象賦予v。

  這裡我們就清楚了,當一個任務被提交後,狀態流轉中1、2是怎麼來的了。同時我們可以確定,outcome變量,存著是執行結果或者拋出的異常對象。

2  public V get() throws InterruptionException,ExecutionException

    get() 和 get(long timeout, TimeUnit unit)方法是獲取執行結果的兩個方法,我們這裡就看get()方法即可。首先貼源碼

  

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

  首先檢查state值,如果小於COMPLETING,則阻塞,阻塞時可能會拋出異常,這裡我們不糾結這個,往下看。如果沒有拋出異常,獲取執行後返回的state值,最後調用report(s)方法。接著我們看report方法,如果s為NORMAL,返回執行結果outcome,否則拋出異常。結合之前的run()方法,我們這裡可以得出,如果主邏輯正常執行完畢,則返回執行結果,如果拋出異常,那麼這裡會封裝該異常為ExecutionException並且拋出。如果任務執行過程中被取消了,則可能拋出CancellationException()。

3 public boolean cancel(boolean mayInterruptIfRunning)

  這個方法個人認為是最具爭議的方法。這裡我們先貼個demo

  

 1 public class FutureDemo {
 2     public static void main(String[] args) {
 3         ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
 4         // 預創建線程
 5         executorService.prestartCoreThread();
 6 
 7         Future future = executorService.submit(new Callable<Object>() {
 8             @Override
 9             public Object call() {
10                 System.out.println("start to run callable");
11                 Long start = System.currentTimeMillis();
12                 while (true) {
13                     Long current = System.currentTimeMillis();
14                     if ((current - start) > 1000) {
15                         System.out.println("當前任務執行已經超過1s");
16                         return 1;
17                     }
18                 }
19             }
20         });
21 
22         System.out.println(future.cancel(false));
23 
24         try {
25             Thread.currentThread().sleep(3000);
26             executorService.shutdown();
27         } catch (Exception e) {
28             //NO OP
29         }
30     }
31 }

我們多次測試後發現,出現了2種打印結果,如圖

                結果1

                結果2    

咦,兩個結果和預期的都好像不太一樣?第一種是任務壓根沒取消,第二種則是任務壓根沒提交成功,似乎和方法簽名cancel不太一致?

我們先看一下方法簽名上的作者注釋

/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when <tt>cancel</tt> is called,
* this task should never run. If the task has already started,
* then the <tt>mayInterruptIfRunning</tt> parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return <tt>true</tt>. Subsequent calls to {@link #isCancelled}
* will always return <tt>true</tt> if this method returned <tt>true</tt>.
*
* @param mayInterruptIfRunning <tt>true</tt> if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return <tt>false</tt> if the task could not be cancelled,
* typically because it has already completed normally;
* <tt>true</tt> otherwise
*/
  這裡我們可以看到,"嘗試"取消任務的執行,如果當前任務已經結束或者已經取消,則當前取消操作會失敗。如果任務還沒開始就被取消,那麼任務則不會被執行。
這裡我們就知道了,如果任務還沒開始執行時cancel(false)就被調用,那麼這個任務是不會被執行的,這就解釋了出現上圖結果2的情況。那如果任務已經開始執行,並且
調用cancel(false),是不會終止任務的。我們還是從源碼去分析cancel()究竟做了哪些事。
public boolean cancel(boolean mayInterruptIfRunning) {
        if (state != NEW)
            return false;
        if (mayInterruptIfRunning) {
            if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
                return false;
            Thread t = runner;
            if (t != null)
                t.interrupt();
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
        }
        else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
            return false;
        finishCompletion();
        return true;
    }

  執行邏輯如下

     1. 如果當前futureTask狀態不為NEW,直接返回false,表示取消操作失敗。

  2. 如果傳入true,代表可能會引發線程中斷。一個CAS操作,把狀態由NEW->INTERRUPTING,如果執行失敗則直接返回false。設置當前工作線程中斷標識為true,然後把futureTask狀態設置為INTERRUPTED。

  3. 如果傳入false,把futureTask狀態設置為CANCELLED。

  4. 做一些清理工作

    可見,cancel()方法僅僅是改變了futureTask的狀態位!如果傳入的是false,當前任務是不會被終止的,而是會繼續執行,直到異常或者執行完畢。如果傳入的是true,會調用當前線程的interrupt()方法,把中斷標志位設為true。所以cancel()方法其實個人理解是有歧義的,它並不能真正取消一個任務的執行。事實上,除非線程自己停止自己的任務,或者退出JVM,是沒有其他方法完全終止一個線程的任務的。cancel(true)方法也只是希望當前線程可以響應中斷而已,當線程被阻塞,拋出InterruptedException。同時,由之前的future.get()方法可知,如果一個futureTask被cancel()了,調用get()方法會拋出CancellationException。

總結

  理解FutureTask,我們使用Future類才能更加得心應手。這裡也只是作者自己的理解,如有不對之處,還望讀者批評指正。

 

 

作者:mayday芋頭

出處:http://www.cnblogs.com/maypattis/ 本博客中未標明轉載的文章歸作者mayday芋頭和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利

   

 

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved