程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> Java 高並發五:JDK並發包1具體引見

Java 高並發五:JDK並發包1具體引見

編輯:關於JAVA

Java 高並發五:JDK並發包1具體引見。本站提示廣大學習愛好者:(Java 高並發五:JDK並發包1具體引見)文章只能為提供參考,不一定能成為您想要的結果。以下是Java 高並發五:JDK並發包1具體引見正文


在[高並發Java 二] 多線程基本中,我們曾經初步提到了根本的線程同步操作。此次要提到的是在並發包中的同步掌握對象。

1. 各類同步掌握對象的應用

1.1 ReentrantLock

ReentrantLock感到上是synchronized的加強版,synchronized的特色是應用簡略,一切交給JVM行止理,然則功效上是比擬軟弱的。在JDK1.5之前,ReentrantLock的機能要好過synchronized,因為對JVM停止了優化,如今的JDK版本中,二者機能是平起平坐的。假如是簡略的完成,不要銳意去應用ReentrantLock。

比擬於synchronized,ReentrantLock在功效上加倍豐碩,它具有可重入、可中止、可限時、公正鎖等特色。

起首我們經由過程一個例子來講明ReentrantLock最後步的用法:

package test;

import java.util.concurrent.locks.ReentrantLock;

public class Test implements Runnable
{
 public static ReentrantLock lock = new ReentrantLock();
 public static int i = 0;

 @Override
 public void run()
 {
 for (int j = 0; j < 10000000; j++)
 {
 lock.lock();
 try
 {
 i++;
 }
 finally
 {
 lock.unlock();
 }
 }
 }
 
 public static void main(String[] args) throws InterruptedException
 {
 Test test = new Test();
 Thread t1 = new Thread(test);
 Thread t2 = new Thread(test);
 t1.start();
 t2.start();
 t1.join();
 t2.join();
 System.out.println(i);
 }

}

有兩個線程都對i停止++操作,為了包管線程平安,應用了 ReentrantLock,從用法上可以看出,與 synchronized比擬, ReentrantLock就略微龐雜一點。由於必需在finally中停止解鎖操作,假如不在 finally解鎖,有能夠代碼湧現異常鎖沒被釋放,而synchronized是由JVM來釋放鎖。

那末ReentrantLock究竟有哪些優良的特色呢?

1.1.1 可重入

單線程可以反復進入,但要反復加入

lock.lock();
lock.lock();
try
{
 i++;
 
} 
finally
{
 lock.unlock();
 lock.unlock();
}

因為ReentrantLock是重入鎖,所以可以重復獲得雷同的一把鎖,它有一個與鎖相干的獲得計數器,假如具有鎖的某個線程再次獲得鎖,那末獲得計數器就加1,然後鎖須要被釋放兩次能力取得真正釋放(重入鎖)。這模擬了 synchronized 的語義;假如線程進入由線程曾經具有的監控器掩護的 synchronized 塊,就許可線程持續停止,當線程加入第二個(或許後續) synchronized 塊的時刻,不釋放鎖,只要線程加入它進入的監控器掩護的第一個synchronized 塊時,才釋放鎖。

public class Child extends Father implements Runnable{
 final static Child child = new Child();//為了包管鎖獨一
 public static void main(String[] args) {
 for (int i = 0; i < 50; i++) {
  new Thread(child).start();
 }
 }
 
 public synchronized void doSomething() {
 System.out.println("1child.doSomething()");
 doAnotherThing(); // 挪用本身類中其他的synchronized辦法
 }
 
 private synchronized void doAnotherThing() {
 super.doSomething(); // 挪用父類的synchronized辦法
 System.out.println("3child.doAnotherThing()");
 }
 
 @Override
 public void run() {
 child.doSomething();
 }
}
class Father {
 public synchronized void doSomething() {
 System.out.println("2father.doSomething()");
 }
}

我們可以看到一個線程進入分歧的 synchronized辦法,是不會釋放之前獲得的鎖的。所以輸入照樣次序輸入。所以synchronized也是重入鎖

輸入:

1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
...

1.1.2.可中止

與synchronized分歧的是,ReentrantLock對中止是有呼應的。中止相干常識檢查[高並發Java 二] 多線程基本

通俗的lock.lock()是不克不及呼應中止的,lock.lockInterruptibly()可以或許呼應中止。

我們模仿出一個逝世鎖現場,然後用中止來處置逝世鎖

package test;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.locks.ReentrantLock;

public class Test implements Runnable
{
 public static ReentrantLock lock1 = new ReentrantLock();
 public static ReentrantLock lock2 = new ReentrantLock();

 int lock;

 public Test(int lock)
 {
 this.lock = lock;
 }

 @Override
 public void run()
 {
 try
 {
 if (lock == 1)
 {
 lock1.lockInterruptibly();
 try
 {
 Thread.sleep(500);
 }
 catch (Exception e)
 {
 // TODO: handle exception
 }
 lock2.lockInterruptibly();
 }
 else
 {
 lock2.lockInterruptibly();
 try
 {
 Thread.sleep(500);
 }
 catch (Exception e)
 {
 // TODO: handle exception
 }
 lock1.lockInterruptibly();
 }
 }
 catch (Exception e)
 {
 // TODO: handle exception
 }
 finally
 {
 if (lock1.isHeldByCurrentThread())
 {
 lock1.unlock();
 }
 if (lock2.isHeldByCurrentThread())
 {
 lock2.unlock();
 }
 System.out.println(Thread.currentThread().getId() + ":線程加入");
 }
 }

 public static void main(String[] args) throws InterruptedException
 {
 Test t1 = new Test(1);
 Test t2 = new Test(2);
 Thread thread1 = new Thread(t1);
 Thread thread2 = new Thread(t2);
 thread1.start();
 thread2.start();
 Thread.sleep(1000);
 //DeadlockChecker.check();
 }

 static class DeadlockChecker
 {
 private final static ThreadMXBean mbean = ManagementFactory
 .getThreadMXBean();
 final static Runnable deadlockChecker = new Runnable()
 {
 @Override
 public void run()
 {
 // TODO Auto-generated method stub
 while (true)
 {
 long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
 if (deadlockedThreadIds != null)
 {
 ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadIds);
 for (Thread t : Thread.getAllStackTraces().keySet())
 {
 for (int i = 0; i < threadInfos.length; i++)
 {
 if(t.getId() == threadInfos[i].getThreadId())
 {
  t.interrupt();
 }
 }
 }
 }
 try
 {
 Thread.sleep(5000);
 }
 catch (Exception e)
 {
 // TODO: handle exception
 }
 }

 }
 };
 
 public static void check()
 {
 Thread t = new Thread(deadlockChecker);
 t.setDaemon(true);
 t.start();
 }
 }

}

上述代碼有能夠會產生逝世鎖,線程1獲得lock1,線程2獲得lock2,然後彼此又想取得對方的鎖。

我們用jstack檢查運轉上述代碼後的情形

切實其實發明了一個逝世鎖。

DeadlockChecker.check();辦法用來檢測逝世鎖,然後把逝世鎖的線程中止。中止後,線程正常加入。

1.1.3.可限時

超時不克不及取得鎖,就前往false,不會永遠期待組成逝世鎖

應用lock.tryLock(long timeout, TimeUnit unit)來完成可限時鎖,參數為時光和單元。

舉個例子來講明下可限時:

package test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class Test implements Runnable
{
 public static ReentrantLock lock = new ReentrantLock();

 @Override
 public void run()
 {
 try
 {
 if (lock.tryLock(5, TimeUnit.SECONDS))
 {
 Thread.sleep(6000);
 }
 else
 {
 System.out.println("get lock failed");
 }
 }
 catch (Exception e)
 {
 }
 finally
 {
 if (lock.isHeldByCurrentThread())
 {
 lock.unlock();
 }
 }
 }
 
 public static void main(String[] args)
 {
 Test t = new Test();
 Thread t1 = new Thread(t);
 Thread t2 = new Thread(t);
 t1.start();
 t2.start();
 }

}

應用兩個線程來爭取一把鎖,當某個線程取得鎖後,sleep6秒,每一個線程都只測驗考試5秒去取得鎖。

所以一定有一個線程沒法取得鎖。沒法取得後就直接加入了。

輸入:

get lock failed

1.1.4.公正鎖

應用方法:

public ReentrantLock(boolean fair)

public static ReentrantLock fairLock = new ReentrantLock(true);

普通意義上的鎖是不公正的,紛歧定先來的線程能先獲得鎖,後來的線程就後獲得鎖。不公正的鎖能夠會發生饑餓景象。

公正鎖的意思就是,這個鎖能包管線程是先來的先獲得鎖。固然公正鎖不會發生饑餓景象,然則公正鎖的機能會比非公正鎖差許多。

1.2 Condition

Condition與ReentrantLock的關系就相似於synchronized與Object.wait()/signal()

await()辦法會使以後線程期待,同時釋放以後鎖,當其他線程中應用signal()時或許signalAll()辦法時,線 程會從新取得鎖並持續履行。或許當線程被中止時,也能跳出期待。這和Object.wait()辦法很類似。

awaitUninterruptibly()辦法與await()辦法根本雷同,然則它其實不會再期待進程中呼應中止。 singal()辦法用於叫醒一個在期待中的線程。絕對的singalAll()辦法會叫醒一切在期待中的線程。這和Obejct.notify()辦法很相似。

這裡就不再具體引見了。舉個例子來講明:

package test;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Test implements Runnable
{
 public static ReentrantLock lock = new ReentrantLock();
 public static Condition condition = lock.newCondition();

 @Override
 public void run()
 {
 try
 {
 lock.lock();
 condition.await();
 System.out.println("Thread is going on");
 }
 catch (Exception e)
 {
 e.printStackTrace();
 }
 finally
 {
 lock.unlock();
 }
 }
 
 public static void main(String[] args) throws InterruptedException
 {
 Test t = new Test();
 Thread thread = new Thread(t);
 thread.start();
 Thread.sleep(2000);
 
 lock.lock();
 condition.signal();
 lock.unlock();
 }

}

上述例子很簡略,讓一個線程await住,讓主線程去叫醒它。condition.await()/signal只能在獲得鎖今後應用。

1.3.Semaphore

關於鎖來講,它是互斥的排他的。意思就是,只需我取得了鎖,沒人能再取得了。

而關於Semaphore來講,它許可多個線程同時進入臨界區。可以以為它是一個同享鎖,然則同享的額度是無限制的,額度用完了,其他沒有拿到額度的線程照樣要壅塞在臨界區外。當額度為1時,就相等於lock

上面舉個例子:

package test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;


public class Test implements Runnable
{
 final Semaphore semaphore = new Semaphore(5);
 @Override
 public void run()
 {
 try
 {
 semaphore.acquire();
 Thread.sleep(2000);
 System.out.println(Thread.currentThread().getId() + " done");
 }
 catch (Exception e)
 {
 e.printStackTrace();
 }finally {
 semaphore.release();
 }
 }
 
 public static void main(String[] args) throws InterruptedException
 {
 ExecutorService executorService = Executors.newFixedThreadPool(20);
 final Test t = new Test();
 for (int i = 0; i < 20; i++)
 {
 executorService.submit(t);
 }
 }

}

有一個20個線程的線程池,每一個線程都去 Semaphore的允許,Semaphore的允許只要5個,運轉後可以看到,5個一批,一批一批地輸入。

固然一個線程也能夠一次請求多個允許

public void acquire(int permits) throws InterruptedException

1.4 ReadWriteLock

ReadWriteLock是辨別功效的鎖。讀和寫是兩種分歧的功效,讀-讀不互斥,讀-寫互斥,寫-寫互斥。

如許的設計是並發量進步了,又包管了數據平安。

應用方法:

private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();

具體例子可以檢查 Java完成臨盆者花費者成績與讀者寫者成績,這裡就不睜開了。

1.5 CountDownLatch

倒數計時器
一種典范的場景就是火箭發射。在火箭發射前,為了包管滿有把握,常常還要停止各項裝備、儀器的檢討。 只要等一切檢討終了後,引擎能力焚燒。這類場景就異常合適應用CountDownLatch。它可使得焚燒線程
,期待一切檢討線程全體落成後,再履行

應用方法:

static final CountDownLatch end = new CountDownLatch(10);
end.countDown();
end.await();

表示圖:

一個簡略的例子:

package test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test implements Runnable
{
 static final CountDownLatch countDownLatch = new CountDownLatch(10);
 static final Test t = new Test();
 @Override
 public void run()
 {
 try
 {
 Thread.sleep(2000);
 System.out.println("complete");
 countDownLatch.countDown();
 }
 catch (Exception e)
 {
 e.printStackTrace();
 }
 }
 
 public static void main(String[] args) throws InterruptedException
 {
 ExecutorService executorService = Executors.newFixedThreadPool(10);
 for (int i = 0; i < 10; i++)
 {
 executorService.execute(t);
 }
 countDownLatch.await();
 System.out.println("end");
 executorService.shutdown();
 }

}

主線程必需期待10個線程全體履行完才會輸入"end"。

1.6 CyclicBarrier

和CountDownLatch類似,也是期待某些線程都做完今後再履行。與CountDownLatch差別在於這個計數器可以重復應用。好比,假定我們將計數器設置為10。那末湊齊第一批1 0個線程後,計數器就會歸零,然後接著湊齊下一批10個線程

應用方法:

public CyclicBarrier(int parties, Runnable barrierAction)

barrierAction就是當計數器一次計數完成後,體系會履行的舉措

await()

表示圖:

上面舉個例子:

package test;

import java.util.concurrent.CyclicBarrier;

public class Test implements Runnable
{
 private String soldier;
 private final CyclicBarrier cyclic;

 public Test(String soldier, CyclicBarrier cyclic)
 {
 this.soldier = soldier;
 this.cyclic = cyclic;
 }

 @Override
 public void run()
 {
 try
 {
 //期待一切兵士到齊
 cyclic.await();
 dowork();
 //期待一切兵士完成任務
 cyclic.await();
 }
 catch (Exception e)
 {
 // TODO Auto-generated catch block
 e.printStackTrace();
 }

 }

 private void dowork()
 {
 // TODO Auto-generated method stub
 try
 {
 Thread.sleep(3000);
 }
 catch (Exception e)
 {
 // TODO: handle exception
 }
 System.out.println(soldier + ": done");
 }

 public static class BarrierRun implements Runnable
 {

 boolean flag;
 int n;

 public BarrierRun(boolean flag, int n)
 {
 super();
 this.flag = flag;
 this.n = n;
 }

 @Override
 public void run()
 {
 if (flag)
 {
 System.out.println(n + "個義務完成");
 }
 else
 {
 System.out.println(n + "個聚集完成");
 flag = true;
 }

 }

 }

 public static void main(String[] args)
 {
 final int n = 10;
 Thread[] threads = new Thread[n];
 boolean flag = false;
 CyclicBarrier barrier = new CyclicBarrier(n, new BarrierRun(flag, n));
 System.out.println("聚集");
 for (int i = 0; i < n; i++)
 {
 System.out.println(i + "報導");
 threads[i] = new Thread(new Test("兵士" + i, barrier));
 threads[i].start();
 }
 }

}

打印成果:

聚集
0報導
1報導
2報導
3報導
4報導
5報導
6報導
7報導
8報導
9報導
10個聚集完成
兵士5: done
兵士7: done
兵士8: done
兵士3: done
兵士4: done
兵士1: done
兵士6: done
兵士2: done
兵士0: done
兵士9: done
10個義務完成

1.7 LockSupport

供給線程壅塞原語

和suspend相似

LockSupport.park();
LockSupport.unpark(t1);

與suspend比擬 不輕易惹起線程解凍

LockSupport的思惟呢,和 Semaphore有點類似,外部有一個允許,park的時刻拿失落這個允許,unpark的時刻請求這個允許。所以假如unpark在park之前,是不會產生線程解凍的。

上面的代碼是[高並發Java 二] 多線程基本中suspend示例代碼,在應用suspend時會產生逝世鎖。

package test;

import java.util.concurrent.locks.LockSupport;
 
public class Test
{
 static Object u = new Object();
 static TestSuspendThread t1 = new TestSuspendThread("t1");
 static TestSuspendThread t2 = new TestSuspendThread("t2");
 
 public static class TestSuspendThread extends Thread
 {
 public TestSuspendThread(String name)
 {
  setName(name);
 }
 
 @Override
 public void run()
 {
  synchronized (u)
  {
  System.out.println("in " + getName());
  //Thread.currentThread().suspend();
  LockSupport.park();
  }
 }
 }
 
 public static void main(String[] args) throws InterruptedException
 {
 t1.start();
 Thread.sleep(100);
 t2.start();
// t1.resume();
// t2.resume();
 LockSupport.unpark(t1);
 LockSupport.unpark(t2);
 t1.join();
 t2.join();
 }
}

而應用 LockSupport則不會產生逝世鎖。

別的

park()可以或許呼應中止,但不拋出異常。中止呼應的成果是,park()函數的前往,可以從Thread.interrupted()獲得中止標記。

在JDK傍邊有年夜量處所應用到了park,固然LockSupport的完成也是應用unsafe.park()來完成的。

public static void park() {
        unsafe.park(false, 0L);
    }

1.8 ReentrantLock 的完成

上面來引見下ReentrantLock的完成,ReentrantLock的完成重要由3部門構成:

  1. CAS狀況
  2. 期待隊列
  3. park()
  4. ReentrantLock的父類中會有一個state變量來表現同步的狀況

    /**
     * The synchronization state.
     */
     private volatile int state;
    

    經由過程CAS操作來設置state來獲得鎖,假如設置成了1,則將鎖的持有者給以後線程

    final void lock() {
      if (compareAndSetState(0, 1))
      setExclusiveOwnerThread(Thread.currentThread());
      else
      acquire(1);
     }
    

    假如拿鎖不勝利,則會做一個請求

    public final void acquire(int arg) {
     if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
     }
    

    起首,再去請求下嘗嘗看tryAcquire,由於此時能夠另外一個線程曾經釋放了鎖。

    假如照樣沒有請求到鎖,就addWaiter,意思是把本身加到期待隊列中去

    private Node addWaiter(Node mode) {
     Node node = new Node(Thread.currentThread(), mode);
     // Try the fast path of enq; backup to full enq on failure
     Node pred = tail;
     if (pred != null) {
      node.prev = pred;
      if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
      }
     }
     enq(node);
     return node;
     }
    

    其間還會有屢次測驗考試去請求鎖,假如照樣請求不到,就會被掛起

    private final boolean parkAndCheckInterrupt() {
     LockSupport.park(this);
     return Thread.interrupted();
     }
    

    同理,假如在unlock操作中,就是釋放了鎖,然後unpark,這裡就不詳細講了。

    2. 並發容器及典范源碼剖析

    2.1 ConcurrentHashMap

    我們曉得HashMap不是一個線程平安的容器,最簡略的方法使HashMap釀成線程平安就是應用

    Collections.synchronizedMap,它是對HashMap的一個包裝

    public static Map m=Collections.synchronizedMap(new HashMap());

    同理關於List,Set也供給了類似辦法。

    然則這類方法只合適於並發量比擬小的情形。

    我們來看下synchronizedMap的完成

    private final Map<K,V> m; // Backing Map
     final Object mutex; // Object on which to synchronize
    
     SynchronizedMap(Map<K,V> m) {
      if (m==null)
      throw new NullPointerException();
      this.m = m;
      mutex = this;
     }
    
     SynchronizedMap(Map<K,V> m, Object mutex) {
      this.m = m;
      this.mutex = mutex;
     }
    
     public int size() {
      synchronized (mutex) {return m.size();}
     }
     public boolean isEmpty() {
      synchronized (mutex) {return m.isEmpty();}
     }
     public boolean containsKey(Object key) {
      synchronized (mutex) {return m.containsKey(key);}
     }
     public boolean containsValue(Object value) {
      synchronized (mutex) {return m.containsValue(value);}
     }
     public V get(Object key) {
      synchronized (mutex) {return m.get(key);}
     }
    
     public V put(K key, V value) {
      synchronized (mutex) {return m.put(key, value);}
     }
     public V remove(Object key) {
      synchronized (mutex) {return m.remove(key);}
     }
     public void putAll(Map<? extends K, ? extends V> map) {
      synchronized (mutex) {m.putAll(map);}
     }
     public void clear() {
      synchronized (mutex) {m.clear();}
     }
    
    

    它會將HashMap包裝在外面,然後將HashMap的每一個操作都加上synchronized。

    因為每一個辦法都是獲得統一把鎖(mutex),這就意味著,put和remove等操作是互斥的,年夜年夜削減了並發量。

    上面來看下ConcurrentHashMap是若何完成的

    public V put(K key, V value) {
     Segment<K,V> s;
     if (value == null)
      throw new NullPointerException();
     int hash = hash(key);
     int j = (hash >>> segmentShift) & segmentMask;
     if ((s = (Segment<K,V>)UNSAFE.getObject  // nonvolatile; recheck
      (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
      s = ensureSegment(j);
     return s.put(key, hash, value, false);
     }
    

    在 ConcurrentHashMap外部有一個Segment段,它將年夜的HashMap切分紅若干個段(小的HashMap),然後讓數據在每段上Hash,如許多個線程在分歧段上的Hash操作必定是線程平安的,所以只須要同步統一個段上的線程便可以了,如許完成了鎖的分別,年夜年夜增長了並發量。

    在應用ConcurrentHashMap.size時會比擬費事,由於它要統計每一個段的數據和,在這個時刻,要把每個段都加上鎖,然後再做數據統計。這個就是把鎖分別後的小小弊病,然則size辦法應當是不會被高頻率挪用的辦法。

    在完成上,不應用synchronized和lock.lock而是盡可能應用trylock,同時在HashMap的完成上,也做了一點優化。這裡就不提了。

    2.2 BlockingQueue

    BlockingQueue不是一個高機能的容器。然則它是一個異常好的同享數據的容器。是典范的臨盆者和花費者的完成。

    表示圖:

     

    詳細可以檢查Java完成臨盆者花費者成績與讀者寫者成績

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved