2023大數(shù)據(jù)技術(shù)之Spark_第1頁
2023大數(shù)據(jù)技術(shù)之Spark_第2頁
2023大數(shù)據(jù)技術(shù)之Spark_第3頁
2023大數(shù)據(jù)技術(shù)之Spark_第4頁
2023大數(shù)據(jù)技術(shù)之Spark_第5頁
已閱讀5頁,還剩185頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

PAGEPAGE1大數(shù)據(jù)技術(shù)Spark基礎(chǔ)解析第1章Spark概述什么是Spark什么是Spark1、定義Spark是一種基于內(nèi)存的快速、通用、可擴(kuò)展的大數(shù)據(jù)分析引擎。2、歷史2009年誕生于加州大學(xué)伯克利分校AMPLab,項目采用Scala編寫。2010年開源;2013年6月成為Apache孵化項目2014年2月成為Apache頂級項目。Spark內(nèi)置模塊Spark內(nèi)置模塊SparkSQL結(jié)構(gòu)化數(shù)據(jù)SparkStreaming實時計算SparkMlib機(jī)器學(xué)習(xí)SparkSQL結(jié)構(gòu)化數(shù)據(jù)SparkStreaming實時計算SparkMlib機(jī)器學(xué)習(xí)SparkGraghX圖計算SparkCore獨立調(diào)度器YARNMesos——————————————————————————————————————————————————————————PAGEPAGE2SparkCore:SparkSparkCore(ResilientDistributedDataSetRDD)的API定義。SparkSQL:是Spark用來操作結(jié)構(gòu)化數(shù)據(jù)的程序包。通過SparkSQL,我們可以使用SQL或者ApacheHive版本的SQL方言(HQL)來查詢數(shù)據(jù)。SparkSQL支持多種數(shù)據(jù)源,比如Hive表、Parquet以及JSON等。SparkStreaming:是Spark提供的對實時數(shù)據(jù)進(jìn)行流式計算的組件。提供了用來操作數(shù)據(jù)流的API,并且與SparkCore中的RDDAPI高度對應(yīng)。SparkMLlib:提供常見的機(jī)器學(xué)習(xí)(ML)功能的程序庫。包括分類、回歸、聚類、協(xié)同過濾等,還提供了模型評估、數(shù)據(jù)導(dǎo)入等額外的支持功能。集群管理器:Spark設(shè)計為可以高效地在一個計算節(jié)點到數(shù)千個計算節(jié)點之間伸縮計算。為了實現(xiàn)這樣的要求,同時獲得最大靈活性,Spark支持在各種集群管理器(ClusterManager)上運行,包括HadoopYARN、ApacheMesos,以及Spark自帶的一個簡易調(diào)度器,叫作獨立調(diào)度器。SparkHortonworks、IBM、Intel、Cloudera、MapRPivotalGraphXSpark8000Spark——————————————————————————————————————————————————————————PAGEPAGE3Spark特點SparkSpark特點快:與Hadoop的MapReduce相比,Spark基于內(nèi)存的運算要快100倍以上,基于硬盤的運算也要快10倍以上。Spark實現(xiàn)了高效的DAG執(zhí)行引擎,可以通過基于內(nèi)存來高效處理數(shù)據(jù)流。計算的中間結(jié)果是存在于內(nèi)存中的。易用:Spark支持Java、Python和Scala的API,還支持超過80種高級算法,使用戶可以快速構(gòu)建不同的應(yīng)用。而且Spark支持交互式的Python和Scala的Shell,可以非常方便地在這些Shell中使用Spark集群來驗證解決問題的方法。通用:Spark提供了統(tǒng)一的解決方案。Spark可以用于批處理、交互式查詢(SparkSQL)、實時流處理(SparkStreaming)、機(jī)器學(xué)習(xí)(SparkMLlib)和圖計算(GraphX)。這些不同類型的處理都可以在同一個應(yīng)用中無縫使用。減少了開發(fā)和維護(hù)的人力成本和部署平臺的物力成本。兼容性:Spark可以非常方便地與其他的開源產(chǎn)品進(jìn)行融合。比如,Spark可以使用Hadoop的和ApacheMesos作為它的資源管理和調(diào)度器,并且可以處理所有Hadoop支持的數(shù)據(jù),包括HDFS、HBase等。這對于已經(jīng)部署Hadoop集群的用戶特別重要,因為不需要做任何數(shù)據(jù)遷移就可以使用Spark的強大處理能力。第2章Spark運行模式Spark安裝地址\h//docs/2.1.1//downloads.html——————————————————————————————————————————————————————————PAGEPAGE4Local模式概述Local概述Local模式就是運行在一臺計算機(jī)上的模式,通常就是用于在本機(jī)上練手和測試。它可以通過以下集中方式設(shè)置Master。local:所有計算都運行在一個線程當(dāng)中,沒有任何并行計算,通常我們在本機(jī)執(zhí)行一些測試代碼,或者練手,就用這種模式;local[K]:指定使用幾個線程來運行計算,比如local[4]就是運行4個Worker線程。通常我們的Cpu有幾個Core,就指定幾個線程,最大化利用Cpu的計算能力;local[*]:這種模式直接幫你按照Cpu最多Cores來設(shè)置線程數(shù)了。安裝使用[atguigu@hadoop102sorfware]$tar-zxvf[atguigu@hadoop102sorfware]$tar-zxvfspark-2.1.1-bin-hadoop2.7.tgz-C/opt/module/[atguigu@hadoop102module]$mvspark-2.1.1-bin-hadoop2.7spark官方求PI[atguigu@hadoop102spark]$bin/spark-submit\[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--executor-memory1G\--total-executor-cores2\./examples/jars/spark-examples_2.11-2.1.1.jar\100bin/spark-submit\--class<main-class>--master<master-url>\--deploy-mode<deploy-mode>\--conf<key>=<value>\...#otheroptions<application-jar>\[application-arguments]--masterMasterLocal--class:你的應(yīng)用的啟動類(如org.apache.spark.examples.SparkPi)--deploy-mode:是否發(fā)布你的驅(qū)動到worker節(jié)點(cluster)或者作為一個本地客戶端(client)(default:client)*--conf:任意的Spark配置屬性,格式key=value.如果值包含空格,可以加引號“key=value”application-jar:打包好的應(yīng)用jar,包含依賴.這個URL在集群中全局可見。比如hdfs://共享存儲系統(tǒng),如果是file://path,那么所有的節(jié)點的path都包含同樣的jarapplication-arguments:傳給main()方法的參數(shù)--executor-memory1G指定每個executor可用內(nèi)存為1G--total-executor-cores2executorcup2個該算法是利用蒙特·卡羅算法求PI[atguigu@hadoop102spark]$mkdirinput helloatguiguhellospark在input下創(chuàng)建3個文件1.txt和2.txt,并輸入以下內(nèi)容helloatguiguhellosparkspark-shell[atguigu@hadoop102spark]$bin/spark-shell[atguigu@hadoop102spark]$bin/spark-shellUsingSpark'sdefaultlog4jprofile:org/apache/spark/log4j-pertiesSettingdefaultloglevelto"WARN".Toadjustlogginglevelusesc.setLogLevel(newLevel).ForSparkR,usesetLogLevel(newLevel).18/09/2908:50:52WARNNativeCodeLoader:Unabletoloadnative-hadooplibraryforyourplatform...usingbuiltin-javaclasseswhereapplicable18/09/2908:50:58WARNObjectStore:Failedtogetdatabaseglobal_temp,returningNoSuchObjectExceptionSparkcontextWebUIavailableat02:4040Sparkcontextavailableas'sc'(master=local[*],appid=local-1538182253312).Sparksessionavailableas'spark'.Welcometo// //_\\/_\/_`/_\\/_\/_`//'_///./\_,_/_//_/\_\version2.1.1/_/UsingScalaversion2.11.8(JavaHotSpot(TM)64-BitServerVM,Java1.8.0_144)Typeinexpressionstohavethemevaluated.Type:helpformoreinformation.scala>[atguigu@hadoop102spark]$jps3627SparkSubmit4047Jps開啟另一個CRD[atguigu@hadoop102spark]$jps3627SparkSubmit4047Jps可登錄hadoop102:4040查看程序運行scala>sc.textFile("input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collectres0:Array[(String,scala>sc.textFile("input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collectres0:Array[(String,Int)]=Array((hadoop,6),(oozie,3),(spark,3),(hive,3),(atguigu,3),(hbase,6))scala>可登錄hadoop102:4040查看程序運行提交流程1)提交任務(wù)分析:——————————————————————————————————————————————————————————PAGEPAGE9Spark通用運行簡易流程Spark通用運行簡易流程Driver運行反向注冊任務(wù)提交ClientDriver初始化sc執(zhí)行任務(wù)textFile()flatMap()map()…注冊應(yīng)用程序啟動Executor資源管理者執(zhí)行任務(wù)textFile()flatMap()map()…Driver(驅(qū)動器)SparkmainSparkContext、創(chuàng)建RDD,以及進(jìn)行RDDsparkshellSparkshellSpark序,就是在SparkshellscSparkContextSpark1)Executor3)4)UI展示應(yīng)用運行狀況Executor(執(zhí)行器)SparkExecutor是一個工作進(jìn)程,負(fù)責(zé)在Spark作業(yè)中運行任務(wù),任務(wù)間相互獨立。Spark應(yīng)用啟動時,Executor節(jié)點被同時啟動,并且始終伴隨著整個Spark應(yīng)用的生命周期而存在。如果有Executor節(jié)點發(fā)生了故障或崩潰,Spark應(yīng)用也可以繼續(xù)執(zhí)行,會將出錯節(jié)點上的任務(wù)調(diào)度到其他Executor節(jié)點上繼續(xù)運行。主要負(fù)責(zé):Spark(BlockManager)RDDRDD是直接緩存在Executor數(shù)據(jù)流程textFile("input"):讀取本地文件input文件夾數(shù)據(jù);flatMap(_.split("")):壓平操作,按照空格分割符將一行數(shù)據(jù)映射成一個個單詞;map((_,1)):對每一個元素操作,將單詞映射為元組;reduceByKey(_+_):按照key將值進(jìn)行聚合,相加;WordCount案例分析sc.textFile("input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collecttextFile()flatMap()WordCount案例分析sc.textFile("input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collecttextFile()flatMap()map()reduceByKey()collect()1.txthellohelloatguigu helloatguigu atguiguhellospark hellospark hellospark(hello,1)(hello,1)(spark,1)(hello,4)(atguigu,2)(hello,4)(atguigu,2)(spark,2)2.txthelloatguiguhellosparkhelloatguiguhellosparkhelloatguiguhellospark(hello,1)(hello,1)(spark,1)(spark,2)Standalone模式概述構(gòu)建一個由Master+Slave構(gòu)成的Spark集群,Spark運行在集群中。StandaloneStandalone運行模式介紹6.報告Task狀態(tài),直至結(jié)束WorkerExecutorA5.分配Task4.注冊2.申請資源,然后啟動ExecutorBackendTaskClient1.注冊Task3.報告Executor狀態(tài)SparkContextMaster7.注銷Worker安裝使用sparkconf[atguigu@hadoop102module]$cdspark/conf/ [atguigu@hadoop102conf]$mvslaves.templateslaves[atguigu@hadoop102conf]$mvspark-env.sh.templatespark-env.sh[atguigu@hadoop102conf]$mvslaves.templateslaves[atguigu@hadoop102conf]$mvspark-env.sh.templatespark-env.shslave文件,添加work[atguigu@hadoop102conf]$vimslaves[atguigu@hadoop102conf]$vimslaveshadoop102hadoop103hadoop104[atguigu@hadoop102conf]$vimspark-env.shSPARK_MASTER_HOST=hadoop101SPARK_MASTER_PORT=7077[atguigu@hadoop102conf]$vimspark-env.shSPARK_MASTER_HOST=hadoop101SPARK_MASTER_PORT=7077spark包[atguigu@hadoop102module]$xsyncspark/ [atguigu@hadoop102spark]$sbin/start-all.sh[atguigu@hadoop102spark]$util.sh================atguigu@hadoop102================3330Jps[atguigu@hadoop102spark]$sbin/start-all.sh[atguigu@hadoop102spark]$util.sh================atguigu@hadoop102================3330Jps3238Worker3163Master================atguigu@hadoop103================2966Jps2908Worker—————————————————————————————================atguigu@hadoop104================—————————————————————————————================atguigu@hadoop104================PAGEPAGE102978Worker2978Worker3036Jps網(wǎng)頁查看:hadoop102:8080exportJAVA_HOME=XXXX注意:如果遇到“JAVA_HOMEnotset”異常,可以在sbin目錄下的spark-config.sh文件中加入如下配置:exportJAVA_HOME=XXXX[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masterspark://hadoop102:7077\--executor-memory1G\--total-executor-cores2\./examples/jars/spark-examples_2.11-2.1.1.jar\100[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masterspark://hadoop102:7077\--executor-memory1G\--total-executor-cores2\./examples/jars/spark-examples_2.11-2.1.1.jar\100/opt/module/spark/bin/spark-shell\--masterspark://hadoop101:7077\--executor-memory1g\--total-executor-cores2/opt/module/spark/bin/spark-shell\--masterspark://hadoop101:7077\--executor-memory1g\--total-executor-cores2參數(shù):--masterspark://hadoop102:7077指定要連接的集群的masterscala>sc.textFile("input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collectres0:Array[(String,scala>sc.textFile("input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collectres0:Array[(String,Int)]=Array((hadoop,6),(oozie,3),(spark,3),(hive,3),(atguigu,3),(hbase,6))scala>JobHistoryServer配置[atguigu@hadoop102conf]$mvspark-defaults.conf.templatespark-defaults.confspark-default.conf.template名稱[atguigu@hadoop102conf]$mvspark-defaults.conf.templatespark-defaults.confspark-default.confLog:[atguigu@hadoop102conf]$vispark-defaults.confspark.eventLog.enabled[atguigu@hadoop102conf]$vispark-defaults.confspark.eventLog.enabled true——————————————————————————————————————————————————————————PAGEPAGE15spark.eventLog.dirspark.eventLog.dir hdfs://hadoop102:9000/directory注意:HDFS上的目錄需要提前存在。[atguigu@hadoop102conf]$vispark-env.shexportSPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080-Dspark.history.retainedApplications=30-Dspark.history.fs.logDirectory=hdfs://hadoop101:9000/directory"[atguigu@hadoop102conf]$vispark-env.shexportSPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080-Dspark.history.retainedApplications=30-Dspark.history.fs.logDirectory=hdfs://hadoop101:9000/directory"參數(shù)描述:spark.eventLog.dir:Application在運行過程中所有的信息均記錄在該屬性指定的路徑下spark.history.ui.port=18080WEBUI訪問的端口號為18080spark.history.fs.logDirectory=hdfs://hadoop102:9000/directory配置了該屬性后,在start-history-server.sh時就無需再顯式的指定路徑,SparkHistoryServer頁面只展示該指定路徑下的信息spark.history.retainedApplications=30指定保存Application歷史記錄的個數(shù),如果超過這個值,舊的應(yīng)用程序信息將被刪除,這個是內(nèi)存中的應(yīng)用數(shù),而不是頁面上顯示的應(yīng)用數(shù)。[atguigu@hadoop102conf]$xsyncspark-defaults.conf[atguigu@hadoop102conf]$xsyncspark-env.sh[atguigu@hadoop102conf]$xsyncspark-defaults.conf[atguigu@hadoop102conf]$xsyncspark-env.sh[atguigu@hadoop102spark]$sbin/start-history-server.sh[atguigu@hadoop102spark]$sbin/start-history-server.sh[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masterspark://hadoop101:7077\--executor-memory1G\--total-executor-cores2\./examples/jars/spark-examples_2.11-2.1.1.jar\100hadoop102:18080HA配置zookeeper

圖1HA架構(gòu)圖[atguigu@hadoop102conf]$vispark-env.sh注釋掉如下內(nèi)容:#SPARK_MASTER_HOST=hadoop102#SPARK_MASTER_PORT=7077添加上如下內(nèi)容:exportSPARK_DAEMON_JAVA_OPTS="[atguigu@hadoop102conf]$vispark-env.sh注釋掉如下內(nèi)容:#SPARK_MASTER_HOST=hadoop102#SPARK_MASTER_PORT=7077添加上如下內(nèi)容:exportSPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER-Dspark.deploy.zookeeper.url=hadoop101,hadoop102,hadoop103-Dspark.deploy.zookeeper.dir=/spark"[atguigu@hadoop102conf]$xsyncspark-env.sh 在hadoop102[atguigu@hadoop102spark]$sbin/start-all.sh 在hadoop103master節(jié)點[atguigu@hadoop103spark]$sbin/start-master.sh /opt/module/spark/bin/spark-shell\--masterspark://hadoop101:7077,hadoop102:7077\--executor-memory2g\--total-executor-cores2spark/opt/module/spark/bin/spark-shell\--masterspark://hadoop101:7077,hadoop102:7077\--executor-memory2g\--total-executor-cores2模式概述Spark客戶端直接連接Yarn,不需要額外構(gòu)建Spark集群。有yarn-client和yarn-cluster兩種模式,主要區(qū)別在于:Driver程序的運行節(jié)點。yarn-client:Driver程序運行在客戶端,適用于交互、調(diào)試,希望立即看到app的輸出Yarn運行模式介紹4.AppStatus直至結(jié)束NodeManager2.RM選擇一個NM啟動AM,AM啟動Yarn運行模式介紹4.AppStatus直至結(jié)束NodeManager2.RM選擇一個NM啟動AM,AM啟動Driver(即初始化sc)SparkAppMasterSparkContext3.AM啟動Executor5.注銷 并分配TaskNodeManagerClient1.AppSubmitResourceManagerSpark SparkExecutor NodeManagerSpark安裝使用[atguigu@hadoop102hadoop]$viyarn-site.xml<!--true<property>[atguigu@hadoop102hadoop]$viyarn-site.xml<!--true<property><name>yarn.nodemanager.pmem-check-enabled</name><value>false</value></property><!--true<property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value><value>false</value></property>[atguigu@hadoop102conf]$vispark-env.shYARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop[atguigu@hadoop102conf]$vispark-env.shYARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop[atguigu@hadoop102[atguigu@hadoop102conf]$xsync/opt/module/hadoop-2.7.2/etc/hadoop/yarn-site.xml[atguigu@hadoop102conf]$xsyncspark-env.sh[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masteryarn\--deploy-modeclient\./examples/jars/spark-examples_2.11-2.1.1.jar\100注意:在提交任務(wù)之前需啟動HDFS以及YARN集群。日志查看spark-defaults.confspark.yarn.historyServer.address=hadoop101:18080spark.history.ui.port=4000添加如下內(nèi)容:spark.yarn.historyServer.address=hadoop101:18080spark.history.ui.port=4000spark[atguigu@hadoop102spark]$sbin/stop-history-server.shstoppingorg.apache.spark.deploy.history.HistoryServer[atguigu@hadoop102spark]$sbin/start-history-server.sh[atguigu@hadoop102spark]$sbin/stop-history-server.shstoppingorg.apache.spark.deploy.history.HistoryServer[atguigu@hadoop102spark]$sbin/start-history-server.shstartingorg.apache.spark.deploy.history.HistoryServer,loggingto/opt/module/spark/logs/spark-atguigu-org.apache.spark.deploy.history.HistoryServer-1-hadoop102.out[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masteryarn\--deploy-modeclient\./examples/jars/spark-examples_2.11-2.1.1.jar\100[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masteryarn\--deploy-modeclient\./examples/jars/spark-examples_2.11-2.1.1.jar\100Mesos模式SparkMesosSparkyarn調(diào)度。幾種模式對比模式Spark安裝機(jī)器數(shù)需啟動的進(jìn)程所屬者Local1無SparkStandalone3Master及WorkerSparkYarn1Yarn及HDFSHadoop第3章案例實操SparkShellIDEjarMavenjar編寫WordCount程序<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId>Maven<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.1.1</version><version>2.1.1</version></dependency></dependencies><build><finalName>WordCount</finalName><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build>packagecom.atguiguimportorg.apache.spark.{SparkConf,SparkContext}objectWordCount{packagecom.atguiguimportorg.apache.spark.{SparkConf,SparkContext}objectWordCount{defmain(args:Array[String]):Unit={//1.SparkConfAppvalconf=newSparkConf().setAppName("WC")//2.SparkContextSparkAppvalsc=newSparkContext(conf)//3.用sc創(chuàng)建RDD執(zhí)相的transformation和actionsc.textFile(args(0)).flatMap(_.split(" 1)).reduceByKey(_+_,1).sortBy(_._2,false).saveAsTextFile(args(1))//4.關(guān)閉連接sc.stop()}}打包插件<plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><archive><manifest><mainClass>WordCount</mainClass></manifest></archive>——————————————————————————————————————————————————————————PAGEPAGE17<descriptorRefs><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin>bin/spark-submit\--classWordCount\bin/spark-submit\--classWordCount\--masterspark://hadoop102:7077\WordCount.jar\/word.txt\/out本地調(diào)試Spark程序調(diào)試需要使用local提交模式,即將本機(jī)當(dāng)做運行環(huán)境,MasterWorker都為本機(jī)。運行時直接加斷點調(diào)試即可。如下:SparkConf的時候設(shè)置額外屬性,表明本地執(zhí)行:valconf=newSparkConf().setAppName("WC").setMaster("local[*]")如果本機(jī)操作系統(tǒng)是windows,如果在程序中使用了hadoop相關(guān)的東西,比如寫入文件到HDFS,則會遇到如下異常:出現(xiàn)這個問題的原因,并不是程序的錯誤,而是用到了hadoop相關(guān)的服務(wù),解決辦法是將附加里面的hadoop-common-bin-2.7.3-x64.zip解壓到任意目錄。在IDEA中配置RunConfiguration,添加HADOOP_HOME變量——————————————————————————————————————————————————————————(作者:尚硅谷大數(shù)據(jù)研發(fā)部)版本:V1.2第1章RDD概述什么是RDD(ResilientDistributedDataset)SparkRDD的屬性on;;RDD;PartitionerRDD;一個列表,存儲存取每個Paon(peeedocaoRDD特點RDD表示只讀的分區(qū)的數(shù)據(jù)集,對RDD進(jìn)行改動,只能通過RDDRDD得到一個新的RDDRDDRDDs之間RDD彈性存儲的彈性:內(nèi)存與磁盤的自動切換;容錯的彈性:數(shù)據(jù)丟失可以自動恢復(fù);計算的彈性:計算出錯重試機(jī)制;分片的彈性:可根據(jù)需要重新分片。分區(qū)RDDcomputeRDDcomputeRDDRDDcompute換邏輯將其他RDD只讀如下圖所示,RDD是只讀的,要想改變RDD中的數(shù)據(jù),只能在現(xiàn)有的RDD基礎(chǔ)上創(chuàng)建新的RDD。由一個RDD轉(zhuǎn)換到另一個RDD,可以通過豐富的操作算子實現(xiàn),不再像MapReduce那樣只能寫map和reduce了。RDDtransformationsRDDRDDRDDRDD算結(jié)果或者將RDDRDD依賴RDDsRDDRDDsRDDsRDDRDD)緩存如果在應(yīng)用程序中多次使用同一個,可以將該RDDRDDRDDRDD-1RDD-nhdfs,RDD-1RDD-1RDD-m前的RDD-0CheckPointRDDRDDRDDcheckpointcheckpoint后的RDD不需要知道它的父RDDscheckpoint第2章RDD編程編程模型SparkRDDRDDtransformations定義RDDactionsRDDaction可以是向應(yīng)用程序返回結(jié)果(count,collect等),或者是向存儲系統(tǒng)保存數(shù)據(jù)(saveAsTextFile等)。在Spark中,只有遇到action,才會執(zhí)行RDD的計算(即延遲計算),這樣在運行時可以通過管道的方式傳輸多個轉(zhuǎn)換。SparkDriverDriver中定義了一個或多個RDDRDDRDDRDD的創(chuàng)建在Spark中創(chuàng)建RDD的創(chuàng)建方式可以分為三種:從集合中創(chuàng)建RDD;從外部存儲創(chuàng)建RDD;從其他RDD創(chuàng)建。從集合中創(chuàng)建scala>valrdd=sc.parallelize(Array(1,2,3,4,5,6,7,8))rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[0]atparallelizeat<console>:24從集合中創(chuàng)建RDD,SparkparallelizemakeRDD1)parallelize()scala>valrdd=sc.parallelize(Array(1,2,3,4,5,6,7,8))rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[0]atparallelizeat<console>:24scala>valrdd1=sc.makeRDD(Array(1,2,3,4,5,6,7,8))rdd1:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[1]atmakeRDDat<console>:242)makeRDD()scala>valrdd1=sc.makeRDD(Array(1,2,3,4,5,6,7,8))rdd1:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[1]atmakeRDDat<console>:24由外部存儲系統(tǒng)的數(shù)據(jù)集創(chuàng)建包括本地的文件系統(tǒng),還有所有Hadoop支持的數(shù)據(jù)集,比如HDFS、Cassandra、HBasescala>valrdd2=sc.textFile("hdfs://hadoop102:9000/RELEASE")rdd2: org.apache.spark.rdd.RDD[String]scala>valrdd2=sc.textFile("hdfs://hadoop102:9000/RELEASE")rdd2: org.apache.spark.rdd.RDD[String] = hdfs:// MapPartitionsRDD[4]attextFileat<console>:24從其他RDD創(chuàng)建詳見2.3節(jié)RDD的轉(zhuǎn)換(面試開發(fā)重點)RDD整體上分為Value類型和Key-Value類型類型map(func)案例作用:返回一個新的RDDRDDfunc1-10數(shù)組的RDD形成新的RDDscala>scala>varsource =sc.parallelize(1to10)source:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[8]atparallelizeat<console>:24打印scala>source.collect()scala>source.collect()res7:Array[Int]=Array(1,2,3,4,5,6,7,8,9,10)scala>valmapadd=source.map(_*2)mapadd:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[9]atmapat<console>:26scala>valmapadd=source.map(_*2)mapadd:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[9]atmapat<console>:26scala>mapadd.collect()scala>mapadd.collect()res8:Array[Int]=Array(2,4,6,8,10,12,14,16,18,20)mapPartitions(func)案例map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDDfuncIterator[T]=>Iterator[U]NM個分map的函數(shù)的將被調(diào)用N次mapPartitionsM次,需求:創(chuàng)建一個RDD組成新的RDDscala>valrdd=sc.parallelize(Array(1,2,3,4))rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[4]atparallelizeat<console>:24創(chuàng)建一個scala>valrdd=sc.parallelize(Array(1,2,3,4))rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[4]atparallelizeat<console>:24組成新的RDDscala>rdd.mapPartitions(x=>x.map(_*2))scala>rdd.mapPartitions(x=>x.map(_*2))res3:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[6]atmapPartitionsat<console>:27scala>res3.collectres4:Array[Int]=Array(2,4,6,8)打印新的scala>res3.collectres4:Array[Int]=Array(2,4,6,8)mapPartitionsWithIndex(func)案例作用:類似于mapPartitionsfuncT的RDDfunc(Int,Interator[T])Iterator[U];需求:創(chuàng)建一個RDDRDDscala>valrdd=sc.parallelize(Array(1,2,3,4))rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[4]atparallelizeat<console>:24創(chuàng)建一個scala>valrdd=sc.parallelize(Array(1,2,3,4))rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[4]atparallelizeat<console>:24RDDscala>valindexRdd=rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_)))) indexRdd:indexRdd:org.apache.spark.rdd.RDD[(Int,Int)]=MapPartitionsRDD[5]atmapPartitionsWithIndexat<console>:26scala>indexRdd.collectres2:Array[(Int,Int)]=Array((0,1),(0,2),(1,3),(1,4))打印新的scala>indexRdd.collectres2:Array[(Int,Int)]=Array((0,1),(0,2),(1,3),(1,4))flatMap(func)案例作用:類似于map0(func應(yīng))1-5RDDflatMap創(chuàng)建一個新的RDD,新的RDD為原RDD的每個元素的擴(kuò)展(1->1,2->1,2……5->1,2,3,4,5)scala>valsourceFlat=sc.parallelize(1to5)sourceFlat:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[12]atparallelizeatscala>valsourceFlat=sc.parallelize(1to5)sourceFlat:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[12]atparallelizeat<console>:24打印scala>sourceFlat.collect()scala>sourceFlat.collect()res11:Array[Int]=Array(1,2,3,4,5)scala>valflatMap=sourceFlat.flatMap(1to_)flatMap:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[13]atflatMapat<console>:26(3)根據(jù)原RDD創(chuàng)建新RDD(1->1,2->1,2……5->1,2,3,4,5scala>valflatMap=sourceFlat.flatMap(1to_)flatMap:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[13]atflatMapat<console>:26(4)打印新RDDscala>flatMap.collect()scala>flatMap.collect()res12:Array[Int]=Array(1,1,2,1,2,3,1,2,3,4,1,2,3,4,5)map()和mapPartition()的區(qū)別mapPartition()RDD數(shù)據(jù)才能釋放,可能導(dǎo)致OOM。mapPartition()glom案例RDDRDD[Array[T]]4個分區(qū)的RDDscala>valrdd=sc.parallelize(1to16,4)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[65]atparallelizeat<console>:24scala>valrdd=sc.parallelize(1to16,4)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[65]atparallelizeat<console>:24Driver端打印scala>rdd.glom().collect()scala>rdd.glom().collect()res25:Array[Array[Int]]=Array(Array(1,2,3,4),Array(5,6,7,8),Array(9,10,12),Array(13,14,15,16))14,15,16))groupBy(func)案例key需求:創(chuàng)建一個RDD2scala>valrdd=sc.parallelize(1to4)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[65]atparallelizeat<console>:24scala>valrdd=sc.parallelize(1to4)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[65]atparallelizeat<console>:242scala>valgroup=rdd.groupBy(_%2)scala>valgroup=rdd.groupBy(_%2)group:org.apache.spark.rdd.RDD[(Int,Iterable[Int])]=ShuffledRDD[2]atgroupByat<console>:26scala>group.collectres0:Array[(Int,Iterable[Int])]=Array((0,CompactBuffer(2,4)),(1,CompactBuffer(1,3)))scala>group.collectres0:Array[(Int,Iterable[Int])]=Array((0,CompactBuffer(2,4)),(1,CompactBuffer(1,3)))filter(func)案例RDD,該RDDfunctrueDD(xao)scala>varsourceFilter=sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))sourceFilter:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[10]atparallelizeat<console>:24scala>varsourceFilter=sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))sourceFilter:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[10]atparallelizeat<console>:24打印scala>sourceFilter.collect()scala>sourceFilter.collect()res9:Array[String]=Array(xiaoming,xiaojiang,xiaohe,dazhi)scala>valfilter=sourceFilter.filter(_.contains("xiao"))filter:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[11]atfilterat<console>:26xiao”子串的形成一個新的scala>valfilter=sourceFilter.filter(_.contains("xiao"))filter:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[11]atfilterat<console>:26RDDscala>filter.collect()scala>filter.collect()res10:Array[String]=Array(xiaoming,xiaojiang,xiaohe)sample(withReplacement,fraction,seed)案例fractionwithReplacementtruefalseseedD110scala>valrdd=sc.parallelize(1to10)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[20]atparallelizeat<console>:24創(chuàng)建scala>valrdd=sc.parallelize(1to10)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[20]atparallelizeat<console>:24打印scala>rdd.collect()scala>rdd.collect()res15:Array[Int]=Array(1,2,3,4,5,6,7,8,9,10)scala>varsample1=rdd.sample(true,0.4,2)sample1:scala>varsample1=rdd.sample(true,0.4,2)sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[21] at sample at<console>:26scala>sample1.collect()scala>sample1.collect()res16:Array[Int]=Array(1,2,2,7,7,8,9)scala>varsample2=rdd.sample(false,0.2,3)sample2:scala>varsample2=rdd.sample(false,0.2,3)sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[22] at sample at<console>:26scala>sample2.collect()res17:Array[Int]=Array(1,scala>sample2.collect()res17:Array[Int]=Array(1,9)distinct([numTasks]))案例作用:對源RDDRDD8需求:創(chuàng)建一個RDDdistinct()scala>valdistinctRdd=sc.parallelize(List(1,2,1,5,2,9,6,1))distinctRdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[34]atparallelizeat<console>:24scala>valdistinctRdd=sc.parallelize(List(1,2,1,5,2,9,6,1))distinctRdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[34]atparallelizeat<console>:24RDD)scala>valunionRDD=distinctRdd.distinct()scala>valunionRDD=distinctRdd.distinct()unionRDD:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[37]atdistinctat<console>:26scala>unionRDD.collect()res20:Array[Int]=Array(1,9,5,6,2)scala>unionRDD.collect()res20:Array[Int]=Array(1,9,5,6,2)RDD(2)scala>valunionRDD=distinctRdd.distinct(2)scala>valunionRDD=distinctRdd.distinct(2)unionRDD:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[40]atdistinctat<console>:26scala>unionRDD.collect()res21:Array[Int]=Array(6,2,1,9,5)scala>unionRDD.collect()res21:Array[Int]=Array(6,2,1,9,5)coalesce(numPartitions)案例4個分區(qū)的RDDscala>valrdd=sc.parallelize(1to16,4)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[54]atparallelizeat<console>:24創(chuàng)建一個scala>valrdd=sc.parallelize(1to16,4)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[54]atparallelizeat<console>:24查看RDDscala>rdd.partitions.sizeres20:Int=4scala>rdd.partitions.sizeres20:Int=4scala>valcoalesceRDD=rdd.coalesce(3)coalesceRDD:org.apache.spark.rdd.RDD[Int]=CoalescedRDD[55]atcoalesceat<console>:26RDDscala>valcoalesceRDD=rdd.coalesce(3)coalesceRDD:org.apache.spark.rdd.RDD[Int]=CoalescedRDD[55]atcoalesceat<console>:26RDDscala>coalesceRDD.partitions.sizeres21:Int=3scala>coalesceRDD.partitions.sizeres21:Int=3repartition(numPartitions)案例4個分區(qū)的RDDscala>valrdd=sc.parallelize(1to16,4)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[56]atparallelizeat<console>:24創(chuàng)建一個scala>valrdd=sc.parallelize(1to16,4)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[56]atparallelizeat<console>:24查看RDDscala>rdd.partitions.sizeres22:Int=4scala>rdd.partitions.sizeres22:Int=4scala>valrerdd=rdd.repartition(2)rerdd:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[60]atrepartitionat<console>:26RDDscala>valrerdd=rdd.repartition(2)rerdd:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[60]atrepartitionat<console>:26RDDscala>rerdd.partitions.sizeres23:Int=2scala>rerdd.partitions.sizeres23:Int=2coalesce和repartition的區(qū)別coalesceshuffleshuffle:Booleanfalse/true決定。defrepartition(numPartitions:Int)(implicitord:Ordering[T]=null):RDD[T]=withScope{coalesce(numPartitions,shuffle=true)}repartitioncoalesceshuffledefrepartition

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論