程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> 編寫高質量代碼:改善Java程序的151個建議(第8章:多線程和並發___建議122~125),151122

編寫高質量代碼:改善Java程序的151個建議(第8章:多線程和並發___建議122~125),151122

編輯:JAVA綜合教程

編寫高質量代碼:改善Java程序的151個建議(第8章:多線程和並發___建議122~125),151122


建議122:使用線程異常處理器提升系統可靠性

  我們要編寫一個Socket應用,監聽指定端口,實現數據包的接收和發送邏輯,這在早期系統間進行數據交互是經常使用的,這類接口通常需要考慮兩個問題:一個是避免線程阻塞,保證接收的數據盡快處理;二是:接口的穩定性和可靠性問題,數據包很復雜,接口服務的系統也很多,一旦守候線程出現異常就會導致Socket停止,這是非常危險的,那我們有什麼辦法避免嗎?

  Java1.5版本以後在Thread類中增加了setUncaughtExceptionHandler方法,實現了線程異常的捕捉和處理。可能大家會有一個疑問:如果Socket應用出現了不可預測的異常是否可以自動重啟呢?其實使用線程異常處理器很容易解決,我們來看一個異常處理器應用實例,代碼如下: 

class TcpServer implements Runnable {
    // 創建後即運行
    public TcpServer() {
        Thread t = new Thread(this);
        t.setUncaughtExceptionHandler(new TcpServerExceptionHandler());
        t.start();
    }
    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
            try {
                Thread.sleep(1000);
                System.out.println("系統正常運行:" + i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 拋出異常
        throw new RuntimeException();
    }

    // 異常處理器
    private static class TcpServerExceptionHandler implements
            Thread.UncaughtExceptionHandler {

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            // 記錄線程異常信息
            System.out.println("線程" + t.getName() + " 出現異常,自行重啟,請分析原因。");
            e.printStackTrace();
            // 重啟線程,保證業務不中斷
            new TcpServer();
        }

    }
}

  這段代碼的邏輯比較簡單,在TcpServer類創建時即啟動一個線程,提供TCP服務,例如接收和發送文件,具體邏輯在run方法中實現。同時,設置了該線程出現運行期異常(也就是Uncaught Exception)時,由TcpServerExceptionHandler異常處理器來處理異常。那麼TcpServerExceptionHandler做什麼事呢?兩件事:

  • 記錄異常信息,以便查找問題
  • 重新啟動一個新線程,提供不間斷的服務

  有了這兩點,TcpServer就可以穩定的運行了,即使出現異常也能自動重啟,客戶端代碼比較簡單,只需要new TcpServer()即可,運行結果如下:

  

  從運行結果上可以看出,當Thread-0出現異常時,系統自動重啟了Thread-1線程,繼續提供服務,大大提高了系統的性能。

  這段程序只是一個示例程序,若要在實際環境中應用,則需要注意以下三個方面:

  • 共享資源鎖定:如果線程產生異常的原因是資源被鎖定,自動重啟應用知會增加系統的負擔,無法提供不間斷服務。例如一個即時通信服務(XMPP Server)出現信息不能寫入的情況,即使再怎麼啟動服務,也是無法解決問題的。在此情況下最好的辦法是停止所有的線程,釋放資源。
  • 髒數據引起系統邏輯混亂:異常的產生中斷了正在執行的業務邏輯,特別是如果正在處理一個原子操作(像即時通訊服務器的用戶驗證和簽到這兩個事件應該在一個操作中處理,不允許出現驗證成功,但簽到不成功的情況),但如果此時拋出了運行期異常就有可能會破壞正常的業務邏輯,例如出現用戶認證通過了,但簽到不成功的情況,在這種情境下重啟應用服務器,雖然可以提供服務,但對部分用戶卻產生了邏輯異常。
  • 內存溢出:線程異常了,但由該線程創建的對象並不會馬上回收,如果再重親啟動新線程,再創建一批對象,特別是加入了場景接管,就非常危險了,例如即時通信服務,重新啟動一個新線程必須保證原在線用戶的透明性,即用戶不會察覺服務重啟,在此種情況下,就需要在線程初始化時加載大量對象以保證用戶的狀態信息,但是如果線程反復重啟,很可能會引起OutOfMemory內存洩露問題。

建議123:volatile不能保證數據同步

  volatile關鍵字比較少用,原因無外乎兩點,一是在Java1.5之前該關鍵字在不同的操作系統上有不同的表現,所帶來的問題就是移植性較差;而且比較難設計,而且誤用較多,這也導致它的"名譽" 受損。

  我們知道,每個線程都運行在棧內存中,每個線程都有自己的工作內存(Working Memory,比如寄存器Register、高速緩沖存儲器Cache等),線程的計算一般是通過工作內存進行交互的,其示意圖如下圖所示:

  

  從示意圖上我們可以看到,線程在初始化時從主內存中加載所需的變量值到工作內存中,然後在線程運行時,如果是讀取,則直接從工作內存中讀取,若是寫入則先寫到工作內存中,之後刷新到主內存中,這是JVM的一個簡答的內存模型,但是這樣的結構在多線程的情況下有可能會出現問題,比如:A線程修改變量的值,也刷新到了主內存,但B、C線程在此時間內讀取的還是本線程的工作內存,也就是說它們讀取的不是最"新鮮"的值,此時就出現了不同線程持有的公共資源不同步的情況。

  對於此類問題有很多解決辦法,比如使用synchronized同步代碼塊,或者使用Lock鎖來解決該問題,不過,Java可以使用volatile更簡單地解決此類問題,比如在一個變量前加上volatile關鍵字,可以確保每個線程對本地變量的訪問和修改都是直接與內存交互的,而不是與本線程的工作內存交互的,保證每個線程都能獲得最"新鮮"的變量值,其示意圖如下:

  

  明白了volatile變量的原理,那我們思考一下:volatile變量是否能夠保證數據的同步性呢?兩個線程同時修改一個volatile是否會產生髒數據呢?我們看看下面代碼:

class UnsafeThread implements Runnable {
    // 共享資源
    private volatile int count = 0;

    @Override
    public void run() {
        // 增加CPU的繁忙程度,不必關心其邏輯含義
        for (int i = 0; i < 1000; i++) {
            Math.hypot(Math.pow(92456789, i), Math.cos(i));
        }
        count++;
    }

    public int getCount() {
        return count;
    }
}

  上面的代碼定義了一個多線程類,run方法的主要邏輯是共享資源count的自加運算,而且我們還為count變量加上了volatile關鍵字,確保是從內存中讀取和寫入的,如果有多個線程運行,也就是多個線程執行count變量的自加操作,count變量會產生髒數據嗎?想想看,我們已經為count加上了volatile關鍵字呀!模擬多線程的代碼如下:  

public static void main(String[] args) throws InterruptedException {
        // 理想值,並作為最大循環次數
        int value = 1000;
        // 循環次數,防止造成無限循環或者死循環
        int loops = 0;
        // 主線程組,用於估計活動線程數
        ThreadGroup tg = Thread.currentThread().getThreadGroup();
        while (loops++ < value) {
            // 共享資源清零
            UnsafeThread ut = new UnsafeThread();
            for (int i = 0; i < value; i++) {
                new Thread(ut).start();
            }
            // 先等15毫秒,等待活動線程為1
            do {
                Thread.sleep(15);
            } while (tg.activeCount() != 1);
            // 檢查實際值與理論值是否一致
            if (ut.getCount() != value) {
                // 出現線程不安全的情況
                System.out.println("循環到:" + loops + " 遍,出現線程不安全的情況");
                System.out.println("此時,count= " + ut.getCount());
                System.exit(0);
            }
        }

    }

  想讓volatite變量"出點丑",還是需要花點功夫的。此段程序的運行邏輯如下:

  • 啟動100個線程,修改共享資源count的值
  • 暫停15秒,觀察活動線程數是否為1(即只剩下主線程再運行),若不為1,則再等待15秒。
  • 判斷共享資源是否是不安全的,即實際值與理想值是否相同,若不相同,則發現目標,此時count的值為髒數據。
  • 如果沒有找到,繼續循環,直到達到最大循環為止。

運行結果如下:

    循環到:40 遍,出現線程不安全的情況
    此時,count= 999
  這只是一種可能的結果,每次執行都有可能產生不同的結果。這也說明我們的count變量沒有實現數據同步,在多個線程修改的情況下,count的實際值與理論值產生了偏差,直接說明了volatile關鍵字並不能保證線程的安全。
  在解釋原因之前,我們先說一下自加操作。count++表示的是先取出count的值然後再加1,也就是count=count+1,所以,在某個緊鄰時間片段內會發生如下神奇的事情:

(1)、第一個時間片段

  A線程獲得執行機會,因為有關鍵字volatile修飾,所以它從主內存中獲得count的最新值為998,接下來的事情又分為兩種類型:

  • 如果是單CPU,此時調度器暫停A線程執行,讓出執行機會給B線程,於是B線程也獲得了count的最新值998.
  • 如果是多CPU,此時線程A繼續執行,而線程B也同時獲得了count的最新值998.

(2)、第二個片段

  • 如果是單CPU,B線程執行完+1操作(這是一個原子處理),count的值為999,由於是volatile類型的變量,所以直接寫入主內存,然後A線程繼續執行,計算的結果也是999,重新寫入主內存中。
  • 如果是多CPU,A線程執行完加1動作後修改主內存的變量count為999,線程B執行完畢後也修改主內存中的變量為999

這兩個時間片段執行完畢後,原本期望的結果為1000,單運行後的值為999,這表示出現了線程不安全的情況。這也是我們要說明的:volatile關鍵字並不能保證線程安全,它只能保證當前線程需要該變量的值時能夠獲得最新的值,而不能保證線程修改的安全性。

順便說一下,在上面的代碼中,UnsafeThread類的消耗CPU計算是必須的,其目的是加重線程的負荷,以便出現單個線程搶占整個CPU資源的情景,否則很難模擬出volatile線程不安全的情況,大家可以自行模擬測試。

建議124:異步運算考慮使用Callable接口

  多線程應用有兩種實現方式,一種是實現Runnable接口,另一種是繼承Thread類,這兩個方法都有缺點:run方法沒有返回值,不能拋出異常(這兩個缺點歸根到底是Runnable接口的缺陷,Thread類也實現了Runnable接口),如果需要知道一個線程的運行結果就需要用戶自行設計,線程類本身也不能提供返回值和異常。但是從Java1.5開始引入了一個新的接口Callable,它類似於Runnable接口,實現它就可以實現多線程任務,Callable的接口定義如下:

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

  實現Callable接口的類,只是表明它是一個可調用的任務,並不表示它具有多線程運算能力,還是需要執行器來執行的,我們先編寫一個任務類,代碼如下: 

//稅款計算器
class TaxCalculator implements Callable<Integer> {
    // 本金
    private int seedMoney;

    // 接收主線程提供的參數
    public TaxCalculator(int _seedMoney) {
        seedMoney = _seedMoney;
    }

    @Override
    public Integer call() throws Exception {
        // 復雜計算,運行一次需要2秒
        TimeUnit.MILLISECONDS.sleep(2000);
        return seedMoney / 10;
    }
}

  這裡模擬了一個復雜運算:稅款計算器,該運算可能要花費10秒鐘的時間,此時不能讓用戶一直等著吧,需要給用戶輸出點什麼,讓用戶知道系統還在運行,這也是系統友好性的體現:用戶輸入即有輸出,若耗時較長,則顯示運算進度。如果我們直接計算,就只有一個main線程,是不可能有友好提示的,如果稅金不計算完畢,也不會執行後續動作,所以此時最好的辦法就是重啟一個線程來運算,讓main線程做進度提示,代碼如下:  

public static void main(String[] args) throws InterruptedException,
            ExecutionException {
        // 生成一個單線程的異步執行器
        ExecutorService es = Executors.newSingleThreadExecutor();
        // 線程執行後的期望值
        Future<Integer> future = es.submit(new TaxCalculator(100));
        while (!future.isDone()) {
            // 還沒有運算完成,等待200毫秒
            TimeUnit.MICROSECONDS.sleep(200);
            // 輸出進度符號
            System.out.print("*");
        }
        System.out.println("\n計算完成,稅金是:" + future.get() + "  元 ");
        es.shutdown();
    }

  在這段代碼中,Executors是一個靜態工具類,提供了異步執行器的創建能力,如單線程異步執行器newSingleThreadExecutor、固定線程數量的執行器newFixedThreadPool等,一般它是異步計算的入口類。future關注的是線程執行後的結果,比如沒有運行完畢,執行結果是多少等。此段代碼的運行結果如下所示:

      **********************************************......

      計算完成,稅金是:10  元

  執行時,"*"會依次遞增,表示系統正在運算,為用戶提供了運算進度,此類異步計算的好處是:

  • 盡可能多的占用系統資源,提供快速運算
  • 可以監控線程的執行情況,比如是否執行完畢、是否有返回值、是否有異常等。
  • 可以為用戶提供更好的支持,比如例子中的運算進度等。

建議125:優先選擇線程池

  在Java1.5之前,實現多線程比較麻煩,需要自己啟動線程,並關注同步資源,防止出現線程死鎖等問題,在1.5版本之後引入了並行計算框架,大大簡化了多線程開發。我們知道一個線程有五個狀態:新建狀態(NEW)、可運行狀態(Runnable,也叫作運行狀態)、阻塞狀態(Blocked)、等待狀態(Waiting)、結束狀態(Terminated),線程的狀態只能由新建轉變為了運行狀態後才能被阻塞或等待,最後終結,不可能產生本末倒置的情況,比如把一個結束狀態的線程轉變為新建狀態,則會出現異常,例如如下代碼會拋出異常:

    public static void main(String[] args) throws InterruptedException {
        // 創建一個線程,新建狀態
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("線程正在運行");
            }
        });
        // 運行狀態
        t.start();
        // 是否是運行狀態,若不是則等待10毫秒
        while (!t.getState().equals(Thread.State.TERMINATED)) {
            TimeUnit.MICROSECONDS.sleep(10);
        }
        // 直接由結束轉變為雲心態
        t.start();
    }

  此段程序運行時會報java.lang.IllegalThreadStateException異常,原因就是不能從結束狀態直接轉變為運行狀態,我們知道一個線程的運行時間分為3部分:T1為線程啟動時間,T2為線程的運行時間,T3為線程銷毀時間,如果一個線程不能被重復使用,每次創建一個線程都需要經過啟動、運行、銷毀時間,這勢必增大系統的響應時間,有沒有更好的辦法降低線程的運行時間呢?

  T2是無法避免的,只有通過優化代碼來實現降低運行時間。T1和T2都可以通過線程池(Thread Pool)來縮減時間,比如在容器(或系統)啟動時,創建足夠多的線程,當容器(或系統)需要時直接從線程池中獲得線程,運算出結果,再把線程返回到線程池中___ExecutorService就是實現了線程池的執行器,我們來看一個示例代碼: 

public static void main(String[] args) throws InterruptedException {
        // 2個線程的線程池
        ExecutorService es = Executors.newFixedThreadPool(2);
        // 多次執行線程體
        for (int i = 0; i < 4; i++) {
            es.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }
        // 關閉執行器
        es.shutdown();
    }

  此段代碼首先創建了一個包含兩個線程的線程池,然後在線程池中多次運行線程體,輸出運行時的線程名稱,結果如下:

        pool-1-thread-1
        pool-1-thread-2
        pool-1-thread-1
        pool-1-thread-2

   本次代碼執行了4遍線程體,按照我們之前闡述的" 一個線程不可能從結束狀態轉變為可運行狀態 ",那為什麼此處的2個線程可以反復使用呢?這就是我們要搞清楚的重點。

  線程池涉及以下幾個名詞:

  • 工作線程(Worker):線程池中的線程,只有兩個狀態:可運行狀態和等待狀態,沒有任務時它們處於等待狀態,運行時它們循環的執行任務。
  • 任務接口(Task):這是每個任務必須實現的接口,以供工作線程調度器調度,它主要規定了任務的入口、任務執行完的場景處理,任務的執行狀態等。這裡有兩種類型的任務:具有返回值(異常)的Callable接口任務和無返回值並兼容舊版本的Runnable接口任務。
  • 任務對列(Work Quene):也叫作工作隊列,用於存放等待處理的任務,一般是BlockingQuene的實現類,用來實現任務的排隊處理。

  我們首先從線程池的創建說起,Executors.newFixedThreadPool(2)表示創建一個具有兩個線程的線程池,源代碼如下:

public class Executors {
    //生成一個最大為nThreads的線程池執行器
  public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

}

  這裡使用了LinkedBlockingQueue作為隊列任務管理器,所有等待處理的任務都會放在該對列中,需要注意的是,此隊列是一個阻塞式的單端隊列。線程池建立好了,那就需要線程在其中運行了,線程池中的線程是在submit第一次提交任務時建立的,代碼如下:

   public Future<?> submit(Runnable task) {
        //檢查任務是否為null
        if (task == null) throw new NullPointerException();
        //把Runnable任務包裝成具有返回值的任務對象,不過此時並沒有執行,只是包裝
        RunnableFuture<Object> ftask = newTaskFor(task, null);
        //執行此任務
        execute(ftask);
        //返回任務預期執行結果
        return ftask;
    }

  此處的代碼關鍵是execute方法,它實現了三個職責。

  • 創建足夠多的工作線程數,數量不超過最大線程數量,並保持線程處於運行或等待狀態。
  • 把等待處理的任務放到任務隊列中
  • 從任務隊列中取出任務來執行

  其中此處的關鍵是工作線程的創建,它也是通過new Thread方式創建的一個線程,只是它創建的並不是我們的任務線程(雖然我們的任務實現了Runnable接口,但它只是起了一個標志性的作用),而是經過包裝的Worker線程,代碼如下:  

private final class Worker implements Runnable {
// 運行一次任務
    private void runTask(Runnable task) {
        /* 這裡的task才是我們自定義實現Runnable接口的任務 */
        task.run();
        /* 該方法其它代碼略 */
    }
    // 工作線程也是線程,必須實現run方法
    public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);
        }
    }
    // 任務隊列中獲得任務
    Runnable getTask() {
        /* 其它代碼略 */
        for (;;) {
            return r = workQueue.take();
        }
    }
}

  此處為示意代碼,刪除了大量的判斷條件和鎖資源。execute方法是通過Worker類啟動的一個工作線程,執行的是我們的第一個任務,然後改線程通過getTask方法從任務隊列中獲取任務,之後再繼續執行,但問題是任務隊列是一個BlockingQuene,是阻塞式的,也就是說如果該隊列的元素為0,則保持等待狀態,直到有任務進入為止,我們來看LinkedBlockingQuene的take方法,代碼如下:  

public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            try {
                // 如果隊列中的元素為0,則等待
                while (count.get() == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to a non-interrupted thread
                throw ie;
            }
            // 等待狀態結束,彈出頭元素
            x = extract();
            c = count.getAndDecrement();
            // 如果隊列數量還多於一個,喚醒其它線程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        // 返回頭元素
        return x;
    }

  分析到這裡,我們就明白了線程池的創建過程:創建一個阻塞隊列以容納任務,在第一次執行任務時創建做夠多的線程(不超過許可線程數),並處理任務,之後每個工作線程自行從任務對列中獲得任務,直到任務隊列中的任務數量為0為止,此時,線程將處於等待狀態,一旦有任務再加入到隊列中,即召喚醒工作線程進行處理,實現線程的可復用性。

  使用線程池減少的是線程的創建和銷毀時間,這對於多線程應用來說非常有幫助,比如我們常用的Servlet容器,每次請求處理的都是一個線程,如果不采用線程池技術,每次請求都會重新創建一個新的線程,這會導致系統的性能符合加大,響應效率下降,降低了系統的友好性。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved