程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> Spark Rdd coalesce()方法和repartition()方法,rddcoalesce

Spark Rdd coalesce()方法和repartition()方法,rddcoalesce

編輯:JAVA綜合教程

Spark Rdd coalesce()方法和repartition()方法,rddcoalesce


在Spark的Rdd中,Rdd是分區的。

有時候需要重新設置Rdd的分區數量,比如Rdd的分區中,Rdd分區比較多,但是每個Rdd的數據量比較小,需要設置一個比較合理的分區。或者需要把Rdd的分區數量調大。還有就是通過設置一個Rdd的分區來達到設置生成的文件的數量。

有兩種方法是可以重設Rdd的分區:分別是 coalesce()方法和repartition()。

 這兩個方法有什麼區別,看看源碼就知道了:

  def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = (new Random(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions).values
    } else {
      new CoalescedRDD(this, numPartitions)
    }
  }

coalesce()方法的作用是返回指定一個新的指定分區的Rdd。

如果是生成一個窄依賴的結果,那麼不會發生shuffle。比如:1000個分區被重新設置成10個分區,這樣不會發生shuffle。

關於Rdd的依賴,這裡提一下。Rdd的依賴分為兩種:窄依賴和寬依賴。

窄依賴是指父Rdd的分區最多只能被一個子Rdd的分區所引用,即一個父Rdd的分區對應一個子Rdd的分區,或者多個父Rdd的分區對應一個子Rdd的分區。

而寬依賴就是寬依賴是指子RDD的分區依賴於父RDD的多個分區或所有分區,即存在一個父RDD的一個分區對應一個子RDD的多個分區。1個父RDD分區對應多個子RDD分區,這其中又分兩種情況:1個父RDD對應所有子RDD分區(未經協同劃分的Join)或者1個父RDD對應非全部的多個RDD分區(如groupByKey)。

如下圖所示:map就是一種窄依賴,而join則會導致寬依賴

回到剛才的分區,如果分區的數量發生激烈的變化,如設置numPartitions = 1,這可能會造成運行計算的節點比你想象的要少,為了避免這個情況,可以設置shuffle=true,

那麼這會增加shuffle操作。

關於這個分區的激烈的變化情況,比如分區數量從父Rdd的幾千個分區設置成幾個,有可能會遇到這麼一個錯誤。

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 77.0 failed 4 times, most recent failure: Lost task 1.3 in stage 77.0 (TID 6334, 192.168.8.61): java.io.IOException: Unable to acquire 16777216 bytes of memory
        at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
        at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:332)
        at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:461)
        at org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:139)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:489)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
        at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
        at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:99)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
        at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
        at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
        at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:88)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)

這個錯誤只要把shuffle設置成true即可解決。

當把父Rdd的分區數量增大時,比如Rdd的分區是100,設置成1000,如果shuffle為false,並不會起作用。

這時候就需要設置shuffle為true了,那麼Rdd將在shuffle之後返回一個1000個分區的Rdd,數據分區方式默認是采用 hash partitioner。

最後來看看repartition()方法的源碼:

  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

從源碼可以看出,repartition()方法就是coalesce()方法shuffle為true的情況。那麼如果說只是要減少父Rdd的分區數量,並且要設置的分區數量並不是很激烈,可以考慮直接使用coalesce方法來避免執行shuffle操作,提高效率。

如有錯誤遺漏的地方,請不吝賜教,我必將改正。

 

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved