同步工具類主要包括閉鎖(如CountDownLatch),柵欄(如CyclicBarrier),信號量(如Semaphore)和阻塞隊列(如LinkedBlockingQueue)等;
使用同步工具類可以協調線程的控制流;
同步工具類封裝了一些狀態,這些狀態決定線程是繼續執行還是等待,此外同步工具類還提供了修改狀態的方法;
下面將簡單介紹以上同步工具類;
可以讓一個線程等待一組事件發生後(不一定要線程結束)繼續執行;
以CountDownLatch為例,內部包含一個計數器,一開始初始化為一個整數(事件個數),發生一個事件後,調用countDown方法,計數器減1,await用於等待計數器為0後繼續執行當前線程;
舉個例子如下,main線程等待其它子線程的事件發生後繼續執行main線程:
package concurrency;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class TaskTest implements Runnable {
private CountDownLatch latch;
private int sleepTime;
/**
*
*/
public TaskTest(int sleepTime, CountDownLatch latch) {
this.sleepTime = sleepTime;
this.latch = latch;
}
/**
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try {
CountDownLatchTest.print(" is running。");
TimeUnit.MILLISECONDS.sleep(sleepTime);
CountDownLatchTest.print(" finished。");
//計數器減減
latch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class CountDownLatchTest {
public static void main(String[] args) {
int count = 10;
final CountDownLatch latch = new CountDownLatch(count);
ExecutorService es = Executors.newFixedThreadPool(count);
for (int i = 0; i < count; i++) {
es.execute(new TaskTest((i + 1) * 1000, latch));
}
try {
CountDownLatchTest.print(" waiting...");
//主線程等待其它事件發生
latch.await();
//其它事件已發生,繼續執行主線程
CountDownLatchTest.print(" continue。。。");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
es.shutdown();
}
}
public static void print(String str){
SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
System.out.println("[" + dfdate.format(new Date()) + "]" + Thread.currentThread().getName() + str);
}
}
結果打印如下:
[09:41:43]pool-1-thread-1 is running。 [09:41:43]pool-1-thread-6 is running。 [09:41:43]main waiting... [09:41:43]pool-1-thread-10 is running。 [09:41:43]pool-1-thread-4 is running。 [09:41:43]pool-1-thread-5 is running。 [09:41:43]pool-1-thread-2 is running。 [09:41:43]pool-1-thread-3 is running。 [09:41:43]pool-1-thread-7 is running。 [09:41:43]pool-1-thread-8 is running。 [09:41:43]pool-1-thread-9 is running。 [09:41:44]pool-1-thread-1 finished。 [09:41:45]pool-1-thread-2 finished。 [09:41:46]pool-1-thread-3 finished。 [09:41:47]pool-1-thread-4 finished。 [09:41:48]pool-1-thread-5 finished。 [09:41:49]pool-1-thread-6 finished。 [09:41:50]pool-1-thread-7 finished。 [09:41:51]pool-1-thread-8 finished。 [09:41:52]pool-1-thread-9 finished。 [09:41:53]pool-1-thread-10 finished。 [09:41:53]main continue。。。
此外,FutureTask也可用作閉鎖,其get方法會等待任務完成後返回結果,否則一直阻塞直到任務完成;
控制同時執行某個指定操作的數量,常用於實現資源池,如數據庫連接池,線程池...
以Semaphore為例,其內部維護一組資源,可以通過構造函數指定數目,其它線程在執行的時候,可以通過acquire方法獲取資源,有的話,繼續執行(使用結束後釋放資源),沒有資源的話將阻塞直到有其它線程調用release方法釋放資源;
舉個例子,如下代碼,十個線程競爭三個資源,一開始有三個線程可以直接運行,剩下的七個線程只能阻塞等到其它線程使用資源完畢才能執行;
package concurrency;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreTest {
public static void print(String str){
SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
System.out.println("[" + dfdate.format(new Date()) + "]" + Thread.currentThread().getName() + str);
}
public static void main(String[] args) {
// 線程數目
int threadCount = 10;
// 資源數目
Semaphore semaphore = new Semaphore(3);
ExecutorService es = Executors.newFixedThreadPool(threadCount);
// 啟動若干線程
for (int i = 0; i < threadCount; i++)
es.execute(new ConsumeResourceTask((i + 1) * 1000, semaphore));
}
}
class ConsumeResourceTask implements Runnable {
private Semaphore semaphore;
private int sleepTime;
/**
*
*/
public ConsumeResourceTask(int sleepTime, Semaphore semaphore) {
this.sleepTime = sleepTime;
this.semaphore = semaphore;
}
public void run() {
try {
//獲取資源
semaphore.acquire();
SemaphoreTest.print(" 占用一個資源...");
TimeUnit.MILLISECONDS.sleep(sleepTime);
SemaphoreTest.print(" 資源使用結束,釋放資源");
//釋放資源
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
[10:30:11]pool-1-thread-1 占用一個資源... [10:30:11]pool-1-thread-2 占用一個資源... [10:30:11]pool-1-thread-3 占用一個資源... [10:30:12]pool-1-thread-1 資源使用結束,釋放資源 [10:30:12]pool-1-thread-4 占用一個資源... [10:30:13]pool-1-thread-2 資源使用結束,釋放資源 [10:30:13]pool-1-thread-5 占用一個資源... [10:30:14]pool-1-thread-3 資源使用結束,釋放資源 [10:30:14]pool-1-thread-8 占用一個資源... [10:30:16]pool-1-thread-4 資源使用結束,釋放資源 [10:30:16]pool-1-thread-6 占用一個資源... [10:30:18]pool-1-thread-5 資源使用結束,釋放資源 [10:30:18]pool-1-thread-9 占用一個資源... [10:30:22]pool-1-thread-8 資源使用結束,釋放資源 [10:30:22]pool-1-thread-7 占用一個資源... [10:30:22]pool-1-thread-6 資源使用結束,釋放資源 [10:30:22]pool-1-thread-10 占用一個資源... [10:30:27]pool-1-thread-9 資源使用結束,釋放資源 [10:30:29]pool-1-thread-7 資源使用結束,釋放資源 [10:30:32]pool-1-thread-10 資源使用結束,釋放資源
柵欄用於等待其它線程,且會阻塞自己當前線程;
所有線程必須同時到達柵欄位置後,才能繼續執行;
舉個例子如下:
package concurrency;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class CyclicBarrierTaskTest implements Runnable {
private CyclicBarrier cyclicBarrier;
private int timeout;
public CyclicBarrierTaskTest(CyclicBarrier cyclicBarrier, int timeout) {
this.cyclicBarrier = cyclicBarrier;
this.timeout = timeout;
}
@Override
public void run() {
TestCyclicBarrier.print(" 正在running...");
try {
TimeUnit.MILLISECONDS.sleep(timeout);
TestCyclicBarrier.print(" 到達柵欄處,等待其它線程到達");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
TestCyclicBarrier.print(" 所有線程到達柵欄處,繼續執行各自線程任務...");
}
}
public class TestCyclicBarrier {
public static void print(String str) {
SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
System.out.println("[" + dfdate.format(new Date()) + "]"
+ Thread.currentThread().getName() + str);
}
public static void main(String[] args) {
int count = 5;
ExecutorService es = Executors.newFixedThreadPool(count);
CyclicBarrier barrier = new CyclicBarrier(count, new Runnable() {
@Override
public void run() {
TestCyclicBarrier.print(" 所有線程到達柵欄處,可以在此做一些處理...");
}
});
for (int i = 0; i < count; i++)
es.execute(new CyclicBarrierTaskTest(barrier, (i + 1) * 1000));
}
}
[11:07:00]pool-1-thread-2 正在running... [11:07:00]pool-1-thread-1 正在running... [11:07:00]pool-1-thread-5 正在running... [11:07:00]pool-1-thread-3 正在running... [11:07:00]pool-1-thread-4 正在running... [11:07:01]pool-1-thread-1 到達柵欄處,等待其它線程到達 [11:07:02]pool-1-thread-2 到達柵欄處,等待其它線程到達 [11:07:03]pool-1-thread-3 到達柵欄處,等待其它線程到達 [11:07:04]pool-1-thread-4 到達柵欄處,等待其它線程到達 [11:07:05]pool-1-thread-5 到達柵欄處,等待其它線程到達 [11:07:05]pool-1-thread-5 所有線程到達柵欄處,可以在此做一些處理... [11:07:05]pool-1-thread-1 所有線程到達柵欄處,繼續執行各自線程任務... [11:07:05]pool-1-thread-2 所有線程到達柵欄處,繼續執行各自線程任務... [11:07:05]pool-1-thread-5 所有線程到達柵欄處,繼續執行各自線程任務... [11:07:05]pool-1-thread-3 所有線程到達柵欄處,繼續執行各自線程任務... [11:07:05]pool-1-thread-4 所有線程到達柵欄處,繼續執行各自線程任務...
阻塞隊列提供了可阻塞的入隊和出對操作,如果隊列滿了,入隊操作將阻塞直到有空間可用,如果隊列空了,出隊操作將阻塞直到有元素可用;
隊列可以為有界和無界隊列,無界隊列不會滿,因此入隊操作將不會阻塞;
下面將使用阻塞隊列LinkedBlockingQueue舉個生產者-消費者例子,生產者每隔1秒生產1個產品,然後有6個消費者在消費產品,可以發現,每隔1秒,只有一個消費者能夠獲取到產品消費,其它線程只能等待...
如下代碼:
package concurrency;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
//生產者
public class Producer implements Runnable {
private final BlockingQueue<String> fileQueue;
public Producer(BlockingQueue<String> queue) {
this.fileQueue = queue;
}
public void run() {
try {
while (true) {
TimeUnit.MILLISECONDS.sleep(1000);
String produce = this.produce();
System.out.println(Thread.currentThread() + "生產:" + produce);
fileQueue.put(produce);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public String produce() {
SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
return dfdate.format(new Date());
}
public static void main(String[] args) {
BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
for (int i = 0; i < 1; i++) {
new Thread(new Producer(queue)).start();
}
for (int i = 0; i < 6; i++) {
new Thread(new Consumer(queue)).start();
}
}
}
// 消費者
class Consumer implements Runnable {
private final BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println(Thread.currentThread() + "prepare 消費");
System.out.println(Thread.currentThread() + "starting:"
+ queue.take());
System.out.println(Thread.currentThread() + "end 消費");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Thread[Thread-1,5,main]prepare 消費 Thread[Thread-3,5,main]prepare 消費 Thread[Thread-4,5,main]prepare 消費 Thread[Thread-2,5,main]prepare 消費 Thread[Thread-6,5,main]prepare 消費 Thread[Thread-5,5,main]prepare 消費 Thread[Thread-0,5,main]生產:11:36:36 Thread[Thread-1,5,main]starting:11:36:36 Thread[Thread-1,5,main]end 消費 Thread[Thread-1,5,main]prepare 消費 Thread[Thread-0,5,main]生產:11:36:37 Thread[Thread-4,5,main]starting:11:36:37 Thread[Thread-4,5,main]end 消費 Thread[Thread-4,5,main]prepare 消費 Thread[Thread-0,5,main]生產:11:36:38 Thread[Thread-3,5,main]starting:11:36:38 Thread[Thread-3,5,main]end 消費
...
參考資料:java並發編程實戰