大數(shù)據(jù)導(dǎo)論思維第12章SPARKSQL課件_第1頁(yè)
大數(shù)據(jù)導(dǎo)論思維第12章SPARKSQL課件_第2頁(yè)
大數(shù)據(jù)導(dǎo)論思維第12章SPARKSQL課件_第3頁(yè)
大數(shù)據(jù)導(dǎo)論思維第12章SPARKSQL課件_第4頁(yè)
大數(shù)據(jù)導(dǎo)論思維第12章SPARKSQL課件_第5頁(yè)
已閱讀5頁(yè),還剩119頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶(hù)提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

大數(shù)據(jù)導(dǎo)論第十二章大數(shù)據(jù)導(dǎo)論第十二章1CONTENTS目錄PART01SPARKSQL簡(jiǎn)介PART02SPARKSQL執(zhí)行流程PART03基礎(chǔ)數(shù)據(jù)模型DATAFRAMEPART04使用SparkSQL的方式PART05SPARKSQL數(shù)據(jù)源PART06SPARKSQLCLI介紹PART07在Pyspark中使用SparkSQLPART08在Java中連接SparkSQLPART09習(xí)題CONTENTS目錄PART01SPARKSQL簡(jiǎn)介2PART01SparkSQL簡(jiǎn)介SparkSQL是一個(gè)用來(lái)處理結(jié)構(gòu)化數(shù)據(jù)的Spark組件,為Spark提供了查詢(xún)結(jié)構(gòu)化數(shù)據(jù)的能力。SparkSQL可被視為一個(gè)分布式的SQL查詢(xún)引擎,可以實(shí)現(xiàn)對(duì)多種數(shù)據(jù)格式和數(shù)據(jù)源進(jìn)行SQL操作,包括Parquet,Hive,MongoDB,JSON、HDFS、JDBC、S3和RDD等。PART01SparkSQL簡(jiǎn)介SparkSQL3SparkSQL簡(jiǎn)介SparkSQL介紹:SparkSQL是為了處理結(jié)構(gòu)化數(shù)據(jù)的一個(gè)Spark模塊。不同于SparkRDD的基本API,SparkSQL接口擁有更多關(guān)于數(shù)據(jù)結(jié)構(gòu)本身與執(zhí)行計(jì)劃等更多信息。在Spark內(nèi)部,SparkSQL可以利用這些信息更好地對(duì)操作進(jìn)行優(yōu)化。SparkSQL提供了三種訪(fǎng)問(wèn)接口:SQL,DataFrameAPI和DatasetAPI。當(dāng)計(jì)算引擎被用來(lái)執(zhí)行一個(gè)計(jì)算時(shí),有不同的API和語(yǔ)言種類(lèi)可供選擇。這種統(tǒng)一性意味著開(kāi)發(fā)人員可以來(lái)回輕松切換各種最熟悉的API來(lái)完成同一個(gè)計(jì)算工作。SparkSQL簡(jiǎn)介SparkSQL介紹:4SparkSQL簡(jiǎn)介SparkSQL具有如下特點(diǎn)數(shù)據(jù)兼容方面:能加載和查詢(xún)來(lái)自各種來(lái)源的數(shù)據(jù)。性能優(yōu)化方面:除了采取內(nèi)存列存儲(chǔ)、代碼生成等優(yōu)化技術(shù)外,還引進(jìn)成本模型對(duì)查詢(xún)進(jìn)行動(dòng)態(tài)評(píng)估、獲取最佳物理計(jì)劃等;組件擴(kuò)展方面:無(wú)論是SQL的語(yǔ)法解析器、分析器還是優(yōu)化器都可以重新定義,進(jìn)行擴(kuò)展。標(biāo)準(zhǔn)連接:SparkSQL包括具有行業(yè)標(biāo)準(zhǔn)JDBC和ODBC連接的服務(wù)器模式。SparkSQL簡(jiǎn)介SparkSQL具有如下特點(diǎn)5SparkSQL簡(jiǎn)介SparkSQL具有如下特點(diǎn)集成:無(wú)縫地將SQL查詢(xún)與Spark程序混合。SparkSQL允許將結(jié)構(gòu)化數(shù)據(jù)作為Spark中的分布式數(shù)據(jù)集(RDD)進(jìn)行查詢(xún),在Python,Scala和Java中集成了API。這種緊密的集成使得SQL查詢(xún)以及復(fù)雜的分析算法可以輕松地運(yùn)行??蓴U(kuò)展性:對(duì)于交互式查詢(xún)和長(zhǎng)查詢(xún)使用相同的引擎。SparkSQL利用RDD模型來(lái)支持查詢(xún)?nèi)蒎e(cuò),使其能夠擴(kuò)展到大型作業(yè),不需擔(dān)心為歷史數(shù)據(jù)使用不同的引擎。SparkSQL簡(jiǎn)介SparkSQL具有如下特點(diǎn)6PART02SparkSQL執(zhí)行流程PART02SparkSQL執(zhí)行流程7SparkSQL執(zhí)行流程類(lèi)似于關(guān)系型數(shù)據(jù)庫(kù),SparkSQL語(yǔ)句也是由Projection(a1,a2,a3)、DataSource(tableA)、Filter(condition)三部分組成,分別對(duì)應(yīng)SQL查詢(xún)過(guò)程中的Result、DataSource、Operation,也就是說(shuō)SQL語(yǔ)句按Result-->DataSource-->Operation的次序來(lái)描述的。SparkSQL執(zhí)行流程類(lèi)似于關(guān)系型數(shù)據(jù)庫(kù),SparkS8SparkSQL執(zhí)行流程解析(Parse)對(duì)讀入的SQL語(yǔ)句進(jìn)行解析,分辨出SQL語(yǔ)句中哪些詞是關(guān)鍵詞(如SELECT、FROM、WHERE),哪些是表達(dá)式、哪些是Projection、哪些是DataSource等,從而判斷SQL語(yǔ)句是否規(guī)范;綁定(Bind)將SQL語(yǔ)句和數(shù)據(jù)庫(kù)的數(shù)據(jù)字典(列、表、視圖等)進(jìn)行綁定,如果相關(guān)的Projection、DataSource等都存在,則這個(gè)SQL語(yǔ)句是可以執(zhí)行的;SparkSQL執(zhí)行流程解析(Parse)9SparkSQL執(zhí)行流程優(yōu)化(Optimize)一般的數(shù)據(jù)庫(kù)會(huì)提供幾個(gè)執(zhí)行計(jì)劃,這些計(jì)劃一般都有運(yùn)行統(tǒng)計(jì)數(shù)據(jù),數(shù)據(jù)庫(kù)會(huì)在這些計(jì)劃中選擇一個(gè)最優(yōu)計(jì)劃;執(zhí)行(Execute)按Operation-->DataSource-->Result的次序來(lái)執(zhí)行計(jì)劃。在執(zhí)行過(guò)程有時(shí)候甚至不需要讀取物理表就可以返回結(jié)果,比如重新運(yùn)行剛運(yùn)行過(guò)的SQL語(yǔ)句,可能直接從數(shù)據(jù)庫(kù)的緩沖池中獲取返回結(jié)果。SparkSQL執(zhí)行流程優(yōu)化(Optimize)10PART03基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame是由“命名列”(類(lèi)似關(guān)系表的字段定義)所組織起來(lái)的一個(gè)分布式數(shù)據(jù)集合,可以把它看成是一個(gè)關(guān)系型數(shù)據(jù)庫(kù)的表。PART03基礎(chǔ)數(shù)據(jù)模型DataFrameDataFra11基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame是SparkSQL的核心,它將數(shù)據(jù)保存為行構(gòu)成的集合,行對(duì)應(yīng)列有相應(yīng)的列名。DataFrame與RDD的主要區(qū)別在于,DataFrame帶有Schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱(chēng)和類(lèi)型。這使得SparkSQL可以掌握更多的結(jié)構(gòu)信息,從而能夠?qū)ataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進(jìn)行了針對(duì)性的優(yōu)化,最終達(dá)到大幅提升運(yùn)行時(shí)效率的目標(biāo)?;A(chǔ)數(shù)據(jù)模型DataFrameDataFrame是Spark12基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame與RDD的對(duì)比:基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame與RDD的對(duì)13PART04使用SparkSQL的方式PART04使用SparkSQL的方式14使用SparkSQL的方式使用SparkSQL,首先利用sqlContext從外部數(shù)據(jù)源加載數(shù)據(jù)為DataFrame;然后,利用DataFrame上豐富的API進(jìn)行查詢(xún)、轉(zhuǎn)換;最后,將結(jié)果進(jìn)行展現(xiàn)或存儲(chǔ)為各種外部數(shù)據(jù)形式。Spark

SQL

為Spark提供了查詢(xún)結(jié)構(gòu)化數(shù)據(jù)的能力,查詢(xún)時(shí)既可以使用SQL也可以使用DataFrame

API(RDD)。通過(guò)ThriftServer,Spark

SQL支持多語(yǔ)言編程包括Java、Scala、Python及R。使用SparkSQL的方式使用SparkSQL,首先利用15使用SparkSQL的方式使用SparkSQL的方式16使用SparkSQL的方式加載數(shù)據(jù)①.從Hive中的users表構(gòu)造DataFrame:users=sqlContext.table("users")②.加載S3上的JSON文件:logs=sqlContext.load("s3n://path/to/data.json","json")③.加載HDFS上的Parquet文件:clicks=sqlContext.load("hdfs://path/to/data.parquet","parquet")使用SparkSQL的方式加載數(shù)據(jù)①.從Hive中的us17使用SparkSQL的方式加載數(shù)據(jù)④.通過(guò)JDBC訪(fǎng)問(wèn)MySQL:comments=sqlContext.jdbc("jdbc:mysql://localhost/comments","user")⑤.將普通RDD轉(zhuǎn)變?yōu)镈ataFrame:rdd=sparkContext.textFile(“article.txt”).flatMap(_.split("")).map((_,1)).reduceByKey(_+_)wordCounts=sqlContext.createDataFrame(rdd,["word","count"])使用SparkSQL的方式加載數(shù)據(jù)④.通過(guò)JDBC訪(fǎng)問(wèn)M18使用SparkSQL的方式加載數(shù)據(jù)⑥.將本地?cái)?shù)據(jù)容器轉(zhuǎn)變?yōu)镈ataFrame:data=[("Alice",21),("Bob",24)]people=sqlContext.createDataFrame(data,["name","age"])⑦.將Pandas

DataFrame轉(zhuǎn)變?yōu)镾park

DataFrame(Python

API特有功能):sparkDF

=

sqlContext.createDataFrame(pandasDF)使用SparkSQL的方式加載數(shù)據(jù)⑥.將本地?cái)?shù)據(jù)容器轉(zhuǎn)變19使用SparkSQL的方式使用DataFrame①.創(chuàng)建一個(gè)只包含"年輕"用戶(hù)的DataFrame:young=users.filter(users.age<21)②.也可以使用Pandas風(fēng)格的語(yǔ)法:young=users[users.age<21]③.將所有人的年齡加1:young.select(,young.age+1)使用SparkSQL的方式使用DataFrame①.創(chuàng)建20使用SparkSQL的方式使用DataFrame④.統(tǒng)計(jì)年輕用戶(hù)中各性別人數(shù):young.groupBy("gender").count()⑤.將所有年輕用戶(hù)與另一個(gè)名為logs的DataFrame聯(lián)接起來(lái):young.join(logs,logs.userId==users.userId,"left_outer")使用SparkSQL的方式使用DataFrame④.統(tǒng)計(jì)21使用SparkSQL的方式保存結(jié)果①.追加至HDFS上的Parquet文件:young.save(path="hdfs://path/to/data.parquet",source="parquet",mode="append")②.覆寫(xiě)S3上的JSON文件:young.save(path="s3n://path/to/data.json",source="json",mode="append")使用SparkSQL的方式保存結(jié)果①.追加至HDFS上的22使用SparkSQL的方式保存結(jié)果③.保存為SQL表:young.saveAsTable(tableName="young",source="parquet"mode="overwrite")④.轉(zhuǎn)換為PandasDataFrame(PythonAPI特有功能):pandasDF=young.toPandas()⑤.以表格形式打印輸出:young.show()使用SparkSQL的方式保存結(jié)果③.保存為SQL表:23SparkSQL通過(guò)DataFrame接口使用多種數(shù)據(jù)源。應(yīng)用程序可以直接使用關(guān)系型轉(zhuǎn)換對(duì)DataFrame進(jìn)行操作,也可以用來(lái)創(chuàng)建臨時(shí)表。把DataFrame注冊(cè)為臨時(shí)表后就可以使用SQL對(duì)其數(shù)據(jù)進(jìn)行查詢(xún)PART05SparkSQL數(shù)據(jù)源SparkSQL通過(guò)DataFrame接口使用多種數(shù)據(jù)源。24通用Load方法SparkSQL使用數(shù)據(jù)源的最簡(jiǎn)單模式就是對(duì)缺省數(shù)據(jù)源類(lèi)型來(lái)進(jìn)行各種操作。缺省數(shù)據(jù)源類(lèi)型是parquet文件df=spark.read.load("examples/src/main/resources/users.parquet")df.select("name","favorite_color").write.save("namesAndFavColors.parquet")也可以通過(guò)load()函數(shù)的參數(shù)來(lái)說(shuō)明數(shù)據(jù)源的類(lèi)型:df=spark.read.load("examples/src/main/resources/people.json",format="json")df.select("name","age").write.save("namesAndAges.parquet",format="parquet")通用Load方法SparkSQL使用數(shù)據(jù)源的最簡(jiǎn)單模式就是25通用Load方法也可以不把文件上載到DataFrame,然后再對(duì)其進(jìn)行查詢(xún)操作,而是直接對(duì)文件進(jìn)行SQL查詢(xún):df=spark.sql("SELECT*FROMparquet.examples/src/main/resources/users.parquet")通用Load方法也可以不把文件上載到DataFrame,然后26通用Save方法Save方法Save()操作的可選參數(shù)SaveMode用來(lái)說(shuō)明如何處理已有數(shù)據(jù)。需要提醒的是這些保存操作不是原子操作,如果選擇的是Overwrite模式,Save()操作會(huì)首先刪除已有的數(shù)據(jù),然后才寫(xiě)入新的數(shù)據(jù)。SaveMode的可選值:Scala/Java任何語(yǔ)言解釋SaveMode.ErrorIfExists"error"(缺省值)如果有數(shù)據(jù)存在,就拋異常。SaveMode.Append"append"如果有數(shù)據(jù)/表存在,就添加新數(shù)據(jù)到老數(shù)據(jù)后面。SaveMode.Overwrite"overwrite"如果有數(shù)據(jù)/表存在,就用新數(shù)據(jù)覆蓋老數(shù)據(jù)。SaveMode.Ignore"ignore"如果有數(shù)據(jù)存在,就不寫(xiě)入新數(shù)據(jù)。通用Save方法Save方法Save()操作的可選參數(shù)Sav27通用Save方法基于文件的數(shù)據(jù)源,可以用path可選參數(shù)定義一個(gè)數(shù)據(jù)庫(kù)表的路徑,比如:df.write.option(“path”,“/some/path”).saveAsTable(“t”)即使數(shù)據(jù)庫(kù)表刪除后,該數(shù)據(jù)庫(kù)表路徑和數(shù)據(jù)也不會(huì)被刪除。如果程序不指定數(shù)據(jù)庫(kù)表路徑,Spark會(huì)把數(shù)據(jù)寫(xiě)到一個(gè)缺省的數(shù)據(jù)庫(kù)表路徑下。但是在這種情況下,當(dāng)數(shù)據(jù)庫(kù)被刪除的時(shí)候,缺省數(shù)據(jù)庫(kù)表的路徑將會(huì)被刪除。通用Save方法基于文件的數(shù)據(jù)源,可以用path可選參數(shù)定義28通用Save方法對(duì)基于文件的數(shù)據(jù)源還可以對(duì)其輸出進(jìn)行分桶、排序和分割,分桶和排序只能用于當(dāng)輸出方式為持久數(shù)據(jù)庫(kù)表的時(shí)候。df.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed")當(dāng)進(jìn)行分割時(shí),可以使用DatasetAPI接口的save()和saveAsTable()方法:df.write.partitionBy("favor_color").format("parquet").save("namesPartByColor.parquet")通用Save方法對(duì)基于文件的數(shù)據(jù)源還可以對(duì)其輸出進(jìn)行分桶、排29通用Save方法還可以對(duì)同一張表同時(shí)進(jìn)行分割和分桶:df=spark.read.parquet("examples/src/main/resources/users.parquet")(df.write.partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("people_partitioned_bucketed"))通用Save方法還可以對(duì)同一張表同時(shí)進(jìn)行分割和分桶:df=30Parquet文件數(shù)據(jù)源Parquet文件數(shù)據(jù)源SparkSQL支持對(duì)Parquet文件的讀寫(xiě)操作,并自動(dòng)保持源數(shù)據(jù)的模式。為了兼容性,在寫(xiě)Parquet文件的時(shí)候,所有的列會(huì)自動(dòng)轉(zhuǎn)換為可為空的列。把DataFrame保存為Parquet文件,格式信息會(huì)自動(dòng)保留。peopleDF=spark.read.json("examples/src/main/resources/people.json")df.write.partitionBy("favor_color").format("parquet").save("namesPartByColor.parquet")Parquet文件數(shù)據(jù)源Parquet文件數(shù)據(jù)源Spark31讀取上面存儲(chǔ)的Parquet文件為DataFrameParquet文件也可以用來(lái)創(chuàng)建臨時(shí)表供SQL查詢(xún)使用parquetFile=spark.read.parquet("people.parquet")parquetFile.createOrReplaceTempView("parquetFile")teenagers=spark.sql("SELECTnameFROMparquetFileWHEREage>=13ANDage<=19")teenagers.show()Parquet文件數(shù)據(jù)源讀取上面存儲(chǔ)的Parquet文件為DataFrameParq32JSONDataSets數(shù)據(jù)源JSONDataSets數(shù)據(jù)源SparkSQL可以自動(dòng)根據(jù)JSONDataSet的格式把其上載為DataFrame。用路徑指定JSONdataset;路徑下可以是一個(gè)文件,也可以是多個(gè)文件:sc=spark.sparkContextpath="examples/src/main/resources/people.json"peopleDF=spark.read.json(path)使用的結(jié)構(gòu)可以調(diào)用printSchema()方法打?。簆eopleDF.printSchema()JSONDataSets數(shù)據(jù)源JSONDataSets33利用DataFrame創(chuàng)建一個(gè)臨時(shí)表:使用Spark的sql方法進(jìn)行SQL查詢(xún):peopleDF.createOrReplaceTempView("people")teenagerNamesDF=spark.sql("SELECTnameFROMpeopleWHEREageBETWEEN13AND19")teenagerNamesDF.show()JSONDataSets數(shù)據(jù)源利用DataFrame創(chuàng)建一個(gè)臨時(shí)表:使用Spark的sql34JSONdataset的DataFrame也可以是RDD[String]格式,每個(gè)JSON對(duì)象為一個(gè)string:jsonStrings=['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']otherPeopleRDD=sc.parallelize(jsonStrings)otherPeople=spark.read.json(otherPeopleRDD)otherPeople.show()JSONDataSets數(shù)據(jù)源JSONdataset的DataFrame也可以是RDD[35Hive表數(shù)據(jù)源Hive表數(shù)據(jù)源SparkSQL支持對(duì)Hive中的數(shù)據(jù)進(jìn)行讀寫(xiě)。首先創(chuàng)建一個(gè)支持Hive的SparkSession對(duì)象,包括與Hivemetastore的連接,支持Hive的序列化和反序列化操作,支持用戶(hù)定義的Hive操作等。warehouse_location=abspath('spark-warehouse')spark=SparkSession.builder\.appName("PythonSparkSQLHiveintegrationexample")\.config("spark.sql.warehouse.dir",warehouse_location)\.enableHiveSupport().getOrCreate()warehouse_location指定數(shù)據(jù)庫(kù)和表的缺省位置:Hive表數(shù)據(jù)源Hive表數(shù)據(jù)源SparkSQL支持對(duì)Hi36Hive表數(shù)據(jù)源spark.sql("CREATETABLEIFNOTEXISTSsrc(keyINT,valueSTRING)USINGhive")spark.sql("LOADDATALOCALINPATH'examples/src/main/resources/kv1.txt'INTOTABLEsrc")基于新創(chuàng)建的SparkSession創(chuàng)建表和上載數(shù)據(jù)到表中:spark.sql("SELECT*FROMsrc").show()spark.sql("SELECTCOUNT(*)FROMsrc").show()使用HiveQL進(jìn)行查詢(xún):Hive表數(shù)據(jù)源spark.sql("CREATETABL37Hive表數(shù)據(jù)源sqlDF=spark.sql("SELECTkey,valueFROMsrcWHEREkey<10ORDERBYkey")SQL查詢(xún)的結(jié)果也是DataFrames,可以對(duì)結(jié)果進(jìn)行所有DataFrame的操作:stringsDS=sqlDF.rdd.map(lambdarow:"Key:%d,Value:%s"%(row.key,row.value))forrecordinstringsDS.collect():print(record)DataFrames中的元素的類(lèi)型是Row,所以可以按次序訪(fǎng)問(wèn)列:Hive表數(shù)據(jù)源sqlDF=spark.sql("SEL38Hive表數(shù)據(jù)源Record=Row("key","value")recordsDF=spark.createDataFrame([Record(i,"val_"+str(i))foriinrange(1,101)])recordsDF.createOrReplaceTempView("records")也可以在同一個(gè)SparkSession內(nèi)用DataFrames來(lái)創(chuàng)建臨時(shí)表:Hive表數(shù)據(jù)源Record=Row("key","v39Hive表數(shù)據(jù)源創(chuàng)建Hive表的時(shí)候需要說(shuō)明表通過(guò)哪種方式讀寫(xiě)文件系統(tǒng),以及如何序列化和反序列化數(shù)據(jù)。屬性名解釋fileFormat文件格式定義了存儲(chǔ)的格式,包括serde,inputformat和outputformat。目前支持6種文件格式:sequencefile,rcfile,orc,parquet,textfile和avro。inputFormat,outputFormat必須按對(duì)出現(xiàn),分別定義存儲(chǔ)的inputformat和outputformat。不能與fileFormat同時(shí)使用。serde說(shuō)明serde類(lèi)的名字。當(dāng)文件格式本身沒(méi)有serde時(shí)候使用。Hive表數(shù)據(jù)源創(chuàng)建Hive表的時(shí)候需要說(shuō)明表通過(guò)哪種方式讀40SparkCLI指使用命令界面直接輸入SQL命令,然后發(fā)送到Spark集群進(jìn)行執(zhí)行,在界面中顯示運(yùn)行過(guò)程和最終的結(jié)果。PART06SparkSQLCLI介紹SparkCLI指使用命令界面直接輸入SQL命令,然后發(fā)送41SparkSQLCLI介紹SparkCLI指的是使用命令界面直接輸入SQL命令,然后發(fā)送到Spark集群進(jìn)行執(zhí)行,在界面中顯示運(yùn)行過(guò)程和最終的結(jié)果。SparkSQL已經(jīng)集成在SparkShell中,因此,只要啟動(dòng)SparkShell,就可以使用SparkSQL的Shell交互接口:bin/spark-shell--masterspark://hdp-node-01:7077或者,可以啟動(dòng)SparkSQL界面,使用起來(lái)更方便:bin/spark-sql--masterspark://hdp-node-01:7077SparkSQLCLI介紹SparkCLI指的是使用命42SparkSQLCLI介紹SparkSQL所有功能的入口是SQLContext類(lèi)及其子類(lèi)。為了創(chuàng)建一個(gè)基本的SQLContext,需要一個(gè)SparkContext。scala>valsqlContext=neworg.apache.spark.sql.SQLContext(sc)sqlContext:org.apache.spark.sql.SQLContext=org.apache.spark.sql.SQLContext@1943a343scala>importsqlContext.implicits._importsqlContext.implicits._SQLContextSparkSQLCLI介紹SparkSQL所有功能的入43SparkSQLCLI介紹下面的操作基于一個(gè)簡(jiǎn)單的數(shù)據(jù)文件people.json,文件的內(nèi)容如下:{"name":"Michael"}{"name":"Andy","age":30}{"name":"Justin","age":19}數(shù)據(jù)文件下面語(yǔ)句從本地文件people.json讀取數(shù)據(jù)創(chuàng)建DataFrame:{valdf=sqlContext.read.json("file:///data/people.json")………df:org.apache.spark.sql.DataFrame=[age:bigint,name:string]創(chuàng)建DataFramesSparkSQLCLI介紹下面的操作基于一個(gè)簡(jiǎn)單的數(shù)據(jù)文44Pyspark是針對(duì)Spark的PythonAPI。Spark使用py4j來(lái)實(shí)現(xiàn)Python與Java的互操作,從而實(shí)現(xiàn)使用Python編寫(xiě)Spark程序。Spark也同樣提供了Pyspark,一個(gè)Spark的PythonShell,可以以交互的方式使用Python編寫(xiě)Spark程序。PART07在Pyspark中使用SparkSQLPyspark是針對(duì)Spark的PythonAPI。Spa45在Pyspark中使用SparkSQL在終端上啟動(dòng)Python

Spark

Shell:./bin/pyspark使用JSON文件作為數(shù)據(jù)源,創(chuàng)建JSON文件/home/sparksql/courses.json,并輸入下面的內(nèi)容:實(shí)例描述{"name":"Linux","type":"basic","length":10}{"name":"TCPIP","type":"project","length":15}{"name":"Python","type":"project","length":8}{"name":"GO","type":"basic","length":2}{"name":"Ruby","type":"basic","length":5}在Pyspark中使用SparkSQL在終端上啟動(dòng)Pyth46在Pyspark中使用SparkSQL首先使用SQLContext模塊,其作用是提供SparkSQL處理的功能。在PysparkShell中逐步輸入下面步驟的內(nèi)容:引入pyspark.sql中的SQLContext:frompyspark.sqlimportSQLContext創(chuàng)建SQLContext對(duì)象使用pyspark的SparkContext對(duì)象,創(chuàng)建SQLContext對(duì)象:sqlContext=SQLContext(sc)在Pyspark中使用SparkSQL首先使用SQLCon47在Pyspark中使用SparkSQLDataFrame對(duì)象可以由RDD創(chuàng)建,也可以從Hive表或JSON文件等數(shù)據(jù)源創(chuàng)建。創(chuàng)建DataFrame,指明來(lái)源自JSON文件:df=sqlContext.read.json("/home/shiyanlou/courses.json")創(chuàng)建DataFrame對(duì)象在Pyspark中使用SparkSQLDataFrame對(duì)48在Pyspark中使用SparkSQL首先打印當(dāng)前DataFrame里的內(nèi)容和數(shù)據(jù)表的格式:df.select("name").show()#展示了所有的課程名df.select("name","length").show()#展示了所有的課程名及課程長(zhǎng)度對(duì)DataFrame進(jìn)行操作show()函數(shù)將打印出JSON文件中存儲(chǔ)的數(shù)據(jù)表;使用printSchema()函數(shù)打印數(shù)據(jù)表的格式。然后對(duì)DataFrame的數(shù)據(jù)進(jìn)行各種操作:df.show()df.printSchema()在Pyspark中使用SparkSQL首先打印當(dāng)前Data49在Pyspark中使用SparkSQLdf.filter(df['type']=='basic').select('name','type').show()#展示了課程類(lèi)型為基礎(chǔ)課(basic)的課程名和課程類(lèi)型df.groupBy("type").count().show()#計(jì)算所有基礎(chǔ)課和項(xiàng)目課的數(shù)量。首先需要將DataFrame注冊(cè)為T(mén)able才可以在該表上執(zhí)行SQL語(yǔ)句:df.registerTempTable('courses')coursesRDD=sqlContext.sql("SELECTnameFROMcoursesWHERElength>=5andlength<=10")names=coursesRDD.rdd.map(lambdap:"Name:"+)fornameinnames.collect():printname執(zhí)行SQL語(yǔ)句在Pyspark中使用SparkSQLdf.filter(50在Pyspark中使用SparkSQLParquet是SparkSQL讀取的默認(rèn)數(shù)據(jù)文件格式,把從JSON中讀取的DataFrame保存為Parquet格式,只保存課程名稱(chēng)和長(zhǎng)度兩項(xiàng)數(shù)據(jù):df.select("name","length").write.save("/tmp/courses.parquet",format="parquet")保存DataFrame為其他格式將創(chuàng)建hdfs://master:9000/tmp/courses.parquet文件夾并存入課程名稱(chēng)和長(zhǎng)度數(shù)據(jù)。在Pyspark中使用SparkSQLParquet是Sp51SparkSQL實(shí)現(xiàn)了ThriftJDBC/ODBCserver,所以Java程序可以通過(guò)JDBC遠(yuǎn)程連接SparkSQL發(fā)送SQL語(yǔ)句并執(zhí)行。PART08在Java中連接SparkSQLSparkSQL實(shí)現(xiàn)了ThriftJDBC/ODBCs52在Java中連接SparkSQL首先將${HIVE_HOME}/conf/hive-site.xml拷貝到${SPARK_HOME}/conf目錄下。另外,因?yàn)镠ive元數(shù)據(jù)信息存儲(chǔ)在MySQL中,所以Spark在訪(fǎng)問(wèn)這些元數(shù)據(jù)信息時(shí)需要MySQL連接驅(qū)動(dòng)的支持。添加驅(qū)動(dòng)的方式有三種:在${SPARK_HOME}/conf目錄下的spark-defaults.conf中添加:spark.jars/opt/lib2/mysql-connector-java-5.1.26-bin.jar

;可以實(shí)現(xiàn)添加多個(gè)依賴(lài)jar比較方便:spark.driver.extraClassPath/opt/lib2/mysql-connector-java-5.1.26-bin.jar;設(shè)置配置在Java中連接SparkSQL首先將${HIVE_HOM53在運(yùn)行時(shí)添加--jars

/opt/lib2/mysql-connector-java-5.1.26-bin.jar做完上面的準(zhǔn)備工作后,SparkSQL和Hive就繼承在一起了,SparkSQL可以讀取Hive中的數(shù)據(jù)。設(shè)置配置啟動(dòng)Thrift在Spark根目錄下執(zhí)行:./sbin/start-thriftserver.sh開(kāi)啟thrift服務(wù)器,它可以接受所有spark-submit的參數(shù),并且還可以接受--hiveconf參數(shù)。不添加任何參數(shù)表示以local方式運(yùn)行,默認(rèn)的監(jiān)聽(tīng)端口為10000在Java中連接SparkSQL在運(yùn)行時(shí)添加--jars

/opt/lib2/mysql-54添加依賴(lài)打開(kāi)Eclipse用JDBC連接HiveServer2。新建一個(gè)Maven項(xiàng)目,在pom.xml添加以下依賴(lài):<dependency>

<groupId>org.apache.hive</groupId>

<artifactId>hive-jdbc</artifactId>

<version>1.2.1</version>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-common</artifactId>

<version>2.4.1</version>

</dependency>

在Java中連接SparkSQL添加依賴(lài)打開(kāi)Eclipse用JDBC連接HiveServe55添加依賴(lài)<dependency>

<groupId>jdk.tools</groupId>

<artifactId>jdk.tools</artifactId>

<version>1.6</version>

<scope>system</scope>

<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>

</dependency>

在Java中連接SparkSQL添加依賴(lài)<dependency>

在Java中連接Spar56JDBC連接HiveServer2的相關(guān)參數(shù):驅(qū)動(dòng):org.apache.hive.jdbc.HiveDriverurl:jdbc:hive2://31:10000/default用戶(hù)名:hadoop(啟動(dòng)thriftserver的linux用戶(hù)名)密碼:“”(默認(rèn)密碼為空)JDBC連接參數(shù)在Java中連接SparkSQLJDBC連接HiveServer2的相關(guān)參數(shù):JDBC連接57import

java.sql.Connection;

import

java.sql.DriverManager;

import

java.sql.ResultSet;

import

java.sql.SQLException;

import

java.sql.Statement;

public

class

Test1

{

public

static

void

main(String[]

args)

throws

SQLException

{

String

url

=

"jdbc:hive2://31:10000/default";

try

{

Class.forName("org.apache.hive.jdbc.HiveDriver");

}

catch

(ClassNotFoundException

e)

{

e.printStackTrace();

}

Connection

conn

=

DriverManager.getConnection(url,"hadoop","");

Statement

stmt

=

conn.createStatement();

String

sql

=

"SELECT

*

FROM

doc1

limit

10";

System.out.println("Running"+sql);

ResultSet

res

=

stmt.executeQuery(sql);

while(res.next()){

System.out.println("id:

"+res.getInt(1)+"\ttype:

"+res.getString(2)+"\tauthors:

"+res.getString(3)+"\ttitle:

"+res.getString(4)+"\tyear:"

+

res.getInt(5));

}

}

}

JDBC連接參數(shù)在Java中連接SparkSQLimport

java.sql.Connection;

J58PART09作業(yè)

PART09作業(yè)59作業(yè)作業(yè):什么是SparkSQL?其主要目的是什么?SparkSQL的執(zhí)行流程有哪幾個(gè)步驟?在SparkSQL中,什么是DataFrame?使用DataFrame的優(yōu)勢(shì)是什么?DataFrame與RDD的主要區(qū)別是什么?使用SparkSQL的方式有哪幾種?使用SparkSQL的步驟是什么?常用的SparkSQL的數(shù)據(jù)源有哪些?Parquet文件格式是什么?它的主要特點(diǎn)是什么?為了使Java程序可以通過(guò)JDBC遠(yuǎn)程連接SparkSQL,需要做哪些準(zhǔn)備工作?連接數(shù)據(jù)庫(kù)的語(yǔ)句是什么?有哪些參數(shù)?作業(yè)作業(yè):什么是SparkSQL?其主要目的是什么?60作業(yè)作業(yè):請(qǐng)按下述要求寫(xiě)出相應(yīng)的SparkSQL語(yǔ)句:從一個(gè)本地JSON文件創(chuàng)建DataFrame;打印DataFrame元數(shù)據(jù);按照列屬性過(guò)濾DataFrame的數(shù)據(jù);返回某列滿(mǎn)足條件的數(shù)據(jù);把DataFrame注冊(cè)成數(shù)據(jù)庫(kù)表。作業(yè)作業(yè):請(qǐng)按下述要求寫(xiě)出相應(yīng)的SparkSQL語(yǔ)句:61謝謝FORYOURLISTENINGHandgeCO.LTD.2016.12.09謝謝FORYOURLISTENINGHandgeCO.62

大數(shù)據(jù)導(dǎo)論第十二章大數(shù)據(jù)導(dǎo)論第十二章63CONTENTS目錄PART01SPARKSQL簡(jiǎn)介PART02SPARKSQL執(zhí)行流程PART03基礎(chǔ)數(shù)據(jù)模型DATAFRAMEPART04使用SparkSQL的方式PART05SPARKSQL數(shù)據(jù)源PART06SPARKSQLCLI介紹PART07在Pyspark中使用SparkSQLPART08在Java中連接SparkSQLPART09習(xí)題CONTENTS目錄PART01SPARKSQL簡(jiǎn)介64PART01SparkSQL簡(jiǎn)介SparkSQL是一個(gè)用來(lái)處理結(jié)構(gòu)化數(shù)據(jù)的Spark組件,為Spark提供了查詢(xún)結(jié)構(gòu)化數(shù)據(jù)的能力。SparkSQL可被視為一個(gè)分布式的SQL查詢(xún)引擎,可以實(shí)現(xiàn)對(duì)多種數(shù)據(jù)格式和數(shù)據(jù)源進(jìn)行SQL操作,包括Parquet,Hive,MongoDB,JSON、HDFS、JDBC、S3和RDD等。PART01SparkSQL簡(jiǎn)介SparkSQL65SparkSQL簡(jiǎn)介SparkSQL介紹:SparkSQL是為了處理結(jié)構(gòu)化數(shù)據(jù)的一個(gè)Spark模塊。不同于SparkRDD的基本API,SparkSQL接口擁有更多關(guān)于數(shù)據(jù)結(jié)構(gòu)本身與執(zhí)行計(jì)劃等更多信息。在Spark內(nèi)部,SparkSQL可以利用這些信息更好地對(duì)操作進(jìn)行優(yōu)化。SparkSQL提供了三種訪(fǎng)問(wèn)接口:SQL,DataFrameAPI和DatasetAPI。當(dāng)計(jì)算引擎被用來(lái)執(zhí)行一個(gè)計(jì)算時(shí),有不同的API和語(yǔ)言種類(lèi)可供選擇。這種統(tǒng)一性意味著開(kāi)發(fā)人員可以來(lái)回輕松切換各種最熟悉的API來(lái)完成同一個(gè)計(jì)算工作。SparkSQL簡(jiǎn)介SparkSQL介紹:66SparkSQL簡(jiǎn)介SparkSQL具有如下特點(diǎn)數(shù)據(jù)兼容方面:能加載和查詢(xún)來(lái)自各種來(lái)源的數(shù)據(jù)。性能優(yōu)化方面:除了采取內(nèi)存列存儲(chǔ)、代碼生成等優(yōu)化技術(shù)外,還引進(jìn)成本模型對(duì)查詢(xún)進(jìn)行動(dòng)態(tài)評(píng)估、獲取最佳物理計(jì)劃等;組件擴(kuò)展方面:無(wú)論是SQL的語(yǔ)法解析器、分析器還是優(yōu)化器都可以重新定義,進(jìn)行擴(kuò)展。標(biāo)準(zhǔn)連接:SparkSQL包括具有行業(yè)標(biāo)準(zhǔn)JDBC和ODBC連接的服務(wù)器模式。SparkSQL簡(jiǎn)介SparkSQL具有如下特點(diǎn)67SparkSQL簡(jiǎn)介SparkSQL具有如下特點(diǎn)集成:無(wú)縫地將SQL查詢(xún)與Spark程序混合。SparkSQL允許將結(jié)構(gòu)化數(shù)據(jù)作為Spark中的分布式數(shù)據(jù)集(RDD)進(jìn)行查詢(xún),在Python,Scala和Java中集成了API。這種緊密的集成使得SQL查詢(xún)以及復(fù)雜的分析算法可以輕松地運(yùn)行??蓴U(kuò)展性:對(duì)于交互式查詢(xún)和長(zhǎng)查詢(xún)使用相同的引擎。SparkSQL利用RDD模型來(lái)支持查詢(xún)?nèi)蒎e(cuò),使其能夠擴(kuò)展到大型作業(yè),不需擔(dān)心為歷史數(shù)據(jù)使用不同的引擎。SparkSQL簡(jiǎn)介SparkSQL具有如下特點(diǎn)68PART02SparkSQL執(zhí)行流程PART02SparkSQL執(zhí)行流程69SparkSQL執(zhí)行流程類(lèi)似于關(guān)系型數(shù)據(jù)庫(kù),SparkSQL語(yǔ)句也是由Projection(a1,a2,a3)、DataSource(tableA)、Filter(condition)三部分組成,分別對(duì)應(yīng)SQL查詢(xún)過(guò)程中的Result、DataSource、Operation,也就是說(shuō)SQL語(yǔ)句按Result-->DataSource-->Operation的次序來(lái)描述的。SparkSQL執(zhí)行流程類(lèi)似于關(guān)系型數(shù)據(jù)庫(kù),SparkS70SparkSQL執(zhí)行流程解析(Parse)對(duì)讀入的SQL語(yǔ)句進(jìn)行解析,分辨出SQL語(yǔ)句中哪些詞是關(guān)鍵詞(如SELECT、FROM、WHERE),哪些是表達(dá)式、哪些是Projection、哪些是DataSource等,從而判斷SQL語(yǔ)句是否規(guī)范;綁定(Bind)將SQL語(yǔ)句和數(shù)據(jù)庫(kù)的數(shù)據(jù)字典(列、表、視圖等)進(jìn)行綁定,如果相關(guān)的Projection、DataSource等都存在,則這個(gè)SQL語(yǔ)句是可以執(zhí)行的;SparkSQL執(zhí)行流程解析(Parse)71SparkSQL執(zhí)行流程優(yōu)化(Optimize)一般的數(shù)據(jù)庫(kù)會(huì)提供幾個(gè)執(zhí)行計(jì)劃,這些計(jì)劃一般都有運(yùn)行統(tǒng)計(jì)數(shù)據(jù),數(shù)據(jù)庫(kù)會(huì)在這些計(jì)劃中選擇一個(gè)最優(yōu)計(jì)劃;執(zhí)行(Execute)按Operation-->DataSource-->Result的次序來(lái)執(zhí)行計(jì)劃。在執(zhí)行過(guò)程有時(shí)候甚至不需要讀取物理表就可以返回結(jié)果,比如重新運(yùn)行剛運(yùn)行過(guò)的SQL語(yǔ)句,可能直接從數(shù)據(jù)庫(kù)的緩沖池中獲取返回結(jié)果。SparkSQL執(zhí)行流程優(yōu)化(Optimize)72PART03基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame是由“命名列”(類(lèi)似關(guān)系表的字段定義)所組織起來(lái)的一個(gè)分布式數(shù)據(jù)集合,可以把它看成是一個(gè)關(guān)系型數(shù)據(jù)庫(kù)的表。PART03基礎(chǔ)數(shù)據(jù)模型DataFrameDataFra73基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame是SparkSQL的核心,它將數(shù)據(jù)保存為行構(gòu)成的集合,行對(duì)應(yīng)列有相應(yīng)的列名。DataFrame與RDD的主要區(qū)別在于,DataFrame帶有Schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱(chēng)和類(lèi)型。這使得SparkSQL可以掌握更多的結(jié)構(gòu)信息,從而能夠?qū)ataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進(jìn)行了針對(duì)性的優(yōu)化,最終達(dá)到大幅提升運(yùn)行時(shí)效率的目標(biāo)。基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame是Spark74基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame與RDD的對(duì)比:基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame與RDD的對(duì)75PART04使用SparkSQL的方式PART04使用SparkSQL的方式76使用SparkSQL的方式使用SparkSQL,首先利用sqlContext從外部數(shù)據(jù)源加載數(shù)據(jù)為DataFrame;然后,利用DataFrame上豐富的API進(jìn)行查詢(xún)、轉(zhuǎn)換;最后,將結(jié)果進(jìn)行展現(xiàn)或存儲(chǔ)為各種外部數(shù)據(jù)形式。Spark

SQL

為Spark提供了查詢(xún)結(jié)構(gòu)化數(shù)據(jù)的能力,查詢(xún)時(shí)既可以使用SQL也可以使用DataFrame

API(RDD)。通過(guò)ThriftServer,Spark

SQL支持多語(yǔ)言編程包括Java、Scala、Python及R。使用SparkSQL的方式使用SparkSQL,首先利用77使用SparkSQL的方式使用SparkSQL的方式78使用SparkSQL的方式加載數(shù)據(jù)①.從Hive中的users表構(gòu)造DataFrame:users=sqlContext.table("users")②.加載S3上的JSON文件:logs=sqlContext.load("s3n://path/to/data.json","json")③.加載HDFS上的Parquet文件:clicks=sqlContext.load("hdfs://path/to/data.parquet","parquet")使用SparkSQL的方式加載數(shù)據(jù)①.從Hive中的us79使用SparkSQL的方式加載數(shù)據(jù)④.通過(guò)JDBC訪(fǎng)問(wèn)MySQL:comments=sqlContext.jdbc("jdbc:mysql://localhost/comments","user")⑤.將普通RDD轉(zhuǎn)變?yōu)镈ataFrame:rdd=sparkContext.textFile(“article.txt”).flatMap(_.split("")).map((_,1)).reduceByKey(_+_)wordCounts=sqlContext.createDataFrame(rdd,["word","count"])使用SparkSQL的方式加載數(shù)據(jù)④.通過(guò)JDBC訪(fǎng)問(wèn)M80使用SparkSQL的方式加載數(shù)據(jù)⑥.將本地?cái)?shù)據(jù)容器轉(zhuǎn)變?yōu)镈ataFrame:data=[("Alice",21),("Bob",24)]people=sqlContext.createDataFrame(data,["name","age"])⑦.將Pandas

DataFrame轉(zhuǎn)變?yōu)镾park

DataFrame(Python

API特有功能):sparkDF

=

sqlContext.createDataFrame(pandasDF)使用SparkSQL的方式加載數(shù)據(jù)⑥.將本地?cái)?shù)據(jù)容器轉(zhuǎn)變81使用SparkSQL的方式使用DataFrame①.創(chuàng)建一個(gè)只包含"年輕"用戶(hù)的DataFrame:young=users.filter(users.age<21)②.也可以使用Pandas風(fēng)格的語(yǔ)法:young=users[users.age<21]③.將所有人的年齡加1:young.select(,young.age+1)使用SparkSQL的方式使用DataFrame①.創(chuàng)建82使用SparkSQL的方式使用DataFrame④.統(tǒng)計(jì)年輕用戶(hù)中各性別人數(shù):young.groupBy("gender").count()⑤.將所有年輕用戶(hù)與另一個(gè)名為logs的DataFrame聯(lián)接起來(lái):young.join(logs,logs.userId==users.userId,"left_outer")使用SparkSQL的方式使用DataFrame④.統(tǒng)計(jì)83使用SparkSQL的方式保存結(jié)果①.追加至HDFS上的Parquet文件:young.save(path="hdfs://path/to/data.parquet",source="parquet",mode="append")②.覆寫(xiě)S3上的JSON文件:young.save(path="s3n://path/to/data.json",source="json",mode="append")使用SparkSQL的方式保存結(jié)果①.追加至HDFS上的84使用SparkSQL的方式保存結(jié)果③.保存為SQL表:young.saveAsTable(tableName="young",source="parquet"mode="overwrite")④.轉(zhuǎn)換為PandasDataFrame(PythonAPI特有功能):pandasDF=young.toPandas()⑤.以表格形式打印輸出:young.show()使用SparkSQL的方式保存結(jié)果③.保存為SQL表:85SparkSQL通過(guò)DataFrame接口使用多種數(shù)據(jù)源。應(yīng)用程序可以直接使用關(guān)系型轉(zhuǎn)換對(duì)DataFrame進(jìn)行操作,也可以用來(lái)創(chuàng)建臨時(shí)表。把DataFrame注冊(cè)為臨時(shí)表后就可以使用SQL對(duì)其數(shù)據(jù)進(jìn)行查詢(xún)PART05SparkSQL數(shù)據(jù)源SparkSQL通過(guò)DataFrame接口使用多種數(shù)據(jù)源。86通用Load方法SparkSQL使用數(shù)據(jù)源的最簡(jiǎn)單模式就是對(duì)缺省數(shù)據(jù)源類(lèi)型來(lái)進(jìn)行各種操作。缺省數(shù)據(jù)源類(lèi)型是parquet文件df=spark.read.load("examples/src/main/resources/users.parquet")df.select("name","favorite_color").write.save("namesAndFavColors.parquet")也可以通過(guò)load()函數(shù)的參數(shù)來(lái)說(shuō)明數(shù)據(jù)源的類(lèi)型:df=spark.read.load("examples/src/main/resources/people.json",format="json")df.select("name","age").write.save("namesAndAges.parquet",format="parquet")通用Load方法SparkSQL使用數(shù)據(jù)源的最簡(jiǎn)單模式就是87通用Load方法也可以不把文件上載到DataFrame,然后再對(duì)其進(jìn)行查詢(xún)操作,而是直接對(duì)文件進(jìn)行SQL查詢(xún):df=spark.sql("SELECT*FROMparquet.examples/src/main/resources/users.parquet")通用Load方法也可以不把文件上載到DataFrame,然后88通用Save方法Save方法Save()操作的可選參數(shù)SaveMode用來(lái)說(shuō)明如何處理已有數(shù)據(jù)。需要提醒的是這些保存操作不是原子操作,如果選擇的是Overwrite模式,Save()操作會(huì)首先刪除已有的數(shù)據(jù),然后才寫(xiě)入新的數(shù)據(jù)。SaveMode的可選值:Scala/Java任何語(yǔ)言解釋SaveMode.ErrorIfExists"error"(缺省值)如果有數(shù)據(jù)存在,就拋異常。SaveMode.Append"append"如果有數(shù)據(jù)/表存在,就添加新數(shù)據(jù)到老數(shù)據(jù)后面。SaveMode.Overwrite"overwrite"如果有數(shù)據(jù)/表存在,就用新數(shù)據(jù)覆蓋老數(shù)據(jù)。SaveMode.Ignore"ignore"如果有數(shù)據(jù)存在,就不寫(xiě)入新數(shù)據(jù)。通用Save方法Save方法Save()操作的可選參數(shù)Sav89通用Save方法基于文件的數(shù)據(jù)源,可以用path可選參數(shù)定義一個(gè)數(shù)據(jù)庫(kù)表的路徑,比如:df.write.option(“path”,“/some/path”).saveAsTable(“t”)即使數(shù)據(jù)庫(kù)表刪除后,該數(shù)據(jù)庫(kù)表路徑和數(shù)據(jù)也不會(huì)被刪除。如果程序不指定數(shù)據(jù)庫(kù)表路徑,Spark會(huì)把數(shù)據(jù)寫(xiě)到一個(gè)缺省的數(shù)據(jù)庫(kù)表路徑下。但是在這種情況下,當(dāng)數(shù)據(jù)庫(kù)被刪除的時(shí)候,缺省數(shù)據(jù)庫(kù)表的路徑將會(huì)被刪除。通用Save方法基于文件的數(shù)據(jù)源,可以用path可選參數(shù)定義90通用Save方法對(duì)基于文件的數(shù)據(jù)源還可以對(duì)其輸出進(jìn)行分桶、排序和分割,分桶和排序只能用于當(dāng)輸出方式為持久數(shù)據(jù)庫(kù)表的時(shí)候。df.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed")當(dāng)進(jìn)行分割時(shí),可以使用DatasetAPI接口的save()和saveAsTable()方法:df.write.partitionBy("favor_color").format("parquet").save("namesPartByColor.parquet")通用Save方法對(duì)基于文件的數(shù)據(jù)源還可以對(duì)其輸出進(jìn)行分桶、排91通用Save方法還可以對(duì)同一張表同時(shí)進(jìn)行分割和分桶:df=spark.read.parquet("examples/src/main/resources/users.parquet")(df.write.partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("people_partitioned_bucketed"))通用Save方法還可以對(duì)同一張表同時(shí)進(jìn)行分割和分桶:df=92Parquet文件數(shù)據(jù)源Parquet文件數(shù)據(jù)源SparkSQL支持對(duì)Parquet文件的讀寫(xiě)操作,并自動(dòng)保持源數(shù)據(jù)的模式。為了兼容性,在寫(xiě)Parquet文件的時(shí)候,所有的列會(huì)自動(dòng)轉(zhuǎn)換為可為空的列。把DataFrame保存為Parquet文件,格式信息會(huì)自動(dòng)保留。peopleDF=spark.read.json("examples/src/main/resources/people.json")df.write.partitionBy("favor_color").format("parquet").save("namesPartByColor.parquet")Parquet文件數(shù)據(jù)源Parquet文件數(shù)據(jù)源Spark93讀取上面存儲(chǔ)的Parquet文件為DataFrameParquet文件也可以用來(lái)創(chuàng)建臨時(shí)表供SQL查詢(xún)使用parquetFile=spark.read.parquet("people.parquet")parquetFile.createOrReplaceTempView("parquetFile")teenagers=spark.sql("SELECTnameFROMparquetFileWHEREage>=13ANDage<=19")teenagers.show()Parquet文件數(shù)據(jù)源讀取上面存儲(chǔ)的Parquet文件為DataFrameParq94JSONDataSets數(shù)據(jù)源JSONDataSets數(shù)據(jù)源SparkSQL可以自動(dòng)根據(jù)JSONDataSet的格式把其上載為DataFrame。用路徑指定JSONdataset;路徑下可以是一個(gè)文件,也可以是多個(gè)文件:sc=spark.sparkContextpath="examples/src/main/resources/people.json"peopleDF=spark.read.json(path)使用的結(jié)構(gòu)可以調(diào)用printSchema()方法打印:peopleDF.printSchema()JSONDataSets數(shù)據(jù)源JSONDataSets95利用DataFrame創(chuàng)建一個(gè)臨時(shí)表:使用Spark的sql方法進(jìn)行SQL查詢(xún):peopleDF.createOrReplaceTempView("people")teenagerNamesDF=spark.sql("SELECTnameFROMpeopleWHEREageBETWEEN13AND19")teenagerNamesDF.show()JSONDataSets數(shù)據(jù)源利用DataFrame創(chuàng)建一個(gè)臨時(shí)表:使用Spark的sql96JSONdataset的DataFrame也可以是RDD[String]格式,每個(gè)JSON對(duì)象為一個(gè)string:jsonStrings=['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']otherPeopleRDD=sc.parallelize(jsonStrings)otherPeople=spark.read.json(otherPeopleRDD)otherPeople.show()JSONDataSets數(shù)據(jù)源JSONdataset的DataFrame也可以是RDD[97Hive表數(shù)據(jù)源Hive表數(shù)據(jù)源SparkSQL支持對(duì)Hive中的數(shù)據(jù)進(jìn)行讀寫(xiě)。首先創(chuàng)建一個(gè)支持Hive的SparkSession對(duì)象,包括與Hivemetastore的連接,支持Hive的序列化和反序列化操作,支持用戶(hù)定義的Hive操作等。warehouse_location=abspath('spark-warehouse')spark=SparkSession.builder\.appName("PythonSparkSQLHiveintegrationexample")\.config("spark.sql.warehouse.dir",warehouse_location)\.enableHiveSupport().getOrCreate()warehouse_location指定數(shù)據(jù)庫(kù)和表的缺省位置:Hive表數(shù)據(jù)源Hive表數(shù)據(jù)源SparkSQL支持對(duì)Hi98Hive表數(shù)據(jù)源spark.sql("CREATETABLEIFNOTEXISTSsrc(keyINT,valueSTRING)USINGhive")spark.sql("LOADDATALOCALINPATH'examples/src/main/resources/kv1.txt'INTOTABLEsrc")基于新創(chuàng)建的SparkSession創(chuàng)建表和上載數(shù)據(jù)到表中:spark.sql("SELECT*FROMsrc").show()spark.sql("SELECTCOUNT(*)FROMsrc").show()使用HiveQL進(jìn)行查詢(xún):Hive表數(shù)據(jù)源spark.sql("CREATETABL99Hive表數(shù)據(jù)源sqlDF=spark.sql("SELECTkey,valueFROMsrcWHEREkey<10ORDERBYkey")SQL查詢(xún)的結(jié)果也是DataFrames,可以對(duì)結(jié)果進(jìn)行所有DataFrame的操作:stringsDS=sqlDF.rdd.map(lambdaro

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶(hù)所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶(hù)上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶(hù)上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶(hù)因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論