版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)
文檔簡介
1、Spark SQL數(shù)據(jù)加載和保存實戰(zhàn)一:前置知識詳解:Spark SQL 重要是操作 DataFrame ,DataFrame 本身提供了 save 和 load 的操作, Load :可以創(chuàng)建 DataFrame ,Save :把 DataFrame中的數(shù)據(jù)保存到文件或者說與具體的格式來指明我們要讀取的文件的類型以及與具體的格式來指出我們要輸出的文件是什么類型。二: Spark SQL讀寫數(shù)據(jù)代碼實戰(zhàn):import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spa
2、rk.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.*;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import java.util.ArrayList;import java.util.List;public
3、 class SparkSQLLoadSaveOps public static void main(String args) SparkConfconfSparkConf().setMaster(local).setAppName(SparkSQLLoadSaveOps);JavaSparkContext sc = new JavaSparkContext(conf);SQLContext = new SQLContext(sc);/* read() 是 DataFrameReader類型, load 可以將數(shù)據(jù)讀取出來*/=newDataFrame peopleDF = sqlContex
4、t.read().format(json).load(E:SparkSparkinstanll_packageBig_Data_Softwarespa rk-1.6.0-bin-hadoop2.6examplessrcmainresourcespeople.json);/* 直接對 DataFrame進行操作* Json:是一種自解釋的格式,讀取Json 的時候怎么判斷其是什么格式?通過掃描整個 Json 。掃描之后才會知道元數(shù)據(jù)*/通過 mode 來指定輸出文件的是 append 。創(chuàng)建新文件來追加文件peopleDF.select(name).write().mode(SaveMode.A
5、ppend).save(E:personNames);讀取過程源碼分析如下:1. read方法返回 DataFrameReader,用于讀取數(shù)據(jù)。/*: Experimental :Returns a DataFrameReader that can be used to read data in as a DataFrame.sqlContext.read.parquet(/path/to/file.parquet)sqlContext.read.schema(schema).json(/path/to/file.json)* group genericdata* since 1.4.0*/
6、Experimental/ 創(chuàng)建 DataFrameReader實例,獲得了DataFrameReader引用def read: DataFrameReader = new DataFrameReader(this)然后再調(diào)用 DataFrameReader 類中的 format ,指出讀取文件的格式。/*Specifies the input data source format.since 1.4.0*/def format(source: String): DataFrameReader = this.source = sourcethis3.通過 DtaFrameReader中 loa
7、d 方法通過路徑把傳入過來的輸入變成DataFrame。/*Loads input in as a DataFrame, for data sources that require a path (e.g. data backed bya local or distributed file system).*since 1.4.0*/TODO: Remove this one in Spark 2.0. def load(path: String): DataFrame = option(path, path).load()至此,數(shù)據(jù)的讀取工作就完成了,下面就對DataFrame進行操作。下面
8、就是寫操作!1. 調(diào)用 DataFrame中 select函數(shù)進行對列篩選/*Selects a set of columns. This is a variant of select that can only selectexisting columns using column names (i.e. cannot construct expressions).*/ The following two are equivalent:df.select(colA, colB)df.select($colA, $colB)group dfopssince 1.3.0*/scala.annot
9、ation.varargsdef select(col: String, cols: String*): DataFrame = select(col +: cols).map(Column(_) : _*)然后通過 write 將結(jié)果寫入到外部存儲系統(tǒng)中。/*: Experimental :Interface for saving the content of the DataFrame out into external storage.group outputsince 1.4.0*/Experimentaldef write: DataFrameWriter = new DataFra
10、meWriter(this)在保持文件的時候 mode 指定追加文件的方式/* Specifies the behavior when data or table already exists. Options include:/ Overwrite是覆蓋- SaveMode.Overwrite: overwrite the existing data.創(chuàng)建新的文件,然后追加- SaveMode.Append: append the data.- SaveMode.Ignore: ignore the operation (i.e. no-op).- SaveMode.ErrorIfExist
11、s: default option, throw an exception at runtime.since 1.4.0*/def mode(saveMode: SaveMode): DataFrameWriter = this.mode = saveModethis最后, save() 方法觸發(fā) action ,將文件輸出到指定文件中。/*Saves the content of the DataFrame at the specified path.since 1.4.0*/def save(path: String): Unit = this.extraOptions += (path
12、- path)save()三: Spark SQL讀寫整個流程圖如下:這里寫圖片描述四:對于流程中部分函數(shù)源碼詳解:DataFrameReader.Load()Load ()返回 DataFrame 類型的數(shù)據(jù)集合,使用的數(shù)據(jù)是從默認的路徑讀取。/*Returns the dataset stored at path as a DataFrame,using the default data source configured by spark.sql.sources.default.group genericdatadeprecated As of 1.4.0, replaced by re
13、ad().load(path). This will be removed in Spark 2.0.*/deprecated(Use read.load(path). This will be removed in Spark 2.0., 1.4.0)def load(path: String): DataFrame = / 此時的 read 就是 DataFrameReaderread.load(path)追蹤 load 源碼進去,源碼如下:在 DataFrameReader中的方法。 Load() 通過路徑把輸入傳進來變成一個DataFrame。/*Loads input in as a
14、 DataFrame, for data sources that require a path (e.g. data backed bya local or distributed file system).*since 1.4.0*/TODO: Remove this one in Spark 2.0. def load(path: String): DataFrame = option(path, path).load()追蹤 load 源碼如下:/*Loads input in as a DataFrame, for data sources that dont require a p
15、ath (e.g. externalkey-value stores).*since 1.4.0*/def load(): DataFrame = 對傳入的 Source 進行解析val resolved = ResolvedDataSource(sqlContext,userSpecifiedSchema = userSpecifiedSchema,partitionColumns = Array.emptyString,provider = source,options = extraOptions.toMap)DataFrame(sqlContext, LogicalRelation(r
16、esolved.relation)DataFrameReader.format()1. Format:具體指定文件格式,這就獲得一個巨大的啟示是:如果是Json文件格式可以保持為Parquet等此類操作。Spark SQL在讀取文件的時候可以指定讀取文件的類型。例如,Json,Parquet./* Specifies the input data source format.Built-in options include“parquet”,”json ”,etc.* since 1.4.0*/def format(source: String): DataFrameReader = this
17、.source = source /FileTypethisDataFrame.write()1. 創(chuàng)建 DataFrameWriter實例/*: Experimental :Interface for saving the content of the DataFrame out into external storage.group outputsince 1.4.0*/Experimentaldef write: DataFrameWriter = new DataFrameWriter(this)2.追蹤 DataFrameWriter源碼如下:以 DataFrame的方式向外部存儲系
18、統(tǒng)中寫入數(shù)據(jù)。/*: Experimental :Interface used to write a DataFrame to external storage systems (e.g. file systems,key-value stores, etc). Use DataFrame.write to access this.*since 1.4.0*/Experimentalfinal class DataFrameWriter privatesql(df: DataFrame) DataFrameWriter.mode()Overwrite 是覆蓋,之前寫的數(shù)據(jù)全都被覆蓋了。Appe
19、nd: 是追加,對于普通文件是在一個文件中進行追加,但是對于parquet格式的文件則創(chuàng)建新的文件進行追加。/*Specifies the behavior when data or table already exists. Options include:- SaveMode.Overwrite: overwrite the existing data.- SaveMode.Append: append the data.- SaveMode.Ignore: ignore the operation (i.e. no-op).默認操作- SaveMode.ErrorIfExists: de
20、fault option, throw an exception at runtime.since 1.4.0*/def mode(saveMode: SaveMode): DataFrameWriter = this.mode = saveModethis通過模式匹配接收外部參數(shù)/*Specifies the behavior when data or table already exists. Options include:- overwrite: overwrite the existing data.- append: append the data.- ignore: ignore
21、 the operation (i.e. no-op).- error: default option, throw an exception at runtime.*since 1.4.0*/def mode(saveMode: String): DataFrameWriter = this.mode = saveMode.toLowerCase match case overwrite = SaveMode.Overwritecase append = SaveMode.Appendcase ignore = SaveMode.Ignorecase error | default = Sa
22、veMode.ErrorIfExistscase _ = throw new IllegalArgumentException(sUnknown save mode: $saveMode. + Accepted modes are overwrite, append, ignore, error.)thisDataFrameWriter.save()save 將結(jié)果保存?zhèn)魅氲穆窂健?*Saves the content of the DataFrame at the specified path.since 1.4.0*/def save(path: String): Unit = this.
23、extraOptions += (path - path)save()追蹤 save 方法。/*Saves the content of the DataFrame as the specified table.since 1.4.0*/def save(): Unit = ResolvedDataSource(df.sqlContext,source,partitioningColumns.map(_.toArray).getOrElse(Array.emptyString),mode,extraOptions.toMap,df)其中 source 是 SQLConf 的 defaultDa
24、taSourceNameprivate var source: String = df.sqlContext.conf.defaultDataSourceName11其中 DEFAULT_DATA_SOURCE_NAME默認參數(shù)是parquet。/ This is used to set the default data sourceval DEFAULT_DATA_SOURCE_NAME = stringConf(spark.sql.sources.default, defaultValue = Some(pache.spark.sql.parquet), doc = The default
25、 data source to use in input/output.)DataFrame.Scala中部分函數(shù)詳解:1. toDF函數(shù)是將 RDD 轉(zhuǎn)換成 DataFrame/*Returns the object itself.group basicsince 1.3.0*/This is declared with parentheses to prevent the Scala compiler from treatingrdd.toDF(1) as invoking this toDF and then apply on the returned DataFrame. def toDF(): DataFrame = thisshow() 方法:將結(jié)果顯示出來/*Displays the DataFrame in a tabular form. For example:*yearmonth AVG(Adj Close) MAX(Adj Close)*1980120.5032180.595103*1981010.5232890.570307*1982020.4365040.475256
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025版高新技術(shù)企業(yè)知識產(chǎn)權(quán)轉(zhuǎn)讓合同3篇
- 二零二五年度化工原料搬運合作協(xié)議模板3篇
- 2025年度建筑施工現(xiàn)場文明施工與環(huán)境保護責(zé)任書3篇
- 正常分娩產(chǎn)婦護理
- 二零二五年度個體養(yǎng)殖戶與肉牛屠宰加工企業(yè)合同3篇
- 2024年標準頂管工程作業(yè)協(xié)議模板
- 2024版國際貿(mào)易操作實務(wù):合同屬性與標的的融合
- 2024水電站施工臨時道路及橋梁建設(shè)合同范本3篇
- 白內(nèi)障疾病的護理
- 2024房地產(chǎn)項目合作開發(fā)合同5篇
- 2024-2030年中國停車場建設(shè)行業(yè)發(fā)展趨勢投資策略研究報告
- 藍軍戰(zhàn)略課件
- 物業(yè)管理重難點分析及解決措施
- 北京郵電大學(xué)《數(shù)據(jù)庫系統(tǒng)》2022-2023學(xué)年第一學(xué)期期末試卷
- 湖北省黃岡市2023-2024學(xué)年高一上學(xué)期期末考試化學(xué)試題(含答案)
- 中國HDMI高清線行業(yè)市場動態(tài)分析及未來趨勢研判報告
- 物流公司安全生產(chǎn)監(jiān)督檢查管理制度
- DB22T 277-2011 建筑電氣防火檢驗規(guī)程
- 2024年基本公共衛(wèi)生服務(wù)工作計劃(三篇)
- 2024-2030年中國錸行業(yè)供需趨勢及發(fā)展規(guī)模分析報告
- 2025屆上海市復(fù)旦附中浦東分校物理高二上期末教學(xué)質(zhì)量檢測試題含解析
評論
0/150
提交評論