程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> Java線程池(一),java線程池

Java線程池(一),java線程池

編輯:JAVA綜合教程

Java線程池(一),java線程池


為何使用線程池?

  • 第一:降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。

  • 第二:提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。

  • 第三:提高線程的可管理性。線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一分配、調優和監控。

線程池如何處理任務?

先看線程池處理任務的機制圖:

 

 

 

 

 

 

 

 

 

 

 

 

 

從上圖中可以看出,當提交一個新任務到線程池時,線程池的處理流程如下:

  1. 線程池判斷核心線程池裡的線程是否都在執行任務。如果不是,則創建一個新的工作線程來執行任務。如果核心線程池裡的線程都在執行任務,則進入下個流程。

  2. 線程池判斷工作隊列是否已經滿。如果工作隊列沒有滿,則將新提交的任務存儲在這個工作隊列裡。如果工作隊列滿了,則進入下個流程。

  3. 線程池判斷線程池的線程是否都處於工作狀態。如果沒有,則創建一個新的工作線程來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。

ThreadPoolExecutor執行execute方法分下面4種情況:

  1. 如果當前運行的線程少於corePoolSize,則創建新線程來執行任務(注意,執行這一步驟需要獲取全局鎖)。

  2. 如果運行的線程等於或多於corePoolSize,則將任務加入BlockingQueue。

  3. 如果無法將任務加入BlockingQueue(隊列已滿),則創建新的線程來處理任務(注意,執行這一步驟需要獲取全局鎖)。

  4. 如果創建新線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler.rejectedExecution()方法。

下面是ThreadPoolExecutor的執行流程圖:

 

 

 

 

 

 

 

 

 

 

 

如何創建一個線程池?

創建線程池很簡單,Java提供了ThreadPoolExecutor類。下面的語句就能創建一個大小為10的線程池。

1 new ThreadPoolExecutor(10,10,1,TimeUnit.DAYS);

先來看看ThreadPoolExecutor類的方法簽名:

 1 public ThreadPoolExecutor(int corePoolSize,        //線程池基本大小
 2                           int maximumPoolSize,    //線程池最大線程數
 3                           long keepAliveTime,    //線程活動保持時間
 4                           TimeUnit unit,        //線程活動時間單位
 5                           BlockingQueue<Runnable> workQueue,
 6 
 7 public ThreadPoolExecutor(int corePoolSize,        //線程池基本大小
 8                           int maximumPoolSize,    //線程池最大線程數
 9                           long keepAliveTime,    //線程活動保持時間
10                           TimeUnit unit,        //線程活動時間單位
11                           BlockingQueue<Runnable> workQueue,    //任務隊列
12                           ThreadFactory threadFactory,        //產生線程的工廠
13                           RejectedExecutionHandler handler    //飽和策略);

下面詳細的講解一下這幾個參數:

  • corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即使其他空閒的基本線程能夠執行新任務也會創建線程,等到需要執行的任務數大於線程池基本大小時就不再創建。如果調用了線程池的prestartAllCoreThreads()方法,線程池會提前創建並啟動所有基本線程。

  • maximumPoolSize(線程池最大數量):線程池允許創建的最大線程數。如果隊列滿了,並且已創建的線程數小於最大線程數,則線程池會再創建新的線程執行任務。值得注意的是,如果使用了無界的任務隊列這個參數就沒什麼效果。

  • keepAliveTime(線程活動保持時間):線程池的工作線程空閒後,保持存活的時間。所以,如果任務很多,並且每個任務執行的時間比較短,可以調大時間,提高線程的利用率。

  • TimeUnit(線程活動保持時間的單位):可選的單位有天、小時、分鐘、毫秒、微秒和納秒。

  • runnableTaskQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。可選項有:

    • ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按FIFO(先進先出)原則對元素進行排序。

    • LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。

    • SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於Linked-BlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。

    • PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。

  • ThreadFactory:用於設置創建線程的工廠。

  • RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。

向線程池提交任務

ThreadPoolExecutor提供了兩個方法可以向線程池提交任務去執行submit()execute()。

  • execute()方法用於執行不需要返回結果的任務。所以無法判斷任務是否被線程池執行成功。

  • submit()方法用於提交需要返回值的任務。線程池會返回一個future類型的對象,通過這個future對象可以判斷任務是否執行成功,並且可以通過future的get()方法來獲取返回值。get()方法會阻塞當前線程直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間後立即返回,這時候有可能任務沒有執行完。

看下面一段代碼:

 1 package com.alibaba.thread;
 2 
 3 import java.util.concurrent.*;
 4 
 5 /**
 6  * Created by zhouxuanyu on 2016/12/6.
 7  */
 8 public class CallableAndFuture {
 9     public static void main(String[] args) throws ExecutionException, InterruptedException {
10         ExecutorService executorService = Executors.newScheduledThreadPool(3);
11 
12         Runnable runnable = new Runnable() {
13             public void run() {
14                 System.out.print("runnable()");
15             }
16         };
17 
18         executorService.execute(runnable);  //execute()拿不到返回值
19 
20         Callable<String> callable = new Callable<String>() {
21             public String call() throws Exception {
22                 return "callable";
23             }
24         };
25 
26         Future<String> future = executorService.submit(callable);//submit()可以拿到返回值
27 
28         System.out.print(future.get());
29 
30     }
31 }

 

CompletionService

​  有了上面的基礎,我們可以知道,當我們需要異步執行某個任務的時候,可以將這個任務丟到線程池中,如果不需要結果返回,那麼可以使用execute();相反,則需要使用submit()。當有多個任務執行,比如十個,在交給線程池去執行時,我們固然可以為這十個任務關聯十個Future而拿到結果。但是這樣做實在比較low!

​ Java為我們提供了一個名叫CompletionService的接口,它是Executor和BlockQueue的結合體。你可以將Callable任務交給CompletionService去執行,然後使用類似隊列的take()和poll()方法拿到Future類型的結果。ExecutorCompletionService是CompletionService的一個實現,它將任務交給executor去執行。

​  先看一下ExecutorCompletionService的構造方法:

1 public ExecutorCompletionService(Executor executor,
2                                  BlockingQueue<Future<V>> completionQueue)

  由構造函數可以看到,executor是執行任務的執行器,而completionQueue是用來保存執行結果的隊列。當提交一個任務之後,會首先把這個任務包裝為一個QueueingFuture。QueueingFuture是FutureTask的子類。然後復寫了done()方法,將執行結果放入completionQueue中。

從上面可以知道,ExecutorCompletionService實現了哪一個任務先執行完就返回,而不是按任務添加的順序返回。

下面一個例子:

 1 package com.alibaba.thread;
 2 
 3 import java.util.Random;
 4 import java.util.concurrent.*;
 5 
 6 /**
 7  * Created by zhouxuanyu on 2016/12/14.
 8  */
 9 public class TestCompletionService {
10 
11     public static void main(String[] args){
12 
13         ExecutorService executorService = Executors.newFixedThreadPool(11); //創建一個大小為11的線程池
14         final BlockingQueue<Future<String>> blockingQueue = new LinkedBlockingQueue<Future<String>>();//創建一個結果隊列
15 
16         CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService,blockingQueue);
17 
18         //使用completionService向線程池中添加10個任務
19         for (int i = 0; i < 10; i++) {
20             completionService.submit(new Callable<String>() {
21                 public String call() throws Exception {
22                     int random = new Random().nextInt(10000);
23                     Thread.sleep(random);
24                     return Thread.currentThread().getName() + "---sleep---" + random;
25                 }
26             });
27         }
28 
29         //按照執行完成的順便取出已經完成的任務。
30         for (int i = 0; i < 10; i++) {
31             try {
32                 Future future = completionService.take();
33                 System.out.println(future.get(1000,TimeUnit.NANOSECONDS));
34             } catch (InterruptedException e) {
35                 e.printStackTrace();
36             } catch (ExecutionException e) {
37                 e.printStackTrace();
38             } catch (TimeoutException e) {
39                 e.printStackTrace();
40             }
41         }
42 
43         //關閉線程池
44         executorService.shutdown();
45     }
46 }

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved