SparkSQL數(shù)據(jù)加載和保存實戰(zhàn)資料_第1頁
SparkSQL數(shù)據(jù)加載和保存實戰(zhàn)資料_第2頁
SparkSQL數(shù)據(jù)加載和保存實戰(zhàn)資料_第3頁
SparkSQL數(shù)據(jù)加載和保存實戰(zhàn)資料_第4頁
SparkSQL數(shù)據(jù)加載和保存實戰(zhàn)資料_第5頁
已閱讀5頁,還剩3頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

1、Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)一:前置知識詳解:Spark SQL 重要是操作 DataFrame ,DataFrame 本身提供了 save 和 load 的操作, Load :可以創(chuàng)建 DataFrame ,Save :把 DataFrame中的數(shù)據(jù)保存到文件或者說與具體的格式來指明我們要讀取的文件的類型以及與具體的格式來指出我們要輸出的文件是什么類型。二: Spark SQL讀寫數(shù)據(jù)代碼實戰(zhàn):import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spa

2、rk.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.*;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import java.util.ArrayList;import java.util.List;public

3、 class SparkSQLLoadSaveOps public static void main(String args) SparkConfconfSparkConf().setMaster(local).setAppName(SparkSQLLoadSaveOps);JavaSparkContext sc = new JavaSparkContext(conf);SQLContext = new SQLContext(sc);/* read() 是 DataFrameReader類型, load 可以將數(shù)據(jù)讀取出來*/=newDataFrame peopleDF = sqlContex

4、t.read().format(json).load(E:SparkSparkinstanll_packageBig_Data_Softwarespa rk-1.6.0-bin-hadoop2.6examplessrcmainresourcespeople.json);/* 直接對 DataFrame進行操作* Json:是一種自解釋的格式,讀取Json 的時候怎么判斷其是什么格式?通過掃描整個 Json 。掃描之后才會知道元數(shù)據(jù)*/通過 mode 來指定輸出文件的是 append 。創(chuàng)建新文件來追加文件peopleDF.select(name).write().mode(SaveMode.A

5、ppend).save(E:personNames);讀取過程源碼分析如下:1. read方法返回 DataFrameReader,用于讀取數(shù)據(jù)。/*: Experimental :Returns a DataFrameReader that can be used to read data in as a DataFrame.sqlContext.read.parquet(/path/to/file.parquet)sqlContext.read.schema(schema).json(/path/to/file.json)* group genericdata* since 1.4.0*/

6、Experimental/ 創(chuàng)建 DataFrameReader實例,獲得了DataFrameReader引用def read: DataFrameReader = new DataFrameReader(this)然后再調(diào)用 DataFrameReader 類中的 format ,指出讀取文件的格式。/*Specifies the input data source format.since 1.4.0*/def format(source: String): DataFrameReader = this.source = sourcethis3.通過 DtaFrameReader中 loa

7、d 方法通過路徑把傳入過來的輸入變成DataFrame。/*Loads input in as a DataFrame, for data sources that require a path (e.g. data backed bya local or distributed file system).*since 1.4.0*/TODO: Remove this one in Spark 2.0. def load(path: String): DataFrame = option(path, path).load()至此,數(shù)據(jù)的讀取工作就完成了,下面就對DataFrame進行操作。下面

8、就是寫操作!1. 調(diào)用 DataFrame中 select函數(shù)進行對列篩選/*Selects a set of columns. This is a variant of select that can only selectexisting columns using column names (i.e. cannot construct expressions).*/ The following two are equivalent:df.select(colA, colB)df.select($colA, $colB)group dfopssince 1.3.0*/scala.annot

9、ation.varargsdef select(col: String, cols: String*): DataFrame = select(col +: cols).map(Column(_) : _*)然后通過 write 將結(jié)果寫入到外部存儲系統(tǒng)中。/*: Experimental :Interface for saving the content of the DataFrame out into external storage.group outputsince 1.4.0*/Experimentaldef write: DataFrameWriter = new DataFra

10、meWriter(this)在保持文件的時候 mode 指定追加文件的方式/* Specifies the behavior when data or table already exists. Options include:/ Overwrite是覆蓋- SaveMode.Overwrite: overwrite the existing data.創(chuàng)建新的文件,然后追加- SaveMode.Append: append the data.- SaveMode.Ignore: ignore the operation (i.e. no-op).- SaveMode.ErrorIfExist

11、s: default option, throw an exception at runtime.since 1.4.0*/def mode(saveMode: SaveMode): DataFrameWriter = this.mode = saveModethis最后, save() 方法觸發(fā) action ,將文件輸出到指定文件中。/*Saves the content of the DataFrame at the specified path.since 1.4.0*/def save(path: String): Unit = this.extraOptions += (path

12、- path)save()三: Spark SQL讀寫整個流程圖如下:這里寫圖片描述四:對于流程中部分函數(shù)源碼詳解:DataFrameReader.Load()Load ()返回 DataFrame 類型的數(shù)據(jù)集合,使用的數(shù)據(jù)是從默認的路徑讀取。/*Returns the dataset stored at path as a DataFrame,using the default data source configured by spark.sql.sources.default.group genericdatadeprecated As of 1.4.0, replaced by re

13、ad().load(path). This will be removed in Spark 2.0.*/deprecated(Use read.load(path). This will be removed in Spark 2.0., 1.4.0)def load(path: String): DataFrame = / 此時的 read 就是 DataFrameReaderread.load(path)追蹤 load 源碼進去,源碼如下:在 DataFrameReader中的方法。 Load() 通過路徑把輸入傳進來變成一個DataFrame。/*Loads input in as a

14、 DataFrame, for data sources that require a path (e.g. data backed bya local or distributed file system).*since 1.4.0*/TODO: Remove this one in Spark 2.0. def load(path: String): DataFrame = option(path, path).load()追蹤 load 源碼如下:/*Loads input in as a DataFrame, for data sources that dont require a p

15、ath (e.g. externalkey-value stores).*since 1.4.0*/def load(): DataFrame = 對傳入的 Source 進行解析val resolved = ResolvedDataSource(sqlContext,userSpecifiedSchema = userSpecifiedSchema,partitionColumns = Array.emptyString,provider = source,options = extraOptions.toMap)DataFrame(sqlContext, LogicalRelation(r

16、esolved.relation)DataFrameReader.format()1. Format:具體指定文件格式,這就獲得一個巨大的啟示是:如果是Json文件格式可以保持為Parquet等此類操作。Spark SQL在讀取文件的時候可以指定讀取文件的類型。例如,Json,Parquet./* Specifies the input data source format.Built-in options include“parquet”,”json ”,etc.* since 1.4.0*/def format(source: String): DataFrameReader = this

17、.source = source /FileTypethisDataFrame.write()1. 創(chuàng)建 DataFrameWriter實例/*: Experimental :Interface for saving the content of the DataFrame out into external storage.group outputsince 1.4.0*/Experimentaldef write: DataFrameWriter = new DataFrameWriter(this)2.追蹤 DataFrameWriter源碼如下:以 DataFrame的方式向外部存儲系

18、統(tǒng)中寫入數(shù)據(jù)。/*: Experimental :Interface used to write a DataFrame to external storage systems (e.g. file systems,key-value stores, etc). Use DataFrame.write to access this.*since 1.4.0*/Experimentalfinal class DataFrameWriter privatesql(df: DataFrame) DataFrameWriter.mode()Overwrite 是覆蓋,之前寫的數(shù)據(jù)全都被覆蓋了。Appe

19、nd: 是追加,對于普通文件是在一個文件中進行追加,但是對于parquet格式的文件則創(chuàng)建新的文件進行追加。/*Specifies the behavior when data or table already exists. Options include:- SaveMode.Overwrite: overwrite the existing data.- SaveMode.Append: append the data.- SaveMode.Ignore: ignore the operation (i.e. no-op).默認操作- SaveMode.ErrorIfExists: de

20、fault option, throw an exception at runtime.since 1.4.0*/def mode(saveMode: SaveMode): DataFrameWriter = this.mode = saveModethis通過模式匹配接收外部參數(shù)/*Specifies the behavior when data or table already exists. Options include:- overwrite: overwrite the existing data.- append: append the data.- ignore: ignore

21、 the operation (i.e. no-op).- error: default option, throw an exception at runtime.*since 1.4.0*/def mode(saveMode: String): DataFrameWriter = this.mode = saveMode.toLowerCase match case overwrite = SaveMode.Overwritecase append = SaveMode.Appendcase ignore = SaveMode.Ignorecase error | default = Sa

22、veMode.ErrorIfExistscase _ = throw new IllegalArgumentException(sUnknown save mode: $saveMode. + Accepted modes are overwrite, append, ignore, error.)thisDataFrameWriter.save()save 將結(jié)果保存?zhèn)魅氲穆窂健?*Saves the content of the DataFrame at the specified path.since 1.4.0*/def save(path: String): Unit = this.

23、extraOptions += (path - path)save()追蹤 save 方法。/*Saves the content of the DataFrame as the specified table.since 1.4.0*/def save(): Unit = ResolvedDataSource(df.sqlContext,source,partitioningColumns.map(_.toArray).getOrElse(Array.emptyString),mode,extraOptions.toMap,df)其中 source 是 SQLConf 的 defaultDa

24、taSourceNameprivate var source: String = df.sqlContext.conf.defaultDataSourceName11其中 DEFAULT_DATA_SOURCE_NAME默認參數(shù)是parquet。/ This is used to set the default data sourceval DEFAULT_DATA_SOURCE_NAME = stringConf(spark.sql.sources.default, defaultValue = Some(pache.spark.sql.parquet), doc = The default

25、 data source to use in input/output.)DataFrame.Scala中部分函數(shù)詳解:1. toDF函數(shù)是將 RDD 轉(zhuǎn)換成 DataFrame/*Returns the object itself.group basicsince 1.3.0*/This is declared with parentheses to prevent the Scala compiler from treatingrdd.toDF(1) as invoking this toDF and then apply on the returned DataFrame. def toDF(): DataFrame = thisshow() 方法:將結(jié)果顯示出來/*Displays the DataFrame in a tabular form. For example:*yearmonth AVG(Adj Close) MAX(Adj Close)*1980120.5032180.595103*1981010.5232890.570307*1982020.4365040.475256

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論