程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> Java並發包源碼分析,Java發包源碼分析

Java並發包源碼分析,Java發包源碼分析

編輯:JAVA綜合教程

Java並發包源碼分析,Java發包源碼分析


  並發是一種能並行運行多個程序或並行運行一個程序中多個部分的能力。如果程序中一個耗時的任務能以異步或並行的方式運行,那麼整個程序的吞吐量和可交互性將大大改善。現代的PC都有多個CPU或一個CPU中有多個核,是否能合理運用多核的能力將成為一個大規模應用程序的關鍵。

  Java基礎部分知識總結點擊Java並發基礎總結。Java多線程相關類的實現都在Java的並發包concurrent,concurrent包主要包含3部分內容,第一個是atomic包,裡面主要是一些原子類,比如AtomicInteger、AtomicIntegerArray等;第二個是locks包,裡面主要是鎖相關的類,比如ReentrantLock、Condition等;第三個就是屬於concurrent包的內容,主要包括線程池相關類(Executors)、阻塞集合類(BlockingQueue)、並發Map類(ConcurrentHashMap)、線程相關類(Thread、Runnable、Callable)等。

atomic包源碼分析

  atomic包是專門為線程安全設計的Java包,包含多個原子操作類。其基本思想就是在多線程環境下,當有多個線程同時執行這些類的實例的方法時,具有排他性,一個線程進入方法執行指令時,不會被其他的線程打斷,而別的線程就像自旋鎖一樣,一直等待該方法執行完成。

  原子變量的底層使用了處理器提供的原子指令,但是不同的CPU架構可能提供的原子指令不一樣,也有可能需要某種形式的內部鎖,所以該方法不能絕對保證線程不被阻塞。

  atomic包一共有12個類,四種原子更新方式,分別是原子更新基本類型、原子更新數組、原子更新引用和原子更新字段。JDK1.5中引入了底層的支持,在int、long和對象的引用等類型上都公開了CAS的操作,並且JVM把它們編譯為底層硬件提供的最有效的方法,在運行CAS的平台上,運行時把它們編譯為相應的機器指令。在java.util.concurrent.atomic包下面的所有的原子變量類型中,比如AtomicInteger,都使用了這些底層的JVM支持為數字類型的引用類型提供一種高效的CAS操作。

  Unsafe中的操作一般都是基於CAS來實現的,CAS就是Compare and Swap的意思,比較並操作。很多的cpu直接支持CAS指令。CAS是一項樂觀鎖技術,當多個線程嘗試使用CAS同時更新同一個變量時,只有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知這次競爭中失敗,並可以再次嘗試。CAS有3個操作數,內存值V,舊的預期值A,要修改的新值B。當且僅當預期值A和內存值V相同時,將內存值V修改為B,否則什麼都不做。

/**
 * AtomicMain
 * atomic class test
 */
public class AtomicMain {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService   executor  = Executors.newCachedThreadPool();
        AtomicInteger     data      = new AtomicInteger(0);
        AtomicIntegerArray array    = new AtomicIntegerArray(10);
        AtomicReference   reference = new AtomicReference();

        /* AtomicInteger測試 */
        executor.execute(new AtomicIntegerTask(data));
        executor.execute(new AtomicIntegerTask(data));

        /* AtomicIntegerArray測試 */
        executor.execute(new AtomicIntegerArrayTask(array));
        executor.execute(new AtomicIntegerArrayTask(array));

        User user = new User("xxx", 18);
        reference.set(user);
        executor.execute(new AtomicReferenceTask(reference));

        /**
         * shutdown表示線程池不再接收新的任務了,
         * 而不是阻塞到線程池任務執行完成之後再返回
         */
        executor.shutdown();
        /* 延時保證線程池任務執行完畢 */
        Thread.sleep(100);

        System.out.println(data);

        for (int i = 0; i < 10; i++) {
            System.out.print(array.get(i) + " ");
        }
        System.out.println();

        System.out.println(user);
    }

    /**
     * AtomicInteger
     */
    static class AtomicIntegerTask implements Runnable {
        private AtomicInteger data;

        public AtomicIntegerTask(AtomicInteger data) {
            this.data = data;
        }

        public void run() {
            int cnt = 10;

            while (cnt-- > 0) {
                data.incrementAndGet();
            }
        }
    }

    /**
     * 傳進來的Array大小至少為10
     * AtomicIntegerArray是原子性的,保證對該array整個內存操作的原子性,
     * 也就是說不可能同時有A線程對array[0]操作,而B線程對array[1]操作
     */
    static class AtomicIntegerArrayTask implements Runnable {
        private AtomicIntegerArray array;

        public AtomicIntegerArrayTask(AtomicIntegerArray array) {
            this.array = array;
        }

        public void run() {
            int cnt = 10;

            while (cnt-- > 0) {
                for (int i = 0; i < 10; i++) {
                    array.getAndAdd(i, 1);
                }
            }
        }
    }

    static class AtomicReferenceTask implements Runnable {
        private AtomicReference reference;

        public AtomicReferenceTask(AtomicReference reference) {
            this.reference = reference;
        }

        public void run() {
            reference.set(new User("luoxn28", 23));
        }
    }

    static class User {
        public String name;
        public int    age;

        public User(String name, int age) {
            this.name = name;
            this.age  = age;
        }

        @Override
        public String toString() {
            return "User{" +
                    "name='" + name + '\'' +
                    ", age=" + age +
                    '}';
        }
    }
}

AtomicInteger. incrementAndGet流程

/**
 * 原子自增1
 * this表示AtomicInteger實例
 * valueOffset表示value數據域相對於this的內存地址的偏移位置
 */
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        /* 獲取value在內存中的值,然後進行CAS操作 */
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

lock包源碼分析

  lock包裡面主要是鎖相關的類,比如ReentrantLock、Condition等。

  Lock接口主要有lock、lockInterruptibly、tryLock、unlock、newCondition等方法:

public interface Lock {

    /**
     * 獲取鎖,獲取不到時該線程一直處於休眠狀態
     */
    void lock();

    /**
     * 如果所可用則獲取鎖;否則線程處理休眠狀態,如果此時發生中斷,則拋出InterruptException異常
     */
    void lockInterruptibly() throws InterruptedException;

    /**
     * 如果鎖可用則獲取鎖並返回true,否則返回false
     */
    boolean tryLock();

    /**
     * tryLock的待超時時間版本
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    /**
     * 釋放鎖
     */
    void unlock();

    /**
     * 返回用來與此 Lock 實例一起使用的 Condition 實例
     */
    Condition newCondition();
}

使用Lock示例:

public class LockMain {
    public static void main(String[] args) throws InterruptedException {
        Lock lock = new ReentrantLock();
        AtomicInteger data = new AtomicInteger(0);

        ExecutorService executor = Executors.newCachedThreadPool();
        executor.execute(new LockAddTask(lock, data));
        executor.execute(new LockAddTask(lock, data));

        executor.shutdown();
        Thread.sleep(10);

        System.out.println(data.get());
    }

    static class LockAddTask implements Runnable {
        private Lock lock;
        private AtomicInteger data;

        public LockAddTask(Lock lock, AtomicInteger data) {
            this.lock = lock;
            this.data = data;
        }

        public void run() {
            int cnt = 10;

            while (cnt-- > 0) {
                try {
                    lock.lockInterruptibly();
                    data.getAndIncrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}

concurrent包源碼分析

BlockingQueue

public interface BlockingQueue<E> extends Queue<E> {

    /**
     * 底層調用的是offer,如果滿了拋出異常
     */
    boolean add(E e);

    /**
     * 當集合為滿時,一直等待
     */
    void put(E e) throws InterruptedException;

    /**
     * 當集合為滿時,一直等待到超時
     */
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;
    boolean offer(E e);

    /**
     * 當集合為空時,始終等待
     */
    E take() throws InterruptedException;

    /**
     * 當集合為空時,一直等到超時,如果還為空則返回null
     */
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 底層調用的是poll,如果空了拋出異常
     */
    boolean remove(Object o);

    //...
}

ArrayBlockingQueue

  ArrayBlockingQueue是一個基於數組的有界阻塞隊列,按照FIFO(先進先出)原則對元素進行排序,在構造方法中會new一個數組,並且new ReentrantLock,並且初始化notEmpty和notFull兩個Condition。

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

  執行put操作時,首先獲取lock,如果數組已經滿了,則調用notFull.await等待;否則調用enqueue插入元素,插入成功後把count計數值加1,調用notEmpty.signal。判斷數組是否滿了是根據count是否等於數組長度來確定的,因為往數組中插入元素時,首先從下標為0位置開始插入,插到下標為array.length-1時,如果count小於array.length,則下一次從下標為0位置插入。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

  執行take操作時,首先獲取lock,如果數組為空,則調用notEmpty.await等待;否則調用dequeue取出元素,取出成功後把count計數值減1,調用notFull.signal。

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

  lock的wait/signal更多知識:http://www.cnblogs.com/alphablox/archive/2013/01/20/2868479.html

LinkedBlockingQueue

  LinkedBlockingQueue是基於鏈表結構的阻塞隊列,按照FIFO(先進先出)原則對元組進行排序,新元素是尾部插入,吞吐量通常高於ArrayBlockingQueue。該類中包含一個takeLock和基於takeLock的Condition對象notEmpty,一個putLock鎖,和基於putLock的Condition對象notFull。在構造方法中會新new一個Node,last和head都指向該Node節點。

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

  執行put操作時,首先獲取putLock,如果鏈表節點數已經達到上限,則調用notFull.await等待;否則調用enqueue插入元素,插入成功後把count值原子加1,如果鏈表節點數未達到上限,則調用notFull.signal。然後獲取takeLock,再調用notEmpty.signal通知。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}
private void enqueue(Node<E> node) {
    last = last.next = node;
}
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

  執行take操作時,首先獲取takeLock,如果鏈表為空,則調用notEmpty.await等待;否則調用dequeue取出元素,然後把count值原子減1,如果此時鏈表非空,則調用notEmpty.signal。然後獲取putLock,再調用putLock.signal通知。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

ConcurrentHashMap

       ConcurrentHashMap是concurrent包中一個重要的類,其高效支並發操作,被廣泛使用,Spring框架的底層數據結構就是使用ConcurrentHashMap實現的。同HashTable相比,它的鎖粒度更細,而不是像HashTable一樣為每個方法都添加了synchronized鎖。

       Java8中的ConcurrentHashMap廢棄了Segment(鎖段)的概念,而是用CAS和synchronized方法來實現。利用CAS來獲取table數組中的單個Node節點,獲取成功進行更新操作時,再使用synchronized處理對應Node節點所對應鏈表(或紅黑樹)中的數據。

使用ConcurrentHashMap程序示例

/**
 * HashMapMain test
 */
public class HashMapMain {
    public static void main(String[] args) throws InterruptedException {
        ConcurrentHashMap<String, String> hashMap = new ConcurrentHashMap<String, String>();
        ExecutorService executorService = Executors.newCachedThreadPool();

        executorService.execute(new HashMapPutTask(hashMap));
        executorService.execute(new HashMapPutTask(hashMap));
        executorService.execute(new HashMapPutTask(hashMap));

        executorService.shutdown();
        /**
         * Main thread wait for other thread over.
         */
        Thread.sleep(2000);

        Set<Map.Entry<String, String>> set = hashMap.entrySet();
        Iterator<Map.Entry<String, String>> iter = set.iterator();
        int i = 0;
        while (iter.hasNext()) {
            Map.Entry<String, String> keyValue = iter.next();
            System.out.println(++i + " -> " + keyValue.getKey() + ": " + keyValue.getValue());
        }
    }

    static class HashMapPutTask implements Runnable {
        private ConcurrentHashMap<String, String> hashMap;

        public HashMapPutTask(ConcurrentHashMap<String, String> hashMap) {
            this.hashMap = hashMap;
        }

        public void run() {
            int cnt = 10;

            while (cnt-- > 0) {
                String key   = UUID.randomUUID().toString();
                String value = UUID.randomUUID().toString();
                hashMap.put(key, value);
            }
        }
    }
}

幾個核心的內部類:

Node

  Node是最核心的內部類,它包裝了key-value鍵值對,所有插入ConcurrentHashMap的數據都包裝在這裡面。它與HashMap中的定義很相似,但是但是有一些差別它對value和next屬性設置了volatile同步鎖,它不允許調用setValue方法直接改變Node的value域,它增加了find方法輔助map.get()方法。

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    // ...
}

TreeNode

  樹節點類,另外一個核心的數據結構。當鏈表長度過長的時候,會轉換為TreeNode。但是與HashMap不相同的是,它並不是直接轉換為紅黑樹,而是把這些結點包裝成TreeNode放在TreeBin對象中,由TreeBin完成對紅黑樹的操作。而且TreeNode在ConcurrentHashMap繼承自Node類,而並非HashMap中的繼承自LinkedHashMap.Entry<K,V>類,也就是說TreeNode帶有next指針,這樣做的目的是方便基於TreeBin的訪問。

TreeBin

       這個類並不負責包裝用戶的key、value信息,而是包裝的很多TreeNode節點。它代替了TreeNode的根節點,也就是說在實際的ConcurrentHashMap“數組”中,存放的是TreeBin對象,而不是TreeNode對象,這是與HashMap的區別。另外這個類還帶有了讀寫鎖。

put操作

  ConcurrentHashMap最常用的就是put和get兩個方法。現在來介紹put方法,這個put方法依然沿用HashMap的put方法的思想,根據hash值計算這個新插入的點在table中的位置i,如果i位置是空的,直接放進去,否則進行判斷,如果i位置是樹節點,按照樹的方式插入新的節點,否則把i插入到鏈表的末尾。ConcurrentHashMap中依然沿用這個思想,有一個最重要的不同點就是ConcurrentHashMap不允許key或value為null值。另外由於涉及到多線程,put方法就要復雜一點。在多線程中可能有以下兩個情況

  • 如果一個或多個線程正在對ConcurrentHashMap進行擴容操作,當前線程也要進入擴容的操作中。這個擴容的操作之所以能被檢測到,是因為transfer方法中在空結點上插入forward節點,如果檢測到需要插入的位置被forward節點占有,就幫助進行擴容;
  • 如果檢測到要插入的節點是非空且不是forward節點,就對這個節點加鎖,這樣就保證了線程安全。盡管這個有一些影響效率,但是還是會比hashTable的synchronized要好得多。

  整體流程就是首先定義不允許key或value為null的情況放入  對於每一個放入的值,首先利用spread方法對key的hashcode進行一次hash計算,由此來確定這個值在table中的位置。如果這個位置是空的,那麼直接放入,而且不需要加鎖操作。

  如果這個位置存在結點,說明發生了hash碰撞,首先進入sychnorized同步代碼塊,然後判斷這個節點的類型。如果是鏈表節點(fh>0),則得到的結點就是hash值相同的節點組成的鏈表的頭節點。需要依次向後遍歷確定這個新加入的值所在位置。如果遇到hash值與key值都與新加入節點是一致的情況,則只需要更新value值即可。否則依次向後遍歷,直到鏈表尾插入這個結點。  如果加入這個節點以後鏈表長度大於8,就把這個鏈表轉換成紅黑樹。如果這個節點的類型已經是樹節點的話,直接調用樹節點的插入方法進行插入新的值。

public V put(K key, V value) {
    return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
        //不允許 key或value為null
    if (key == null || value == null) throw new NullPointerException();
    //計算hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    //死循環 何時插入成功 何時跳出
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //如果table為空的話,初始化table
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //根據hash值計算出在table裡面的位置 
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //如果這個位置沒有值 ,直接放進去,不需要加鎖
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //當遇到表連接點時,需要進行整合表的操作
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            //結點上鎖  這裡的結點可以理解為hash值相同組成的鏈表的頭結點
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //fh〉0 說明這個節點是一個鏈表的節點 不是樹的節點
                    if (fh >= 0) {
                        binCount = 1;
                        //在這裡遍歷鏈表所有的結點
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //如果hash值和key值相同  則修改對應結點的value值
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            //如果遍歷到了最後一個結點,那麼就證明新的節點需要插入 就把它插入在鏈表尾部
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //如果這個節點是樹節點,就按照樹的方式插入值
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                //如果鏈表長度已經達到臨界值8 就需要把鏈表轉換為樹結構
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //將當前ConcurrentHashMap的元素數量+1
    addCount(1L, binCount);
    return null;
}

get方法

  get方法比較簡單,給定一個key來確定value的時候,必須滿足兩個條件  key相同  hash值相同,對於節點可能在鏈表或樹上的情況,需要分別去查找。

public V get(Object key) {  
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;  
    //計算hash值  
    int h = spread(key.hashCode());  
    //根據hash值確定節點位置  
    if ((tab = table) != null && (n = tab.length) > 0 &&  
        (e = tabAt(tab, (n - 1) & h)) != null) {  
        //如果搜索到的節點key與傳入的key相同且不為null,直接返回這個節點    
        if ((eh = e.hash) == h) {  
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))  
                return e.val;  
        }  
        //如果eh<0 說明這個節點在樹上 直接尋找  
        else if (eh < 0)  
            return (p = e.find(h, key)) != null ? p.val : null;  
         //否則遍歷鏈表 找到對應的值並返回  
        while ((e = e.next) != null) {  
            if (e.hash == h &&  
                ((ek = e.key) == key || (ek != null && key.equals(ek))))  
                return e.val;  
        }  
    }  
    return null;  
}  

 

參考:

  1、ConcurrentHashMap源碼分析(JDK8版本)

  2、Java並發基礎總結

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved