Java 高並發五:JDK並發包1具體引見。本站提示廣大學習愛好者:(Java 高並發五:JDK並發包1具體引見)文章只能為提供參考,不一定能成為您想要的結果。以下是Java 高並發五:JDK並發包1具體引見正文
在[高並發Java 二] 多線程基本中,我們曾經初步提到了根本的線程同步操作。此次要提到的是在並發包中的同步掌握對象。
1. 各類同步掌握對象的應用
1.1 ReentrantLock
ReentrantLock感到上是synchronized的加強版,synchronized的特色是應用簡略,一切交給JVM行止理,然則功效上是比擬軟弱的。在JDK1.5之前,ReentrantLock的機能要好過synchronized,因為對JVM停止了優化,如今的JDK版本中,二者機能是平起平坐的。假如是簡略的完成,不要銳意去應用ReentrantLock。
比擬於synchronized,ReentrantLock在功效上加倍豐碩,它具有可重入、可中止、可限時、公正鎖等特色。
起首我們經由過程一個例子來講明ReentrantLock最後步的用法:
package test;
import java.util.concurrent.locks.ReentrantLock;
public class Test implements Runnable
{
public static ReentrantLock lock = new ReentrantLock();
public static int i = 0;
@Override
public void run()
{
for (int j = 0; j < 10000000; j++)
{
lock.lock();
try
{
i++;
}
finally
{
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException
{
Test test = new Test();
Thread t1 = new Thread(test);
Thread t2 = new Thread(test);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(i);
}
}
有兩個線程都對i停止++操作,為了包管線程平安,應用了 ReentrantLock,從用法上可以看出,與 synchronized比擬, ReentrantLock就略微龐雜一點。由於必需在finally中停止解鎖操作,假如不在 finally解鎖,有能夠代碼湧現異常鎖沒被釋放,而synchronized是由JVM來釋放鎖。
那末ReentrantLock究竟有哪些優良的特色呢?
1.1.1 可重入
單線程可以反復進入,但要反復加入
lock.lock();
lock.lock();
try
{
i++;
}
finally
{
lock.unlock();
lock.unlock();
}
因為ReentrantLock是重入鎖,所以可以重復獲得雷同的一把鎖,它有一個與鎖相干的獲得計數器,假如具有鎖的某個線程再次獲得鎖,那末獲得計數器就加1,然後鎖須要被釋放兩次能力取得真正釋放(重入鎖)。這模擬了 synchronized 的語義;假如線程進入由線程曾經具有的監控器掩護的 synchronized 塊,就許可線程持續停止,當線程加入第二個(或許後續) synchronized 塊的時刻,不釋放鎖,只要線程加入它進入的監控器掩護的第一個synchronized 塊時,才釋放鎖。
public class Child extends Father implements Runnable{
final static Child child = new Child();//為了包管鎖獨一
public static void main(String[] args) {
for (int i = 0; i < 50; i++) {
new Thread(child).start();
}
}
public synchronized void doSomething() {
System.out.println("1child.doSomething()");
doAnotherThing(); // 挪用本身類中其他的synchronized辦法
}
private synchronized void doAnotherThing() {
super.doSomething(); // 挪用父類的synchronized辦法
System.out.println("3child.doAnotherThing()");
}
@Override
public void run() {
child.doSomething();
}
}
class Father {
public synchronized void doSomething() {
System.out.println("2father.doSomething()");
}
}
我們可以看到一個線程進入分歧的 synchronized辦法,是不會釋放之前獲得的鎖的。所以輸入照樣次序輸入。所以synchronized也是重入鎖
輸入:
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
...
1.1.2.可中止
與synchronized分歧的是,ReentrantLock對中止是有呼應的。中止相干常識檢查[高並發Java 二] 多線程基本
通俗的lock.lock()是不克不及呼應中止的,lock.lockInterruptibly()可以或許呼應中止。
我們模仿出一個逝世鎖現場,然後用中止來處置逝世鎖
package test;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.locks.ReentrantLock;
public class Test implements Runnable
{
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
public Test(int lock)
{
this.lock = lock;
}
@Override
public void run()
{
try
{
if (lock == 1)
{
lock1.lockInterruptibly();
try
{
Thread.sleep(500);
}
catch (Exception e)
{
// TODO: handle exception
}
lock2.lockInterruptibly();
}
else
{
lock2.lockInterruptibly();
try
{
Thread.sleep(500);
}
catch (Exception e)
{
// TODO: handle exception
}
lock1.lockInterruptibly();
}
}
catch (Exception e)
{
// TODO: handle exception
}
finally
{
if (lock1.isHeldByCurrentThread())
{
lock1.unlock();
}
if (lock2.isHeldByCurrentThread())
{
lock2.unlock();
}
System.out.println(Thread.currentThread().getId() + ":線程加入");
}
}
public static void main(String[] args) throws InterruptedException
{
Test t1 = new Test(1);
Test t2 = new Test(2);
Thread thread1 = new Thread(t1);
Thread thread2 = new Thread(t2);
thread1.start();
thread2.start();
Thread.sleep(1000);
//DeadlockChecker.check();
}
static class DeadlockChecker
{
private final static ThreadMXBean mbean = ManagementFactory
.getThreadMXBean();
final static Runnable deadlockChecker = new Runnable()
{
@Override
public void run()
{
// TODO Auto-generated method stub
while (true)
{
long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
if (deadlockedThreadIds != null)
{
ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadIds);
for (Thread t : Thread.getAllStackTraces().keySet())
{
for (int i = 0; i < threadInfos.length; i++)
{
if(t.getId() == threadInfos[i].getThreadId())
{
t.interrupt();
}
}
}
}
try
{
Thread.sleep(5000);
}
catch (Exception e)
{
// TODO: handle exception
}
}
}
};
public static void check()
{
Thread t = new Thread(deadlockChecker);
t.setDaemon(true);
t.start();
}
}
}
上述代碼有能夠會產生逝世鎖,線程1獲得lock1,線程2獲得lock2,然後彼此又想取得對方的鎖。
我們用jstack檢查運轉上述代碼後的情形
切實其實發明了一個逝世鎖。
DeadlockChecker.check();辦法用來檢測逝世鎖,然後把逝世鎖的線程中止。中止後,線程正常加入。
1.1.3.可限時
超時不克不及取得鎖,就前往false,不會永遠期待組成逝世鎖
應用lock.tryLock(long timeout, TimeUnit unit)來完成可限時鎖,參數為時光和單元。
舉個例子來講明下可限時:
package test;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class Test implements Runnable
{
public static ReentrantLock lock = new ReentrantLock();
@Override
public void run()
{
try
{
if (lock.tryLock(5, TimeUnit.SECONDS))
{
Thread.sleep(6000);
}
else
{
System.out.println("get lock failed");
}
}
catch (Exception e)
{
}
finally
{
if (lock.isHeldByCurrentThread())
{
lock.unlock();
}
}
}
public static void main(String[] args)
{
Test t = new Test();
Thread t1 = new Thread(t);
Thread t2 = new Thread(t);
t1.start();
t2.start();
}
}
應用兩個線程來爭取一把鎖,當某個線程取得鎖後,sleep6秒,每一個線程都只測驗考試5秒去取得鎖。
所以一定有一個線程沒法取得鎖。沒法取得後就直接加入了。
輸入:
get lock failed
1.1.4.公正鎖
應用方法:
public ReentrantLock(boolean fair)
public static ReentrantLock fairLock = new ReentrantLock(true);
普通意義上的鎖是不公正的,紛歧定先來的線程能先獲得鎖,後來的線程就後獲得鎖。不公正的鎖能夠會發生饑餓景象。
公正鎖的意思就是,這個鎖能包管線程是先來的先獲得鎖。固然公正鎖不會發生饑餓景象,然則公正鎖的機能會比非公正鎖差許多。
1.2 Condition
Condition與ReentrantLock的關系就相似於synchronized與Object.wait()/signal()
await()辦法會使以後線程期待,同時釋放以後鎖,當其他線程中應用signal()時或許signalAll()辦法時,線 程會從新取得鎖並持續履行。或許當線程被中止時,也能跳出期待。這和Object.wait()辦法很類似。
awaitUninterruptibly()辦法與await()辦法根本雷同,然則它其實不會再期待進程中呼應中止。 singal()辦法用於叫醒一個在期待中的線程。絕對的singalAll()辦法會叫醒一切在期待中的線程。這和Obejct.notify()辦法很相似。
這裡就不再具體引見了。舉個例子來講明:
package test;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Test implements Runnable
{
public static ReentrantLock lock = new ReentrantLock();
public static Condition condition = lock.newCondition();
@Override
public void run()
{
try
{
lock.lock();
condition.await();
System.out.println("Thread is going on");
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException
{
Test t = new Test();
Thread thread = new Thread(t);
thread.start();
Thread.sleep(2000);
lock.lock();
condition.signal();
lock.unlock();
}
}
上述例子很簡略,讓一個線程await住,讓主線程去叫醒它。condition.await()/signal只能在獲得鎖今後應用。
1.3.Semaphore
關於鎖來講,它是互斥的排他的。意思就是,只需我取得了鎖,沒人能再取得了。
而關於Semaphore來講,它許可多個線程同時進入臨界區。可以以為它是一個同享鎖,然則同享的額度是無限制的,額度用完了,其他沒有拿到額度的線程照樣要壅塞在臨界區外。當額度為1時,就相等於lock
上面舉個例子:
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class Test implements Runnable
{
final Semaphore semaphore = new Semaphore(5);
@Override
public void run()
{
try
{
semaphore.acquire();
Thread.sleep(2000);
System.out.println(Thread.currentThread().getId() + " done");
}
catch (Exception e)
{
e.printStackTrace();
}finally {
semaphore.release();
}
}
public static void main(String[] args) throws InterruptedException
{
ExecutorService executorService = Executors.newFixedThreadPool(20);
final Test t = new Test();
for (int i = 0; i < 20; i++)
{
executorService.submit(t);
}
}
}
有一個20個線程的線程池,每一個線程都去 Semaphore的允許,Semaphore的允許只要5個,運轉後可以看到,5個一批,一批一批地輸入。
固然一個線程也能夠一次請求多個允許
public void acquire(int permits) throws InterruptedException
1.4 ReadWriteLock
ReadWriteLock是辨別功效的鎖。讀和寫是兩種分歧的功效,讀-讀不互斥,讀-寫互斥,寫-寫互斥。
如許的設計是並發量進步了,又包管了數據平安。
應用方法:
private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
具體例子可以檢查 Java完成臨盆者花費者成績與讀者寫者成績,這裡就不睜開了。
1.5 CountDownLatch
倒數計時器
一種典范的場景就是火箭發射。在火箭發射前,為了包管滿有把握,常常還要停止各項裝備、儀器的檢討。 只要等一切檢討終了後,引擎能力焚燒。這類場景就異常合適應用CountDownLatch。它可使得焚燒線程
,期待一切檢討線程全體落成後,再履行
應用方法:
static final CountDownLatch end = new CountDownLatch(10);
end.countDown();
end.await();
表示圖:
一個簡略的例子:
package test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test implements Runnable
{
static final CountDownLatch countDownLatch = new CountDownLatch(10);
static final Test t = new Test();
@Override
public void run()
{
try
{
Thread.sleep(2000);
System.out.println("complete");
countDownLatch.countDown();
}
catch (Exception e)
{
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException
{
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++)
{
executorService.execute(t);
}
countDownLatch.await();
System.out.println("end");
executorService.shutdown();
}
}
主線程必需期待10個線程全體履行完才會輸入"end"。
1.6 CyclicBarrier
和CountDownLatch類似,也是期待某些線程都做完今後再履行。與CountDownLatch差別在於這個計數器可以重復應用。好比,假定我們將計數器設置為10。那末湊齊第一批1 0個線程後,計數器就會歸零,然後接著湊齊下一批10個線程
應用方法:
public CyclicBarrier(int parties, Runnable barrierAction)
barrierAction就是當計數器一次計數完成後,體系會履行的舉措
await()
表示圖:
上面舉個例子:
package test;
import java.util.concurrent.CyclicBarrier;
public class Test implements Runnable
{
private String soldier;
private final CyclicBarrier cyclic;
public Test(String soldier, CyclicBarrier cyclic)
{
this.soldier = soldier;
this.cyclic = cyclic;
}
@Override
public void run()
{
try
{
//期待一切兵士到齊
cyclic.await();
dowork();
//期待一切兵士完成任務
cyclic.await();
}
catch (Exception e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void dowork()
{
// TODO Auto-generated method stub
try
{
Thread.sleep(3000);
}
catch (Exception e)
{
// TODO: handle exception
}
System.out.println(soldier + ": done");
}
public static class BarrierRun implements Runnable
{
boolean flag;
int n;
public BarrierRun(boolean flag, int n)
{
super();
this.flag = flag;
this.n = n;
}
@Override
public void run()
{
if (flag)
{
System.out.println(n + "個義務完成");
}
else
{
System.out.println(n + "個聚集完成");
flag = true;
}
}
}
public static void main(String[] args)
{
final int n = 10;
Thread[] threads = new Thread[n];
boolean flag = false;
CyclicBarrier barrier = new CyclicBarrier(n, new BarrierRun(flag, n));
System.out.println("聚集");
for (int i = 0; i < n; i++)
{
System.out.println(i + "報導");
threads[i] = new Thread(new Test("兵士" + i, barrier));
threads[i].start();
}
}
}
打印成果:
聚集
0報導
1報導
2報導
3報導
4報導
5報導
6報導
7報導
8報導
9報導
10個聚集完成
兵士5: done
兵士7: done
兵士8: done
兵士3: done
兵士4: done
兵士1: done
兵士6: done
兵士2: done
兵士0: done
兵士9: done
10個義務完成
1.7 LockSupport
供給線程壅塞原語
和suspend相似
LockSupport.park();
LockSupport.unpark(t1);
與suspend比擬 不輕易惹起線程解凍
LockSupport的思惟呢,和 Semaphore有點類似,外部有一個允許,park的時刻拿失落這個允許,unpark的時刻請求這個允許。所以假如unpark在park之前,是不會產生線程解凍的。
上面的代碼是[高並發Java 二] 多線程基本中suspend示例代碼,在應用suspend時會產生逝世鎖。
package test;
import java.util.concurrent.locks.LockSupport;
public class Test
{
static Object u = new Object();
static TestSuspendThread t1 = new TestSuspendThread("t1");
static TestSuspendThread t2 = new TestSuspendThread("t2");
public static class TestSuspendThread extends Thread
{
public TestSuspendThread(String name)
{
setName(name);
}
@Override
public void run()
{
synchronized (u)
{
System.out.println("in " + getName());
//Thread.currentThread().suspend();
LockSupport.park();
}
}
}
public static void main(String[] args) throws InterruptedException
{
t1.start();
Thread.sleep(100);
t2.start();
// t1.resume();
// t2.resume();
LockSupport.unpark(t1);
LockSupport.unpark(t2);
t1.join();
t2.join();
}
}
而應用 LockSupport則不會產生逝世鎖。
別的
park()可以或許呼應中止,但不拋出異常。中止呼應的成果是,park()函數的前往,可以從Thread.interrupted()獲得中止標記。
在JDK傍邊有年夜量處所應用到了park,固然LockSupport的完成也是應用unsafe.park()來完成的。
public static void park() {
unsafe.park(false, 0L);
}
1.8 ReentrantLock 的完成
上面來引見下ReentrantLock的完成,ReentrantLock的完成重要由3部門構成:
ReentrantLock的父類中會有一個state變量來表現同步的狀況
/** * The synchronization state. */ private volatile int state;
經由過程CAS操作來設置state來獲得鎖,假如設置成了1,則將鎖的持有者給以後線程
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
假如拿鎖不勝利,則會做一個請求
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
起首,再去請求下嘗嘗看tryAcquire,由於此時能夠另外一個線程曾經釋放了鎖。
假如照樣沒有請求到鎖,就addWaiter,意思是把本身加到期待隊列中去
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
其間還會有屢次測驗考試去請求鎖,假如照樣請求不到,就會被掛起
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
同理,假如在unlock操作中,就是釋放了鎖,然後unpark,這裡就不詳細講了。
2. 並發容器及典范源碼剖析
2.1 ConcurrentHashMap
我們曉得HashMap不是一個線程平安的容器,最簡略的方法使HashMap釀成線程平安就是應用
Collections.synchronizedMap,它是對HashMap的一個包裝
public static Map m=Collections.synchronizedMap(new HashMap());
同理關於List,Set也供給了類似辦法。
然則這類方法只合適於並發量比擬小的情形。
我們來看下synchronizedMap的完成
private final Map<K,V> m; // Backing Map
final Object mutex; // Object on which to synchronize
SynchronizedMap(Map<K,V> m) {
if (m==null)
throw new NullPointerException();
this.m = m;
mutex = this;
}
SynchronizedMap(Map<K,V> m, Object mutex) {
this.m = m;
this.mutex = mutex;
}
public int size() {
synchronized (mutex) {return m.size();}
}
public boolean isEmpty() {
synchronized (mutex) {return m.isEmpty();}
}
public boolean containsKey(Object key) {
synchronized (mutex) {return m.containsKey(key);}
}
public boolean containsValue(Object value) {
synchronized (mutex) {return m.containsValue(value);}
}
public V get(Object key) {
synchronized (mutex) {return m.get(key);}
}
public V put(K key, V value) {
synchronized (mutex) {return m.put(key, value);}
}
public V remove(Object key) {
synchronized (mutex) {return m.remove(key);}
}
public void putAll(Map<? extends K, ? extends V> map) {
synchronized (mutex) {m.putAll(map);}
}
public void clear() {
synchronized (mutex) {m.clear();}
}
它會將HashMap包裝在外面,然後將HashMap的每一個操作都加上synchronized。
因為每一個辦法都是獲得統一把鎖(mutex),這就意味著,put和remove等操作是互斥的,年夜年夜削減了並發量。
上面來看下ConcurrentHashMap是若何完成的
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}
在 ConcurrentHashMap外部有一個Segment段,它將年夜的HashMap切分紅若干個段(小的HashMap),然後讓數據在每段上Hash,如許多個線程在分歧段上的Hash操作必定是線程平安的,所以只須要同步統一個段上的線程便可以了,如許完成了鎖的分別,年夜年夜增長了並發量。
在應用ConcurrentHashMap.size時會比擬費事,由於它要統計每一個段的數據和,在這個時刻,要把每個段都加上鎖,然後再做數據統計。這個就是把鎖分別後的小小弊病,然則size辦法應當是不會被高頻率挪用的辦法。
在完成上,不應用synchronized和lock.lock而是盡可能應用trylock,同時在HashMap的完成上,也做了一點優化。這裡就不提了。
2.2 BlockingQueue
BlockingQueue不是一個高機能的容器。然則它是一個異常好的同享數據的容器。是典范的臨盆者和花費者的完成。
表示圖:
詳細可以檢查Java完成臨盆者花費者成績與讀者寫者成績