程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> 關於JAVA >> 任務列表分派給多個線程的策略及方法

任務列表分派給多個線程的策略及方法

編輯:關於JAVA

多線程下載由來已久,如 FlashGet、NetAnts 等工具,它們都是依懶於 HTTP 協議的支持(Range 字段指定請求內容范圍),首先能讀取出請求內容 (即欲下載的文件) 的大小,劃分出若干區塊,把區塊分段分發給每個線程去下載,線程從本段起始處下載數據及至段尾,多個線程下載的內容最終會寫入到同一個文件中。

只研究有用的,工作中的需求:要把多個任務分派給多個線程去執行,這其中就會有一個任務列表指派到線程的策略思考:已知:1. 一個待執行的任務列表,2. 指定要啟動的線程數;問題是:每個線程實際要執行哪些任務。

策略是:任務列表連續按線程數分段,先保證每線程平均能分配到的任務數,余下的任務從前至後依次附加到線程中--只是數量上,實際每個線程執行的任務都還是連續的。如果出現那種僧多(線程) 粥(任務) 少的情況,實際啟動的線程數就等於任務數,一挑一。這裡只實現了每個線程各掃自家門前雪,動作快的完成後眼見別的線程再累都是愛莫能助。

實現及演示代碼如下:由三個類實現,寫在了一個 java 文件中:TaskDistributor 為任務分發器,Task 為待執行的任務,WorkThread 為自定的工作線程。代碼中運用了命令模式,如若能配以監聽器,用上觀察者模式來控制 UI 顯示就更絕妙不過了,就能實現像下載中的區塊著色跳躍的動感了,在此定義下一步的著眼點了。

代碼中有較為詳細的注釋,看這些注釋和執行結果就很容易理解的。main() 是測試方法

package com.unmi.common;
import java.util.ArrayList;
import java.util.List;
/** 
* 指派任務列表給線程的分發器 
* @author Unmi
* QQ: 1125535 Email: [email protected] 
* MSN: [email protected] 2008-03-25 
*/
public class TaskDistributor {
  /** 
   * 測試方法 
   * @param args 
   */
  public static void main(String[] args) {
    //初始化要執行的任務列表
    List<Task> taskList = new ArrayList<Task>();
    for (int i = 0; i < 108; i++) {
      taskList.add(new Task(i));
    } 
    //設定要啟動的工作線程數為 5 個 
    int threadCount = 5;
    List<Task>[] taskListPerThread = distributeTasks(taskList, threadCount);
    System.out.println("實際要啟動的工作線程數:"+taskListPerThread.length);
    for (int i = 0; i < taskListPerThread.length; i++) {
      Thread workThread = new WorkThread(taskListPerThread[i],i);
      workThread.start();
    } 
  } 
  /** 
   * 把 List 中的任務分配給每個線程,先平均分配,剩於的依次附加給前面的線程
   * 返回的數組有多少個元素 (List<Task>) 就表明將啟動多少個工作線程
   * @param taskList 待分派的任務列表
   * @param threadCount  線程數
   * @return 列表的數組,每個數組中存有該線程要執行的任務列表
   */
  public static List<Task>[] distributeTasks(List<Task> taskList, int threadCount) {
    // 每個線程至少要執行的任務數,假如不為零則表示每個線程都會分配到任務
    int minTaskCount = taskList.size() / threadCount;
    // 平均分配後還剩下的任務數,不為零則還有任務依個附加到前面的線程中 
    int remainTaskCount = taskList.size() % threadCount;
    // 實際要啟動的線程數,如果工作線程比任務還多 
    // 自然只需要啟動與任務相同個數的工作線程,一對一的執行 
    // 畢竟不打算實現了線程池,所以用不著預先初始化好休眠的線程
    int actualThreadCount = minTaskCount > 0 ? threadCount : remainTaskCount;
    // 要啟動的線程數組,以及每個線程要執行的任務列表
    List<Task>[] taskListPerThread = new List[actualThreadCount];
    int taskIndex = 0;
    //平均分配後多余任務,每附加給一個線程後的剩余數,重新聲明與 remainTaskCount 
    //相同的變量,不然會在執行中改變 remainTaskCount 原有值,產生麻煩 
    int remainIndces = remainTaskCount;
    for (int i = 0; i < taskListPerThread.length; i++) {
      taskListPerThread[i] = new ArrayList<Task>();
      // 如果大於零,線程要分配到基本的任務
      if (minTaskCount > 0) {
        for (int j = taskIndex; j < minTaskCount + taskIndex; j++) {
          taskListPerThread[i].add(taskList.get(j));
        } 
        taskIndex += minTaskCount;
      } 
      // 假如還有剩下的,則補一個到這個線程中 
      if (remainIndces > 0) {
        taskListPerThread[i].add(taskList.get(taskIndex++));
        remainIndces--;
      } 
    } 
    // 打印任務的分配情況 
    for (int i = 0; i < taskListPerThread.length; i++) {
      System.out.println("線程 " + i + " 的任務數:" + taskListPerThread[i].size() + " 區間[" 
          + taskListPerThread[i].get(0).getTaskId() + "," 
          + taskListPerThread[i].get(taskListPerThread[i].size() - 1).getTaskId() + "]");
    } 
    return taskListPerThread;
  } 
} 
/** 
* 要執行的任務,可在執行時改變它的某個狀態或調用它的某個操作 
* 例如任務有三個狀態,就緒,運行,完成,默認為就緒態 
* 要進一步完善,可為 Task 加上狀態變遷的監聽器,因之決定UI的顯示 
*/
class Task {
  public static final int READY = 0;
  public static final int RUNNING = 1;
  public static final int FINISHED = 2;
  private int status;
  //聲明一個任務的自有業務含義的變量,用於標識任務
  private int taskId;
  //任務的初始化方法 
  public Task(int taskId){
    this.status = READY;
    this.taskId = taskId;
  } 
  /** 
   * 執行任務
   */
  public void execute() {
    // 設置狀態為運行中 
    setStatus(Task.RUNNING);
    System.out.println("當前線程 ID 是:" + Thread.currentThread().getName() 
        +" | 任務 ID 是:"+this.taskId);
    // 附加一個延時 
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } 
    // 執行完成,改狀態為完成 
    setStatus(FINISHED);
  } 
  public void setStatus(int status) {
    this.status = status;
  } 
  public int getTaskId() {
    return taskId;
  } 
} 
/** 
* 自定義的工作線程,持有分派給它執行的任務列表
*/
class WorkThread extends Thread {
  //本線程待執行的任務列表,你也可以指為任務索引的起始值 
  private List<Task> taskList = null;
  private int threadId;
  /** 
   * 構造工作線程,為其指派任務列表,及命名線程 ID
   * @param taskList 欲執行的任務列表
   * @param threadId 線程 ID
   */
  public WorkThread(List<Task> taskList,int threadId) {
    this.taskList = taskList;
    this.threadId = threadId;
  } 
  /** 
   * 執行被指派的所有任務
   */
  public void run() {
    for (Task task : taskList) {
      task.execute();
    } 
  } 
} 
package com.unmi.common;
import java.util.ArrayList;
import java.util.List;
/**
* 指派任務列表給線程的分發器
* @author Unmi
* QQ: 1125535 Email: [email protected]
* MSN: [email protected] 2008-03-25
*/
public class TaskDistributor {
/**
 * 測試方法
 * @param args
 */
public static void main(String[] args) {
 //初始化要執行的任務列表
 List<Task> taskList = new ArrayList<Task>();
 for (int i = 0; i < 108; i++) {
 taskList.add(new Task(i));
 }
 //設定要啟動的工作線程數為 5 個
 int threadCount = 5;
 List<Task>[] taskListPerThread = distributeTasks(taskList, threadCount);
 System.out.println("實際要啟動的工作線程數:"+taskListPerThread.length);
 for (int i = 0; i < taskListPerThread.length; i++) {
 Thread workThread = new WorkThread(taskListPerThread[i],i);
 workThread.start();
 }
}
/**
 * 把 List 中的任務分配給每個線程,先平均分配,剩於的依次附加給前面的線程
 * 返回的數組有多少個元素 (List<Task>) 就表明將啟動多少個工作線程
 * @param taskList 待分派的任務列表
 * @param threadCount  線程數
 * @return 列表的數組,每個數組中存有該線程要執行的任務列表
 */
public static List<Task>[] distributeTasks(List<Task> taskList, int threadCount) {
 // 每個線程至少要執行的任務數,假如不為零則表示每個線程都會分配到任務
 int minTaskCount = taskList.size() / threadCount;
 // 平均分配後還剩下的任務數,不為零則還有任務依個附加到前面的線程中
 int remainTaskCount = taskList.size() % threadCount;
 // 實際要啟動的線程數,如果工作線程比任務還多
 // 自然只需要啟動與任務相同個數的工作線程,一對一的執行
 // 畢竟不打算實現了線程池,所以用不著預先初始化好休眠的線程
 int actualThreadCount = minTaskCount > 0 ? threadCount : remainTaskCount;
 // 要啟動的線程數組,以及每個線程要執行的任務列表
 List<Task>[] taskListPerThread = new List[actualThreadCount];
 int taskIndex = 0;
 //平均分配後多余任務,每附加給一個線程後的剩余數,重新聲明與 remainTaskCount
 //相同的變量,不然會在執行中改變 remainTaskCount 原有值,產生麻煩
 int remainIndces = remainTaskCount;
 for (int i = 0; i < taskListPerThread.length; i++) {
 taskListPerThread[i] = new ArrayList<Task>();
 // 如果大於零,線程要分配到基本的任務
 if (minTaskCount > 0) {
  for (int j = taskIndex; j < minTaskCount + taskIndex; j++) {
  taskListPerThread[i].add(taskList.get(j));
  }
  taskIndex += minTaskCount;
 }
 // 假如還有剩下的,則補一個到這個線程中
 if (remainIndces > 0) {
  taskListPerThread[i].add(taskList.get(taskIndex++));
  remainIndces--;
 }
 }
 // 打印任務的分配情況
 for (int i = 0; i < taskListPerThread.length; i++) {
 System.out.println("線程 " + i + " 的任務數:" + taskListPerThread[i].size() + " 區間["
  + taskListPerThread[i].get(0).getTaskId() + ","
  + taskListPerThread[i].get(taskListPerThread[i].size() - 1).getTaskId() + "]");
 }
 return taskListPerThread;
}
}
/**
* 要執行的任務,可在執行時改變它的某個狀態或調用它的某個操作
* 例如任務有三個狀態,就緒,運行,完成,默認為就緒態
* 要進一步完善,可為 Task 加上狀態變遷的監聽器,因之決定UI的顯示
*/
class Task {
public static final int READY = 0;
public static final int RUNNING = 1;
public static final int FINISHED = 2;
private int status;
//聲明一個任務的自有業務含義的變量,用於標識任務
private int taskId;
//任務的初始化方法
public Task(int taskId){
 this.status = READY;
 this.taskId = taskId;
}
/**
 * 執行任務
 */
public void execute() {
 // 設置狀態為運行中
 setStatus(Task.RUNNING);
 System.out.println("當前線程 ID 是:" + Thread.currentThread().getName()
  +" | 任務 ID 是:"+this.taskId);
 // 附加一個延時
 try {
 Thread.sleep(1000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 // 執行完成,改狀態為完成
 setStatus(FINISHED);
}
public void setStatus(int status) {
 this.status = status;
}
public int getTaskId() {
 return taskId;
}
}
/**
* 自定義的工作線程,持有分派給它執行的任務列表
*/
class WorkThread extends Thread {
//本線程待執行的任務列表,你也可以指為任務索引的起始值
private List<Task> taskList = null;
private int threadId;
/**
 * 構造工作線程,為其指派任務列表,及命名線程 ID
 * @param taskList 欲執行的任務列表
 * @param threadId 線程 ID
 */
public WorkThread(List<Task> taskList,int threadId) {
 this.taskList = taskList;
 this.threadId = threadId;
}
/**
 * 執行被指派的所有任務
 */
public void run() {
 for (Task task : taskList) {
 task.execute();
 }
}
}

執行結果如下,注意觀察每個線程分配到的任務數量及區間。直到所有的線程完成了所分配到的任務後程序結束:

線程 0 的任務數:22 區間[0,21]

線程 1 的任務數:22 區間[22,43]

線程 2 的任務數:22 區間[44,65]

線程 3 的任務數:21 區間[66,86]

線程 4 的任務數:21 區間[87,107]

實際要啟動的工作線程數:5

當前線程 ID 是:Thread-0 | 任務 ID 是:0

當前線程 ID 是:Thread-1 | 任務 ID 是:22

當前線程 ID 是:Thread-2 | 任務 ID 是:44

當前線程 ID 是:Thread-3 | 任務 ID 是:66

當前線程 ID 是:Thread-4 | 任務 ID 是:87

當前線程 ID 是:Thread-0 | 任務 ID 是:1

當前線程 ID 是:Thread-1 | 任務 ID 是:23

當前線程 ID 是:Thread-2 | 任務 ID 是:45

...........................................................................

上面坦白來只算是基本功夫,貼出來還真見笑了。還有更為復雜的功能:

像多線程的下載工具的確更充分利用了網絡資源,而且像 FlashGet、NetAnts 都實現了:假如某個線程下載完了欲先所分配段的內容之後,會幫其他線程下載未完成數據,直到任務完成;或某一下載線程的未完成段區間已經很小了,用不著別人來幫忙時,這就涉及到任務的進一步分配。再如,以上兩個工具都能動態增加、減小或中止線程,越說越復雜了,它們原本比這復雜多了,這些實現可能定義各種隊列來實現,如未完成任務隊列、下載中任務隊列和已完成隊列。難以細究了。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved