Spark SQL數據加載和保管實例解說。本站提示廣大學習愛好者:(Spark SQL數據加載和保管實例解說)文章只能為提供參考,不一定能成為您想要的結果。以下是Spark SQL數據加載和保管實例解說正文
一、前置知識詳解
Spark SQL重要是操作DataFrame,DataFrame自身提供了save和load的操作,
Load:可以創立DataFrame,
Save:把DataFrame中的數據保管到文件或許說與詳細的格式來指明我們要讀取的文件的類型以及與詳細的格式來指出我們要輸入的文件是什麼類型。
二、Spark SQL讀寫數據代碼實戰
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
public class SparkSQLLoadSaveOps {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLLoadSaveOps");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext = new SQLContext(sc);
/**
* read()是DataFrameReader類型,load可以將數據讀取出來
*/
DataFrame peopleDF = sqlContext.read().format("json").load("E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\src\\main\\resources\\people.json");
/**
* 直接對DataFrame停止操作
* Json: 是一種自解釋的格式,讀取Json的時分怎樣判別其是什麼格式?
* 經過掃描整個Json。掃描之後才會知道元數據
*/
//經過mode來指定輸入文件的是append。創立新文件來追加文件
peopleDF.select("name").write().mode(SaveMode.Append).save("E:\\personNames");
}
}
讀取進程源碼剖析如下:
1. read辦法前往DataFrameReader,用於讀取數據。
/**
* :: Experimental ::
* Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
* {{{
* sqlContext.read.parquet("/path/to/file.parquet")
* sqlContext.read.schema(schema).json("/path/to/file.json")
* }}}
*
* @group genericdata
* @since 1.4.0
*/
@Experimental
//創立DataFrameReader實例,取得了DataFrameReader援用
def read: DataFrameReader = new DataFrameReader(this)
2. 然後再調用DataFrameReader類中的format,指出讀取文件的格式。
/**
* Specifies the input data source format.
*
* @since 1.4.0
*/
def format(source: String): DataFrameReader = {
this.source = source
this
}
3. 經過DtaFrameReader中load辦法經過途徑把傳入過去的輸出變成DataFrame。
/**
* Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
* a local or distributed file system).
*
* @since 1.4.0
*/
// TODO: Remove this one in Spark 2.0.
def load(path: String): DataFrame = {
option("path", path).load()
}
至此,數據的讀取任務就完成了,上面就對DataFrame停止操作。
上面就是寫操作!!!
1. 調用DataFrame中select函數停止對列挑選
/**
* Selects a set of columns. This is a variant of `select` that can only select
* existing columns using column names (i.e. cannot construct expressions).
*
* {{{
* // The following two are equivalent:
* df.select("colA", "colB")
* df.select($"colA", $"colB")
* }}}
* @group dfops
* @since 1.3.0
*/
@scala.annotation.varargs
def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)) : _*)
2. 然後經過write將後果寫入到內部存儲零碎中。
/** * :: Experimental :: * Interface for saving the content of the [[DataFrame]] out into external storage. * * @group output * @since 1.4.0 */ @Experimental def write: DataFrameWriter = new DataFrameWriter(this)
3. 在堅持文件的時分mode指定追加文件的方式
/**
* Specifies the behavior when data or table already exists. Options include:
// Overwrite是掩蓋
* - `SaveMode.Overwrite`: overwrite the existing data.
//創立新的文件,然後追加
* - `SaveMode.Append`: append the data.
* - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
* - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
*
* @since 1.4.0
*/
def mode(saveMode: SaveMode): DataFrameWriter = {
this.mode = saveMode
this
}
4. 最後,save()辦法觸發action,將文件輸入到指定文件中。
/**
* Saves the content of the [[DataFrame]] at the specified path.
*
* @since 1.4.0
*/
def save(path: String): Unit = {
this.extraOptions += ("path" -> path)
save()
}
三、Spark SQL讀寫整個流程圖如下

四、關於流程中局部函數源碼詳解
DataFrameReader.Load()
1. Load()前往DataFrame類型的數據集合,運用的數據是從默許的途徑讀取。
/**
* Returns the dataset stored at path as a DataFrame,
* using the default data source configured by spark.sql.sources.default.
*
* @group genericdata
* @deprecated As of 1.4.0, replaced by `read().load(path)`. This will be removed in Spark 2.0.
*/
@deprecated("Use read.load(path). This will be removed in Spark 2.0.", "1.4.0")
def load(path: String): DataFrame = {
//此時的read就是DataFrameReader
read.load(path)
}
2. 追蹤load源碼出來,源碼如下:
在DataFrameReader中的辦法。Load()經過途徑把輸出傳出去變成一個DataFrame。
/**
* Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
* a local or distributed file system).
*
* @since 1.4.0
*/
// TODO: Remove this one in Spark 2.0.
def load(path: String): DataFrame = {
option("path", path).load()
}
3. 追蹤load源碼如下:
/**
* Loads input in as a [[DataFrame]], for data sources that don't require a path (e.g. external
* key-value stores).
*
* @since 1.4.0
*/
def load(): DataFrame = {
//對傳入的Source停止解析
val resolved = ResolvedDataSource(
sqlContext,
userSpecifiedSchema = userSpecifiedSchema,
partitionColumns = Array.empty[String],
provider = source,
options = extraOptions.toMap)
DataFrame(sqlContext, LogicalRelation(resolved.relation))
}
DataFrameReader.format()
1. Format:詳細指定文件格式,這就取得一個宏大的啟示是:假如是Json文件格式可以堅持為Parquet等此類操作。
Spark SQL在讀取文件的時分可以指定讀取文件的類型。例如,Json,Parquet.
/**
* Specifies the input data source format.Built-in options include “parquet”,”json”,etc.
*
* @since 1.4.0
*/
def format(source: String): DataFrameReader = {
this.source = source //FileType
this
}
DataFrame.write()
1. 創立DataFrameWriter實例
/** * :: Experimental :: * Interface for saving the content of the [[DataFrame]] out into external storage. * * @group output * @since 1.4.0 */ @Experimental def write: DataFrameWriter = new DataFrameWriter(this) 1
2. 追蹤DataFrameWriter源碼如下:
以DataFrame的方式向內部存儲零碎中寫入數據。
/**
* :: Experimental ::
* Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
* key-value stores, etc). Use [[DataFrame.write]] to access this.
*
* @since 1.4.0
*/
@Experimental
final class DataFrameWriter private[sql](df: DataFrame) {
DataFrameWriter.mode()
1. Overwrite是掩蓋,之前寫的數據全都被掩蓋了。
Append:是追加,關於普通文件是在一個文件中停止追加,但是關於parquet格式的文件則創立新的文件停止追加。
/**
* Specifies the behavior when data or table already exists. Options include:
* - `SaveMode.Overwrite`: overwrite the existing data.
* - `SaveMode.Append`: append the data.
* - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
//默許操作
* - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
*
* @since 1.4.0
*/
def mode(saveMode: SaveMode): DataFrameWriter = {
this.mode = saveMode
this
}
2. 經過形式婚配接納內部參數
/**
* Specifies the behavior when data or table already exists. Options include:
* - `overwrite`: overwrite the existing data.
* - `append`: append the data.
* - `ignore`: ignore the operation (i.e. no-op).
* - `error`: default option, throw an exception at runtime.
*
* @since 1.4.0
*/
def mode(saveMode: String): DataFrameWriter = {
this.mode = saveMode.toLowerCase match {
case "overwrite" => SaveMode.Overwrite
case "append" => SaveMode.Append
case "ignore" => SaveMode.Ignore
case "error" | "default" => SaveMode.ErrorIfExists
case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
"Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")
}
this
}
DataFrameWriter.save()
1. save將後果保管傳入的途徑。
/**
* Saves the content of the [[DataFrame]] at the specified path.
*
* @since 1.4.0
*/
def save(path: String): Unit = {
this.extraOptions += ("path" -> path)
save()
}
2. 追蹤save辦法。
/**
* Saves the content of the [[DataFrame]] as the specified table.
*
* @since 1.4.0
*/
def save(): Unit = {
ResolvedDataSource(
df.sqlContext,
source,
partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
mode,
extraOptions.toMap,
df)
}
3. 其中source是SQLConf的defaultDataSourceName
private var source: String = df.sqlContext.conf.defaultDataSourceName
其中DEFAULT_DATA_SOURCE_NAME默許參數是parquet。
// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
defaultValue = Some("org.apache.spark.sql.parquet"),
doc = "The default data source to use in input/output.")
DataFrame.scala中局部函數詳解:
1. toDF函數是將RDD轉換成DataFrame
/**
* Returns the object itself.
* @group basic
* @since 1.3.0
*/
// This is declared with parentheses to prevent the Scala compiler from treating
// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
def toDF(): DataFrame = this
2. show()辦法:將後果顯示出來
/**
* Displays the [[DataFrame]] in a tabular form. For example:
* {{{
* year month AVG('Adj Close) MAX('Adj Close)
* 1980 12 0.503218 0.595103
* 1981 01 0.523289 0.570307
* 1982 02 0.436504 0.475256
* 1983 03 0.410516 0.442194
* 1984 04 0.450090 0.483521
* }}}
* @param numRows Number of rows to show
* @param truncate Whether truncate long strings. If true, strings more than 20 characters will
* be truncated and all cells will be aligned right
*
* @group action
* @since 1.5.0
*/
// scalastyle:off println
def show(numRows: Int, truncate: Boolean): Unit = println(showString(numRows, truncate))
// scalastyle:on println
追蹤showString源碼如下:showString中觸發action搜集數據。
/**
* Compose the string representing rows for output
* @param _numRows Number of rows to show
* @param truncate Whether truncate long strings and align cells right
*/
private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = {
val numRows = _numRows.max(0)
val sb = new StringBuilder
val takeResult = take(numRows + 1)
val hasMoreData = takeResult.length > numRows
val data = takeResult.take(numRows)
val numCols = schema.fieldNames.length
以上就是本文的全部內容,希望對大家的學習有所協助,也希望大家多多支持。