程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> Java並發編程系列之二十五:線程池

Java並發編程系列之二十五:線程池

編輯:JAVA綜合教程

Java並發編程系列之二十五:線程池


線程池簡介

在之前介紹Executor框架的文章中對線程池有一個初步的認識,實際上線程池這種設計思想在Java中很普遍,比如JVM中常量池,以及Web開發使用到的數據庫連接池。這些池本質上還是Java中的對象池,因為池中存放的都是Java對象。回到線程池,幾乎所有需要異步或者執行並發任務的程序都可以使用到線程池。使用線程池帶來的好處主要包括以下幾個方面:

一,提高資源利用率。由於線程池中的線程使可以重復利用的,所以達到了循環利用的目的
二,提高響應速度。由於線程的創建也是需要開銷的,如果請求到來的時候可以直接使用已經創建好的線程對象自然就能提高響應速度了
三,便於對線程進行統一的管理。線程屬於稀缺資源,如果無限制的創建,不僅會消耗大量的資源還會大大降低系統的穩定性。使用線程池則可以對線程進行統一的分配和監控。

線程池的實現原理

其實線程池一句話就可以概括:通過將事先創建好的線程存放起來,在需要的時候直接拿過來使用就可以了。但是,為了提高線程池的性能,實際的線程池要比這種簡化版復雜得多。在前面的線程池中,都只接收Runnable和Callable的任務,或者稱為工作單元。那麼就來分析當一個工作單元提交到線程池的時候具體發生了什麼。

線程池首先判斷核心線程池中線程是否都在執行任務。如果不是,則創建一個新的工作線程來執行任務。如果都在執行任務,也就是沒有空閒線程的話就進入下個流程 線程池繼續判斷工作隊列是否已經滿了。如果工作隊列沒有滿,則把新提交的任務放入該工作隊列中,如果工作隊列已經滿了,則進入下個流程 線程池判斷線程池中的線程是否都處於工作狀態。如果不是,則創建一個新的線程執行提交的任務,如果是,則執行飽和策略

下面是線程池的執行流程:

線程池的執行流程

在Java中實現的線程池的核心類是ThreadPoolExecutor,該類的execute方法的執行流程就是上面的過程。注意上面三個加粗的詞匯:核心線程池、工作隊列和飽和策略。細化到ThreadPoolExecutor執行execute方法的過程,對上面的過程補充如下:核心線程池對應corePoolSize變量的值,如果運行的線程小於corePoolSize,則創建新的線程執行任務(這個過程需要獲取全局鎖);如果運行的線程大於corePoolSize,則將任務加入BlockingQueue(對應工作隊列);如果無法加入則創建新的線程執行任務,這個步驟中,如果創建新線程後當前運行的線程數大於maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler.rejectedExcution()方法。

ThreadPoolExecutor為了避免執行新提交的任務獲取全局鎖,ThreadPoolExecutor在創建後會執行一個預熱過程,所謂預熱就是讓當前運行的線程數大於等於corePoolSize。這樣,後面新提交的任務都將直接加入到BlockingQueue。而這個過程是不需要獲取全局鎖的,自然就能提高線程池的性能。為了對ThreadPoolExecutor執行execute方法的過程一探究竟,來扒扒其源碼:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //如果當前正在運行的線程數小於corePoolSize,則創建新的線程
        //執行當前任務
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果當前運行的線程數大於等於corePoolSize或者線程創建失敗
        //則把當前任務放入工作隊列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //判斷之前是否已經添加過線程執行該任務(因為可能之前)
            //創建的線程已經死亡了)或者線程池是否已經關閉。如果
            //兩個答案都是肯定的,那麼選擇拒絕執行任務
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果線程池任務無法加入到工作隊列(說明工作隊列滿了)
        //創建一個線程執行任務。如果新創建後當前運行的線程數大於
        //maximumPoolSize則拒絕執行任務
        else if (!addWorker(command, false))
            reject(command);
    }

如果線程池能夠創建線程執行任務,那麼將調用addWorker方法,將線程池創建的線程封裝為Worker,Worker在執行完任務後還會循環獲取隊列中任務來執行。看看addWorker方法的源碼:

    private boolean addWorker(Runnable firstTask, boolean core){
        //省略部分代碼
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //這裡就將提交的任務封裝成為Worker了
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //使用加鎖的方式原子添加工作線程
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //在獲得鎖期間再次檢查線程池的運行狀態:如果
                    //線程池已經關閉或者任務為空則拋出異常
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        //加入Worker數組
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //如果添加成功則啟動線程執行任務
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

之後我們看看執行t.start()後會發生的事,因為Worker本身實現了Runnable,所以start後將調用Worker的run方法,源碼如下:

        public void run() {
           runWorker(this);
       }
       final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    task.run();
                    afterExecute(task, thrown);
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
   }

以上源碼其實就干了一件事:創建的線程在執行完提交的任務後會反復從BlockingQueue中獲取任務來執行。

使用線程池
前面分析了線程池的執行過程以及對源碼進行了剖析,下面我們自己創建一個線程池並熟悉線程池的使用。創建一個線程池需要幾個輸入參數:

corePoolSize:線程池的基本大小。當提交一個任務給線程池時,線程池會創建一個線程執行任務,但是即使線程池有空閒的線程也會創建新的線程,直到執行的任務數大於等於corePoolSize時就不再創建。 runnableTaskQueue:用於保存等待執行的任務的阻塞隊列。 maximumPoolSize:線程池的最大數量。如果線程池的工作隊列滿了,並且已經創建的線程數小於最大的線程數,那麼線程池會再創建新的線程執行任務。 ThreadFactory:用於設置創建線程的工廠。 RejectedExecutionHandler:飽和策略。當隊列和線程池都滿了,說明線程已經處於飽和狀態,那麼必須采取一種策略處理提交的任務。默認的策略是拒絕執行,也就是拋出異常。

提交任務的方式有兩種:execute()和submit()。區別在於後者提交的任務是有返回值的。返回值可以通過Future對象的get()方法得到。執行後需要調用線程池的關閉關閉線程池。主要有shutdown()和shutdownNow()兩個方法,原理都是遍歷線程池中的工作線程,然後逐個調用線程的interrupt()方法中斷線程。區別在於shutdownNow()方法首先會把線程池的狀態設為STOP,然後嘗試終止正在執行的線程和等待執行的線程,並返回等待執行的任務列表;而shutdown()方法只是將線程池的狀態設為SHUTDOWN,然後中斷所有沒有正在執行的線程。一般而言,如果要等到任務執行完再關閉線程池,則調用shutdown()方法,如果不一定要等到把任務執行完,那麼就執行shutdownNow()方法。

一個簡單的線程池的例子如下(建議造輪子):

/**
 * Created by rhwayfun on 16-4-7.
 */
//線程池接口
public interface ThreadPool {
    //執行任務
    void execute(Job job);
    //關閉線程池
    void shutdown();
}
//演示用線程池
package com.rhwayfun.patchwork.concurrency.r0407;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

public class DemoThreadPool implements ThreadPool {

    //阻塞隊列
    private BlockingQueue workQueue = null;
    //保存線程池的工作線程
    private List demoThreadList = Collections.synchronizedList(new ArrayList());
    //線程池狀態
    private boolean isShutdown = false;
    //線程池默認的大小
    private static final int DEFAULT_WORKER_NUM = 5;
    //線程池最大的大小
    private static final int MAX_WORKER_NUM = 10;
    //線程池最小的大小
    private static final int MIN_WORKER_NUM = 1;
    //工作者線程的數量
    private int workNum;
    //線程編號
    private AtomicLong threadNum = new AtomicLong();

    public DemoThreadPool(int num) {
        workNum = num > MAX_WORKER_NUM ? MAX_WORKER_NUM : num < MIN_WORKER_NUM ? MIN_WORKER_NUM : num;
        init(workNum);
    }

    public DemoThreadPool() {
        init(DEFAULT_WORKER_NUM);
    }

    /**
     * 線程池初始化
     * @param workNum
     */
    private void init(int workNum) {
        //初始化工作隊列
        workQueue = new ArrayBlockingQueue<>(DEFAULT_WORKER_NUM);
        //將指定數量的工作線程加入到列表中
        for (int i = 0; i < workNum; i++) {
            demoThreadList.add(new DemoThread(workQueue));
        }
        //啟動指定數量的工作線程
        for (DemoThread thread : demoThreadList) {
            Thread worker = new Thread(thread, "ThreadPool-Worker-" + threadNum.incrementAndGet());
            System.out.println("ThreadPool-Worker-" + threadNum.get() + " add to workQueue!");
            worker.start();
        }
    }

    /**
     * 執行一個任務
     * @param job
     */
    @Override
    public void execute(Runnable job) {
        if (isShutdown) throw new IllegalStateException("ThreadPool is shutdown!");
        if (demoThreadList != null) {
                try {
                    //添加一個任務到工作隊列中
                    workQueue.put(job);
                    System.out.println("ThreadPool receives a task!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
        }
    }

    /**
     * 關閉線程池
     */
    @Override
    public void shutdown() {
            isShutdown = true;
            for (DemoThread t : demoThreadList) {
                t.stopToSelf();
            }
    }
}

//工作者線程
package com.rhwayfun.patchwork.concurrency.r0407;

import java.util.concurrent.BlockingQueue;

public class DemoThread implements Runnable {

    private BlockingQueue workQueue;
    private volatile boolean shutdown = false;

    public DemoThread(BlockingQueue workQueue) {
        this.workQueue = workQueue;
    }

    @Override
    public void run() {
        while (!shutdown){
            try {
                Runnable job = workQueue.take();
                job.run();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void stopToSelf(){
        shutdown = true;
        //調用interrupt方法讓等待在工作隊列打算出隊的線程從wait方法返回
        new Thread(this).interrupt();
    }
}
//測試程序
package com.rhwayfun.patchwork.concurrency.r0407;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

public class DemoThreadPoolTest {
    public static void main(String[] args) throws InterruptedException {
        //日期格式器
        final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");
        //創建線程池
        DemoThreadPool threadPool = new DemoThreadPool<>();
        //添加15個任務
        for (int i = 0; i < 15; i++){
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " get result " +format.format(new Date()));
                }
            };
            threadPool.execute(task);
        }
        threadPool.shutdown();
        TimeUnit.SECONDS.sleep(10);
        System.out.println("work done! Time is " + format.format(new Date()));
    }
}

以上程序的執行結果為:

ThreadPool-Worker-1 add to workQueue!
ThreadPool-Worker-2 add to workQueue!
ThreadPool-Worker-3 add to workQueue!
ThreadPool-Worker-4 add to workQueue!
ThreadPool-Worker-5 add to workQueue!
ThreadPool receives a task!
ThreadPool receives a task!
ThreadPool receives a task!
ThreadPool receives a task!
ThreadPool receives a task!
ThreadPool-Worker-1 get result 19:36:48
ThreadPool-Worker-1 get result 19:36:48
ThreadPool-Worker-1 get result 19:36:48
ThreadPool-Worker-1 get result 19:36:48
ThreadPool-Worker-1 get result 19:36:48
ThreadPool receives a task!
ThreadPool receives a task!
ThreadPool receives a task!
ThreadPool receives a task!
ThreadPool receives a task!
ThreadPool-Worker-1 get result 19:36:48
ThreadPool-Worker-1 get result 19:36:48
ThreadPool-Worker-1 get result 19:36:48
ThreadPool-Worker-1 get result 19:36:48
ThreadPool-Worker-1 get result 19:36:48
ThreadPool receives a task!
ThreadPool receives a task!
ThreadPool receives a task!
ThreadPool receives a task!
ThreadPool receives a task!
ThreadPool-Worker-1 get result 19:36:48
ThreadPool-Worker-2 get result 19:36:48
ThreadPool-Worker-3 get result 19:36:48
ThreadPool-Worker-4 get result 19:36:48
ThreadPool-Worker-5 get result 19:36:48
work done! Time is 19:37:03

從線程池的實現上可以看到,當客戶端調用execute(Runnable task)時,會不斷將任務加入工作隊列BlockingQueue中,而每個工作者線程DemoThread則不斷從工作隊列中取任務執行,當工作隊列為空時,工作者線程進入等待狀態。執行完任務後線程調用shutdown()方法關閉線程池,要注意的是調用了工作者線程的stopToSelf方法停止從工作隊列中取任務執行,除了把shutdown變量設為false外,還調用工作者線程的interrupt方法中斷線程,進行如此操作的目的是將因為需要取任務而陷入等待的工作者線程進行中斷從而讓其從wait方法返回,然後停止執行。

可以看到,線程池的本質就是使用了一個線程安全的工作隊列連接線程和客戶端線程,客戶端線程將任務放入工作隊列後便直接返回,而工作者線程則不斷從工作隊列上取出任務並執行。當工作隊列為空時,所有的工作者線程都等待在工作隊列上,當有客戶端提交新的任務時便會通知任意一個工作者線程,隨著新提交的任務的增多,將有更多的工作者線程被喚醒。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved