程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> 生產者-消費者模式,生產者消費者模式

生產者-消費者模式,生產者消費者模式

編輯:JAVA綜合教程

生產者-消費者模式,生產者消費者模式


  • 簡介

  生產者-消費者模式是經典的多線程設計模式,它為多線程間的協作提供了良好的解決方案。在生產者-消費者模式中,有兩類線程:若干個生產者線程和若干個消費者線程。生產者負責提交用戶請求,消費者用於具體的處理生產者提交的任務。生產者和消費者通過共享內存緩沖區進行數據通信。

  生產者-消費者模式的基本結構如下圖:

  通過上圖可以看出,生產者和消費者通過共享內存緩沖區進行通信,他們之間並不進行直接的通信,從而減少了他們之間的耦合,生產者不需要直到消費者的存在,消費者也不需要知道生產者的存在。內存緩沖區主要的功能是實現數據在多線程間的共享,此外,通過該緩沖區,還可以緩解生產者和消費者之間的性能差異。

  • 生產者-消費者模式實現

  下面以生產者-消費者模式的簡單實現,介紹該模式的優點:

  生產者代碼:

 1 public class Producer implements Runnable {
 2     
 3     private volatile boolean isRunnig = true;
 4     
 5     private BlockingQueue<PCData> queue;//緩沖隊列
 6     
 7     private static AtomicInteger count = new AtomicInteger();
 8     
 9     private static final int SLEEPTIME = 1000;
10     
11     
12     public Producer(BlockingQueue<PCData> queue) {
13         this.queue = queue;
14     }
15 
16 
17     @Override
18     public void run() {
19         PCData pcData = null;
20         Random r = new Random();
21         
22         System.out.println("start producer id:"+Thread.currentThread().getId());
23         
24         try {
25             while(true){
26                 Thread.sleep(r.nextInt(SLEEPTIME));
27                 pcData = new PCData(count.incrementAndGet());
28                 System.out.println(pcData +"is put into queue");
29                 if(!queue.offer(pcData, 2, TimeUnit.SECONDS)){
30                     System.err.println("fail to put data:"+pcData);
31                 }
32             }
33         } catch (Exception e) {
34             // TODO: handle exception
35             e.printStackTrace();
36             Thread.currentThread().interrupt();
37         }
38         
39     }
40     
41     public void stop(){
42         isRunnig = false;
43     }
44     
45 
46 }

  消費者代碼:

 1 public class Consumer implements Runnable {
 2     
 3     private BlockingQueue<PCData> queue;//緩沖隊列
 4     
 5     private static final int SLEEPTIME=1000;
 6     
 7     
 8     
 9     public Consumer(BlockingQueue<PCData> queue) {
10         this.queue = queue;
11     }
12 
13 
14 
15     @Override
16     public void run() {
17         System.out.println("start constomer id:"+Thread.currentThread().getId());
18         Random r = new Random();
19         try {
20             while(true){
21                 PCData data = queue.take();
22                 int re = data.getIntData()*data.getIntData();
23                 System.out.println(MessageFormat.format("{0}*{1}={2}", data.getIntData(),data.getIntData(),re));
24                 
25                 Thread.sleep(r.nextInt(SLEEPTIME));
26             }
27         } catch (Exception e) {
28             // TODO: handle exception
29             e.printStackTrace();
30             Thread.currentThread().interrupt();
31         }
32         
33     }
34 
35 }

  消費者、生產者之間的共享數據模型:

 1 public final class PCData {
 2     private final int intData;
 3 
 4     public PCData(int intData) {
 5         this.intData = intData;
 6     }
 7     
 8     public PCData(String strData){
 9         this.intData = Integer.valueOf(strData);
10     }
11     
12     public synchronized int getIntData() {
13         return intData;
14     }
15 
16     @Override
17     public String toString() {
18         return "data:" + intData;
19     }
20     
21     
22     
23 }

  在客戶端中,啟動3個消費者和3個生產者,並讓他們協作運行:

 1 public class Client {
 2     public static void main(String[] args) throws InterruptedException {
 3         BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10); 
 4         
 5         Producer  p1 = new Producer(queue);
 6         Producer  p2 = new Producer(queue);
 7         Producer  p3 = new Producer(queue);
 8         
 9         Consumer  c1 = new Consumer(queue);
10         Consumer  c2 = new Consumer(queue);
11         Consumer  c3 = new Consumer(queue);
12         
13         ExecutorService exe = Executors.newCachedThreadPool();
14         exe.execute(p1);
15         exe.execute(p2);
16         exe.execute(p3);
17         
18         exe.execute(c1);
19         exe.execute(c2);
20         exe.execute(c3);
21         
22         Thread.sleep(10*1000);
23         
24         p1.stop();
25         p2.stop();
26         p3.stop();
27         
28         Thread.sleep(3000);
29         exe.shutdown();
30     }
31 }    

  優點:生產者-消費者模式能很好的對生產者線程和消費者線程進行解耦,優化系統的整體結構。同時,由於緩沖區的存在,運行生產者和消費者在性能上存在一定的差異,從而一定程度上緩解了性能瓶頸對系統性能的影響。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved