《通信數(shù)據(jù)分析與實(shí)戰(zhàn)》課件-第三章 Spark RDD_第1頁(yè)
《通信數(shù)據(jù)分析與實(shí)戰(zhàn)》課件-第三章 Spark RDD_第2頁(yè)
《通信數(shù)據(jù)分析與實(shí)戰(zhàn)》課件-第三章 Spark RDD_第3頁(yè)
《通信數(shù)據(jù)分析與實(shí)戰(zhàn)》課件-第三章 Spark RDD_第4頁(yè)
《通信數(shù)據(jù)分析與實(shí)戰(zhàn)》課件-第三章 Spark RDD_第5頁(yè)
已閱讀5頁(yè),還剩71頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

通信數(shù)據(jù)分析與實(shí)戰(zhàn)SparkRDD第三章第1節(jié)2知道RDD的作用理解RDD的五大特征學(xué)習(xí)目標(biāo)TARGETRDD的概述傳統(tǒng)的MapReduce雖然具有自動(dòng)容錯(cuò)、平衡負(fù)載和可拓展性的優(yōu)點(diǎn),但是其最大缺點(diǎn)是采用非循環(huán)式的數(shù)據(jù)流模型,使得在迭代計(jì)算式要進(jìn)行大量的磁盤(pán)IO操作。Spark中的RDD可以很好的解決這一缺點(diǎn)。RDD(ResilientDistributedDataset),即彈性分布式數(shù)據(jù)集,是一個(gè)容錯(cuò)的、并行的數(shù)據(jù)結(jié)構(gòu),可以讓用戶顯式地將數(shù)據(jù)存儲(chǔ)到磁盤(pán)和內(nèi)存中,并且還能控制數(shù)據(jù)的分區(qū)。對(duì)于迭代式計(jì)算和交互式數(shù)據(jù)挖掘,RDD可以將中間計(jì)算的數(shù)據(jù)結(jié)果保存在內(nèi)存中,若是后面需要中間結(jié)果參與計(jì)算時(shí),則可以直接從內(nèi)存中讀取,從而可以極大地提高計(jì)算速度。RDD是Spark提供的最重要的抽象概念,我們可以將RDD理解為一個(gè)分布式存儲(chǔ)在集群中的大型數(shù)據(jù)集合,不同RDD之間可以通過(guò)轉(zhuǎn)換操作形成依賴關(guān)系實(shí)現(xiàn)管道化,從而避免了中間結(jié)果的I/O操作,提高數(shù)據(jù)處理的速度和性能。RDD的五大特征

分區(qū)列表計(jì)算函數(shù)依賴其他RDDKV類(lèi)型分區(qū)器優(yōu)先位置列表RDD的五大特征分區(qū)列表每個(gè)RDD被分為多個(gè)分區(qū)(Partitions),這些分區(qū)運(yùn)行在集群中的不同節(jié)點(diǎn),每個(gè)分區(qū)都會(huì)被一個(gè)計(jì)算任務(wù)處理,分區(qū)數(shù)決定了并行計(jì)算的數(shù)量,創(chuàng)建RDD時(shí)可以指定RDD分區(qū)的個(gè)數(shù)。如果不指定分區(qū)數(shù)量,當(dāng)RDD從集合創(chuàng)建時(shí),默認(rèn)分區(qū)數(shù)量為該程序所分配到的資源的CPU核數(shù)(每個(gè)Core可以承載2~4個(gè)Partition),如果是從HDFS文件創(chuàng)建,默認(rèn)為文件的Block數(shù)。RDD的五大特征每個(gè)分區(qū)都有一個(gè)計(jì)算函數(shù)

Spark的RDD的計(jì)算函數(shù)是以分片為基本單位的,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù),對(duì)具體的分片進(jìn)行計(jì)算。RDD的五大特征依賴于其他RDD

RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD,所以RDD之間就會(huì)形成類(lèi)似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過(guò)這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算。RDD的五大特征K-V數(shù)據(jù)類(lèi)型的RDD分區(qū)器當(dāng)前Spark中實(shí)現(xiàn)了兩種類(lèi)型的分區(qū)函數(shù),一個(gè)是基于哈希的HashPartitioner,另外一個(gè)是基于范圍的RangePartitioner。只有對(duì)于(Key,Value)的RDD,才會(huì)有Partitioner(分區(qū)),非(Key,Value)的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分區(qū)數(shù)量,也決定了parentRDDShuffle輸出時(shí)的分區(qū)數(shù)量。RDD的五大特征每個(gè)分區(qū)都有一個(gè)優(yōu)先位置列表優(yōu)先位置列表會(huì)存儲(chǔ)每個(gè)Partition的優(yōu)先位置,對(duì)于一個(gè)HDFS文件來(lái)說(shuō),就是每個(gè)Partition塊的位置。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置。10小結(jié)知道RDD的作用理解RDD的五大特征通信數(shù)據(jù)分析與實(shí)戰(zhàn)SparkRDD第三章第2節(jié)12掌握從本地系統(tǒng)創(chuàng)建RDD掌握并行集合創(chuàng)建RDD學(xué)習(xí)目標(biāo)TARGETRDD的創(chuàng)建方式從本地-HDFS文件系統(tǒng)加載數(shù)據(jù)創(chuàng)建通過(guò)并行集合創(chuàng)建RDD從文件系統(tǒng)加載數(shù)據(jù)創(chuàng)建RDDSpark可以從Hadoop支持的任何存儲(chǔ)源中加載數(shù)據(jù)去創(chuàng)建RDD,包括本地文件系統(tǒng)和HDFS等文件系統(tǒng)。我們通過(guò)Spark中的SparkContext對(duì)象調(diào)用textFile()方法加載數(shù)據(jù)創(chuàng)建RDD。scala>valtest=sc.textFile("file:///export/data/test.txt")test:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txtMapPartitionsRDD[1]attextFileat<console>:241、從本地文件中加載數(shù)據(jù)創(chuàng)建RDD從文件系統(tǒng)加載數(shù)據(jù)創(chuàng)建RDDscala>valtestRDD=sc.textFile("/data/test.txt")testRDD:org.apache.spark.rdd.RDD[String]=/data/test.txtMapPartitionsRDD[1]attextFileat<console>:242、從HDFS中加載數(shù)據(jù)創(chuàng)建RDD通過(guò)并行集合創(chuàng)建RDDscala>valarray=Array(1,2,3,4,5)array:Array[Int]=Array(1,2,3,4,5)scala>valarrRDD=sc.parallelize(array)arrRDD:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[6]atparallelizeat<console>:26Spark可以通過(guò)并行集合創(chuàng)建RDD。即從一個(gè)已經(jīng)存在的集合、數(shù)組上,通過(guò)SparkContext對(duì)象調(diào)用parallelize()方法創(chuàng)建RDD。17小結(jié)掌握從本地系統(tǒng)創(chuàng)建RDD掌握并行集合創(chuàng)建RDD通信數(shù)據(jù)分析與實(shí)戰(zhàn)SparkRDD第三章第3節(jié)19知道RDD的處理過(guò)程熟悉RDD的轉(zhuǎn)換算子熟悉RDD的行動(dòng)算子學(xué)習(xí)目標(biāo)TARGETRDD的處理過(guò)程Spark用Scala語(yǔ)言實(shí)現(xiàn)了RDD的API,程序開(kāi)發(fā)者可以通過(guò)調(diào)用API對(duì)RDD進(jìn)行操作處理。RDD經(jīng)過(guò)一系列的“轉(zhuǎn)換”操作,每一次轉(zhuǎn)換都會(huì)產(chǎn)生不同的RDD,以供給下一次“轉(zhuǎn)換”操作使用,直到最后一個(gè)RDD經(jīng)過(guò)“行動(dòng)”操作才會(huì)被真正計(jì)算處理,并輸出到外部數(shù)據(jù)源中,若是中間的數(shù)據(jù)結(jié)果需要復(fù)用,則可以進(jìn)行緩存處理,將數(shù)據(jù)緩存到內(nèi)存中。RDD的轉(zhuǎn)換算子RDD處理過(guò)程中的“轉(zhuǎn)換”操作主要用于根據(jù)已有RDD創(chuàng)建新的RDD,每一次通過(guò)Transformation算子計(jì)算后都會(huì)返回一個(gè)新RDD,供給下一個(gè)轉(zhuǎn)換算子使用。下面,通過(guò)一張表來(lái)列舉一些常用轉(zhuǎn)換算子操作的API,具體如下。RDD的轉(zhuǎn)換算子下面,我們通過(guò)結(jié)合具體的示例對(duì)這些轉(zhuǎn)換算子API進(jìn)行詳細(xì)講解。filter(func)操作會(huì)篩選出滿足函數(shù)func的元素,并返回一個(gè)新的數(shù)據(jù)集。假設(shè),有一個(gè)文件test.txt,下面,通過(guò)一張圖來(lái)描述如何通過(guò)filter算子操作,篩選出包含單詞“spark”的元素。test.txtRDD(lines)RDD(linesWithSpark)hadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshihadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshihadoopsparksparkyuminyuanlongpingsparksc.textFile()lines.filter()RDD的轉(zhuǎn)換算子

通過(guò)從test.txt文件中加載數(shù)據(jù)的方式創(chuàng)建RDD,然后通過(guò)filter操作篩選出滿足條件的元素,這些元素組成的集合是一個(gè)新的RDD。接下來(lái),通過(guò)代碼來(lái)進(jìn)行演示,具體代碼如下:

scala>vallines=sc.textFile("file:///export/data/test.txt")

lines:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txt

MapPartitionsRDD[1]attextFileat<console>:24

scala>vallinesWithSpark=lines.filter(line=>line.contains("spark"))

linesWithSpark:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[2]at

filterat<console>:25RDD的轉(zhuǎn)換算子

map(func)操作將每個(gè)元素傳遞到函數(shù)func中,并將結(jié)果返回為一個(gè)新的數(shù)據(jù)集。有一個(gè)文件test.txt,接下來(lái),通過(guò)一張圖來(lái)描述如何通過(guò)map算子操作把文件內(nèi)容拆分成一個(gè)個(gè)的單詞并封裝在數(shù)組對(duì)象中,具體過(guò)程如下

test.txtRDD(lines)RDD(words)hadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshihadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshiArray(“Hadoop”,“spark”)Array(“tuyouyou”,”yuanshi”)Array(“spark”,”yumin”)Array(“yuanlongping”,“spark”)Array(“zhongnanshan”,“yuanshi”)sc.textFile()lines.map()RDD的轉(zhuǎn)換算子通過(guò)從test.txt文件中加載數(shù)據(jù)的方式創(chuàng)建RDD,然后通過(guò)map操作將文件的每一行內(nèi)容都拆分成一個(gè)個(gè)的單詞元素,這些元素組成的集合是一個(gè)新的RDD。接下來(lái),通過(guò)代碼來(lái)進(jìn)行演示,具體代碼如下:scala>vallines=sc.textFile("file:///export/data/test.txt")lines:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txtMapPartitionsRDD[4]attextFileat<console>:24

scala>valwords=lines.map(line=>line.split(""))words:org.apache.spark.rdd.RDD[Array[String]]=MapPartitionsRDD[13]amapat<console>:25RDD的轉(zhuǎn)換算子flatMap(func)與map(func)相似,但是每個(gè)輸入的元素都可以映射到0或者多個(gè)輸出的結(jié)果。有一個(gè)文件test.txt,接下來(lái),通過(guò)一張圖來(lái)描述如何通過(guò)flatMap算子操作,把文件內(nèi)容拆分成一個(gè)個(gè)的單詞test.txtRDD(lines)RDD(words)hadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshihadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshiArray(“Hadoop”,“spark”)Array(“tuyouyou”,”yuanshi”)Array(“spark”,”yumin”)Array(“yuanlongping”,“spark”)Array(“zhongnanshan”,“yuanshi”)sc.textFile()lines.map()hadoopsparkTuyouyouyuanshiSparkyuminyuanlongpingsparkZhongnanshanyuanshilines.flatmap()lines.flat()RDD的轉(zhuǎn)換算子通過(guò)從test.txt文件中加載數(shù)據(jù)的方式創(chuàng)建RDD,然后通過(guò)flatMap操作將文件的每一行內(nèi)容都拆分成一個(gè)個(gè)的單詞元素,這些元素組成的集合是一個(gè)新的RDD。接下來(lái),通過(guò)代碼來(lái)進(jìn)行演示,具體代碼如下:scala>vallines=sc.textFile("file:///export/data/test.txt")lines:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txtMapPartitionsRDD[5]attextFileat<console>:24

scala>valwords=lines.flatMap(line=>line.split(""))words:org.apache.spark.rdd.RDD[Array[String]]=MapPartitionsRDD[14]atmapat<console>:25RDD的轉(zhuǎn)換算子

groupByKey()主要用于(Key,Value)鍵值對(duì)的數(shù)據(jù)集,將具有相同Key的Value進(jìn)行分組,會(huì)返回一個(gè)新的(Key,Iterable)形式的數(shù)據(jù)集。同樣以文件test.txt為例,接下來(lái),通過(guò)一張圖來(lái)描述如何通過(guò)groupByKey算子操作,將文件內(nèi)容中的所有單詞進(jìn)行分組.

RDD(words)RDD(groupWords)(“Hadoop”,1)(“spark”,1)(“tuyouyou”,1)(“yuanshi”,1)(“spark”,1)(“yumin”,1)(“yuanlongping”,1)(“spark”,1)(“zhongnanshan”,1)(“yuanshi”,1)(“hadoop”,1)(“spark”,(1,1,1))(“tuyouyou”,1)(“yuanshi”,(1,1))(“yumin”,1)(“yaunlongping”,1)(“zhongnanshan”,1)Words.groupByKey()RDD的轉(zhuǎn)換算子通過(guò)groupByKey操作把(Key,Value)鍵值對(duì)類(lèi)型的RDD,按單詞將單詞出現(xiàn)的次數(shù)進(jìn)行分組,這些元素組成的集合是一個(gè)新的RDD。接下來(lái),通過(guò)代碼來(lái)進(jìn)行演示,具體代碼如下:scala>vallines=sc.textFile("file:///export/data/test.txt")lines:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txtMapPartitionsRDD[6]attextFileat<console>:24

scala>valwords=lines.flatMap(line=>line.split("")).map(word=>(word,1))ords:org.apache.spark.rdd.RDD[(String,Int)]=MapPartitionsRDD[15]atmapat<console>:25

scala>valgroupWords=words.groupByKey()groupWords:org.apache.spark.rdd.RDD[(String,Iterable[Int])]=ShuffledRDD[16]atgroupByKeyat<console>:25RDD的轉(zhuǎn)換算子

reduceByKey()主要用于(Key,Value)鍵值對(duì)的數(shù)據(jù)集,返回的是一個(gè)新的(Key,Iterable)形式的數(shù)據(jù)集,該數(shù)據(jù)集是每個(gè)Key傳遞給函數(shù)func進(jìn)行聚合運(yùn)算后得到的結(jié)果。同樣以文件test.txt,接下來(lái),通過(guò)一張圖來(lái)描述如何通過(guò)reduceByKey算子操作統(tǒng)計(jì)單詞出現(xiàn)的次數(shù)。RDD(words)RDD(reduceWords)(“hadoop”,1)(“spark”,1)(“tuyouyou”,1)(“yuanshi”,1)(“spark”,1)(“yumin”,1)(“yuanlongping”,1)(“spark”,1)(“zhongnanshan”,1)(“yuanshi”,1)(“hadoop”,1)(“spark”,3)(“tuyouyou”,1)(“yuanshi”,2)(“yumin”,1)(“yaunlongping”,1)(“zhongnanshan”,1)Words.reduceByKey()RDD的轉(zhuǎn)換算子通過(guò)reduceByKey操作把(Key,Value)鍵值對(duì)類(lèi)型的RDD,按單詞Key將單詞出現(xiàn)的次數(shù)Value進(jìn)行聚合,這些元素組成的集合是一個(gè)新的RDD。接下來(lái),通過(guò)代碼來(lái)進(jìn)行演示,具體代碼如下:scala>vallines=sc.textFile("file:///export/data/test.txt")lines:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txtMapPartitionsRDD[7]attextFileat<console>:24

scala>valwords=lines.flatMap(line=>line.split("")).map(word=>(word,1))words:org.apache.spark.rdd.RDD[(String,Int)]=MapPartitionsRDD[16]atmapat<console>:25

scala>valreduceWords=words.reduceByKey((a,b)=>a+b)reduceWords:org.apache.spark.rdd.RDD[(String,Int)]=ShuffledRDD[17]atreduceByKeyat<console>:25RDD的行動(dòng)算子行動(dòng)算子主要是將在數(shù)據(jù)集上運(yùn)行計(jì)算后的數(shù)值返回到驅(qū)動(dòng)程序,從而觸發(fā)真正的計(jì)算。下面,通過(guò)一張表來(lái)列舉一些常用行動(dòng)算子操作的API,具體如下。RDD的行動(dòng)算子count()主要用于返回?cái)?shù)據(jù)集中的元素個(gè)數(shù)。假設(shè),現(xiàn)有一個(gè)arrRdd,如果要統(tǒng)計(jì)arrRdd元素的個(gè)數(shù),示例代碼如下:scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.count()es0:Long=5RDD的行動(dòng)算子first()主要用于返回?cái)?shù)組的第一個(gè)元素?,F(xiàn)有一個(gè)arrRdd,如果要獲取arrRdd中第一個(gè)元素,示例代碼如下:scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.first()res1:Int=1RDD的行動(dòng)算子

take()主要用于以數(shù)組的形式返回?cái)?shù)組集中的前n個(gè)元素?,F(xiàn)有一個(gè)arrRdd,如果要獲取arrRdd中的前三個(gè)元素,示例代碼如下scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.take(3)res2:Array[Int]=Array(1,2,3)RDD的行動(dòng)算子reduce()主要用于通過(guò)函數(shù)func(輸入兩個(gè)參數(shù)并返回一個(gè)值)聚合數(shù)據(jù)集中的元素?,F(xiàn)有一個(gè)arrRdd,如果要對(duì)arrRdd中的元素進(jìn)行聚合,示例代碼如下:scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.reduce((a,b)=>a+b)res3:Int=15RDD的行動(dòng)算子collect()主要用于以數(shù)組的形式返回?cái)?shù)據(jù)集中的所有元素?,F(xiàn)有一個(gè)rdd,如果希望rdd中的元素以數(shù)組的形式輸出,示例代碼如下:scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.collect()res4:Array[Int]=Array(1,2,3,4,5)RDD的行動(dòng)算子

foreach()主要用于將數(shù)據(jù)集中的每個(gè)元素傳遞到函數(shù)func中運(yùn)行?,F(xiàn)有一個(gè)arrRdd,如果希望遍歷輸出arrRdd中的元素,示例代碼如下:scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.foreach(x=>println(x))

1

2

3

4

539小結(jié)知道RDD的處理過(guò)程熟悉RDD的轉(zhuǎn)換算子熟悉RDD的行動(dòng)算子通信數(shù)據(jù)分析與實(shí)戰(zhàn)SparkRDD第三章第4節(jié)41知道RDD的分區(qū)作用熟悉RDD的分區(qū)方式了解RDD的自定義分區(qū)學(xué)習(xí)目標(biāo)TARGETRDD的分區(qū)在分布式程序中,網(wǎng)絡(luò)通信的開(kāi)銷(xiāo)是很大的,因此控制數(shù)據(jù)分布以獲得最少的網(wǎng)絡(luò)傳輸可以極大的提升程序的整體性能,Spark程序可以通過(guò)控制RDD分區(qū)方式來(lái)減少通信開(kāi)銷(xiāo)。Spark中所有的RDD都可以進(jìn)行分區(qū),系統(tǒng)會(huì)根據(jù)一個(gè)針對(duì)鍵的函數(shù)對(duì)元素進(jìn)行分區(qū)。雖然Spark不能控制每個(gè)鍵具體劃分到哪個(gè)節(jié)點(diǎn)上,但是可以確保相同的鍵出現(xiàn)在同一個(gè)分區(qū)上。RDD分區(qū)的作用RDD的分區(qū)RDD分區(qū)的默認(rèn)數(shù)目Mesos模式RDD的分區(qū)原則是分區(qū)的個(gè)數(shù)盡量等于集群中的CPU核心(Core)數(shù)目。對(duì)于不同的Spark部署模式而言,都可以通過(guò)設(shè)置spark.default.parallelism這個(gè)參數(shù)值來(lái)配置默認(rèn)的分區(qū)數(shù)目。默認(rèn)為本地機(jī)器的CPU數(shù)目,若設(shè)置了local[N],則默認(rèn)為NLocal模式Standalone/yarn模式默認(rèn)的分區(qū)數(shù)是8。在“集群中所有CPU核數(shù)總和”和“2”這兩者中取較大值作為默認(rèn)值RDD的分區(qū)RDD分區(qū)的方式Spark框架為RDD提供了兩種分區(qū)方式,分別是哈希分區(qū)(HashPartitioner)和范圍分區(qū)(RangePartitioner)。哈希分區(qū)是根據(jù)哈希值進(jìn)行分區(qū);范圍分區(qū)是將一定范圍的數(shù)據(jù)映射到一個(gè)分區(qū)中。這兩種分區(qū)方式已經(jīng)可以滿足大多數(shù)應(yīng)用場(chǎng)景的需求。與此同時(shí),Spark也支持自定義分區(qū)方式,即通過(guò)一個(gè)自定義的Partitioner對(duì)象來(lái)控制RDD的分區(qū),從而進(jìn)一步減少通信開(kāi)銷(xiāo)。RDD的分區(qū)RDD分區(qū)的方式需要注意的是,RDD的分區(qū)函數(shù)是針對(duì)(Key,Value)類(lèi)型的RDD,分區(qū)函數(shù)根據(jù)Key對(duì)RDD元素進(jìn)行分區(qū)。因此,當(dāng)需要對(duì)一些非(Key,Value)類(lèi)型的RDD進(jìn)行自定義分區(qū)時(shí),需要先把RDD元素轉(zhuǎn)換為(Key,Value)類(lèi)型,再通過(guò)分區(qū)函數(shù)進(jìn)行分區(qū)操作。如果想要實(shí)現(xiàn)自定義分區(qū),就需要定義一個(gè)類(lèi),使得這個(gè)自定義的類(lèi)繼承org.apache.spark.Partitioner類(lèi),并實(shí)現(xiàn)其中的3個(gè)方法,具體如下:(1).defnumPartitions:Int:用于返回創(chuàng)建的分區(qū)個(gè)數(shù)。(2).defgetPartition(Key:Any):用于對(duì)輸入的Key做處理,并返回該Key的分區(qū)ID,分區(qū)ID的范圍是0~numPartitions-1。(3).equals(other:Any):用于Spark判斷自定義的Partitioner對(duì)象和其他的Partitioner對(duì)象是否相同,從而判斷兩個(gè)RDD的分區(qū)方式是否相同。46小結(jié)知道RDD的分區(qū)作用熟悉RDD的分區(qū)方式了解RDD的自定義分區(qū)通信數(shù)據(jù)分析與實(shí)戰(zhàn)SparkRDD第三章第5節(jié)48知道RDD的窄依賴關(guān)系知道RDD的寬依賴關(guān)系學(xué)習(xí)目標(biāo)TARGETRDD的依賴關(guān)系寬依賴窄依賴RDD的依賴關(guān)系窄依賴是指父RDD的每一個(gè)分區(qū)最多被一個(gè)子RDD的分區(qū)使用,即OneToOneDependencies。窄依賴的表現(xiàn)一般分為兩類(lèi),第一類(lèi)表現(xiàn)為一個(gè)父RDD的分區(qū)對(duì)應(yīng)于一個(gè)子RDD的分區(qū);第二類(lèi)表現(xiàn)為多個(gè)父RDD的分區(qū)對(duì)應(yīng)于一個(gè)子RDD的分區(qū)。一個(gè)父RDD的一個(gè)分區(qū)不可能對(duì)應(yīng)一個(gè)子RDD的多個(gè)分區(qū)。為了便于理解,我們通常把窄依賴形象的比喻為獨(dú)生子女。窄依賴RDD的依賴關(guān)系RDD做map、filter和union算子操作時(shí),是屬于窄依賴的第一類(lèi)表現(xiàn);而RDD做join算子操作(對(duì)輸入進(jìn)行協(xié)同劃分)時(shí),是屬于窄依賴表現(xiàn)的第二類(lèi)。輸入?yún)f(xié)同劃分是指多個(gè)父RDD的某一個(gè)分區(qū)的所有Key,被劃分到子RDD的同一分區(qū)。當(dāng)子RDD做算子操作,因?yàn)槟硞€(gè)分區(qū)操作失敗導(dǎo)致數(shù)據(jù)丟失時(shí),只需要重新對(duì)父RDD中對(duì)應(yīng)的分區(qū)做算子操作即可恢復(fù)數(shù)據(jù)。窄依賴RDD的依賴關(guān)系寬依賴是指子RDD的每一個(gè)分區(qū)都會(huì)使用所有父RDD的所有分區(qū)或多個(gè)分區(qū),即OneToManyDependecies。為了便于理解,我們通常把寬依賴形象的比喻為超生。寬依賴父RDD做groupByKey和join(輸入未協(xié)同劃分)算子操作時(shí),子RDD的每一個(gè)分區(qū)都會(huì)依賴于所有父RDD的所有分區(qū)。當(dāng)子RDD做算子操作,因?yàn)槟硞€(gè)分區(qū)操作失敗導(dǎo)致數(shù)據(jù)丟失時(shí),則需要重新對(duì)父RDD中的所有分區(qū)進(jìn)行算子操作才能恢復(fù)數(shù)據(jù)。RDD的依賴關(guān)系Join算子操作既可以屬于窄依賴,也可以屬于寬依賴.

當(dāng)join算子操作后,分區(qū)數(shù)量沒(méi)有變化則為窄依賴(如joinwithinputsco-partitioned,輸入?yún)f(xié)同劃分)當(dāng)join算子操作后,分區(qū)數(shù)量發(fā)生變化則為寬依賴(如joinwithinputsnotco-partitioned,輸入非協(xié)同劃分)寬窄依賴的注意點(diǎn)54小結(jié)知道RDD的窄依賴關(guān)系知道RDD的寬依賴關(guān)系通信數(shù)據(jù)分析與實(shí)戰(zhàn)SparkRDD第三章第6節(jié)56熟悉RDD的持久化機(jī)制知道RDD的容錯(cuò)機(jī)制學(xué)習(xí)目標(biāo)TARGETRDD的機(jī)制容錯(cuò)機(jī)制持久化機(jī)制RDD的機(jī)制在Spark中,RDD是采用惰性求值,即每次調(diào)用行動(dòng)算子操作,都會(huì)從頭開(kāi)始計(jì)算,這對(duì)迭代計(jì)算來(lái)說(shuō)代價(jià)很大,因?yàn)榈?jì)算經(jīng)常需要多次重復(fù)的使用同一組數(shù)據(jù)集,所以為了避免重復(fù)計(jì)算的開(kāi)銷(xiāo),讓Spark對(duì)數(shù)據(jù)集進(jìn)行持久化操作。RDD的持久化操作有兩種方法,分別是cache()方法和persist()方法。persist()方法的存儲(chǔ)級(jí)別是通過(guò)StorageLevel對(duì)象設(shè)置的。cache()方法的存儲(chǔ)級(jí)別是使用默認(rèn)的存儲(chǔ)級(jí)別(即StorageLevel.MEMORY_ONLY)。持久化機(jī)制RDD的機(jī)制持久化機(jī)制RDD的機(jī)制持久化機(jī)制使用persist()方法對(duì)RDD進(jìn)行持久化RDD的機(jī)制持久化機(jī)制使用cache()方法對(duì)RDD進(jìn)行持久化RDD的機(jī)制容錯(cuò)機(jī)制持久化機(jī)制RDD的機(jī)制容錯(cuò)機(jī)制當(dāng)Spark集群中的某一個(gè)節(jié)點(diǎn)由于宕機(jī)導(dǎo)致數(shù)據(jù)丟失,則可以通過(guò)Spark中的RDD進(jìn)行容錯(cuò)恢復(fù)已經(jīng)丟失的數(shù)據(jù)。RDD提供了兩種故障恢復(fù)的方式,分別是血統(tǒng)(Lineage)方式和設(shè)置檢查點(diǎn)(checkpoint)方式。RDD的機(jī)制容錯(cuò)機(jī)制血統(tǒng)方式(Lineage)根據(jù)RDD之間依賴關(guān)系對(duì)丟失數(shù)據(jù)的RDD進(jìn)行數(shù)據(jù)恢復(fù)。若丟失數(shù)據(jù)的子RDD進(jìn)行窄依賴運(yùn)算,則只需要把丟失數(shù)據(jù)的父RDD的對(duì)應(yīng)分區(qū)進(jìn)行重新計(jì)算,不依賴其他節(jié)點(diǎn),并且在計(jì)算過(guò)程中不存在冗余計(jì)算;若丟失數(shù)據(jù)的RDD進(jìn)行寬依賴運(yùn)算,則需要父RDD所有分區(qū)都要進(jìn)行從頭到尾計(jì)算,計(jì)算過(guò)程中存在冗余計(jì)算。RDD的機(jī)制容錯(cuò)機(jī)制設(shè)置檢查點(diǎn)(checkPoint)方式本質(zhì)是將RDD寫(xiě)入磁盤(pán)存儲(chǔ)。當(dāng)RDD進(jìn)行寬依賴運(yùn)算時(shí),只要在中間階段設(shè)置一個(gè)進(jìn)行檢查點(diǎn)容錯(cuò),即Spark中的sparkContext調(diào)用setCheckpoint()方法,設(shè)置容錯(cuò)文件系統(tǒng)目錄作為檢查點(diǎn)checkpoint,將checkpoint的數(shù)據(jù)寫(xiě)入之前設(shè)置的容錯(cuò)文件系統(tǒng)中進(jìn)行持久化存儲(chǔ),若后面有節(jié)點(diǎn)宕機(jī)導(dǎo)致分區(qū)數(shù)據(jù)丟失,則以從做

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論