程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> MongoDB MapReduce 性能提升20倍的優化寶典

MongoDB MapReduce 性能提升20倍的優化寶典

編輯:C++入門知識

自從MongoDB被越來越多的大型關鍵項目采用後,數據分析也成為了越來越重要的話題。人們似乎已經厭倦了使用不同的軟件來進行分析(這都利用到了Hadoop),因為這些方法往往需要大規模的數據傳輸,而這些成本相當昂貴。

MongoDB提供了2種方式來對數據進行分析:Map Reduce(以下簡稱MR)和聚合框架(Aggregation Framework)。MR非常靈活且易於使用,它可以很好地與分片(sharding)結合使用,並允許大規模輸出。盡管在MongoDB v2.4版本中,由於JavaScript引擎從Spider切換到了V8,使得MR的性能有了大幅改進,但是與Agg Framework(使用C++)相比,MR的速度還是顯得比較慢。本文就來看看,有哪些方法可以讓MR的速度有所提升。

測試

首先我們來做個測試,插入1000萬文檔,這些文檔中包含了介於0和100萬之間的單一整數值,這意味著,平均每10個文檔具有相同的值。

代碼
> for (var i = 0; i < 10000000; ++i){ db.uniques.insert({ dim0: Math.floor(Math.random()*1000000) });} 
> db.uniques.findOne() 
{ "_id" : ObjectId("51d3c386acd412e22c188dec"), "dim0" : 570859 } 
> db.uniques.ensureIndex({dim0: 1}) 
> db.uniques.stats() 

        "ns" : "test.uniques", 
        "count" : 10000000, 
        "size" : 360000052, 
        "avgObjSize" : 36.0000052, 
        "storageSize" : 582864896, 
        "numExtents" : 18, 
        "nindexes" : 2, 
        "lastExtentSize" : 153874432, 
        "paddingFactor" : 1, 
        "systemFlags" : 1, 
        "userFlags" : 0, 
        "totalIndexSize" : 576040080, 
        "indexSizes" : { 
                "_id_" : 324456384, 
                "dim0_1" : 251583696 
        }, 
        "ok" : 1 


這裡我們想要得到文檔中唯一值的計數,可以通過下面的MR任務來輕松完成:

代碼
> db.runCommand( 
{ mapreduce: "uniques", 
map: function () { emit(this.dim0, 1); }, 
reduce: function (key, values) { return Array.sum(values); }, 
out: "mrout" }) 

        "result" : "mrout", 
        "timeMillis" : 1161960, 
        "counts" : { 
                "input" : 10000000, 
                "emit" : 10000000, 
                "reduce" : 1059138, 
                "output" : 999961 
        }, 
        "ok" : 1 


正如你看到的,輸出結果大約需要1200秒(在EC2 M3實例上測試),共輸出了1千萬maps、100萬reduces、999961個文檔。結果類似於:

代碼
> db.mrout.find() 
{ "_id" : 1, "value" : 10 } 
{ "_id" : 2, "value" : 5 } 
{ "_id" : 3, "value" : 6 } 
{ "_id" : 4, "value" : 10 } 
{ "_id" : 5, "value" : 9 } 
{ "_id" : 6, "value" : 12 } 
{ "_id" : 7, "value" : 5 } 
{ "_id" : 8, "value" : 16 } 
{ "_id" : 9, "value" : 10 } 
{ "_id" : 10, "value" : 13 } 
... 


下面就來看看如何進行優化。

使用排序

我在之前的這篇文章中簡要說明了使用排序對於MR的好處,這是一個鮮為人知的特性。在這種情況下,如果處理未排序的輸入,意味著MR引擎將得到隨機排序的值,基本上沒有機會在RAM中進行reduce,相反,它將不得不通過一個臨時collection來將數據寫回磁盤,然後按順序讀取並進行reduce。

下面來看看如果使用排序,會有什麼幫助:

代碼
> db.runCommand( 
{ mapreduce: "uniques", 
map: function () { emit(this.dim0, 1); }, 
reduce: function (key, values) { return Array.sum(values); }, 
out: "mrout", 
sort: {dim0: 1} }) 

        "result" : "mrout", 
        "timeMillis" : 192589, 
        "counts" : { 
                "input" : 10000000, 
                "emit" : 10000000, 
                "reduce" : 1000372, 
                "output" : 999961 
        }, 
        "ok" : 1 


現在時間降到了192秒,速度提升了6倍。其實reduces的數量是差不多的,但是它們在被寫入磁盤之前已經在RAM中完成了。

使用多線程

在MongoDB中,一個單一的MR任務並不能使用多線程——只有在多個任務中才能使用多線程。但是目前的多核CPU非常有利於在單一服務器上進行並行化工作,就像Hadoop。我們需要做的是,將輸入數據分割成若干塊,並為每個塊分配一個MR任務。splitVector命令可以幫助你非常迅速地找到分割點,如果你有更簡單的分割方法更好。

代碼
> db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32000000}) 

    "timeMillis" : 6006, 
    "splitKeys" : [ 
        { 
            "dim0" : 18171 
        }, 
        { 
            "dim0" : 36378 
        }, 
        { 
            "dim0" : 54528 
        }, 
        { 
            "dim0" : 72717 
        }, 
… 
        { 
            "dim0" : 963598 
        }, 
        { 
            "dim0" : 981805 
        } 
    ], 
    "ok" : 1 


從1千萬文檔中找出分割點,使用splitVector命令只需要大約5秒,這已經相當快了。所以,下面我們需要做的是找到一種方式來創建多個MR任務。從應用服務器方面來說,使用多線程和$gt / $lt查詢命令會非常方便。從shell方面來說,可以使用ScopedThread對象,它的工作原理如下:

代碼
> var t = new ScopedThread(mapred, 963598, 981805) 
> t.start() 
> t.join() 


現在我們可以放入一些JS代碼,這些代碼可以產生4個線程,下面來等待結果顯示:

代碼
> var res = db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32 *1024 * 1024 }) 
> var keys = res.splitKeys 
> keys.length 
39 
> var mapred = function(min, max) { 
return db.runCommand({ mapreduce: "uniques", 
map: function () { emit(this.dim0, 1); }, 
reduce: function (key, values) { return Array.sum(values); }, 
out: "mrout" + min, 
sort: {dim0: 1}, 
query: { dim0: { $gte: min, $lt: max } } }) } 
> var numThreads = 4 
> var inc = Math.floor(keys.length / numThreads) + 1 
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() } 
min:0 max:274736 
min:274736 max:524997 
min:524997 max:775025 
min:775025 max:{ "$maxKey" : 1 } 
connecting to: test 
connecting to: test 
connecting to: test 
connecting to: test 
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); } 

        "result" : "mrout0", 
        "timeMillis" : 205790, 
        "counts" : { 
                "input" : 2750002, 
                "emit" : 2750002, 
                "reduce" : 274828, 
                "output" : 274723 
        }, 
        "ok" : 1 


        "result" : "mrout274736", 
        "timeMillis" : 189868, 
        "counts" : { 
                "input" : 2500013, 
                "emit" : 2500013, 
                "reduce" : 250364, 
                "output" : 250255 
        }, 
        "ok" : 1 


        "result" : "mrout524997", 
        "timeMillis" : 191449, 
        "counts" : { 
                "input" : 2500014, 
                "emit" : 2500014, 
                "reduce" : 250120, 
                "output" : 250019 
        }, 
        "ok" : 1 


        "result" : "mrout775025", 
        "timeMillis" : 184945, 
        "counts" : { 
                "input" : 2249971, 
                "emit" : 2249971, 
                "reduce" : 225057, 
                "output" : 224964 
        }, 
        "ok" : 1 


第1個線程所做的工作比其他的要多一點,但時間仍達到了190秒,這意味著多線程並沒有比單線程快!

使用多個數據庫

這裡的問題是,線程之間存在太多鎖爭用。當鎖時,MR不是非常無私(每1000次讀取會進行yield)。由於MR任務做了大量寫操作,線程之間結束時會等待彼此。由於MongoDB的每個數據庫都有獨立的鎖,那麼讓我們來嘗試為每個線程使用不同的輸出數據庫:

代碼
> var mapred = function(min, max) { 
return db.runCommand({ mapreduce: "uniques", 
map: function () { emit(this.dim0, 1); }, 
reduce: function (key, values) { return Array.sum(values); }, 
out: { replace: "mrout" + min, db: "mrdb" + min }, 
sort: {dim0: 1}, 
query: { dim0: { $gte: min, $lt: max } } }) } 
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() } 
min:0 max:274736 
min:274736 max:524997 
min:524997 max:775025 
min:775025 max:{ "$maxKey" : 1 } 
connecting to: test 
connecting to: test 
connecting to: test 
connecting to: test 
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); } 
... 

        "result" : { 
                "db" : "mrdb274736", 
                "collection" : "mrout274736" 
        }, 
        "timeMillis" : 105821, 
        "counts" : { 
                "input" : 2500013, 
                "emit" : 2500013, 
                "reduce" : 250364, 
                "output" : 250255 
        }, 
        "ok" : 1 

... 


所需時間減少到了100秒,這意味著與一個單獨的線程相比,速度約提高2倍。盡管不如預期,但已經很不錯了。在這裡,我使用了4個核心,只提升了2倍,如果使用8核CPU,大約會提升4倍。

使用純JavaScript模式

在線程之間分割輸入數據時,有一些非常有趣的東西:每個線程只擁有約25萬主鍵來輸出,而不是100萬。這意味著我們可以使用“純JS模式”——通過jsMode:true來啟用。開啟後,MongoDB不會在JS和BSON之間反復轉換,相反,它會從內部的一個50萬主鍵的JS字典來reduces所有對象。下面來看看該操作是否對速度提升有幫助。

代碼
> var mapred = function(min, max) { 
return db.runCommand({ mapreduce: "uniques", 
map: function () { emit(this.dim0, 1); }, 
reduce: function (key, values) { return Array.sum(values); }, 
out: { replace: "mrout" + min, db: "mrdb" + min }, 
sort: {dim0: 1}, 
query: { dim0: { $gte: min, $lt: max } }, 
jsMode: true }) } 
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() } 
min:0 max:274736 
min:274736 max:524997 
min:524997 max:775025 
min:775025 max:{ "$maxKey" : 1 } 
connecting to: test 
connecting to: test 
connecting to: test 
connecting to: test 
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); } 
... 

        "result" : { 
                "db" : "mrdb274736", 
                "collection" : "mrout274736" 
        }, 
        "timeMillis" : 70507, 
        "counts" : { 
                "input" : 2500013, 
                "emit" : 2500013, 
                "reduce" : 250156, 
                "output" : 250255 
        }, 
        "ok" : 1 

... 


現在時間降低到70秒。看來jsMode確實有幫助,尤其是當對象有很多字段時。該示例中是一個單一的數字字段,不過仍然提升了30%。

MongoDB v2.6版本中的改進

在MongoDB v2.6版本的開發中,移除了一段關於在JS函數調用時的一個可選“args”參數的代碼。該參數是不標准的,也不建議使用,它由於歷史原因遺留了下來(見SERVER-4654)。讓我們從Git庫中pull最新的MongoDB並編譯,然後再次運行測試用例:

代碼
... 

        "result" : { 
                "db" : "mrdb274736", 
                "collection" : "mrout274736" 
        }, 
        "timeMillis" : 62785, 
        "counts" : { 
                "input" : 2500013, 
                "emit" : 2500013, 
                "reduce" : 250156, 
                "output" : 250255 
        }, 
        "ok" : 1 

... 


從結果來看,時間降低到了60秒,速度大約提升了10-15%。同時,這種更改也改善了JS引擎的整體堆消耗量。

結論

回頭來看,對於同樣的MR任務,與最開始時的1200秒相比,速度已經提升了20倍。這種優化應該適用於大多數情況,即使一些技巧效果不那麼理想(比如使用多個輸出dbs /集合)。但是這些技巧可以幫助人們來提升MR任務的速度,未來這些特性也許會更加易用——比如,這個ticket 將會使splitVector命令更加可用,這個ticket將會改進同一數據庫中的多個MR任務。

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved