![大數(shù)據(jù)導(dǎo)論思維第12章SPARKSQL課件_第1頁(yè)](http://file4.renrendoc.com/view/c1887399f3ca73c0a55ef3ab2d36bc72/c1887399f3ca73c0a55ef3ab2d36bc721.gif)
![大數(shù)據(jù)導(dǎo)論思維第12章SPARKSQL課件_第2頁(yè)](http://file4.renrendoc.com/view/c1887399f3ca73c0a55ef3ab2d36bc72/c1887399f3ca73c0a55ef3ab2d36bc722.gif)
![大數(shù)據(jù)導(dǎo)論思維第12章SPARKSQL課件_第3頁(yè)](http://file4.renrendoc.com/view/c1887399f3ca73c0a55ef3ab2d36bc72/c1887399f3ca73c0a55ef3ab2d36bc723.gif)
![大數(shù)據(jù)導(dǎo)論思維第12章SPARKSQL課件_第4頁(yè)](http://file4.renrendoc.com/view/c1887399f3ca73c0a55ef3ab2d36bc72/c1887399f3ca73c0a55ef3ab2d36bc724.gif)
![大數(shù)據(jù)導(dǎo)論思維第12章SPARKSQL課件_第5頁(yè)](http://file4.renrendoc.com/view/c1887399f3ca73c0a55ef3ab2d36bc72/c1887399f3ca73c0a55ef3ab2d36bc725.gif)
版權(quán)說(shuō)明:本文檔由用戶(hù)提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
大數(shù)據(jù)導(dǎo)論第十二章大數(shù)據(jù)導(dǎo)論第十二章1CONTENTS目錄PART01SPARKSQL簡(jiǎn)介PART02SPARKSQL執(zhí)行流程PART03基礎(chǔ)數(shù)據(jù)模型DATAFRAMEPART04使用SparkSQL的方式PART05SPARKSQL數(shù)據(jù)源PART06SPARKSQLCLI介紹PART07在Pyspark中使用SparkSQLPART08在Java中連接SparkSQLPART09習(xí)題CONTENTS目錄PART01SPARKSQL簡(jiǎn)介2PART01SparkSQL簡(jiǎn)介SparkSQL是一個(gè)用來(lái)處理結(jié)構(gòu)化數(shù)據(jù)的Spark組件,為Spark提供了查詢(xún)結(jié)構(gòu)化數(shù)據(jù)的能力。SparkSQL可被視為一個(gè)分布式的SQL查詢(xún)引擎,可以實(shí)現(xiàn)對(duì)多種數(shù)據(jù)格式和數(shù)據(jù)源進(jìn)行SQL操作,包括Parquet,Hive,MongoDB,JSON、HDFS、JDBC、S3和RDD等。PART01SparkSQL簡(jiǎn)介SparkSQL3SparkSQL簡(jiǎn)介SparkSQL介紹:SparkSQL是為了處理結(jié)構(gòu)化數(shù)據(jù)的一個(gè)Spark模塊。不同于SparkRDD的基本API,SparkSQL接口擁有更多關(guān)于數(shù)據(jù)結(jié)構(gòu)本身與執(zhí)行計(jì)劃等更多信息。在Spark內(nèi)部,SparkSQL可以利用這些信息更好地對(duì)操作進(jìn)行優(yōu)化。SparkSQL提供了三種訪(fǎng)問(wèn)接口:SQL,DataFrameAPI和DatasetAPI。當(dāng)計(jì)算引擎被用來(lái)執(zhí)行一個(gè)計(jì)算時(shí),有不同的API和語(yǔ)言種類(lèi)可供選擇。這種統(tǒng)一性意味著開(kāi)發(fā)人員可以來(lái)回輕松切換各種最熟悉的API來(lái)完成同一個(gè)計(jì)算工作。SparkSQL簡(jiǎn)介SparkSQL介紹:4SparkSQL簡(jiǎn)介SparkSQL具有如下特點(diǎn)數(shù)據(jù)兼容方面:能加載和查詢(xún)來(lái)自各種來(lái)源的數(shù)據(jù)。性能優(yōu)化方面:除了采取內(nèi)存列存儲(chǔ)、代碼生成等優(yōu)化技術(shù)外,還引進(jìn)成本模型對(duì)查詢(xún)進(jìn)行動(dòng)態(tài)評(píng)估、獲取最佳物理計(jì)劃等;組件擴(kuò)展方面:無(wú)論是SQL的語(yǔ)法解析器、分析器還是優(yōu)化器都可以重新定義,進(jìn)行擴(kuò)展。標(biāo)準(zhǔn)連接:SparkSQL包括具有行業(yè)標(biāo)準(zhǔn)JDBC和ODBC連接的服務(wù)器模式。SparkSQL簡(jiǎn)介SparkSQL具有如下特點(diǎn)5SparkSQL簡(jiǎn)介SparkSQL具有如下特點(diǎn)集成:無(wú)縫地將SQL查詢(xún)與Spark程序混合。SparkSQL允許將結(jié)構(gòu)化數(shù)據(jù)作為Spark中的分布式數(shù)據(jù)集(RDD)進(jìn)行查詢(xún),在Python,Scala和Java中集成了API。這種緊密的集成使得SQL查詢(xún)以及復(fù)雜的分析算法可以輕松地運(yùn)行??蓴U(kuò)展性:對(duì)于交互式查詢(xún)和長(zhǎng)查詢(xún)使用相同的引擎。SparkSQL利用RDD模型來(lái)支持查詢(xún)?nèi)蒎e(cuò),使其能夠擴(kuò)展到大型作業(yè),不需擔(dān)心為歷史數(shù)據(jù)使用不同的引擎。SparkSQL簡(jiǎn)介SparkSQL具有如下特點(diǎn)6PART02SparkSQL執(zhí)行流程PART02SparkSQL執(zhí)行流程7SparkSQL執(zhí)行流程類(lèi)似于關(guān)系型數(shù)據(jù)庫(kù),SparkSQL語(yǔ)句也是由Projection(a1,a2,a3)、DataSource(tableA)、Filter(condition)三部分組成,分別對(duì)應(yīng)SQL查詢(xún)過(guò)程中的Result、DataSource、Operation,也就是說(shuō)SQL語(yǔ)句按Result-->DataSource-->Operation的次序來(lái)描述的。SparkSQL執(zhí)行流程類(lèi)似于關(guān)系型數(shù)據(jù)庫(kù),SparkS8SparkSQL執(zhí)行流程解析(Parse)對(duì)讀入的SQL語(yǔ)句進(jìn)行解析,分辨出SQL語(yǔ)句中哪些詞是關(guān)鍵詞(如SELECT、FROM、WHERE),哪些是表達(dá)式、哪些是Projection、哪些是DataSource等,從而判斷SQL語(yǔ)句是否規(guī)范;綁定(Bind)將SQL語(yǔ)句和數(shù)據(jù)庫(kù)的數(shù)據(jù)字典(列、表、視圖等)進(jìn)行綁定,如果相關(guān)的Projection、DataSource等都存在,則這個(gè)SQL語(yǔ)句是可以執(zhí)行的;SparkSQL執(zhí)行流程解析(Parse)9SparkSQL執(zhí)行流程優(yōu)化(Optimize)一般的數(shù)據(jù)庫(kù)會(huì)提供幾個(gè)執(zhí)行計(jì)劃,這些計(jì)劃一般都有運(yùn)行統(tǒng)計(jì)數(shù)據(jù),數(shù)據(jù)庫(kù)會(huì)在這些計(jì)劃中選擇一個(gè)最優(yōu)計(jì)劃;執(zhí)行(Execute)按Operation-->DataSource-->Result的次序來(lái)執(zhí)行計(jì)劃。在執(zhí)行過(guò)程有時(shí)候甚至不需要讀取物理表就可以返回結(jié)果,比如重新運(yùn)行剛運(yùn)行過(guò)的SQL語(yǔ)句,可能直接從數(shù)據(jù)庫(kù)的緩沖池中獲取返回結(jié)果。SparkSQL執(zhí)行流程優(yōu)化(Optimize)10PART03基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame是由“命名列”(類(lèi)似關(guān)系表的字段定義)所組織起來(lái)的一個(gè)分布式數(shù)據(jù)集合,可以把它看成是一個(gè)關(guān)系型數(shù)據(jù)庫(kù)的表。PART03基礎(chǔ)數(shù)據(jù)模型DataFrameDataFra11基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame是SparkSQL的核心,它將數(shù)據(jù)保存為行構(gòu)成的集合,行對(duì)應(yīng)列有相應(yīng)的列名。DataFrame與RDD的主要區(qū)別在于,DataFrame帶有Schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱(chēng)和類(lèi)型。這使得SparkSQL可以掌握更多的結(jié)構(gòu)信息,從而能夠?qū)ataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進(jìn)行了針對(duì)性的優(yōu)化,最終達(dá)到大幅提升運(yùn)行時(shí)效率的目標(biāo)?;A(chǔ)數(shù)據(jù)模型DataFrameDataFrame是Spark12基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame與RDD的對(duì)比:基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame與RDD的對(duì)13PART04使用SparkSQL的方式PART04使用SparkSQL的方式14使用SparkSQL的方式使用SparkSQL,首先利用sqlContext從外部數(shù)據(jù)源加載數(shù)據(jù)為DataFrame;然后,利用DataFrame上豐富的API進(jìn)行查詢(xún)、轉(zhuǎn)換;最后,將結(jié)果進(jìn)行展現(xiàn)或存儲(chǔ)為各種外部數(shù)據(jù)形式。Spark
SQL
為Spark提供了查詢(xún)結(jié)構(gòu)化數(shù)據(jù)的能力,查詢(xún)時(shí)既可以使用SQL也可以使用DataFrame
API(RDD)。通過(guò)ThriftServer,Spark
SQL支持多語(yǔ)言編程包括Java、Scala、Python及R。使用SparkSQL的方式使用SparkSQL,首先利用15使用SparkSQL的方式使用SparkSQL的方式16使用SparkSQL的方式加載數(shù)據(jù)①.從Hive中的users表構(gòu)造DataFrame:users=sqlContext.table("users")②.加載S3上的JSON文件:logs=sqlContext.load("s3n://path/to/data.json","json")③.加載HDFS上的Parquet文件:clicks=sqlContext.load("hdfs://path/to/data.parquet","parquet")使用SparkSQL的方式加載數(shù)據(jù)①.從Hive中的us17使用SparkSQL的方式加載數(shù)據(jù)④.通過(guò)JDBC訪(fǎng)問(wèn)MySQL:comments=sqlContext.jdbc("jdbc:mysql://localhost/comments","user")⑤.將普通RDD轉(zhuǎn)變?yōu)镈ataFrame:rdd=sparkContext.textFile(“article.txt”).flatMap(_.split("")).map((_,1)).reduceByKey(_+_)wordCounts=sqlContext.createDataFrame(rdd,["word","count"])使用SparkSQL的方式加載數(shù)據(jù)④.通過(guò)JDBC訪(fǎng)問(wèn)M18使用SparkSQL的方式加載數(shù)據(jù)⑥.將本地?cái)?shù)據(jù)容器轉(zhuǎn)變?yōu)镈ataFrame:data=[("Alice",21),("Bob",24)]people=sqlContext.createDataFrame(data,["name","age"])⑦.將Pandas
DataFrame轉(zhuǎn)變?yōu)镾park
DataFrame(Python
API特有功能):sparkDF
=
sqlContext.createDataFrame(pandasDF)使用SparkSQL的方式加載數(shù)據(jù)⑥.將本地?cái)?shù)據(jù)容器轉(zhuǎn)變19使用SparkSQL的方式使用DataFrame①.創(chuàng)建一個(gè)只包含"年輕"用戶(hù)的DataFrame:young=users.filter(users.age<21)②.也可以使用Pandas風(fēng)格的語(yǔ)法:young=users[users.age<21]③.將所有人的年齡加1:young.select(,young.age+1)使用SparkSQL的方式使用DataFrame①.創(chuàng)建20使用SparkSQL的方式使用DataFrame④.統(tǒng)計(jì)年輕用戶(hù)中各性別人數(shù):young.groupBy("gender").count()⑤.將所有年輕用戶(hù)與另一個(gè)名為logs的DataFrame聯(lián)接起來(lái):young.join(logs,logs.userId==users.userId,"left_outer")使用SparkSQL的方式使用DataFrame④.統(tǒng)計(jì)21使用SparkSQL的方式保存結(jié)果①.追加至HDFS上的Parquet文件:young.save(path="hdfs://path/to/data.parquet",source="parquet",mode="append")②.覆寫(xiě)S3上的JSON文件:young.save(path="s3n://path/to/data.json",source="json",mode="append")使用SparkSQL的方式保存結(jié)果①.追加至HDFS上的22使用SparkSQL的方式保存結(jié)果③.保存為SQL表:young.saveAsTable(tableName="young",source="parquet"mode="overwrite")④.轉(zhuǎn)換為PandasDataFrame(PythonAPI特有功能):pandasDF=young.toPandas()⑤.以表格形式打印輸出:young.show()使用SparkSQL的方式保存結(jié)果③.保存為SQL表:23SparkSQL通過(guò)DataFrame接口使用多種數(shù)據(jù)源。應(yīng)用程序可以直接使用關(guān)系型轉(zhuǎn)換對(duì)DataFrame進(jìn)行操作,也可以用來(lái)創(chuàng)建臨時(shí)表。把DataFrame注冊(cè)為臨時(shí)表后就可以使用SQL對(duì)其數(shù)據(jù)進(jìn)行查詢(xún)PART05SparkSQL數(shù)據(jù)源SparkSQL通過(guò)DataFrame接口使用多種數(shù)據(jù)源。24通用Load方法SparkSQL使用數(shù)據(jù)源的最簡(jiǎn)單模式就是對(duì)缺省數(shù)據(jù)源類(lèi)型來(lái)進(jìn)行各種操作。缺省數(shù)據(jù)源類(lèi)型是parquet文件df=spark.read.load("examples/src/main/resources/users.parquet")df.select("name","favorite_color").write.save("namesAndFavColors.parquet")也可以通過(guò)load()函數(shù)的參數(shù)來(lái)說(shuō)明數(shù)據(jù)源的類(lèi)型:df=spark.read.load("examples/src/main/resources/people.json",format="json")df.select("name","age").write.save("namesAndAges.parquet",format="parquet")通用Load方法SparkSQL使用數(shù)據(jù)源的最簡(jiǎn)單模式就是25通用Load方法也可以不把文件上載到DataFrame,然后再對(duì)其進(jìn)行查詢(xún)操作,而是直接對(duì)文件進(jìn)行SQL查詢(xún):df=spark.sql("SELECT*FROMparquet.examples/src/main/resources/users.parquet")通用Load方法也可以不把文件上載到DataFrame,然后26通用Save方法Save方法Save()操作的可選參數(shù)SaveMode用來(lái)說(shuō)明如何處理已有數(shù)據(jù)。需要提醒的是這些保存操作不是原子操作,如果選擇的是Overwrite模式,Save()操作會(huì)首先刪除已有的數(shù)據(jù),然后才寫(xiě)入新的數(shù)據(jù)。SaveMode的可選值:Scala/Java任何語(yǔ)言解釋SaveMode.ErrorIfExists"error"(缺省值)如果有數(shù)據(jù)存在,就拋異常。SaveMode.Append"append"如果有數(shù)據(jù)/表存在,就添加新數(shù)據(jù)到老數(shù)據(jù)后面。SaveMode.Overwrite"overwrite"如果有數(shù)據(jù)/表存在,就用新數(shù)據(jù)覆蓋老數(shù)據(jù)。SaveMode.Ignore"ignore"如果有數(shù)據(jù)存在,就不寫(xiě)入新數(shù)據(jù)。通用Save方法Save方法Save()操作的可選參數(shù)Sav27通用Save方法基于文件的數(shù)據(jù)源,可以用path可選參數(shù)定義一個(gè)數(shù)據(jù)庫(kù)表的路徑,比如:df.write.option(“path”,“/some/path”).saveAsTable(“t”)即使數(shù)據(jù)庫(kù)表刪除后,該數(shù)據(jù)庫(kù)表路徑和數(shù)據(jù)也不會(huì)被刪除。如果程序不指定數(shù)據(jù)庫(kù)表路徑,Spark會(huì)把數(shù)據(jù)寫(xiě)到一個(gè)缺省的數(shù)據(jù)庫(kù)表路徑下。但是在這種情況下,當(dāng)數(shù)據(jù)庫(kù)被刪除的時(shí)候,缺省數(shù)據(jù)庫(kù)表的路徑將會(huì)被刪除。通用Save方法基于文件的數(shù)據(jù)源,可以用path可選參數(shù)定義28通用Save方法對(duì)基于文件的數(shù)據(jù)源還可以對(duì)其輸出進(jìn)行分桶、排序和分割,分桶和排序只能用于當(dāng)輸出方式為持久數(shù)據(jù)庫(kù)表的時(shí)候。df.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed")當(dāng)進(jìn)行分割時(shí),可以使用DatasetAPI接口的save()和saveAsTable()方法:df.write.partitionBy("favor_color").format("parquet").save("namesPartByColor.parquet")通用Save方法對(duì)基于文件的數(shù)據(jù)源還可以對(duì)其輸出進(jìn)行分桶、排29通用Save方法還可以對(duì)同一張表同時(shí)進(jìn)行分割和分桶:df=spark.read.parquet("examples/src/main/resources/users.parquet")(df.write.partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("people_partitioned_bucketed"))通用Save方法還可以對(duì)同一張表同時(shí)進(jìn)行分割和分桶:df=30Parquet文件數(shù)據(jù)源Parquet文件數(shù)據(jù)源SparkSQL支持對(duì)Parquet文件的讀寫(xiě)操作,并自動(dòng)保持源數(shù)據(jù)的模式。為了兼容性,在寫(xiě)Parquet文件的時(shí)候,所有的列會(huì)自動(dòng)轉(zhuǎn)換為可為空的列。把DataFrame保存為Parquet文件,格式信息會(huì)自動(dòng)保留。peopleDF=spark.read.json("examples/src/main/resources/people.json")df.write.partitionBy("favor_color").format("parquet").save("namesPartByColor.parquet")Parquet文件數(shù)據(jù)源Parquet文件數(shù)據(jù)源Spark31讀取上面存儲(chǔ)的Parquet文件為DataFrameParquet文件也可以用來(lái)創(chuàng)建臨時(shí)表供SQL查詢(xún)使用parquetFile=spark.read.parquet("people.parquet")parquetFile.createOrReplaceTempView("parquetFile")teenagers=spark.sql("SELECTnameFROMparquetFileWHEREage>=13ANDage<=19")teenagers.show()Parquet文件數(shù)據(jù)源讀取上面存儲(chǔ)的Parquet文件為DataFrameParq32JSONDataSets數(shù)據(jù)源JSONDataSets數(shù)據(jù)源SparkSQL可以自動(dòng)根據(jù)JSONDataSet的格式把其上載為DataFrame。用路徑指定JSONdataset;路徑下可以是一個(gè)文件,也可以是多個(gè)文件:sc=spark.sparkContextpath="examples/src/main/resources/people.json"peopleDF=spark.read.json(path)使用的結(jié)構(gòu)可以調(diào)用printSchema()方法打?。簆eopleDF.printSchema()JSONDataSets數(shù)據(jù)源JSONDataSets33利用DataFrame創(chuàng)建一個(gè)臨時(shí)表:使用Spark的sql方法進(jìn)行SQL查詢(xún):peopleDF.createOrReplaceTempView("people")teenagerNamesDF=spark.sql("SELECTnameFROMpeopleWHEREageBETWEEN13AND19")teenagerNamesDF.show()JSONDataSets數(shù)據(jù)源利用DataFrame創(chuàng)建一個(gè)臨時(shí)表:使用Spark的sql34JSONdataset的DataFrame也可以是RDD[String]格式,每個(gè)JSON對(duì)象為一個(gè)string:jsonStrings=['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']otherPeopleRDD=sc.parallelize(jsonStrings)otherPeople=spark.read.json(otherPeopleRDD)otherPeople.show()JSONDataSets數(shù)據(jù)源JSONdataset的DataFrame也可以是RDD[35Hive表數(shù)據(jù)源Hive表數(shù)據(jù)源SparkSQL支持對(duì)Hive中的數(shù)據(jù)進(jìn)行讀寫(xiě)。首先創(chuàng)建一個(gè)支持Hive的SparkSession對(duì)象,包括與Hivemetastore的連接,支持Hive的序列化和反序列化操作,支持用戶(hù)定義的Hive操作等。warehouse_location=abspath('spark-warehouse')spark=SparkSession.builder\.appName("PythonSparkSQLHiveintegrationexample")\.config("spark.sql.warehouse.dir",warehouse_location)\.enableHiveSupport().getOrCreate()warehouse_location指定數(shù)據(jù)庫(kù)和表的缺省位置:Hive表數(shù)據(jù)源Hive表數(shù)據(jù)源SparkSQL支持對(duì)Hi36Hive表數(shù)據(jù)源spark.sql("CREATETABLEIFNOTEXISTSsrc(keyINT,valueSTRING)USINGhive")spark.sql("LOADDATALOCALINPATH'examples/src/main/resources/kv1.txt'INTOTABLEsrc")基于新創(chuàng)建的SparkSession創(chuàng)建表和上載數(shù)據(jù)到表中:spark.sql("SELECT*FROMsrc").show()spark.sql("SELECTCOUNT(*)FROMsrc").show()使用HiveQL進(jìn)行查詢(xún):Hive表數(shù)據(jù)源spark.sql("CREATETABL37Hive表數(shù)據(jù)源sqlDF=spark.sql("SELECTkey,valueFROMsrcWHEREkey<10ORDERBYkey")SQL查詢(xún)的結(jié)果也是DataFrames,可以對(duì)結(jié)果進(jìn)行所有DataFrame的操作:stringsDS=sqlDF.rdd.map(lambdarow:"Key:%d,Value:%s"%(row.key,row.value))forrecordinstringsDS.collect():print(record)DataFrames中的元素的類(lèi)型是Row,所以可以按次序訪(fǎng)問(wèn)列:Hive表數(shù)據(jù)源sqlDF=spark.sql("SEL38Hive表數(shù)據(jù)源Record=Row("key","value")recordsDF=spark.createDataFrame([Record(i,"val_"+str(i))foriinrange(1,101)])recordsDF.createOrReplaceTempView("records")也可以在同一個(gè)SparkSession內(nèi)用DataFrames來(lái)創(chuàng)建臨時(shí)表:Hive表數(shù)據(jù)源Record=Row("key","v39Hive表數(shù)據(jù)源創(chuàng)建Hive表的時(shí)候需要說(shuō)明表通過(guò)哪種方式讀寫(xiě)文件系統(tǒng),以及如何序列化和反序列化數(shù)據(jù)。屬性名解釋fileFormat文件格式定義了存儲(chǔ)的格式,包括serde,inputformat和outputformat。目前支持6種文件格式:sequencefile,rcfile,orc,parquet,textfile和avro。inputFormat,outputFormat必須按對(duì)出現(xiàn),分別定義存儲(chǔ)的inputformat和outputformat。不能與fileFormat同時(shí)使用。serde說(shuō)明serde類(lèi)的名字。當(dāng)文件格式本身沒(méi)有serde時(shí)候使用。Hive表數(shù)據(jù)源創(chuàng)建Hive表的時(shí)候需要說(shuō)明表通過(guò)哪種方式讀40SparkCLI指使用命令界面直接輸入SQL命令,然后發(fā)送到Spark集群進(jìn)行執(zhí)行,在界面中顯示運(yùn)行過(guò)程和最終的結(jié)果。PART06SparkSQLCLI介紹SparkCLI指使用命令界面直接輸入SQL命令,然后發(fā)送41SparkSQLCLI介紹SparkCLI指的是使用命令界面直接輸入SQL命令,然后發(fā)送到Spark集群進(jìn)行執(zhí)行,在界面中顯示運(yùn)行過(guò)程和最終的結(jié)果。SparkSQL已經(jīng)集成在SparkShell中,因此,只要啟動(dòng)SparkShell,就可以使用SparkSQL的Shell交互接口:bin/spark-shell--masterspark://hdp-node-01:7077或者,可以啟動(dòng)SparkSQL界面,使用起來(lái)更方便:bin/spark-sql--masterspark://hdp-node-01:7077SparkSQLCLI介紹SparkCLI指的是使用命42SparkSQLCLI介紹SparkSQL所有功能的入口是SQLContext類(lèi)及其子類(lèi)。為了創(chuàng)建一個(gè)基本的SQLContext,需要一個(gè)SparkContext。scala>valsqlContext=neworg.apache.spark.sql.SQLContext(sc)sqlContext:org.apache.spark.sql.SQLContext=org.apache.spark.sql.SQLContext@1943a343scala>importsqlContext.implicits._importsqlContext.implicits._SQLContextSparkSQLCLI介紹SparkSQL所有功能的入43SparkSQLCLI介紹下面的操作基于一個(gè)簡(jiǎn)單的數(shù)據(jù)文件people.json,文件的內(nèi)容如下:{"name":"Michael"}{"name":"Andy","age":30}{"name":"Justin","age":19}數(shù)據(jù)文件下面語(yǔ)句從本地文件people.json讀取數(shù)據(jù)創(chuàng)建DataFrame:{valdf=sqlContext.read.json("file:///data/people.json")………df:org.apache.spark.sql.DataFrame=[age:bigint,name:string]創(chuàng)建DataFramesSparkSQLCLI介紹下面的操作基于一個(gè)簡(jiǎn)單的數(shù)據(jù)文44Pyspark是針對(duì)Spark的PythonAPI。Spark使用py4j來(lái)實(shí)現(xiàn)Python與Java的互操作,從而實(shí)現(xiàn)使用Python編寫(xiě)Spark程序。Spark也同樣提供了Pyspark,一個(gè)Spark的PythonShell,可以以交互的方式使用Python編寫(xiě)Spark程序。PART07在Pyspark中使用SparkSQLPyspark是針對(duì)Spark的PythonAPI。Spa45在Pyspark中使用SparkSQL在終端上啟動(dòng)Python
Spark
Shell:./bin/pyspark使用JSON文件作為數(shù)據(jù)源,創(chuàng)建JSON文件/home/sparksql/courses.json,并輸入下面的內(nèi)容:實(shí)例描述{"name":"Linux","type":"basic","length":10}{"name":"TCPIP","type":"project","length":15}{"name":"Python","type":"project","length":8}{"name":"GO","type":"basic","length":2}{"name":"Ruby","type":"basic","length":5}在Pyspark中使用SparkSQL在終端上啟動(dòng)Pyth46在Pyspark中使用SparkSQL首先使用SQLContext模塊,其作用是提供SparkSQL處理的功能。在PysparkShell中逐步輸入下面步驟的內(nèi)容:引入pyspark.sql中的SQLContext:frompyspark.sqlimportSQLContext創(chuàng)建SQLContext對(duì)象使用pyspark的SparkContext對(duì)象,創(chuàng)建SQLContext對(duì)象:sqlContext=SQLContext(sc)在Pyspark中使用SparkSQL首先使用SQLCon47在Pyspark中使用SparkSQLDataFrame對(duì)象可以由RDD創(chuàng)建,也可以從Hive表或JSON文件等數(shù)據(jù)源創(chuàng)建。創(chuàng)建DataFrame,指明來(lái)源自JSON文件:df=sqlContext.read.json("/home/shiyanlou/courses.json")創(chuàng)建DataFrame對(duì)象在Pyspark中使用SparkSQLDataFrame對(duì)48在Pyspark中使用SparkSQL首先打印當(dāng)前DataFrame里的內(nèi)容和數(shù)據(jù)表的格式:df.select("name").show()#展示了所有的課程名df.select("name","length").show()#展示了所有的課程名及課程長(zhǎng)度對(duì)DataFrame進(jìn)行操作show()函數(shù)將打印出JSON文件中存儲(chǔ)的數(shù)據(jù)表;使用printSchema()函數(shù)打印數(shù)據(jù)表的格式。然后對(duì)DataFrame的數(shù)據(jù)進(jìn)行各種操作:df.show()df.printSchema()在Pyspark中使用SparkSQL首先打印當(dāng)前Data49在Pyspark中使用SparkSQLdf.filter(df['type']=='basic').select('name','type').show()#展示了課程類(lèi)型為基礎(chǔ)課(basic)的課程名和課程類(lèi)型df.groupBy("type").count().show()#計(jì)算所有基礎(chǔ)課和項(xiàng)目課的數(shù)量。首先需要將DataFrame注冊(cè)為T(mén)able才可以在該表上執(zhí)行SQL語(yǔ)句:df.registerTempTable('courses')coursesRDD=sqlContext.sql("SELECTnameFROMcoursesWHERElength>=5andlength<=10")names=coursesRDD.rdd.map(lambdap:"Name:"+)fornameinnames.collect():printname執(zhí)行SQL語(yǔ)句在Pyspark中使用SparkSQLdf.filter(50在Pyspark中使用SparkSQLParquet是SparkSQL讀取的默認(rèn)數(shù)據(jù)文件格式,把從JSON中讀取的DataFrame保存為Parquet格式,只保存課程名稱(chēng)和長(zhǎng)度兩項(xiàng)數(shù)據(jù):df.select("name","length").write.save("/tmp/courses.parquet",format="parquet")保存DataFrame為其他格式將創(chuàng)建hdfs://master:9000/tmp/courses.parquet文件夾并存入課程名稱(chēng)和長(zhǎng)度數(shù)據(jù)。在Pyspark中使用SparkSQLParquet是Sp51SparkSQL實(shí)現(xiàn)了ThriftJDBC/ODBCserver,所以Java程序可以通過(guò)JDBC遠(yuǎn)程連接SparkSQL發(fā)送SQL語(yǔ)句并執(zhí)行。PART08在Java中連接SparkSQLSparkSQL實(shí)現(xiàn)了ThriftJDBC/ODBCs52在Java中連接SparkSQL首先將${HIVE_HOME}/conf/hive-site.xml拷貝到${SPARK_HOME}/conf目錄下。另外,因?yàn)镠ive元數(shù)據(jù)信息存儲(chǔ)在MySQL中,所以Spark在訪(fǎng)問(wèn)這些元數(shù)據(jù)信息時(shí)需要MySQL連接驅(qū)動(dòng)的支持。添加驅(qū)動(dòng)的方式有三種:在${SPARK_HOME}/conf目錄下的spark-defaults.conf中添加:spark.jars/opt/lib2/mysql-connector-java-5.1.26-bin.jar
;可以實(shí)現(xiàn)添加多個(gè)依賴(lài)jar比較方便:spark.driver.extraClassPath/opt/lib2/mysql-connector-java-5.1.26-bin.jar;設(shè)置配置在Java中連接SparkSQL首先將${HIVE_HOM53在運(yùn)行時(shí)添加--jars
/opt/lib2/mysql-connector-java-5.1.26-bin.jar做完上面的準(zhǔn)備工作后,SparkSQL和Hive就繼承在一起了,SparkSQL可以讀取Hive中的數(shù)據(jù)。設(shè)置配置啟動(dòng)Thrift在Spark根目錄下執(zhí)行:./sbin/start-thriftserver.sh開(kāi)啟thrift服務(wù)器,它可以接受所有spark-submit的參數(shù),并且還可以接受--hiveconf參數(shù)。不添加任何參數(shù)表示以local方式運(yùn)行,默認(rèn)的監(jiān)聽(tīng)端口為10000在Java中連接SparkSQL在運(yùn)行時(shí)添加--jars
/opt/lib2/mysql-54添加依賴(lài)打開(kāi)Eclipse用JDBC連接HiveServer2。新建一個(gè)Maven項(xiàng)目,在pom.xml添加以下依賴(lài):<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.4.1</version>
</dependency>
在Java中連接SparkSQL添加依賴(lài)打開(kāi)Eclipse用JDBC連接HiveServe55添加依賴(lài)<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.6</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
在Java中連接SparkSQL添加依賴(lài)<dependency>
在Java中連接Spar56JDBC連接HiveServer2的相關(guān)參數(shù):驅(qū)動(dòng):org.apache.hive.jdbc.HiveDriverurl:jdbc:hive2://31:10000/default用戶(hù)名:hadoop(啟動(dòng)thriftserver的linux用戶(hù)名)密碼:“”(默認(rèn)密碼為空)JDBC連接參數(shù)在Java中連接SparkSQLJDBC連接HiveServer2的相關(guān)參數(shù):JDBC連接57import
java.sql.Connection;
import
java.sql.DriverManager;
import
java.sql.ResultSet;
import
java.sql.SQLException;
import
java.sql.Statement;
public
class
Test1
{
public
static
void
main(String[]
args)
throws
SQLException
{
String
url
=
"jdbc:hive2://31:10000/default";
try
{
Class.forName("org.apache.hive.jdbc.HiveDriver");
}
catch
(ClassNotFoundException
e)
{
e.printStackTrace();
}
Connection
conn
=
DriverManager.getConnection(url,"hadoop","");
Statement
stmt
=
conn.createStatement();
String
sql
=
"SELECT
*
FROM
doc1
limit
10";
System.out.println("Running"+sql);
ResultSet
res
=
stmt.executeQuery(sql);
while(res.next()){
System.out.println("id:
"+res.getInt(1)+"\ttype:
"+res.getString(2)+"\tauthors:
"+res.getString(3)+"\ttitle:
"+res.getString(4)+"\tyear:"
+
res.getInt(5));
}
}
}
JDBC連接參數(shù)在Java中連接SparkSQLimport
java.sql.Connection;
J58PART09作業(yè)
PART09作業(yè)59作業(yè)作業(yè):什么是SparkSQL?其主要目的是什么?SparkSQL的執(zhí)行流程有哪幾個(gè)步驟?在SparkSQL中,什么是DataFrame?使用DataFrame的優(yōu)勢(shì)是什么?DataFrame與RDD的主要區(qū)別是什么?使用SparkSQL的方式有哪幾種?使用SparkSQL的步驟是什么?常用的SparkSQL的數(shù)據(jù)源有哪些?Parquet文件格式是什么?它的主要特點(diǎn)是什么?為了使Java程序可以通過(guò)JDBC遠(yuǎn)程連接SparkSQL,需要做哪些準(zhǔn)備工作?連接數(shù)據(jù)庫(kù)的語(yǔ)句是什么?有哪些參數(shù)?作業(yè)作業(yè):什么是SparkSQL?其主要目的是什么?60作業(yè)作業(yè):請(qǐng)按下述要求寫(xiě)出相應(yīng)的SparkSQL語(yǔ)句:從一個(gè)本地JSON文件創(chuàng)建DataFrame;打印DataFrame元數(shù)據(jù);按照列屬性過(guò)濾DataFrame的數(shù)據(jù);返回某列滿(mǎn)足條件的數(shù)據(jù);把DataFrame注冊(cè)成數(shù)據(jù)庫(kù)表。作業(yè)作業(yè):請(qǐng)按下述要求寫(xiě)出相應(yīng)的SparkSQL語(yǔ)句:61謝謝FORYOURLISTENINGHandgeCO.LTD.2016.12.09謝謝FORYOURLISTENINGHandgeCO.62
大數(shù)據(jù)導(dǎo)論第十二章大數(shù)據(jù)導(dǎo)論第十二章63CONTENTS目錄PART01SPARKSQL簡(jiǎn)介PART02SPARKSQL執(zhí)行流程PART03基礎(chǔ)數(shù)據(jù)模型DATAFRAMEPART04使用SparkSQL的方式PART05SPARKSQL數(shù)據(jù)源PART06SPARKSQLCLI介紹PART07在Pyspark中使用SparkSQLPART08在Java中連接SparkSQLPART09習(xí)題CONTENTS目錄PART01SPARKSQL簡(jiǎn)介64PART01SparkSQL簡(jiǎn)介SparkSQL是一個(gè)用來(lái)處理結(jié)構(gòu)化數(shù)據(jù)的Spark組件,為Spark提供了查詢(xún)結(jié)構(gòu)化數(shù)據(jù)的能力。SparkSQL可被視為一個(gè)分布式的SQL查詢(xún)引擎,可以實(shí)現(xiàn)對(duì)多種數(shù)據(jù)格式和數(shù)據(jù)源進(jìn)行SQL操作,包括Parquet,Hive,MongoDB,JSON、HDFS、JDBC、S3和RDD等。PART01SparkSQL簡(jiǎn)介SparkSQL65SparkSQL簡(jiǎn)介SparkSQL介紹:SparkSQL是為了處理結(jié)構(gòu)化數(shù)據(jù)的一個(gè)Spark模塊。不同于SparkRDD的基本API,SparkSQL接口擁有更多關(guān)于數(shù)據(jù)結(jié)構(gòu)本身與執(zhí)行計(jì)劃等更多信息。在Spark內(nèi)部,SparkSQL可以利用這些信息更好地對(duì)操作進(jìn)行優(yōu)化。SparkSQL提供了三種訪(fǎng)問(wèn)接口:SQL,DataFrameAPI和DatasetAPI。當(dāng)計(jì)算引擎被用來(lái)執(zhí)行一個(gè)計(jì)算時(shí),有不同的API和語(yǔ)言種類(lèi)可供選擇。這種統(tǒng)一性意味著開(kāi)發(fā)人員可以來(lái)回輕松切換各種最熟悉的API來(lái)完成同一個(gè)計(jì)算工作。SparkSQL簡(jiǎn)介SparkSQL介紹:66SparkSQL簡(jiǎn)介SparkSQL具有如下特點(diǎn)數(shù)據(jù)兼容方面:能加載和查詢(xún)來(lái)自各種來(lái)源的數(shù)據(jù)。性能優(yōu)化方面:除了采取內(nèi)存列存儲(chǔ)、代碼生成等優(yōu)化技術(shù)外,還引進(jìn)成本模型對(duì)查詢(xún)進(jìn)行動(dòng)態(tài)評(píng)估、獲取最佳物理計(jì)劃等;組件擴(kuò)展方面:無(wú)論是SQL的語(yǔ)法解析器、分析器還是優(yōu)化器都可以重新定義,進(jìn)行擴(kuò)展。標(biāo)準(zhǔn)連接:SparkSQL包括具有行業(yè)標(biāo)準(zhǔn)JDBC和ODBC連接的服務(wù)器模式。SparkSQL簡(jiǎn)介SparkSQL具有如下特點(diǎn)67SparkSQL簡(jiǎn)介SparkSQL具有如下特點(diǎn)集成:無(wú)縫地將SQL查詢(xún)與Spark程序混合。SparkSQL允許將結(jié)構(gòu)化數(shù)據(jù)作為Spark中的分布式數(shù)據(jù)集(RDD)進(jìn)行查詢(xún),在Python,Scala和Java中集成了API。這種緊密的集成使得SQL查詢(xún)以及復(fù)雜的分析算法可以輕松地運(yùn)行??蓴U(kuò)展性:對(duì)于交互式查詢(xún)和長(zhǎng)查詢(xún)使用相同的引擎。SparkSQL利用RDD模型來(lái)支持查詢(xún)?nèi)蒎e(cuò),使其能夠擴(kuò)展到大型作業(yè),不需擔(dān)心為歷史數(shù)據(jù)使用不同的引擎。SparkSQL簡(jiǎn)介SparkSQL具有如下特點(diǎn)68PART02SparkSQL執(zhí)行流程PART02SparkSQL執(zhí)行流程69SparkSQL執(zhí)行流程類(lèi)似于關(guān)系型數(shù)據(jù)庫(kù),SparkSQL語(yǔ)句也是由Projection(a1,a2,a3)、DataSource(tableA)、Filter(condition)三部分組成,分別對(duì)應(yīng)SQL查詢(xún)過(guò)程中的Result、DataSource、Operation,也就是說(shuō)SQL語(yǔ)句按Result-->DataSource-->Operation的次序來(lái)描述的。SparkSQL執(zhí)行流程類(lèi)似于關(guān)系型數(shù)據(jù)庫(kù),SparkS70SparkSQL執(zhí)行流程解析(Parse)對(duì)讀入的SQL語(yǔ)句進(jìn)行解析,分辨出SQL語(yǔ)句中哪些詞是關(guān)鍵詞(如SELECT、FROM、WHERE),哪些是表達(dá)式、哪些是Projection、哪些是DataSource等,從而判斷SQL語(yǔ)句是否規(guī)范;綁定(Bind)將SQL語(yǔ)句和數(shù)據(jù)庫(kù)的數(shù)據(jù)字典(列、表、視圖等)進(jìn)行綁定,如果相關(guān)的Projection、DataSource等都存在,則這個(gè)SQL語(yǔ)句是可以執(zhí)行的;SparkSQL執(zhí)行流程解析(Parse)71SparkSQL執(zhí)行流程優(yōu)化(Optimize)一般的數(shù)據(jù)庫(kù)會(huì)提供幾個(gè)執(zhí)行計(jì)劃,這些計(jì)劃一般都有運(yùn)行統(tǒng)計(jì)數(shù)據(jù),數(shù)據(jù)庫(kù)會(huì)在這些計(jì)劃中選擇一個(gè)最優(yōu)計(jì)劃;執(zhí)行(Execute)按Operation-->DataSource-->Result的次序來(lái)執(zhí)行計(jì)劃。在執(zhí)行過(guò)程有時(shí)候甚至不需要讀取物理表就可以返回結(jié)果,比如重新運(yùn)行剛運(yùn)行過(guò)的SQL語(yǔ)句,可能直接從數(shù)據(jù)庫(kù)的緩沖池中獲取返回結(jié)果。SparkSQL執(zhí)行流程優(yōu)化(Optimize)72PART03基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame是由“命名列”(類(lèi)似關(guān)系表的字段定義)所組織起來(lái)的一個(gè)分布式數(shù)據(jù)集合,可以把它看成是一個(gè)關(guān)系型數(shù)據(jù)庫(kù)的表。PART03基礎(chǔ)數(shù)據(jù)模型DataFrameDataFra73基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame是SparkSQL的核心,它將數(shù)據(jù)保存為行構(gòu)成的集合,行對(duì)應(yīng)列有相應(yīng)的列名。DataFrame與RDD的主要區(qū)別在于,DataFrame帶有Schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱(chēng)和類(lèi)型。這使得SparkSQL可以掌握更多的結(jié)構(gòu)信息,從而能夠?qū)ataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進(jìn)行了針對(duì)性的優(yōu)化,最終達(dá)到大幅提升運(yùn)行時(shí)效率的目標(biāo)。基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame是Spark74基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame與RDD的對(duì)比:基礎(chǔ)數(shù)據(jù)模型DataFrameDataFrame與RDD的對(duì)75PART04使用SparkSQL的方式PART04使用SparkSQL的方式76使用SparkSQL的方式使用SparkSQL,首先利用sqlContext從外部數(shù)據(jù)源加載數(shù)據(jù)為DataFrame;然后,利用DataFrame上豐富的API進(jìn)行查詢(xún)、轉(zhuǎn)換;最后,將結(jié)果進(jìn)行展現(xiàn)或存儲(chǔ)為各種外部數(shù)據(jù)形式。Spark
SQL
為Spark提供了查詢(xún)結(jié)構(gòu)化數(shù)據(jù)的能力,查詢(xún)時(shí)既可以使用SQL也可以使用DataFrame
API(RDD)。通過(guò)ThriftServer,Spark
SQL支持多語(yǔ)言編程包括Java、Scala、Python及R。使用SparkSQL的方式使用SparkSQL,首先利用77使用SparkSQL的方式使用SparkSQL的方式78使用SparkSQL的方式加載數(shù)據(jù)①.從Hive中的users表構(gòu)造DataFrame:users=sqlContext.table("users")②.加載S3上的JSON文件:logs=sqlContext.load("s3n://path/to/data.json","json")③.加載HDFS上的Parquet文件:clicks=sqlContext.load("hdfs://path/to/data.parquet","parquet")使用SparkSQL的方式加載數(shù)據(jù)①.從Hive中的us79使用SparkSQL的方式加載數(shù)據(jù)④.通過(guò)JDBC訪(fǎng)問(wèn)MySQL:comments=sqlContext.jdbc("jdbc:mysql://localhost/comments","user")⑤.將普通RDD轉(zhuǎn)變?yōu)镈ataFrame:rdd=sparkContext.textFile(“article.txt”).flatMap(_.split("")).map((_,1)).reduceByKey(_+_)wordCounts=sqlContext.createDataFrame(rdd,["word","count"])使用SparkSQL的方式加載數(shù)據(jù)④.通過(guò)JDBC訪(fǎng)問(wèn)M80使用SparkSQL的方式加載數(shù)據(jù)⑥.將本地?cái)?shù)據(jù)容器轉(zhuǎn)變?yōu)镈ataFrame:data=[("Alice",21),("Bob",24)]people=sqlContext.createDataFrame(data,["name","age"])⑦.將Pandas
DataFrame轉(zhuǎn)變?yōu)镾park
DataFrame(Python
API特有功能):sparkDF
=
sqlContext.createDataFrame(pandasDF)使用SparkSQL的方式加載數(shù)據(jù)⑥.將本地?cái)?shù)據(jù)容器轉(zhuǎn)變81使用SparkSQL的方式使用DataFrame①.創(chuàng)建一個(gè)只包含"年輕"用戶(hù)的DataFrame:young=users.filter(users.age<21)②.也可以使用Pandas風(fēng)格的語(yǔ)法:young=users[users.age<21]③.將所有人的年齡加1:young.select(,young.age+1)使用SparkSQL的方式使用DataFrame①.創(chuàng)建82使用SparkSQL的方式使用DataFrame④.統(tǒng)計(jì)年輕用戶(hù)中各性別人數(shù):young.groupBy("gender").count()⑤.將所有年輕用戶(hù)與另一個(gè)名為logs的DataFrame聯(lián)接起來(lái):young.join(logs,logs.userId==users.userId,"left_outer")使用SparkSQL的方式使用DataFrame④.統(tǒng)計(jì)83使用SparkSQL的方式保存結(jié)果①.追加至HDFS上的Parquet文件:young.save(path="hdfs://path/to/data.parquet",source="parquet",mode="append")②.覆寫(xiě)S3上的JSON文件:young.save(path="s3n://path/to/data.json",source="json",mode="append")使用SparkSQL的方式保存結(jié)果①.追加至HDFS上的84使用SparkSQL的方式保存結(jié)果③.保存為SQL表:young.saveAsTable(tableName="young",source="parquet"mode="overwrite")④.轉(zhuǎn)換為PandasDataFrame(PythonAPI特有功能):pandasDF=young.toPandas()⑤.以表格形式打印輸出:young.show()使用SparkSQL的方式保存結(jié)果③.保存為SQL表:85SparkSQL通過(guò)DataFrame接口使用多種數(shù)據(jù)源。應(yīng)用程序可以直接使用關(guān)系型轉(zhuǎn)換對(duì)DataFrame進(jìn)行操作,也可以用來(lái)創(chuàng)建臨時(shí)表。把DataFrame注冊(cè)為臨時(shí)表后就可以使用SQL對(duì)其數(shù)據(jù)進(jìn)行查詢(xún)PART05SparkSQL數(shù)據(jù)源SparkSQL通過(guò)DataFrame接口使用多種數(shù)據(jù)源。86通用Load方法SparkSQL使用數(shù)據(jù)源的最簡(jiǎn)單模式就是對(duì)缺省數(shù)據(jù)源類(lèi)型來(lái)進(jìn)行各種操作。缺省數(shù)據(jù)源類(lèi)型是parquet文件df=spark.read.load("examples/src/main/resources/users.parquet")df.select("name","favorite_color").write.save("namesAndFavColors.parquet")也可以通過(guò)load()函數(shù)的參數(shù)來(lái)說(shuō)明數(shù)據(jù)源的類(lèi)型:df=spark.read.load("examples/src/main/resources/people.json",format="json")df.select("name","age").write.save("namesAndAges.parquet",format="parquet")通用Load方法SparkSQL使用數(shù)據(jù)源的最簡(jiǎn)單模式就是87通用Load方法也可以不把文件上載到DataFrame,然后再對(duì)其進(jìn)行查詢(xún)操作,而是直接對(duì)文件進(jìn)行SQL查詢(xún):df=spark.sql("SELECT*FROMparquet.examples/src/main/resources/users.parquet")通用Load方法也可以不把文件上載到DataFrame,然后88通用Save方法Save方法Save()操作的可選參數(shù)SaveMode用來(lái)說(shuō)明如何處理已有數(shù)據(jù)。需要提醒的是這些保存操作不是原子操作,如果選擇的是Overwrite模式,Save()操作會(huì)首先刪除已有的數(shù)據(jù),然后才寫(xiě)入新的數(shù)據(jù)。SaveMode的可選值:Scala/Java任何語(yǔ)言解釋SaveMode.ErrorIfExists"error"(缺省值)如果有數(shù)據(jù)存在,就拋異常。SaveMode.Append"append"如果有數(shù)據(jù)/表存在,就添加新數(shù)據(jù)到老數(shù)據(jù)后面。SaveMode.Overwrite"overwrite"如果有數(shù)據(jù)/表存在,就用新數(shù)據(jù)覆蓋老數(shù)據(jù)。SaveMode.Ignore"ignore"如果有數(shù)據(jù)存在,就不寫(xiě)入新數(shù)據(jù)。通用Save方法Save方法Save()操作的可選參數(shù)Sav89通用Save方法基于文件的數(shù)據(jù)源,可以用path可選參數(shù)定義一個(gè)數(shù)據(jù)庫(kù)表的路徑,比如:df.write.option(“path”,“/some/path”).saveAsTable(“t”)即使數(shù)據(jù)庫(kù)表刪除后,該數(shù)據(jù)庫(kù)表路徑和數(shù)據(jù)也不會(huì)被刪除。如果程序不指定數(shù)據(jù)庫(kù)表路徑,Spark會(huì)把數(shù)據(jù)寫(xiě)到一個(gè)缺省的數(shù)據(jù)庫(kù)表路徑下。但是在這種情況下,當(dāng)數(shù)據(jù)庫(kù)被刪除的時(shí)候,缺省數(shù)據(jù)庫(kù)表的路徑將會(huì)被刪除。通用Save方法基于文件的數(shù)據(jù)源,可以用path可選參數(shù)定義90通用Save方法對(duì)基于文件的數(shù)據(jù)源還可以對(duì)其輸出進(jìn)行分桶、排序和分割,分桶和排序只能用于當(dāng)輸出方式為持久數(shù)據(jù)庫(kù)表的時(shí)候。df.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed")當(dāng)進(jìn)行分割時(shí),可以使用DatasetAPI接口的save()和saveAsTable()方法:df.write.partitionBy("favor_color").format("parquet").save("namesPartByColor.parquet")通用Save方法對(duì)基于文件的數(shù)據(jù)源還可以對(duì)其輸出進(jìn)行分桶、排91通用Save方法還可以對(duì)同一張表同時(shí)進(jìn)行分割和分桶:df=spark.read.parquet("examples/src/main/resources/users.parquet")(df.write.partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("people_partitioned_bucketed"))通用Save方法還可以對(duì)同一張表同時(shí)進(jìn)行分割和分桶:df=92Parquet文件數(shù)據(jù)源Parquet文件數(shù)據(jù)源SparkSQL支持對(duì)Parquet文件的讀寫(xiě)操作,并自動(dòng)保持源數(shù)據(jù)的模式。為了兼容性,在寫(xiě)Parquet文件的時(shí)候,所有的列會(huì)自動(dòng)轉(zhuǎn)換為可為空的列。把DataFrame保存為Parquet文件,格式信息會(huì)自動(dòng)保留。peopleDF=spark.read.json("examples/src/main/resources/people.json")df.write.partitionBy("favor_color").format("parquet").save("namesPartByColor.parquet")Parquet文件數(shù)據(jù)源Parquet文件數(shù)據(jù)源Spark93讀取上面存儲(chǔ)的Parquet文件為DataFrameParquet文件也可以用來(lái)創(chuàng)建臨時(shí)表供SQL查詢(xún)使用parquetFile=spark.read.parquet("people.parquet")parquetFile.createOrReplaceTempView("parquetFile")teenagers=spark.sql("SELECTnameFROMparquetFileWHEREage>=13ANDage<=19")teenagers.show()Parquet文件數(shù)據(jù)源讀取上面存儲(chǔ)的Parquet文件為DataFrameParq94JSONDataSets數(shù)據(jù)源JSONDataSets數(shù)據(jù)源SparkSQL可以自動(dòng)根據(jù)JSONDataSet的格式把其上載為DataFrame。用路徑指定JSONdataset;路徑下可以是一個(gè)文件,也可以是多個(gè)文件:sc=spark.sparkContextpath="examples/src/main/resources/people.json"peopleDF=spark.read.json(path)使用的結(jié)構(gòu)可以調(diào)用printSchema()方法打印:peopleDF.printSchema()JSONDataSets數(shù)據(jù)源JSONDataSets95利用DataFrame創(chuàng)建一個(gè)臨時(shí)表:使用Spark的sql方法進(jìn)行SQL查詢(xún):peopleDF.createOrReplaceTempView("people")teenagerNamesDF=spark.sql("SELECTnameFROMpeopleWHEREageBETWEEN13AND19")teenagerNamesDF.show()JSONDataSets數(shù)據(jù)源利用DataFrame創(chuàng)建一個(gè)臨時(shí)表:使用Spark的sql96JSONdataset的DataFrame也可以是RDD[String]格式,每個(gè)JSON對(duì)象為一個(gè)string:jsonStrings=['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']otherPeopleRDD=sc.parallelize(jsonStrings)otherPeople=spark.read.json(otherPeopleRDD)otherPeople.show()JSONDataSets數(shù)據(jù)源JSONdataset的DataFrame也可以是RDD[97Hive表數(shù)據(jù)源Hive表數(shù)據(jù)源SparkSQL支持對(duì)Hive中的數(shù)據(jù)進(jìn)行讀寫(xiě)。首先創(chuàng)建一個(gè)支持Hive的SparkSession對(duì)象,包括與Hivemetastore的連接,支持Hive的序列化和反序列化操作,支持用戶(hù)定義的Hive操作等。warehouse_location=abspath('spark-warehouse')spark=SparkSession.builder\.appName("PythonSparkSQLHiveintegrationexample")\.config("spark.sql.warehouse.dir",warehouse_location)\.enableHiveSupport().getOrCreate()warehouse_location指定數(shù)據(jù)庫(kù)和表的缺省位置:Hive表數(shù)據(jù)源Hive表數(shù)據(jù)源SparkSQL支持對(duì)Hi98Hive表數(shù)據(jù)源spark.sql("CREATETABLEIFNOTEXISTSsrc(keyINT,valueSTRING)USINGhive")spark.sql("LOADDATALOCALINPATH'examples/src/main/resources/kv1.txt'INTOTABLEsrc")基于新創(chuàng)建的SparkSession創(chuàng)建表和上載數(shù)據(jù)到表中:spark.sql("SELECT*FROMsrc").show()spark.sql("SELECTCOUNT(*)FROMsrc").show()使用HiveQL進(jìn)行查詢(xún):Hive表數(shù)據(jù)源spark.sql("CREATETABL99Hive表數(shù)據(jù)源sqlDF=spark.sql("SELECTkey,valueFROMsrcWHEREkey<10ORDERBYkey")SQL查詢(xún)的結(jié)果也是DataFrames,可以對(duì)結(jié)果進(jìn)行所有DataFrame的操作:stringsDS=sqlDF.rdd.map(lambdaro
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶(hù)所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶(hù)上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶(hù)上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶(hù)因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年P(guān)A12項(xiàng)目提案報(bào)告模范
- 2025年光伏電站建設(shè)與運(yùn)營(yíng)管理合同
- 2025年微博平臺(tái)廣告投放合作合同
- 2025年會(huì)議場(chǎng)地使用租約協(xié)議參考
- 2025年獸藥購(gòu)銷(xiāo)合同樣本
- 2025年企業(yè)借款擔(dān)保合同標(biāo)準(zhǔn)文本
- 2025年二手住宅居間合同樣本
- 2025年醫(yī)療美容公司股權(quán)融資協(xié)議
- 2025年企業(yè)文化建設(shè)合同樣本
- 2025年鄉(xiāng)村道路路基工程承包合同樣本
- 腰椎間盤(pán)突出癥課件(共100張課件)
- 學(xué)校食堂菜譜及定價(jià)方案
- 2024-2030年中國(guó)非物質(zhì)文化遺產(chǎn)行業(yè)市場(chǎng)深度分析及競(jìng)爭(zhēng)格局與投資策略研究報(bào)告
- 情感表達(dá) 課件 2024-2025學(xué)年人教版(2024)初中美術(shù)七年級(jí)上冊(cè)
- DB50T 662-2015 公交首末站規(guī)劃設(shè)計(jì)規(guī)范
- 2024年上半年教師資格證《初中道德與法治》真題及答案
- 區(qū)塊鏈應(yīng)用操作員技能大賽考試題庫(kù)大全-上(單選題)
- 2024屆中國(guó)航空發(fā)動(dòng)機(jī)集團(tuán)限公司校園招聘高頻考題難、易錯(cuò)點(diǎn)模擬試題(共500題)附帶答案詳解
- 人教版小學(xué)數(shù)學(xué)“數(shù)與代數(shù)”的梳理
- 2024至2030年中國(guó)女裝行業(yè)市場(chǎng)發(fā)展監(jiān)測(cè)及投資前景展望報(bào)告
- 海洋工程裝備制造經(jīng)濟(jì)效益和社會(huì)效益分析報(bào)告
評(píng)論
0/150
提交評(píng)論