程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 數據庫知識 >> DB2數據庫 >> DB2教程 >> MapReduce的詳細過程

MapReduce的詳細過程

編輯:DB2教程

MapReduce的詳細過程


寫在前面的話

MapReduce作為hadoop的編程框架,是工程師最常接觸的部分,也是除去了網絡環境和集群配 置之外對整個Job執行效率影響很大的部分,所以很有必要深入了解整個過程。本文寫作的目的在於使得讀者對整個MapReduce過程有比較細致的了解,當自己需要定制MapReduce行為時,知道該重寫 哪些類和方法。在寫作時,我貼了部分認為重要的源碼和接口,並跟著自己的理解,對於某些內容,結 合了自己在工作中遇到的問題,給出了實踐參考。

 

總體概覽

\\

比較High Level的來看,整個MapReduce過程分為三步:· Map:讀取輸入,做初步的處理,輸出形式的中間結果· Shuffle:按照key對中間結果進行排序聚合,輸出給reduce線程·Reduce:對相同key的輸入進行最終的處理,並將結果寫入到文件中。

 

用經典的WordCount例子來簡單說明一下上面的過程。假設我們現在要做的是統計一個文本中單詞的個數,我們將文件切分成幾個部分,然後創建多個Map線程,處理這些輸入,輸出的中間結果是的形式,shuffle過程將同樣Key的元組,也就是word相同的,分配到同樣的reduce線程中,reduce線程 匯總同一個word的元組個數,最終輸出。

我這麼一說,你是不是感覺已經理解MapReduce了?差不多吧,但是理解與深入理解是1與10000 的差距,下面讓我提幾個細節方面的問題:

1. 原始數據是怎麼切分的,又是以什麼形式傳遞給Map線程的?

2. 有多少個map線程,怎樣控制他們?

3. 輸出寫到磁盤的過程是怎樣的?

4. 如果要保證同一個中間結果key交給同一個reduce,要不要排序?什麼時候排序?

5. 滿足什麼條件的中間結果會調用一次reduce方法,滿足什麼條件的中間結果會交給一個reduce 線程?

6. 有多少reduce線程,怎樣控制他們? 7. 有多少輸出文件? ...

是不是有很多問題都看不懂啦?沒關系,下面我就詳細講解這個過程。

 

Yarn的資源分配與任務調度

 

 

之所以要講解這一部分,是因為MapReduce過程牽扯到了框架本身的東西,我們得知道計算線程 是怎麼來的,怎麼沒的。

Hadoop由1.0進化成2.0,變更還是很大的,1.0裡整個job的資源分配,任務調度和監控管理都是 由一個JobTracker來做的,擴展性很差,2.0對整個過程重新設計了一下,我們重點來看2.0的內容。

一個Job要在集群中運行起來,需要幾個條件,首先,運算資源,可能包括內存,cpu等,其次,得 有一個任務的調度算法,安排運行的先後順序,最後,得知道工作進行的順不順利,並把情況及時的反饋給上級,以便及時的做出響應。下面分別說明。

下面我們首先看看1.0時代hadoop集群是怎麼管理資源和調度任務的。

hadoop1.0的資源管理


 

hadoop1.0的資源管理

\

 

[本圖來自百度百科的“MapReduce”詞條]

對於一個集群來說,資源有很多維度,比如內存,CPU等,1.0時代將節點上的資源切成等份,使用 slot的概念來抽象,根據對資源占用情況的不同,又可細分為Map slot和reduceslot。slot代表一種運 行的能力,像許可證一樣,MapTask只有獲得了Map slot後才可以執行,ReduceTask同理。對於一個 節點,有多少slot是事先配置好的。

JobTracker和TaskTracker共同管理這些slot,其中JobTracker運行在NameNode上,負責資源 的分配和任務的調度,TaskTracker運行在Data Node上,負責所在節點上資源的監控和task的管理。 具體一點,當用戶的任務提交給jobtracker之後,jobtracker根據任務的情況決定要啟動多少MapTask 和ReduceTask,然後根據TaskTracker反饋的slot使用情況(以及其他的因素,比如根據數據的存儲情 況),決定給哪幾個TaskTracker分配多少個MapTask和多少個ReduceTask。接收到任務後,TaskTracker 負責啟動JVM來運行這些Task,並把運行情況實時反饋給JobTracker。

注意,TaskTracker只有監控權,沒有調度權,也就是它只能把運行情況反饋給JobTracker,在他這裡有多少個Task,當task失敗時,重啟task之類的管理權限,都在JobTracker那裡。JobTracker的 任務管理是Task級別的,也即JobTracker負責了集群資源的管理,job的調度,以及一個Job的每個Task 的調度與運行。

打個比方,JobTracker是一個極度專權的君王,TaskTracer是大臣,君王握有所有的權利,大臣們 被架空,君王說事情怎麼做,底下的就得怎麼做,大臣只管執行,並把進行情況告訴君王,如果事情搞砸了,大臣也不得擅作主張的重新做一遍,得上去請示君王,君王要麼再給他一次機會,要麼直接拖出 去砍了,換個人完成。

極度專權早晚累死,而且一個人的力量終歸是有限的,這也是1.0時代很大的問題。所以新時代采取了全新的設計。

Yarn的資源控制與任務調度

\\

Yarn用Container的概念來抽象資源,Container描述了自己的位置,自己擁有的CPU,內存等資 源的數量。Container跟任務完全獨立了,是一個完全硬件的抽象。比1.0裡使用計算時槽更加細粒度, 也更易於理解。

資源控制由ResourceManage(RM)和Node Manager(NM)兩個角色參與,其中Node Manager 管理所在node上的container,並把資源的使用情況匯報給ResourceManager,Resource Manager 通過Node Manager返回的信息,掌握著整個集群的資源情況。為了便於管理,Hadoop集群的管理 員可以建立多個隊列,每個隊列配置一定量的資源,用戶可以向一個或多個隊列提交Job。作為集群的 用戶,可以到50030端口查看集群的隊列的分配和負載情況。

\\

 

當一個用戶提交了一個job給ResourceManager, Resource Manager 並不是直接衡量它所需的 資源並調度,而是下放給一個Application Master(AM)的角色,這個AM全權負責用戶提交的這個Job,它會根據Job的情況向RM申請資源,RM告訴AM它可以使用的Container的信息,AM再將自己 Job的task放到這些Container中運行並監控。如果有失敗的task,AM可以根據情況選擇重啟task。

有幾個關鍵的點我列出來,以確保理解正確:

1. 集群的資源監控由RM與NM合作完成,任務調度與監控由RM與AM完成,結構更加清晰。

2. RM對任務的管理是Job級別的,即它只負責為整個Job分配資源,並交給AM去管理。RM得到了大大的解放。

3. 與TaskTracker相比,AM擁有更多的權利,它可以申請資源並全權負責task級別的運行情況。

4. 與TaskTracker相比,AM可以使用其他機器上的計算資源(即Container)。這些資源也不再有Map和Reduce的區別。

繼續上面的例子。我用壯丁來比喻Container,壯丁有很多屬性,比如家鄉(location),力氣( 內存),財產(CPU),君王(RM)通過錦衣衛(NM)來掌握各個地方(Node)壯丁的使用情況。 當有百姓提出一個要求(提交一個Job),比如興修水利,君王不再事無巨細的過問這件事情,而是叫 一個合適的大臣(AM)過來,比如此例中的水利大臣,問他需要多少人,多少錢,然後衡量一下國力, 播一些壯丁給他用。水利大臣可以使用全國范圍內的壯丁,對他們有絕對的領導權,讓他們干嘛就得干 嘛。事情要麼圓滿完成,水利大臣給君王報喜,要麼發現難度太大啊,嘗試了好多辦法都失敗了(job嘗 試次數到達一定數量),只好回去請罪。

君王遵循政務公開的原則,所有job的運行情況都可以通過50030端口查看:


\

\

好了,講了這麼一大通,我想關於Job怎麼跑起來,task怎麼來怎麼沒,應該有個概念了。用戶將自 己的代碼上傳到集群的一個client Node上,運行代碼,代碼裡會對自己的job進行配置,比如輸入在 哪,有哪些依賴的jar包,輸出寫到哪,以什麼格式寫,然後提交給ResourceManager,ResourceManager 會在一個Node上啟動ApplicationMaster負責用戶的這個Job,AM申請資源,得到RM的批准和分配 後,在得到的Container裡啟動MapTask和ReduceTask,這兩種task會調用我們編寫的Mapper和Reducer等代碼,完成任務。任務的運行情況可以通過web端口查看。

MapReduce計算框架最重要的兩個類是Mapper和Reducer,用戶可以繼承這兩個類完成自己的 業務邏輯,下面以這兩個類的輸入輸出為主線詳細講解整個過程。例子總是最容易被人理解的,所以講解過程有看不懂的,可以回來查看這個簡單的job。用戶想使用MapReduce的過程統計一組文件中每 個單詞出現的次數,與經典的WordCount不同的是,要求大寫字母開頭的單詞寫到一個文件裡面,小寫 的寫到另一個文件中。

Mapper的輸入

\

 

所謂源碼之前,了無秘密,先上mapper的源碼。

Mapper的源碼

 

 

public class Mapper { 
/*** The Context passed on to the {@link Mapper} implementations. */ 
public abstract class Contextimplements MapContext { 
} 
/*** Called once at the beginning of the task. */ 
protected void setup(Context context) throws IOException, InterruptedException { 
// NOTHING 
} 
/*** Called once for each key/value pair in the input split. Most applications * should override this, but the default is the identity function.*/ 
@SuppressWarnings("unchecked")protected void map(KEYIN key, VALUEIN value, 
Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); 
} 
/*** Called once at the end of the task. */ 
protected void cleanup(Context context) throws IOException, InterruptedException { 
// NOTHING 
}  
/*** Expert users can override this method for more complete control over the * execution of the Mapper.* @param context* @throws IOException*/ 
public void run(Context context) throws IOException, InterruptedException { 
setup(context);
try { 
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context); 
}
} finally { 
cleanup(context); } 
} 
	} 

可以簡單的說,Mapper的輸入來自於Context。我們先看一下MapContext的實現:

 

 

public class MapContextImplextends TaskInputOutputContextImpl implements MapContext { 
private RecordReader reader; private InputSplit split; 
public MapContextImpl(Configuration conf, TaskAttemptID taskid, RecordReader reader, RecordWriter writer, 
OutputCommitter committer, StatusReporter reporter, InputSplit split) { 
super(conf, taskid, writer, committer, reporter); this.reader = reader;this.split = split; 
} 
/** 
* Get the input split for this map.
*/ 
public InputSplit getInputSplit() { return split; 
} 
@Override 
public KEYIN getCurrentKey() throws IOException, InterruptedException { return reader.getCurrentKey(); 
} 
@Override 
public VALUEIN getCurrentValue() throws IOException, InterruptedException { return reader.getCurrentValue(); 
} 
@Override 
public boolean nextKeyValue() throws IOException, InterruptedException { return reader.nextKeyValue(); 
} 
} 

 

MapContextImpl類組合了兩個類型的對象,即InputSplit和RecordReader,並封裝了獲取輸入 的Key和Value的方法,在深入探討InputSplit和RecordReader之前,我們先看一下這個Context是 怎麼傳遞給我們編寫的Mapper函數的。下面是我從MapTask類中摘出的一段代碼:

public class MapTask extends Task {
private 
void runNewMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(),
reporter);
// make a mapper
org.apache.hadoop.mapreduce.Mapper mapper =
(org.apache.hadoop.mapreduce.Mapper)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// make the input format
org.apache.hadoop.mapreduce.InputFormat inputFormat =
(org.apache.hadoop.mapreduce.InputFormat)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// rebuild the input split
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
LOG.info("Processing split: " + split);
org.apache.hadoop.mapreduce.RecordReader input =
new NewTrackingRecordReader
(split, inputFormat, reporter, taskContext);
...
org.apache.hadoop.mapreduce.MapContext
mapContext =
new MapContextImpl(job, getTaskID(),
input, output,
committer,
reporter, split);
org.apache.hadoop.mapreduce.Mapper.Context
mapperContext =
new WrappedMapper().getMapContext(
mapContext);
try {
input.initialize(split, mapperContext);
mapper.run(mapperContext);
...
}finally{
closeQuietly(input);
...
}

從代碼中可以看出,我們在客戶端提交Job之前所進行的配置以JobContext的形式傳遞給了MapTask, 在MapTask的runNewMapper()的方法中,它使用反射實例化出了用戶指定的Mapper類和inputFormat 類,並新建了InputSplit和RecorderReader用來實例化上面提交的MapContext,MapContext以參數的形式被傳遞給了包裝類WrappedMapper,在對input進行初始化後,以

mapper.run(mapperContext);

的形式正式調用我們用戶的代碼。

InputSplit

源碼中對InputSplit類的描述是:

 

/** 
			*  InputSplit represents the data to be processed by an 
			*  individual {@link Mapper}. * 
			*  

Typically, it presents a byte-oriented view on the input and is the * responsibility of {@link RecordReader} of the job to process this and present * a record-oriented view.*/ public abstract class InputSplit { 

          public abstract long getLength() throws IOException, InterruptedException; 
          public abstractString[] getLocations() throws IOException, InterruptedException; } 


 

 

更易於理解的表述是,InputSplit決定了一個Map Task需要處理的輸入。更進一步的,有多少個 InputSplit,就對應了多少個處理他們的Map Task。從接口上來看,InputSplit中並沒有存放文件的內 容,而是指定了文件的文件的位置以及長度。

既然inputsplit與MapTask之間是一一對應的,那麼我們就可以通過控制InputSplit的個數來調整 MapTask的並行性。當文件量一定時,InputSplit越小,並行性越強。inputsplit的大小並不是任意的, 雖然最大值和最小值都可以通過配置文件來指定,但是最大值是不能超過一個block大小的。

Block是什麼?用戶通過HDFS的接口,看到的是一個完整文件層面,在HDFS底層,文件會被切成固 定大小的Block,並冗余以達到可靠存儲的目的。一般默認大小是64MB,可以調節配置指定。

InputSplit是從字節的角度來描述輸入的,回頭查看一下Mapper,它裡面沒有這種東西啊,用到的 Key,Value是從哪來的?有請RecordReader。

RecordReader


 

RecordReader

按照慣例,先上源碼:


 

public abstract class RecordReader implements Closeable {
/**
* Called once at initialization.
* @param split the split that defines the range of records to read
* @param context the information about the task
* @throws IOException
* @throws InterruptedException
*/
public abstract void initialize(InputSplit split,
TaskAttemptContext context
) throws IOException, InterruptedException;
/**
* Read the next key, value pair.
* @return true if a key/value pair was read
* @throws IOException
* @throws InterruptedException
*/
public abstract
boolean nextKeyValue() throws IOException, InterruptedException;
/**
* Get the current key
* @return the current key or null if there is no current key
* @throws IOException
* @throws InterruptedException
*/
public abstract
KEYIN getCurrentKey() throws IOException, InterruptedException;
/**
* Get the current value.
* @return the object that was read
* @throws IOException
* @throws InterruptedException
*/
public abstract
VALUEIN getCurrentValue() throws IOException, InterruptedException;
/**
* The current progress of the record reader through its data.
* @return a number between 0.0 and 1.0 that is the fraction of the data read
* @throws IOException
* @throws InterruptedException
*/
public abstract float getProgress() throws IOException, InterruptedException;
/**
* Close the record reader.
*/
public abstract void close() throws IOException;
}


 

 

啊哈,InputSplit原來是RecordReader的一個參數啊。recordReader從InputSplit描述的輸入裡 取出一個KeyValue,作為mapper.map()方法的輸入,跑一遍Map方法。打個比方,InputSplit像一 桌大餐,吃還是得一口一口吃,怎樣算一口,就看RecordReader怎麼實現了。

好了,如果我想自己實現InputSplit和RecordReader,應該寫在哪呢?下面就講InputFormat。

InputFormat

上文我們提到了InputFormat,這個類我們在配置Job的時候經常會指定它的實現類。先來看接口。

public abstract class InputFormat { 
public abstract List getSplits(JobContext context) throws IOException, InterruptedException; 
public abstract RecordReader createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; 
}  


 

明白了吧,InputSplit是在getSplit函數裡面算出來的,RecordReader也是在這裡Create出來 的。如果你想以自己的方式讀取輸入,就可以自己寫一個InputFormat的實現類,重寫裡面的方法。

當然,如果你說我很懶,不想自己寫怎麼辦?好辦,之所以要用框架,很重要的一點就是人家提供 了默認實現啦。WordCount裡面一般用的是TextInputFormat,我們看一下它的實現。


public class TextInputFormat extends FileInputFormat
implements JobConfigurable {
public RecordReader getRecordReader(
InputSplit genericSplit, JobConf job,
Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
String delimiter = job.get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}
return new LineRecordReader(job, (FileSplit) genericSplit,
recordDelimiterBytes);
}
}


有沒有一下明白了的感覺?它實現了自己的getRecordReader方法,裡面從配置中取了Delimiter, 這個東西的默認值是"\n"!然後返回了以Delimiter劃分的一個LineRecordReader,知道為什麼你制定了InputFormat之後,Mapper裡面讀到的就是一行一行的輸入了吧。

在我們加強版的WordCount裡,也完全可以使用默認實現的TextInputFormat。關於Mapper的 輸入暫時就講這些,下面我們來看Mapper的輸出。

Mapper的輸出

注意到上文貼出的Mapper的默認實現的map方法中,是將Key和Value直接寫入到context當中,我們已經知道了context是從MapContextImpl來的,那這個Write方法是怎麼回事?

Context.Write的來歷

Write方法是它從MapContextImpl父類TaskInputOutputContextImpl繼承來的,看一下這個類 的部分代碼:

 

public abstract class TaskInputOutputContextImpl
extendsTaskAttemptContextImpl
implements TaskInputOutputContext {
private RecordWriter output;
private OutputCommitter committer;
public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid,
RecordWriter output,
OutputCommitter committer,
StatusReporter reporter) {
super(conf, taskid, reporter);
this.output = output;
this.committer = committer;
}
/**
* Generate an output key/value pair.
*/
public void write(KEYOUT key,VALUEOUT value
) throws IOException, InterruptedException {
output.write(key, value);
}
} 

注意到了有個RecordWriter類,跟我們在上文分析過的RecordReader一看就是兄弟嘛,作用你 也肯定猜到了,就是將一個Key value對寫入到輸出文件中。回過頭來看它的輸出類是怎麼從MapTask 中傳入的:

 

 

org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object
if (job.getNumReduceTasks() == 0) {
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}


 

判斷條件滿足時,說明這個Job沒有ReduceTask,這時RecordWriter被實例化成了 NewDirectOutputCollector,否則的話,實例化為NewOutputCollector。來具體看看這兩個內部類。

 

private class NewDirectOutputCollector
extends org.apache.hadoop.mapreduce.RecordWriter {
private final org.apache.hadoop.mapreduce.RecordWriter out;
...
out = outputFormat.getRecordWriter(taskContext);
...
out.write(key, value);
}


 

前者直接調用了OutputFormat來實例化自己,我們寫Job的時候一般會指定Job的OutputFormat,這個類在MapTask中是通過反射的方式引入的。可見,第一個分支的邏輯是會直接把map的輸出寫入到 我們整個Job的輸出當中。具體是怎麼個寫入的過程,我們留到reduce的輸出中講,畢竟那裡才是最常規的會寫輸出文件的地方。

 

private class NewOutputCollector
extends org.apache.hadoop.mapreduce.RecordWriter {
private final MapOutputCollector collector;
private final org.apache.hadoop.mapreduce.Partitioner partitioner;
private final int partitions;
@SuppressWarnings("unchecked")
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
collector = createSortingCollector(job, reporter);
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
partitioner = new org.apache.hadoop.mapreduce.Partitioner() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
}
@Override
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}
@Override
public void close(TaskAttemptContext context
) throws IOException,InterruptedException {
try {
collector.flush();
} catch (ClassNotFoundException cnf) {
throw new IOException("can't find class ", cnf);
}
collector.close();
}
}


 

 

這個內部類有兩個成員變量,一個是MapOutputCollector,一個是Partitioner。最終的寫入調 用的是MapOutputCollector的Write方法完成的。Partitioner的名氣更大一些,我們先來介紹。

Partitioner

但凡了解一點MapReduce的人應該都知道這個類,它的作用是根據Key將Map的輸出分區,然後 發送給Reduce線程。有多少個Partition,就對應有多少個Reduce線程。Reduce線程的個數是可以 在配置文件中設定的。上面代碼的邏輯就是先讀一下這個配置,看一下需要分到少個分區,如果分區數 少於1,就實例化出一個Partitioner的默認實現,否則的話,用反射讀取用戶設置的實現類。

我們一般只重寫它的一個方法:getPartition,參數是一個Key Value對以及Partition的總數,比 較常見的實現是取Key的hashcode再對總的分區數取模。

注意,為了提高整個job的運行速度,reduce task應該盡可能均勻的接收Map的輸出。partition 作為Map輸出分配的唯一參考標准,映射規則至關重要,partition返回值一樣的Map的輸出,將會交 給一個reducetask,在實際工作中,我們就遇到了partition返回值不合理,好多Mapper的輸出都壓 在一個reduce的task上,造成這個reducetask執行非常緩慢,整體的job一直結束不了的情況。盡可 能均勻的分配partition!


 

MapOutputCollector

這個Collector我們可以自己實現,不過不是很常見。它有一個默認實現,叫MapOutputBuffer。有關MapOutputBuffer的分析,文獻[4]有非常清晰的解釋,值得一看。

MapOutputBuffer

Combiner的意思是局部的reduce,它可以在job配置的時候指定,實現的邏輯也跟reduce一致, Combiner的作用是可以減少Mapper和Reducer之間傳輸的數據量。以我們上面大小寫敏感的word count來說,同一台機器上的Mapper輸出,可以先合並一次,將n個合並成的形式,再傳遞給reducer。

我把這個類裡關鍵的方法列一下,源碼比較多,就不貼了,可以參照那篇帖子。

 

init

public void init(Context context) throws IOException, ClassNotFoundException; 

collect

 

public synchronized void collect(K key, V value, int partition) throws IOException; 

對外最常調用的接口。判定參數傳入的參數類型與用戶對job的配置一不一致(”Type mismatch in key from map\value“),當緩沖區沒有足夠的位置存放當前鍵值對時,將緩沖區的內容溢出寫 到磁盤,否則的話,序列化鍵值對,寫入到緩沖區數組,並將這個鍵值對的位置信息連同partition編號 寫入到index數組裡。

 

flush

 

public synchronized void collect(K key, V value, int partition) throws IOException; 

 

當map所有的輸出都收集完了之後,處理殘留在緩沖區,沒有溢寫到磁盤的數據。

sortAndSpill


 

private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException;


 

 

溢寫的關鍵邏輯,其中會調用排序函數和combiner。Combiner的邏輯與reducer的完全一樣,相 當於每個map線程的局部預處理,通過對局部數據的合並,來起到減少shuffle階段數據量的作用。

spillSingleRecord


 

private void spillSingleRecord(K key, V value, int partition) throws IOException;

 

當緩沖區沒有達到溢出條件,並且放不下當前這條記錄的時候會調用的方法,主要用來處理大鍵值 對的邊界條件。這種情況直接寫磁盤。

compare&&swap

 

public int compare(int mi, int mj) {
int kvi = this.offsetFor(mi % this.maxRec);
int kvj = this.offsetFor(mj % this.maxRec);
int kvip = this.kvmeta.get(kvi + 2);
int kvjp = this.kvmeta.get(kvj + 2);
return kvip != kvjp?kvip - kvjp:this.comparator.compare(this.kvbuffer, this.kvmeta.get(kvi + 1), this.kvmeta.get(kvi + 0) - this.kvmeta.get
(kvi + 1), this.kvbuffer, this.kvmeta.get(kvj + 1), this.kvmeta.get(kvj + 0) - this.kvmeta.get(kvj + 1));
}
public void swap(int mi, int mj) {
int iOff = mi % this.maxRec * 16;
int jOff = mj % this.maxRec * 16;
System.arraycopy(this.kvbuffer, iOff, this.META_BUFFER_TMP, 0, 16);
System.arraycopy(this.kvbuffer, jOff, this.kvbuffer, iOff, 16);
System.arraycopy(this.META_BUFFER_TMP, 0, this.kvbuffer, jOff, 16);
}


 

 

從這兩個函數可以猜出排序函數的行為。代碼裡出現的kvmeta就是上文中提到的index數組,他是 kvbuffer的一種int視角,比較的對象就是它的兩個元素,如果有亂序情況,交換的也是這兩個元素的位 置。

mergeParts


 

private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException;

當文件全部溢出之後,會調用這個方法將小文件合並成大文件。

 

\

 

合並前後的示意圖還是很形象的。最終在shuffle的時候,只要根據index查找對應的數據就可以了。

 

業務場景

我一直沒有想過MapTask是否會對輸出自動排序,直到有一天我正真需要自己動手修改業務代碼。

我在的組做的是數據處理,在我們的業務場景中,有兩種數據結構,event和session,用戶在電商網站上操作時,會在後台產生一系列的event,比如你查詢了一件商品,後台就有一個查詢event產生。event用guid和timestamp唯一標示,可能還含有其他的屬性(比如ip等),guid可以簡單的理解成用 戶的一種標示,event說白了是某個用戶在某一時刻產生的某種動作。session的意思某個用戶在一段連 續時間內產生的動作集合,比event的抽象層次更高,它用sessionId和timestamp來標示,也有諸如這 個session一共包含了多少個event這種統計信息。sessionId跟guid一樣,某個用戶在一定時間內是唯 一的,session的timestamp取的是這段時間這個用戶的第一個event的timestamp。

好了,我們需要寫一個MapReduce的job,輸入是event,輸出是session。在map階段,從event 裡面提取出key,然後同一個用戶產生的event,應該一起在reduce階段統計。既然有時序的問題,是 不是在統計之前應該先排個序?可我翻遍了代碼,都沒有找到對key排序的邏輯,是前輩代碼的巨大bug ?

當然不是,在我們將guid與timestamp作為key輸出時,MapTask已經按照這兩個字段做了排序。 注意,這種有序,指的只是當前MapTask局部輸出的有序。從Mapper的輸出,到真正Reducer的輸入,還有很重要的一個過程要走。

 

Shuffle

從語義上說,Shuffle應該是Map和Reduce中間的過程,從源碼的代碼結構上看,shuffle過程是 在reduceTask中得。前段時間在考公司的hadoop測試的時候,有這種變態的問題,說下面屬於reduce 過程的操作有。。至今不知道正確答案是什麼。

ReduceTask有三個Phase,即copyPhase,sortPhase和reducePhase,主流的做法應該是將前兩個phase歸為Shuffle階段,reducephase作為狹義的reduce過程。

ShuffleConsumerPlugin

Shuffle過程通過調用抽象類ShuffleConsumerPlugin來完成,它有個實現類,就叫做“Shuffle”。下面是Shuffle類最主要的run方法的實現:

 

@Override
public RawKeyValueIterator run() throws IOException, InterruptedException {
// Scale the maximum events we fetch per RPC call to mitigate OOM issues
// on the ApplicationMaster when a thundering herd of reducers fetch events
// TODO: This should not be necessary after HADOOP-8942
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
// Start the map-completion events fetcher thread
final EventFetcher eventFetcher =
new EventFetcher(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start();
// Start the map-output fetcher threads
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
fetchers[0] = new LocalFetcher(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}
}
// Wait for shuffle to complete successfully
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
reporter.progress();
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
}
// Stop the event-fetcher thread
eventFetcher.shutDown();
// Stop the map-output fetcher threads
for (Fetcher fetcher : fetchers) {
fetcher.shutDown();
}
// stop the scheduler
scheduler.close();
copyPhase.complete(); // copy is already complete
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduceTask.statusUpdate(umbilical);
// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}
// Sanity check
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
return kvIter;
}

 

Shuffle的時候,會先判斷是不是local run的,如果不是的話,會默認啟動5個Fetcher線程拉取 map的輸出,Fetcher會先找到一個主機,確定這台機器上它要拉取的map task的輸出,然後使用http協議獲取response的stream,交給MapOutput類型的對象去完成具體的下載任務。

當文件拉取完成,就會進入sort階段。注意到我們拉取到數據都是局部有序的,因此,排序的過程, 實際上也就是一個Merge的過程。Copy phase結束之後,Shuffle會調用

kvIter = merger.close();

方法來得到排序完成的map的key value輸出。

 

MapOutput

MapOutput有兩個實現類,即OnDiskMapOutput和InMemoryMapOutput,具體哪一個被實例化,是看當前要shuffle的數據適不適合放到內存中。

OnDiskMapOutput的行為如下所示:


 

final int BYTES_TO_READ = 64 * 1024;
byte[] buf = new byte[BYTES_TO_READ];
while (bytesLeft > 0) {
int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
if (n < 0) {
throw new IOException("read past end of stream reading " +
getMapId());
}
disk.write(buf, 0, n);
bytesLeft -= n;
metrics.inputBytes(n);
reporter.progress();
}

InMemoryMapOutput的行為如下:

 

 

public static void readFully(InputStream in, byte buf[],
int off, int len) throws IOException {
int toRead = len;
while (toRead > 0) {
int ret = in.read(buf, off, toRead);
if (ret < 0) {
throw new IOException( "Premature EOF from inputStream");
}
toRead -= ret;
off += ret;
}
}

 

代碼比較簡單,前者有個buffer,一邊讀一邊寫文件,後者將數據緩存在一個byte數組裡,跟類名 看上去的行為完全一致。

當MapOutput拷貝方法shuffle返回時,Fetcher會調用Scheduler的copySucceed方法做一些 收尾工作,比如將已經拷貝過的host從待拷貝列表中刪除。比較重要的一點是,它會調用Mapoutput的commit方法。兩種Mapoutput的實現在這裡的差異不大,都會調用MergeManagerImpl的closeXXXXFile 方法。

MapOutput負責的是將數據從集群中得其他機器上拉取過來,拉取到的數據怎麼Merge到一起, 就是MergeManagerImpl考慮的事情了。

 

MergeManagerImpl

MergeManagerImpl幾乎handle了所有與merge相關的實現。他有兩個(其實是三個)內部類, InMemeryMerger和OnDiskMerger,分別對應了前面的兩種不同的MapOutput。

我們先看一下這兩個Merger共同的父類MergeThread,比較容易理解它做得事情。

 

abstract class MergeThread extends Thread {
private LinkedList> pendingToBeMerged;
public synchronized void close() throws InterruptedException {
closed = true;
waitForMerge();
interrupt();
}
public void startMerge(Set inputs) {
if (!closed) {
numPending.incrementAndGet();
List toMergeInputs = new ArrayList();
Iterator iter=inputs.iterator();
for (int ctr = 0; iter.hasNext() && ctr < mergeFactor; ++ctr) {
toMergeInputs.add(iter.next());
iter.remove();
}
LOG.info(getName() + ": Starting merge with " + toMergeInputs.size() +
" segments, while ignoring " + inputs.size() + " segments");
synchronized(pendingToBeMerged) {
pendingToBeMerged.addLast(toMergeInputs);
pendingToBeMerged.notifyAll();
}
}
}
public synchronized void waitForMerge() throws InterruptedException {
while (numPending.get() > 0) {
wait();
}
}
public void run() {
while (true) {
List inputs = null;
try {
// Wait for notification to start the merge...
synchronized (pendingToBeMerged) {
while(pendingToBeMerged.size() <= 0) {
pendingToBeMerged.wait();
}
// Pickup the inputs to merge.
inputs = pendingToBeMerged.removeFirst();
}
// Merge
merge(inputs);
} catch (InterruptedException ie) {
numPending.set(0);
return;
} catch(Throwable t) {
numPending.set(0);
reporter.reportException(t);
return;
} finally {
synchronized (this) {
numPending.decrementAndGet();
notifyAll();
}
}
}
}
public abstract void merge(List inputs) throws IOException;
}

 

容易看出,MergeThread有一個LinkedList,用於存放MapOutput得到的輸出,startMerge方 法會將這些輸出添加到List中,run方法會不斷的從中取出Mapoutput的輸出,並調用merge方法, Close的時候會等待所有的merge過程結束。startMerge方法正是在MergeManagerImpl的 closeXXXXMergedFile調用的。

這樣整個過程就清晰一些了,Shuffle時調用Fetcher來下載Map的輸出,Fetcher根據數據量的大小,判斷是實例化InMemoryMapOutput還是OnDiskMapOutput,實例化出的MapOutput拉取完 畢後,Fetcher通過一個Shuffle的scheduler調用Mapoutput的commit方法,commit方法中調用到 MergeManagerImpl的closeXXXXMergedFile方法,這個方法又調用到MergeThread實現類中得 startMerge方法,下載到得數據最終就被傳遞到了MergeThread的實現類了。

剩下的問題,就是怎麼Merge了。

Merge

整個Merge的過程比較復雜,牽扯到得代碼也比較多,我按照我的理解,在邏輯的層面簡單敘述一下這個過程。

從整體上講,Merge的過程是先Merge掉InMemory的,InMemory的結果也會加入到onDisk的待 Merge隊列中,最後補上一記finalMerge,合並起InMemory剩余的與onDisk剩余的。每種Merger的Merge操作最終都是交給一個叫Merger的工具類的靜態方法實現的。

除了參數的不同,實際merge的過程是類似的。Merge就是將小文件合並成大文件,對於初始有序 的數據,為了減少比較次數,每次應該合並數據最少的兩組,也就是霍夫曼樹的思想。從源碼看,貌似 是自己用ArrayList實現了一個。

InMemory的Merge行為是,先將InMemoryMapOutput中的buffer結構化成一組segment, segment含有需要merge的數據,最重要的,它含有這些數據的長度信息,這個信息會再霍夫曼樹式的 merge用到。接下來它會new出一個path對象用來存放merge的結果,一個Writer來寫入,然後就會調用Merger工具類的相應merge方法進行實際的merge。在實際寫入文件的時候,會判斷有沒有指定 Combiner,也就是會不會對相同key的輸出進行進一步的合並。InMemoryMerger的最終結果會寫入到 文件,並將這個文件的信息注冊到onDiskMerger中,以便後續的合並。

onDiskMerger的行為與InMemoryMerger的行為基本一致,只是在調用Merger的時候給定了不 同的參數。

finalMerge的行為是,先判斷有沒有inMemory的output,有的話構造出segment,合並,最終 的結果是一個文件,添加到onDisk得output隊裡中,然後合並onDisk的ouput,比較特別的,finalMerge 是有返回值的,最終合並的結果輸出是RawKeyValueIterator類型,代表這一個reduce所接收到的所 有輸入。

 

MergeManagerImpl的close方法

在shuffle的run方法中,在copyPhase結束之後,調用了MergeManagerImpl的close方法,該 方法的實現如下:

public RawKeyValueIterator close() throws Throwable {
// Wait for on-going merges to complete
if (memToMemMerger != null) {
memToMemMerger.close();
}
inMemoryMerger.close();
onDiskMerger.close();
List> memory =
new ArrayList>(inMemoryMergedMapOutputs);
inMemoryMergedMapOutputs.clear();
memory.addAll(inMemoryMapOutputs);
inMemoryMapOutputs.clear();
List disk = new ArrayList(onDiskMapOutputs);
onDiskMapOutputs.clear();
return finalMerge(jobConf, rfs, memory, disk);
}

可見,在inMemoryMerger和onDiskMerger的close之後,會最終返回finalMerge的結果。這個 RawKeyValueIterator會最為一個參數傳遞給reduce過程。Shuffle過程到此就算徹底結束了。

 

Reduce的輸入

Shuffle過程結束之後,reduce階段獲得了RawKeyValueIterator類型的輸入,ReduceTask的 run方法會調用runNewReducer方法,該方法的簽名如下:

 

private 
void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator comparator,
Class keyClass,
ClassvalueClass
) throws IOException,InterruptedException,
ClassNotFoundException;


在這裡面,會用反射的方式實例化出Reducer。
Reducer與Mapper非常相似,我貼一下他的實現:

 

 

public class Reducer {
/**
* The Context passed on to the {@link Reducer} implementations.
*/
public abstract class Context
implements ReduceContext {
}
/**
* Called once at the start of the task.
*/
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* This method is called once for each key. Most applications will define
* their reduce class by overriding this method. The default implementation
* is an identity function.
*/
@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable values,Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
}

 

 

Reducer的輸入輸出的信息同樣是封裝在Context中。Reducer與Mapper看上去很像,但是有很 多關鍵的不同。比如reduce方法的參數,還有run方法的實現。

注意到,reduce方法的第二個參數,不再是一個VALUE類型,而是一個迭代器,意指key相同的 value會一次性的扔給這個reduce方法,那麼到底怎樣算key相同呢?我們看到reduce方法是在run方法中調用的,第一個參數與Mapper相同,也是context的currentKey,第二個不一樣,是從context 獲得的values,在ReduceContextImpl中,這個getValues方法,直接返回一個迭代器。

從語義上說,Reducer的reduce方法應該每次處理Key相同的那一組輸入,那麼到底什麼樣的一組 key,算是相同的key呢?這個關鍵的問題由構造ReduceContext的一個不起眼的參數,GroupingComparator 來解決。

GroupingComparator

我們知道,哪些Mapper的輸出交給一個Reduce線程是由Partitioner決定的,但是這些輸入並不 是一次性處理的,舉個例子,我們在做大小寫敏感wordcount的時候,假設使用的partition策略是根據單詞首字母大小來指定reducer,有2個reducer的話,"an"和"car"都會交給同一個reduce線程, 但是統計每個單詞個數的時候,他倆是不能混起來的,也就是一個reduce線程實際上將整個輸入分成了好多組,在每一組上運行了一次reduce的過程。這個組怎麼分,就是GroupingComparator做得事情。針對word count這個實例,我們應該將完全相等的兩個單詞作為一組,運行一次reduce的方法。

我們看一下GroupingComparator接口的定義:

 

public interface RawComparator extends Comparator { 
/** 
			*  Compare two objects in binary. ?
			*  b1[s1:l1] is the first object, and b2[s2:l2] is the second object. * ?
			*  @param b1 The first byte array. ?
			*  @param s1 The position index in b1. The object under comparison's starting index. ?
			*  @param l1 The length of the object in b1. ?
			*  @param b2 The second byte array. ?
			*  @param s2 The position index in b2. The object under comparison's starting index. ?
			*  @param l2 The length of the object under comparison in b2. ?
			*  @return An integer result of the comparison. */ ?public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); ?} 


 

可見,它是從字節碼的角度來判定是否相等的,具體比較哪一部分,可以根據我們自己的實現來控 制。

經過了Shuffle過程,所有的輸入都已經按照Key排序好了。所以,在判定兩個Key要不要交給同一 個Reduce方法時,只要判定相鄰的兩個key就可以了。這個比較的過程,實際上在我們在reduce方法 中,對value的迭代器不斷的取next的時候,就悄悄發生了。

業務場景

接著前面的業務場景,還是event和session的問題,我已經講過一段時間內同一個guid的event應該劃分成一個session,也就是event的key是guid和timestamp時,guid一樣的event要按照timestamp排好序的順序交給一個reduce方法來處理,因此,我們自己實現的GroupingComparator應該只比較 event key的guid,按照guid來聚合。

Reduce的輸出

 

在構造ReduceContext的時候,傳入了兩個跟輸出相關的參數,一個是RecordWriter類型,一個 是OutputCommitter類型。但是,當查看這兩個對象構造的過程時,會發現他們的幕後boss居然是 OutputFormat!這貨看起來是不是非常眼熟?沒錯,我們在之前講Map的輸出時提到過一次,沒有展開 講。它跟InputFormat的功能其實很呼應。


 

OutputFormat

按照慣例,我們還是來看看它的接口。

 

public abstract class OutputFormat {
/**
* Get the {@link RecordWriter} for the given task.
*
* @param context the information about the current task.
* @return a {@link RecordWriter} to write the output for the job.
* @throws IOException
*/
public abstract RecordWriter
getRecordWriter(TaskAttemptContext context
) throws IOException, InterruptedException;
/**
* Check for validity of the output-specification for the job.
*
*

This is to validate the output specification for the job when it is * a job is submitted. Typically checks that it does not already exist, * throwing an exception when it already exists, so that output is not * overwritten.

* * @param context information about the job * @throws IOException when output should not be attempted */ public abstract void checkOutputSpecs(JobContext context ) throws IOException, InterruptedException; /** * Get the output committer for this output format. This is responsible * for ensuring the output is committed correctly. * @param context the task context * @return an output committer * @throws IOException * @throws InterruptedException */ public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context ) throws IOException, InterruptedException; }

 

從源碼的描述來看,OutputFormat主要做兩件事情,一是驗證job的輸出配置是否合理(比如查看目標路徑是否存在),二是提供一個RecordWriter的實現,來寫入最終的輸出。三個抽象方法,分別用 於返回RecordWriter,返回OutputCommitter,以及驗證輸出的配置。

你可能會想不通一個輸出為什麼要搞這麼復雜,反正一個reducer產生一個文件,指定一下目錄,直接往裡寫不就行了嗎?怎麼還要recordWriter,還要committer的。我們回想一下hadoop設計的初 衷,在不可靠的機器上,得到可靠的輸出。也就是,hadoop的設計者認為一個task它很可能是會運行 失敗的,我們常常需要嘗試多次,因此,除了寫入操作之外,我們應該先寫在臨時目錄,確定成功了, 再提交到正式的輸出目錄裡,這個工作其實就是committer做得,而recordWriter只要專注於寫入操作 就可以了。

我們當然可以從頭開始寫一個OutputFormat,但更一般的做法是,繼承自一個典型的實現 FileOutputFormat。

FileOutputFormat

下面是它對checkOutputSpecs的實現:

 

public void checkOutputSpecs(FileSystem ignored, JobConf job)
throws FileAlreadyExistsException,
InvalidJobConfException, IOException {
// Ensure that the output directory is set and not already there
Path outDir = getOutputPath(job);
if (outDir == null && job.getNumReduceTasks() != 0) {
throw new InvalidJobConfException("Output directory not set in JobConf.");
}
if (outDir != null) {
FileSystem fs = outDir.getFileSystem(job);
// normalize the output directory
outDir = fs.makeQualified(outDir);
setOutputPath(job, outDir);
// get delegation token for the outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] {outDir}, job);
// check its existence
if (fs.exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
" already exists");
}
}
}

 

 

最開始接觸hadoop跑測試的時候,經常遇到FileAlreadyExistsException這個錯誤,原因就是 沒有刪掉上一次跑的結果。直到現在,才知道原來是從這裡拋出的啊。hadoop之所以這樣設定,是為 了防止因為粗心覆蓋掉之前生成的數據,我覺得這是合理的。

FileOutputFormat還提供了一些好用的方法,比如下面這個:

public synchronized static String getUniqueFile(TaskAttemptContext context,
String name,
String extension) {
TaskID taskId = context.getTaskAttemptID().getTaskID();
int partition = taskId.getId();
StringBuilder result = new StringBuilder();
result.append(name);
result.append('-');
result.append(
TaskID.getRepresentingCharacter(taskId.getTaskType()));
result.append('-');
result.append(NUMBER_FORMAT.format(partition));
result.append(extension);
return result.toString();
}

這個方法提供了一種生成唯一輸出文件名的功能,之所以需要這個,是因為多個task都輸出,萬一起名沖突就壞了。命名的規則是以用戶提供的字符串開頭,然後是task的類型(map-m, reduce-r),最後是這個task所屬的partition。這種方式保證了在當前job生成的結果中文件名是唯一的。標示出task得類型和partition有很大的好處,我們在實際工作中就有過這種體會。我們有一個每小時都會運行的job,跑完一個小時的數據需要15分鐘的樣子,但是每天0點都會跑的特別慢,也不報錯,通過查看生成文件,我們發現標示為r的partition號為3的那個task總是最後生成文件,而且比其他partition的都要明顯大,最終確定了是交給這個partition的數據太多造成的。對於一開始用戶提供的前綴,當然可以是任何形式,但是我們強烈建議綴上時間戳。在我們的實踐中,當前小時生成的數據有可能需要拷貝回前面的文件夾,默認提供的命名方式只能保證在當前job生成的輸出文件是唯一的,沒法保證與之前的不沖突,我們的做法是在前綴上加時間戳,這樣可以方便的分辨哪些是後來加入的文件。

OutputCommitter
從源碼的注釋,我們知道OutputCommitter負責下面的工作:
? 在job啟動時setup job。例如,在job的啟動期間建立臨時的輸出目錄。
? 在job結束是clean up job。比如,job結束之後刪掉臨時輸出目錄。
? 建立task的臨時輸出
? 檢測一個task需不需要提交自己的輸出
? 提交task的輸出
? 丟棄task的輸出
這麼列出來,感覺比較空洞,我講一下我的理解。正如前文提到的,OutputCommitter的主要職責是建立起task執行的臨時目錄,然後驗證這個task需不需要將自己的輸出的結果提交,提交到哪裡。對於產生的臨時目錄和寫入的臨時文件,也要負責清理干淨。
OutputCommitter有很多需要實現的方法,我列一下:

 

public abstract class OutputCommitter {
public abstract void setupJob(JobContext jobContext) throws IOException;
public void cleanupJob(JobContext jobContext) throws IOException { }
public void commitJob(JobContext jobContext) throws IOException {
cleanupJob(jobContext);
}
public void abortJob(JobContext jobContext, JobStatus.State state)
throws IOException {
cleanupJob(jobContext);
}
public abstract void setupTask(TaskAttemptContext taskContext)
throws IOException;
public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
throws IOException;
public abstract void commitTask(TaskAttemptContext taskContext)
throws IOException;
public abstract void abortTask(TaskAttemptContext taskContext)
throws IOException;
public boolean isRecoverySupported() {
return false;
}
public void recoverTask(TaskAttemptContext taskContext)
throwsIOException
{}

 

 

方法名比較准確的反應方法需要實現的功能。下面我們看一下與FileOutputFormat對應的Committer。

FileOutputCommitter

前面已經提到了,OutputCommitter最重要的就是目錄的建立刪除以及拷貝,那麼要理解一個 Committer的行為,只要專注它是怎麼操作目錄的就可以了。

在FileOutputCommitter裡,有三個四種目錄,四種目錄分別包括

· 最終的Job輸出目錄

· 臨時的Job目錄

· task的提交目錄

· task的臨時目錄

task每次在task臨時目錄中工作,如果確定成功並且需要被提交,就會提交到task的提交目錄中。 task的提交目錄實際上跟臨時Job的目錄是一個目錄,當一個job的所有task都順利執行之後,會從臨時 job目錄提交到最終的輸出目錄。

之所以有這麼多跳,其實還是基於task很可能會執行失敗的假設,這種方式,在task失敗的時候, 可以直接清掉它的目錄重來,效率上肯定要差一些。因此我的同事寫過一個DirectFileOutputCommitter,當task執行成功時,直接提交到最終的工作目錄。這種方式雖然在一定程度上提高了效率,可有個風險, 當這個job失敗需要重新執行的時候,就得事先清一下最終的輸出目錄。

在實踐的時候,我們常常通過在一個目錄下生成"_SUCCESS"文件來標記這個目錄已經完成,一個很好的生成時機就是Committer的commitJob方法。

RecordWriter

這個類的介紹非常普通,它做的事情也很簡單,就是將一對KeyValue的pair寫入到輸出文件中。他的接口很簡單:

 

public abstract class RecordWriter {
/**
* Writes a key/value pair.
*
* @param key the key to write.
* @param value the value to write.
* @throws IOException
*/
public abstract void write(K key,V value
) throws IOException, InterruptedException;
/**
* Close this RecordWriter to future operations.
*
* @param context the context of the task
* @throws IOException
*/
public abstract void close(TaskAttemptContext context
) throws IOException, InterruptedException;
}
write方法用來寫入,close方法用來釋放資源。
FileOutputFormat中沒有提供getRecordWriter的實現,我們來看一下實際工作中用的比較多的FileOutputFormat最有名的子類,TextOutputFormat中是怎麼實現的。

 

 

public RecordWriter getRecordWriter(FileSystem ignored,
JobConf job,
String name,
Progressable progress)
throws IOException {
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator",
"\t");
if (!isCompressed) {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new LineRecordWriter(fileOut, keyValueSeparator);
} else {
Class codecClass =
getOutputCompressorClass(job, GzipCodec.class);
// create the named codec
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
// build the filename including the extension
Path file =
FileOutputFormat.getTaskOutputPath(job,
name + codec.getDefaultExtension());
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new LineRecordWriter(new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);
}
}

代碼很好懂,key和value之間默認的分割符是"\t",輸出流用得是FSDataOutputStream,可壓縮,可不壓縮。LineRecordWriter是它的內部類,每次新起一行寫入新數據,key value之間用分隔符分割。至此,我就將MapReduce的過程全部講完了,中間還有沒說清楚的地方,後面我們繼續晚上,再自己的經驗積累一些後,應該會有更深刻的理解。

結語

終於將這篇博客寫完了,本來想著是入職之前完成的,結果拖了快兩個月。最初開始的時候也沒想 過會寫這麼多,看起源碼來,一環一環的,就總想再搞得明白一些。裡面摻雜了一些我工作中遇到的問 題,不是很多,不過我覺得還是有一定的參考意義的。希望這篇博客能幫助到對hadoop感興趣的你。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved