在執行並發任務時,我們可以把任務傳遞給一個線程池,來替代為每個並發執行的任務都啟動一個新的線程,只要池裡有空閒的線程,任務就會分配一個線程執行。在線程池的內部,任務被插入一個阻塞隊列(BlockingQueue),線程池裡的線程會去取這個隊列裡的任務。
利用線程池有三個好處:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
創建一個線程池需要的幾個參數:
1、ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
2、LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
3、SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
4、PriorityBlockingQueue:一個具有優先級得無限阻塞隊列。
此外,我們還可以通過調用Ececutors中的某個靜態工廠方法來創建一個線程池(它們的內部實現原理都是相同的,僅僅是使用了不同的工作隊列或線程池大小):
newFixedThreadPool:創建一個定長的線程池,每當提交一個任務就創建一個線程,直到達到池的最大長度,這時線程池會保持長度不在變化
newCachedThreadPool:創建一個可緩存的線程池,如果當前的線程池的長度超過了處理的需要時,它可以靈活的回收空閒的線程,當需求增加時,它可以靈活的添加新的線程,並不會對池的長度做任何限制
newSingleThreadPool:創建一個單線程化的executor,它只會創建唯一的工作者線程來執行任務
newScheduledThreadPool:創建一個定長的線程池,而且支持定時的以及周期性的任務執行,類似於Timer
可以使用execute向線程池提交任務:
public class Test2
{
public static void main(String[] args)
{
BlockingQueue<Runnable> workQueue=new LinkedBlockingDeque<Runnable>();
ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2, 3, 60, TimeUnit.SECONDS, workQueue);
poolExecutor.execute(new Task1());
poolExecutor.execute(new Task2());
poolExecutor.shutdown();
}
}
class Task1 implements Runnable
{
public void run()
{
System.out.println("執行任務1");
}
}
class Task2 implements Runnable
{
public void run()
{
System.out.println("執行任務2");
}
}
也可以使用submit方法來提交任務,它會返回一個future,我們可以通過這個future來判斷任務是否執行成功,通過future的get方法獲取返回值,get方法會阻塞直到任務完成。
public class Test3
{
public static void main(String[] args) throws InterruptedException, ExecutionException
{
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<String>> resultList = new ArrayList<Future<String>>();
// 創建10個任務並執行
for (int i = 0; i < 10; i++)
{
// 使用ExecutorService執行Callable類型的任務,並將結果保存在future變量中
Future<String> future = executorService.submit(new TaskWithResult(i));
resultList.add(future);
}
for (Future<String> future : resultList)
{
while (!future.isDone());// Future返回如果沒有完成,則一直循環等待,直到Future返回完成
{
System.out.println(future.get()); // 打印各個線程(任務)執行的結果
}
}
executorService.shutdown();
}
}
class TaskWithResult implements Callable<String>
{
private int id;
public TaskWithResult(int id)
{
this.id = id;
}
public String call() throws Exception
{
return "執行結果"+id;
}
}
可以通過調用線程池的shutdown或shutdownNow方法來關閉線程池,但是它們的實現原理不同,shutdown的原理是只是將線程池的狀態設置成SHUTDOWN狀態,然後中斷所有沒有正在執行任務的線程。shutdownNow的原理是遍歷線程池中的工作線程,然後逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。shutdownNow會首先將線程池的狀態設置成STOP,然後嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表。
只要調用了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當所有的任務都已關閉後,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至於我們應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow。
整個ThreadExecutor的任務處理經過下面4個步驟,如下圖所示:

1、如果當前的線程數<corePoolSize,提交的Runnable任務,會直接作為new Thread的參數,立即執行,當提交的任務數超過了corePoolSize,就進入第二部操作
2、將當前的任務提交到BlockingQueue阻塞隊列中,如果Block Queue是個有界隊列,當隊列滿了之後就進入第三步
3、如果poolSize < maximumPoolsize時,會嘗試new 一個Thread的進行救急處理,執行對應的runnable任務
4、如果第三步也無法處理,就會用RejectedExecutionHandler來做拒絕處理
Timer工具管理任務的定時以及周期性執行。示例代碼如下:
public class TimerTest
{
final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args)
{
TimerTask timerTask1=new TimerTask()
{
@Override
public void run()
{
System.out.println("任務1執行時間:"+sdf.format(new Date()));
try
{
Thread.sleep(3000);//模擬任務1執行時間為3秒
}
catch (InterruptedException e)
{
// TODO 自動生成的 catch 塊
e.printStackTrace();
}
}
};
System.out.println("當前時間:"+sdf.format(new Date()));
Timer timer=new Timer();
timer.schedule(timerTask1, new Date(),4000); //間隔4秒周期性執行
}
}
執行結果:

可以看到上述任務1以4秒為間隔周期性執行。但是Timer存在一些缺陷,主要是下面兩個方面的問題:
缺陷1:Timer只創建唯一的線程的來執行所有的Timer任務,如果一個time任務的執行很耗時,會導致其他的TimeTask的時效准確性出問題。看下面的例子:
public class TimerTest
{
final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args)
{
TimerTask timerTask1=new TimerTask()
{
@Override
public void run()
{
System.out.println("任務1執行時間:"+sdf.format(new Date()));
try
{
Thread.sleep(10000);
}
catch (InterruptedException e)
{
// TODO 自動生成的 catch 塊
e.printStackTrace();
}
}
};
TimerTask timerTask2=new TimerTask()
{
@Override
public void run()
{
System.out.println("任務2執行時間:"+sdf.format(new Date()));
}
};
System.out.println("當前時間:"+sdf.format(new Date()));
Timer timer=new Timer();
timer.schedule(timerTask1, new Date(),1000); //間隔1秒周期性執行
timer.schedule(timerTask2, new Date(),4000); //間隔4秒周期性執行
}
}
執行結果:

缺陷2:如果TimeTask拋出未檢查的異常,Timer將產生無法預料的行為。Timer線程並不捕獲線程,所有TimerTask拋出的未檢查的異常會終止timer線程。看下面的代碼:
public class TimerTest2
{
final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args)
{
TimerTask timerTask1=new TimerTask()
{
@Override
public void run()
{
System.out.println("任務1執行時間:"+sdf.format(new Date()));
throw new RuntimeException();
}
};
TimerTask timerTask2=new TimerTask()
{
@Override
public void run()
{
System.out.println("任務2執行時間:"+sdf.format(new Date()));
}
};
System.out.println("當前時間:"+sdf.format(new Date()));
Timer timer=new Timer();
timer.schedule(timerTask1, new Date(),1000); //周期1秒執行任務1
timer.schedule(timerTask2, new Date() ,3000); //周期3秒執行任務2
}
}
執行結果為:

針對上述的兩個問題,我們可以使用ScheduledThreadPoolExecutor來作為Timer的替代。
針對問題1,有下面代碼:
public class ScheduledThreadPoolExecutorTest
{
final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args)
{
TimerTask timerTask1 = new TimerTask()
{
@Override
public void run()
{
System.out.println("任務1執行時間:" + sdf.format(new Date()));
try
{
Thread.sleep(10000);
} catch (InterruptedException e)
{
// TODO 自動生成的 catch 塊
e.printStackTrace();
}
}
};
TimerTask timerTask2 = new TimerTask()
{
@Override
public void run()
{
System.out.println("任務2執行時間:" + sdf.format(new Date()));
}
};
System.out.println("當前時間:" + sdf.format(new Date()));
ScheduledThreadPoolExecutor poolExecutor=new ScheduledThreadPoolExecutor(2);
poolExecutor.scheduleAtFixedRate(timerTask1, 0, 1000,TimeUnit.MILLISECONDS);
poolExecutor.scheduleAtFixedRate(timerTask2, 0, 4000,TimeUnit.MILLISECONDS);
}
}
執行的結果為:

針對問題2,有下面代碼:
public class ScheduledThreadPoolExecutorTest2
{
final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args)
{
TimerTask timerTask1 = new TimerTask()
{
@Override
public void run()
{
System.out.println("任務1執行時間:" + sdf.format(new Date()));
throw new RuntimeException();
}
};
TimerTask timerTask2 = new TimerTask()
{
@Override
public void run()
{
System.out.println("任務2執行時間:" + sdf.format(new Date()));
}
};
System.out.println("當前時間:" + sdf.format(new Date()));
ScheduledThreadPoolExecutor poolExecutor=new ScheduledThreadPoolExecutor(2);
poolExecutor.scheduleAtFixedRate(timerTask1, 0, 1000, TimeUnit.MILLISECONDS);
poolExecutor.scheduleAtFixedRate(timerTask2, 0, 2000, TimeUnit.MILLISECONDS);
}
}
執行結果為:

1、Java並發編程實踐