程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> 並發遍歷二叉樹

並發遍歷二叉樹

編輯:C++入門知識

單線程遍歷二叉樹是數據結構書中必講的算法, 但多線程訪問呢?


我最近寫了一個多線程遍歷二叉樹的算法, 當然既然是多線程,就不保證一定的順序, 只要節點能訪問到就算數.


算法的基本思路
1) 使用 java.util.concurrent.ThreadPoolExecutor 構造線程池
2) 當線程池有空余線程把樹的右子樹拆分給另外一個線程處理, 否則自己處理右子數

要考慮的情況
1) 如果二叉樹非常不平衡(左子樹很深,右子樹很淺), 會出現一個線程忙碌,而另外一個線程早結束的情況
       工作線程過一定時間間隔就會檢查線程池是否有空閒線程, 有則分離當前節點的右子樹並委托給ThreadPool
2)判斷所有工作都結束

      啟動工作任務時用 HashSet 記錄, 工作任務結束時清楚HashSet 記錄, 當HashSet size 為0 時,所有工作結束.

注意事項
1) 即使任務結束,ThreadPool 還是在運行的( waiting task ), 要調用 shutdown 通知 ThreadPool 結束
2) 主線程結束, 程序並不會結束, 想讓主線程等待 ThreadPool 結束, 要調用ThreadPool 的 awaitTermination 方法
程序
[cpp]
package com.pnp.javathreadpool; 
 
import java.util.HashSet; 
import java.util.Random; 
import java.util.Stack; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 
 
public class ConcurTravBTree { 
    class BTreeNode<T extends Comparable> { 
 
        public BTreeNode<T> left; 
        public BTreeNode<T> right; 
        public T value; 
 
        public BTreeNode(BTreeNode<T> left, BTreeNode<T> right, T value) { 
            this.left = left; 
            this.right = right; 
            this.value = value; 
        } 
 
        public BTreeNode(T value) { 
            this(null, null, value); 
        } 
    } 
 
    public class BTree<T extends Comparable> { 
        String dumpStr; 
 
        public BTreeNode<T> root; 
 
        public BTree() { 
        } 
 
        public BTree(BTreeNode<T> root) { 
            this.root = root; 
        } 
 
        public void orderAdd(T v) { 
            if (root == null) 
                root = new BTreeNode(v); 
            else 
                orderAdd(root, v); 
        } 
 
        public void orderAdd(BTreeNode<T> node, T v) { 
            if (node.value.compareTo(v) > 0) { 
                if (node.left == null) 
                    node.left = new BTreeNode(v); 
                else 
                    orderAdd(node.left, v); 
            } else { 
                if (node.right == null) 
                    node.right = new BTreeNode(v); 
                else 
                    orderAdd(node.right, v); 
            } 
        } 
         
        public void preOrderTraverse() { 
            preOrderTraverse(root); 
        } 
         
        public void preOrderTraverse( BTreeNode<T> node) { 
            if (node != null) { 
                //System.out.println(node.value);  
                try { 
                    Thread.sleep(100); 
                } catch (InterruptedException e) { 
                    e.printStackTrace(); 
                } 
                preOrderTraverse(node.left); 
                preOrderTraverse(node.right); 
            } 
        }        
    } 
 
    static int global_id = 0; 
    public static final int POOLSIZE = 4; 
     
    ThreadPoolExecutor threadPool; 
    BTree<Integer> tree; 
    HashSet<Integer> tasks = new HashSet<Integer>();  
     
 
    public void init() { 
        tree = new BTree<Integer>(); 
 
        /*
        int[] data = new int[] { 1, 3, 5 };
        for (int i = 0; i < data.length; i++)
            tree.orderAdd(data[i]);
        */ 
         
        Random r = new Random(); 
        for( int i=0; i< 500; i++) { 
            tree.orderAdd( r.nextInt(500)); 
        } 
 
        threadPool = new ThreadPoolExecutor(POOLSIZE, // corePoolSize  
                POOLSIZE + 2, // maximumPoolSize  
                60, // keepAliveTime  
                TimeUnit.SECONDS, // keepAliveTime unit  
                new ArrayBlockingQueue<Runnable>(POOLSIZE + 2), // workQueue  
                new ThreadPoolExecutor.CallerRunsPolicy()); // handler  
    } 
 
    public void start() { 
        threadPool.execute(new Visitor(tree.root)); 
    } 
 
    class Visitor implements Runnable { 
        private int id; 
        private BTreeNode<Integer> node; 
 
        public Visitor(BTreeNode<Integer> node) { 
            this.id = ++global_id; 
            this.node = node; 
        } 
 
        public boolean handleNode(BTreeNode<Integer> node) { 
            if (node == null) 
                return true; 
 
            if (ConcurTravBTree.this.threadPool.getActiveCount() <= ConcurTravBTree.this.threadPool 
                    .getCorePoolSize()) { 
                System.out.println("--- ActiveCount " 
                        + ConcurTravBTree.this.threadPool.getActiveCount()); 
                ConcurTravBTree.this.threadPool.execute(new Visitor(node)); 
                return true; 
            } 
            return false; 
        } 
 
        public void run() { 
            System.out.println("[ job" + id + " ] ---start  "); 
            tasks.add(id); 
            if (handleNode(node.right)) // assign Node to a task  
                node.right = null; //  
 
            // pre-order traverse  
            Stack<BTreeNode<Integer>> stack = new Stack<BTreeNode<Integer>>(); 
            while (node != null || !stack.isEmpty()) { 
                long start = System.currentTimeMillis(); 
                while (node != null) { 
                    //System.out.println("[ job" + id + " ]" + node.value);  
                    try { 
                        Thread.sleep(100); 
                    } catch (InterruptedException e) { 
                        e.printStackTrace(); 
                    } 
 
                    stack.push(node); 
                    node = node.left; 
                } 
                node = stack.pop(); 
                node = node.right; 
 
                long cur = System.currentTimeMillis(); 
                if (cur - start > 1000) { 
                    if (handleNode(node)) { 
                        node = null; 
                    } 
                } 
            } 
            System.out.println("[ job" + id + " ] ---end"); 
            tasks.remove(id); 
            if ( tasks.size() == 0) { 
                threadPool.shutdown(); 
            } 
        } 
    } 
 
    public static void main(String[] args) { 
        ConcurTravBTree tt = new ConcurTravBTree(); 
        System.out.println("init tree..."); 
        tt.init(); 
 
        System.out.println("start preOrderTraverse traverse..."); 
        long start = System.currentTimeMillis(); 
        tt.tree.preOrderTraverse(); 
        long end = System.currentTimeMillis(); 
        System.out.println("---- preOrderTraverse cost " + (end - start)); 
     
        System.out.println("start concurrent traverse..."); 
        long cstart = System.currentTimeMillis(); 
        tt.start(); 
        try { 
            tt.threadPool.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } 
        long cend = System.currentTimeMillis(); 
        System.out.println("---- current traverse cost " + (cend - cstart)); 
    } 

package com.pnp.javathreadpool;

import java.util.HashSet;
import java.util.Random;
import java.util.Stack;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConcurTravBTree {
 class BTreeNode<T extends Comparable> {

  public BTreeNode<T> left;
  public BTreeNode<T> right;
  public T value;

  public BTreeNode(BTreeNode<T> left, BTreeNode<T> right, T value) {
   this.left = left;
   this.right = right;
   this.value = value;
  }

  public BTreeNode(T value) {
   this(null, null, value);
  }
 }

 public class BTree<T extends Comparable> {
  String dumpStr;

  public BTreeNode<T> root;

  public BTree() {
  }

  public BTree(BTreeNode<T> root) {
   this.root = root;
  }

  public void orderAdd(T v) {
   if (root == null)
    root = new BTreeNode(v);
   else
    orderAdd(root, v);
  }

  public void orderAdd(BTreeNode<T> node, T v) {
   if (node.value.compareTo(v) > 0) {
    if (node.left == null)
     node.left = new BTreeNode(v);
    else
     orderAdd(node.left, v);
   } else {
    if (node.right == null)
     node.right = new BTreeNode(v);
    else
     orderAdd(node.right, v);
   }
  }
  
  public void preOrderTraverse() {
   preOrderTraverse(root);
  }
  
  public void preOrderTraverse( BTreeNode<T> node) {
   if (node != null) {
    //System.out.println(node.value);
    try {
     Thread.sleep(100);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
    preOrderTraverse(node.left);
    preOrderTraverse(node.right);
   }
  }  
 }

 static int global_id = 0;
 public static final int POOLSIZE = 4;
 
 ThreadPoolExecutor threadPool;
 BTree<Integer> tree;
 HashSet<Integer> tasks = new HashSet<Integer>();
 

 public void init() {
  tree = new BTree<Integer>();

  /*
  int[] data = new int[] { 1, 3, 5 };
  for (int i = 0; i < data.length; i++)
   tree.orderAdd(data[i]);
  */
  
  Random r = new Random();
  for( int i=0; i< 500; i++) {
   tree.orderAdd( r.nextInt(500));
  }

  threadPool = new ThreadPoolExecutor(POOLSIZE, // corePoolSize
    POOLSIZE + 2, // maximumPoolSize
    60, // keepAliveTime
    TimeUnit.SECONDS, // keepAliveTime unit
    new ArrayBlockingQueue<Runnable>(POOLSIZE + 2), // workQueue
    new ThreadPoolExecutor.CallerRunsPolicy()); // handler
 }

 public void start() {
  threadPool.execute(new Visitor(tree.root));
 }

 class Visitor implements Runnable {
  private int id;
  private BTreeNode<Integer> node;

  public Visitor(BTreeNode<Integer> node) {
   this.id = ++global_id;
   this.node = node;
  }

  public boolean handleNode(BTreeNode<Integer> node) {
   if (node == null)
    return true;

   if (ConcurTravBTree.this.threadPool.getActiveCount() <= ConcurTravBTree.this.threadPool
     .getCorePoolSize()) {
    System.out.println("--- ActiveCount "
      + ConcurTravBTree.this.threadPool.getActiveCount());
    ConcurTravBTree.this.threadPool.execute(new Visitor(node));
    return true;
   }
   return false;
  }

  public void run() {
   System.out.println("[ job" + id + " ] ---start  ");
   tasks.add(id);
   if (handleNode(node.right)) // assign Node to a task
    node.right = null; //

   // pre-order traverse
   Stack<BTreeNode<Integer>> stack = new Stack<BTreeNode<Integer>>();
   while (node != null || !stack.isEmpty()) {
    long start = System.currentTimeMillis();
    while (node != null) {
     //System.out.println("[ job" + id + " ]" + node.value);
     try {
      Thread.sleep(100);
     } catch (InterruptedException e) {
      e.printStackTrace();
     }

     stack.push(node);
     node = node.left;
    }
    node = stack.pop();
    node = node.right;

    long cur = System.currentTimeMillis();
    if (cur - start > 1000) {
     if (handleNode(node)) {
      node = null;
     }
    }
   }
   System.out.println("[ job" + id + " ] ---end");
   tasks.remove(id);
   if ( tasks.size() == 0) {
    threadPool.shutdown();
   }
  }
 }

 public static void main(String[] args) {
  ConcurTravBTree tt = new ConcurTravBTree();
  System.out.println("init tree...");
  tt.init();

  System.out.println("start preOrderTraverse traverse...");
  long start = System.currentTimeMillis();
  tt.tree.preOrderTraverse();
  long end = System.currentTimeMillis();
  System.out.println("---- preOrderTraverse cost " + (end - start));
 
  System.out.println("start concurrent traverse...");
  long cstart = System.currentTimeMillis();
  tt.start();
  try {
   tt.threadPool.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  long cend = System.currentTimeMillis();
  System.out.println("---- current traverse cost " + (cend - cstart));
 }
}

在程序中,我調用 Thread.sleep() 模擬處理時間, 否則啟動線程池, 線程調度的開銷將大於節點訪問時間, 體現不出多線程的優勢.

在i3上跑得的輸出: 似乎效率提高了1倍
[cpp]
init tree... 
start preOrderTraverse traverse... 
---- preOrderTraverse cost 50023 
start concurrent traverse... 
[ job1 ] ---start   
--- ActiveCount 1 
[ job2 ] ---start   
--- ActiveCount 2 
[ job3 ] ---start   
--- ActiveCount 3 
[ job4 ] ---start   
--- ActiveCount 4 
[ job3 ] ---end 
[ job5 ] ---start   
--- ActiveCount 4 
[ job5 ] ---end 
[ job6 ] ---start   
--- ActiveCount 4 
[ job4 ] ---end 
[ job7 ] ---start   
[ job6 ] ---end 
[ job7 ] ---end 
[ job1 ] ---end 
[ job2 ] ---end 
---- current traverse cost 27116 

init tree...
start preOrderTraverse traverse...
---- preOrderTraverse cost 50023
start concurrent traverse...
[ job1 ] ---start 
--- ActiveCount 1
[ job2 ] ---start 
--- ActiveCount 2
[ job3 ] ---start 
--- ActiveCount 3
[ job4 ] ---start 
--- ActiveCount 4
[ job3 ] ---end
[ job5 ] ---start 
--- ActiveCount 4
[ job5 ] ---end
[ job6 ] ---start 
--- ActiveCount 4
[ job4 ] ---end
[ job7 ] ---start 
[ job6 ] ---end
[ job7 ] ---end
[ job1 ] ---end
[ job2 ] ---end
---- current traverse cost 27116

有時間把此程序放到4核的機器上跑, 看究竟效率能提高多少?

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved