Spark大數(shù)據(jù)分析技術(shù)(Python版)課件 第3章 Spark RDD編程_第1頁
Spark大數(shù)據(jù)分析技術(shù)(Python版)課件 第3章 Spark RDD編程_第2頁
Spark大數(shù)據(jù)分析技術(shù)(Python版)課件 第3章 Spark RDD編程_第3頁
Spark大數(shù)據(jù)分析技術(shù)(Python版)課件 第3章 Spark RDD編程_第4頁
Spark大數(shù)據(jù)分析技術(shù)(Python版)課件 第3章 Spark RDD編程_第5頁
已閱讀5頁,還剩35頁未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

SparkRDD編程1

RDD創(chuàng)建的方式2 RDD轉(zhuǎn)換操作3

RDD行動(dòng)操作4RDD之間的依賴關(guān)系5RDD的持久化6案例實(shí)戰(zhàn):SparkRDD實(shí)現(xiàn)詞頻統(tǒng)計(jì)1RDD創(chuàng)建的方式RDD是Spark對(duì)具體數(shù)據(jù)對(duì)象的一種抽象(封裝),本質(zhì)上是一個(gè)只讀的分區(qū)記錄集合,每個(gè)分區(qū)(partition)就是一個(gè)數(shù)據(jù)集片段,每個(gè)分區(qū)對(duì)應(yīng)一個(gè)Task任務(wù)來執(zhí)行。Spark中的所有操作都是基于RDD進(jìn)行的,一個(gè)Spark應(yīng)用可以看作一個(gè)由“RDD創(chuàng)建”到“一系列RDD轉(zhuǎn)化操作”,再到“RDD存儲(chǔ)”的過程。1RDD創(chuàng)建的方式使用程序中數(shù)據(jù)集創(chuàng)建RDD可通過調(diào)用SparkContext對(duì)象的parallelize()方法并行化程序中的Python類型的數(shù)據(jù)集來創(chuàng)建RDD。>>>arr=[1,2,3,4,5,6]>>>rdd=sc.parallelize(arr)#把a(bǔ)rr這個(gè)數(shù)據(jù)集并行化到節(jié)點(diǎn)上來創(chuàng)建RDDSpark會(huì)為每一個(gè)分區(qū)運(yùn)行一個(gè)Task任務(wù)來進(jìn)行處理。Spark默認(rèn)會(huì)根據(jù)集群的情況來設(shè)置partition的數(shù)量。當(dāng)調(diào)用parallelize()方法的時(shí),若不指定分區(qū)數(shù),則使用系統(tǒng)給出的分區(qū)數(shù)。>>>rdd4=sc.parallelize([1,2,3,4,5,6],3)>>>rdd4.getNumPartitions()#獲取rdd的分區(qū)數(shù)3在上述語句中,使用了Spark提供的SparkContext對(duì)象,名稱為sc,這是PySpark啟動(dòng)的時(shí)候自動(dòng)創(chuàng)建的,在交互式編程環(huán)境中可以直接使用。1RDD創(chuàng)建的方式調(diào)用SparkContext對(duì)象的textFile()方法讀取指定位置的文件即可創(chuàng)建RDD。textFile()方法支持針對(duì)目錄、文本文件、壓縮文件以及通配符匹配的文件創(chuàng)建RDD。Spark支持的一些常見文件格式如表所示。使用文本文件創(chuàng)建RDD讀取HDFS中的文本文件創(chuàng)建RDD讀取HDFS中的"/user/hadoop/input/data.txt"文件創(chuàng)建RDD之前,需要先啟動(dòng)Hadoop系統(tǒng),命令如下:$cd/usr/local/hadoop$./sbin/start-dfs.sh#啟動(dòng)Hadoop#讀取HDFS上的文件創(chuàng)建RDD>>>rdd=sc.textFile("/user/hadoop/input/data.txt")1RDD創(chuàng)建的方式使用文本文件創(chuàng)建RDD讀取HDFS中的文本文件創(chuàng)建RDD使用textFile()方法讀取文件創(chuàng)建RDD時(shí),可指定分區(qū)的個(gè)數(shù),語法格式如下:>>>rdd=sc.textFile("/user/hadoop/input/data.txt",3)#創(chuàng)建包含3個(gè)分區(qū)的RDD對(duì)象讀取本地的文本文件創(chuàng)建RDD從Linux本地讀取文件也是通過sc.textFile("路徑")方法,但需要在路徑前面加上“file:”讀取Linux本地的"/home/hadoop/data.txt"文件創(chuàng)建一個(gè)RDD。>>>rdd1=sc.textFile("file:/home/hadoop/data.txt")#讀取本地文件讀取目錄創(chuàng)建RDD/home/hadoop/input目錄中有文件text1.txt和文件text2.txt,text1.txt中的內(nèi)容為:HelloSpark;text2.txt中的內(nèi)容為:HelloPython。1RDD創(chuàng)建的方式使用文本文件創(chuàng)建RDD讀取目錄創(chuàng)建RDD/home/hadoop/input目錄中有文件text1.txt和文件text2.txt,text1.txt中的內(nèi)容為:HelloSpark;text2.txt中的內(nèi)容為:HelloPython。>>>rddw1=sc.textFile("file:/home/hadoop/input")#讀取本地文件夾>>>rddw1.collect()['HelloPython','HelloSpark']使用wholeTextFiles()方法讀取目錄創(chuàng)建RDDSparkContext對(duì)象的wholeTextFiles()讀取給定目錄(文件夾)中的所有文件,可在輸入路徑中使用通配字符(如part-*.txt)。>>>rddw2=sc.wholeTextFiles("file:/home/hadoop/input")#讀取本地文件夾>>>rddw2.collect()[('file:/home/hadoop/input/text2.txt','HelloPython\n'),('file:/home/hadoop/input/text1.txt','HelloSpark\n')]1RDD創(chuàng)建的方式使用JSON文件創(chuàng)建RDDJSON(JavaScriptObjectNotation)是一種輕量級(jí)的數(shù)據(jù)交換格式,JSON文件在許多不同的編程API中都被支持。JSON格式的四條規(guī)則:(1)并列的數(shù)據(jù)之間用逗號(hào)“,”分隔;(2)映射(鍵值對(duì))用冒號(hào)“:”表示;(3)并列數(shù)據(jù)的集合(數(shù)組)用方括號(hào)“[]”表示;(4)映射(鍵值對(duì))的集合(對(duì)象)用大括號(hào)“{}”表示;(5)元素值可具有的類型:string,number,object(對(duì)象),array(數(shù)組),true,false,null。JSON文件的創(chuàng)建的一種方法:新建一個(gè)文本檔案,.txt結(jié)尾的;在文檔中寫入JSON數(shù)據(jù),保存;將文檔后綴.txt修改成.json就可以成為JSON文件了。讀取JSON文件創(chuàng)建RDD的最簡(jiǎn)單方法是是將JSON文件作為文本文件讀取。>>>jsonStr=sc.textFile("file:/home/hadoop/student.json")1RDD創(chuàng)建的方式使用CSV文件創(chuàng)建RDDCSV(CommaSeparatedValues,逗號(hào)分隔值)文件是一種用來存儲(chǔ)表格數(shù)據(jù)(數(shù)字和文本)的純文本格式文件,文檔的內(nèi)容是由“,”分隔的一列列的數(shù)據(jù)構(gòu)成,它可以被導(dǎo)入各種電子表格和數(shù)據(jù)庫中。#使用textFile()方法讀取grade.csv文件創(chuàng)建RDD>>>gradeRDD=sc.textFile("file:/home/hadoop/grade.csv")#創(chuàng)建RDD在CSV文件中,列與列之間以逗號(hào)分隔。CSV文件由任意數(shù)目的記錄組成,記錄間以某種換行符分隔,一行即為一條記錄。如果CSV文件的所有數(shù)據(jù)字段均沒有包含換行符,可以使用textFile()方法讀取并解析數(shù)據(jù)。

SparkRDD編程1 RDD創(chuàng)建的方式2 RDD轉(zhuǎn)換操作3

RDD行動(dòng)操作4RDD之間的依賴關(guān)系5RDD的持久化6案例實(shí)戰(zhàn):SparkRDD實(shí)現(xiàn)詞頻統(tǒng)計(jì)2RDD轉(zhuǎn)換操作映射操作映射操作方法主要有map()、flatMap()、mapValues()、flatMapValues()和mapPartitions()。map(func)映射轉(zhuǎn)換操作map(func)將一個(gè)RDD中的每個(gè)元素執(zhí)行func函數(shù)計(jì)算得到新元素,這些新元素組成的RDD作為map(func)的返回結(jié)果。>>>rdd1=sc.parallelize([1,2,3,4])>>>result=rdd1.map(lambdax:x*2)#用map()對(duì)rdd1中的每個(gè)數(shù)進(jìn)行乘2操作>>>result.collect()#以列表形式返回RDD中的所有元素[2,4,6,8]上述代碼中,向map()操作傳入了一個(gè)匿名函數(shù)lambdax:x*2,其中,x為函數(shù)的參數(shù)名稱,也可以使用其他字符,如y,x+2為函數(shù)解析式,用來實(shí)現(xiàn)函數(shù)變化。collect()為行動(dòng)操作,將生成的RDD對(duì)象result轉(zhuǎn)化為list類型,同時(shí)可實(shí)現(xiàn)查看RDD中數(shù)據(jù)的效果。2RDD轉(zhuǎn)換操作映射操作flatMap(func)映射轉(zhuǎn)換操作flatMap(func)類似于map(func),但又有所不同。flatMap(func)中的func函數(shù)會(huì)返回0到多個(gè)元素,flatMap(func)將func函數(shù)會(huì)返回的0到多個(gè)元素合并生成一個(gè)RDD作為flatMap(func)的返回值。mapValues()映射轉(zhuǎn)換操作mapValues(func)對(duì)鍵值對(duì)(key,value)組成的RDD對(duì)象每個(gè)value都應(yīng)用一個(gè)函數(shù),返回新的RDD,但是,key不會(huì)發(fā)生變化。鍵值對(duì)RDD是指RDD中的每個(gè)元素都是(key,value)二元組,key稱為鍵,value稱值。flatMapValues()flatMapValues(<func)轉(zhuǎn)換操作把鍵值對(duì)RDD的每個(gè)值都傳給一個(gè)函數(shù)處理,而鍵保持不變,并生成拍平的列表。對(duì)于每個(gè)輸入元素,返回0個(gè)乃至多個(gè)輸出元素。使用flatMapValues(<func)是會(huì)保留原RDD的分區(qū)情況。2RDD轉(zhuǎn)換操作映射操作mapPartitions()映射轉(zhuǎn)換操作mapPartitions(func)是對(duì)每個(gè)分區(qū)數(shù)據(jù)執(zhí)行指定函數(shù)。>>>rdd=sc.parallelize([1,2,3,4],2)>>>rdd.glom().collect()#查看每個(gè)分區(qū)中的數(shù)據(jù)[[1,2],[3,4]]>>>deff(x):yieldsum(x)>>>rdd.mapPartitions(f).collect()#對(duì)每個(gè)分區(qū)中的數(shù)據(jù)執(zhí)行f函數(shù)操作[3,7]2RDD轉(zhuǎn)換操作去重操作filter(func)過濾轉(zhuǎn)換操作filter(func)使用過濾函數(shù)func過濾RDD中的元素,func函數(shù)的返回值為Boolean類型,filter(func)返回由使func函后返回值為true的元素組成新的RDD。>>>rdd4=sc.parallelize([1,2,2,3,4,3,5,7,9])>>>rdd4.filter(lambdax:x>4).collect()#對(duì)rdd4進(jìn)行過濾,得到大于4的數(shù)據(jù)[5,7,9]distinct([numPartitions])去重轉(zhuǎn)換操作distinct([numPartitions])對(duì)RDD中的數(shù)據(jù)進(jìn)行去重操作,返回一個(gè)新的RDD,其中,可選參數(shù)numPartitions用來設(shè)置操作的并行任務(wù)數(shù)量。>>>Rdd=sc.parallelize([1,2,1,5,3,5,4,8,6,4])>>>distinctRdd=Rdd.distinct()>>>distinctRdd.collect()[1,2,5,3,4,8,6]2RDD轉(zhuǎn)換操作排序操作sortByKey(ascending,[numPartitions])排序轉(zhuǎn)換操作sortByKey(ascending,numPartitions)是對(duì)RDD中的數(shù)據(jù)集進(jìn)行排序操作,對(duì)(K,V)鍵值對(duì)類型的數(shù)據(jù)按照鍵K進(jìn)行排序,返回一個(gè)按照K進(jìn)行排序后的(K,V)鍵值對(duì)類型的的RDD。參數(shù)ascending用來指定是升序還是降序,默認(rèn)值是true,按升序排序。參數(shù)numPartitions用來指定排序分區(qū)的并行任務(wù)個(gè)數(shù)。>>>rdd=sc.parallelize([("WangLi",1),("LiHua",3),("LiuFei",2),("XuFeng",1)])>>>rdd.collect()[('WangLi',1),('LiHua',3),('LiuFei',2),('XuFeng',1)]#False降序>>>rdd1=rdd.sortByKey(False)>>>rdd1.collect()[('XuFeng',1),('WangLi',1),('LiuFei',2),('LiHua',3)]2RDD轉(zhuǎn)換操作排序操作sortBy(keyfunc,[ascending:Boolean=true],[numPartitions])轉(zhuǎn)換操作sortBy()使用keyfunc函數(shù)先對(duì)數(shù)據(jù)進(jìn)行處理,按照處理后的數(shù)據(jù)排序,默認(rèn)為升序。sortBy()可以指定按鍵Key還是按值Value進(jìn)行排序。第一個(gè)參數(shù)keyfunc是一個(gè)函數(shù),sortBy()按keyfunc對(duì)RDD中的每個(gè)元素計(jì)算的結(jié)果對(duì)RDD中的元素進(jìn)行排序。第二個(gè)參數(shù)是ascending,決定排序后RDD中的元素是升序還是降序,默認(rèn)是True,按升序排列。第三個(gè)參數(shù)是numPartitions,該參數(shù)決定排序后的RDD的分區(qū)個(gè)數(shù),默認(rèn)排序后的分區(qū)個(gè)數(shù)和排序之前的個(gè)數(shù)相等。>>>goods=sc.parallelize(["radio3050","soap360","cup650","bowl480"])>>>goodsTup=goods.map(lambdax:(x.split("")[0],int(x.split("")[1]),int(x.split("")[2])))>>>goodsTup.sortBy(lambdax:x[0]).foreach(print)#按商品名稱進(jìn)行排序('bowl',4,80)('cup',6,50)('radio',30,50)('soap',3,60)2RDD轉(zhuǎn)換操作分組聚合操作groupBy()分組操作groupBy(<function>)轉(zhuǎn)化操作返回一個(gè)按指定條件(用函數(shù)表示)對(duì)元素進(jìn)行分組的RDD。參數(shù)<function>可以是有名稱的函數(shù),也可以是匿名函數(shù),用來指定對(duì)所有元素進(jìn)行分組的鍵,或者指定對(duì)元素進(jìn)行求值以確定其所屬分組的表達(dá)式。注意,groupBy()返回的是一個(gè)可迭代對(duì)象。>>>rdd=sc.parallelize([1,2,3,4,5,6,7,8])>>>res=rdd.groupBy(lambdax:x%2).collect()>>>forx,yinres:#輸出迭代器的具體值

print(x)print(y)print(sorted(y))print("*"*44)

1<pyspark.resultiterable.ResultIterableobjectat0x7fe71012ea60>[1,3,5,7]********************************************0<pyspark.resultiterable.ResultIterableobjectat0x7fe70de43bb0>[2,4,6,8]********************************************2RDD轉(zhuǎn)換操作分組聚合操作groupByKey()分組聚合操作groupByKey()對(duì)一個(gè)由鍵值對(duì)(K,V)組成的RDD進(jìn)行分組聚合操作,返回由(K,Seq[V])鍵值對(duì)組成的新RDD,Seq[V]表示由鍵K相同的值V所組成的序列。>>>rdd=sc.parallelize([("Spark",1),("Spark",1),("Hadoop",1),("Hadoop",1)])>>>rdd.groupByKey().map(lambdax:(x[0],list(x[1]))).collect()[('Spark',[1,1]),('Hadoop',[1,1])]>>>rdd.groupByKey().map(lambdax:(x[0],len(list(x[1])))).collect()[('Spark',2),('Hadoop',2)]2RDD轉(zhuǎn)換操作分組聚合操作groupWith(other,*others)分組聚合操作groupWith(otherRDD,*others)把多個(gè)RDD按key進(jìn)行分組;輸出(key,迭代器)形式的數(shù)據(jù)。分組后的數(shù)據(jù)是有順序的,每個(gè)key對(duì)應(yīng)的value是按列出RDD的順序排放的,如果參數(shù)RDD沒有這個(gè)key,則對(duì)應(yīng)位置取空值。>>>w=sc.parallelize([("a","w"),("b","w")])>>>x=sc.parallelize([("a","x"),("b","x")])>>>y=sc.parallelize([("a","y")])>>>z=sc.parallelize([("b","z")])>>>w.groupWith(x,y,z)#迭代輸出每個(gè)分組>>>[(x,tuple(map(list,y)))forx,yinlist(w.groupWith(x,y,z).collect())][('b',(['w'],['x'],[],['z'])),('a',(['w'],['x'],['y'],[]))]2RDD轉(zhuǎn)換操作分組聚合操作reduceByKey(func)分組聚合操作reduceByKey(func)對(duì)一個(gè)由鍵值對(duì)(K,V)組成的RDD進(jìn)行聚合操作,對(duì)K相同的值V,使用指定的reduce函數(shù)func聚合到一起。>>>rdd=sc.parallelize([("Spark",1),("Spark",2),("Hadoop",1),("Hadoop",5)])>>>rdd.reduceByKey(lambdax,y:x+y).collect()[('Spark',3),('Hadoop',6)]2RDD轉(zhuǎn)換操作分組聚合操作combineByKey()分組聚合操作combineByKey(createCombiner,mergeValue,mergeCombiners)是對(duì)鍵值對(duì)RDD中的每個(gè)鍵值對(duì)(K,V)按照K進(jìn)行聚合操作,即合并相同鍵的值。聚合操作的邏輯是通過自定義函數(shù)提供給combineByKey()。把鍵值對(duì)(K,V)類型的RDD轉(zhuǎn)換為鍵值對(duì)(K,C)類型的RDD。三個(gè)參數(shù)含義如下:createCombiner:類型是函數(shù),在遍歷(K,V)時(shí),若combineByKey()是第一次遇到鍵為K的鍵值對(duì),則將對(duì)該(K,V)鍵值對(duì)調(diào)用createCombiner()函數(shù)將V轉(zhuǎn)換為C(聚合對(duì)象類型),C會(huì)作為鍵K的累加器的初始值。mergeValue:類型是函數(shù),在遍歷(K,V)是,若comineByKey()不是第一次遇到鍵為K的鍵值對(duì),則將對(duì)該(K,V)鍵值對(duì)調(diào)用mergeValue()函數(shù)將V累加到聚合對(duì)象C中。mergeCombiners:類型是函數(shù),combineByKey()是在分布式環(huán)境中執(zhí)行的,RDD的每個(gè)分區(qū)單獨(dú)進(jìn)行combineBykey()操作,最后需要對(duì)各個(gè)分區(qū)進(jìn)行最后的聚合。2RDD轉(zhuǎn)換操作分組聚合操作combineByKey()分組聚合操作>>>defcreateCombiner(value):

#定義createCombiner函數(shù)return(value,1)>>>defmergeValue(acc,value):#定義mergeValue函數(shù)return(acc[0]+value,acc[1]+1)>>>defmergeCombiners(acc1,acc2):#定義mergeCombiners函數(shù)return(acc1[0]+acc2[0],acc1[1]+acc2[1])#創(chuàng)建成績(jī)RDD對(duì)象>>>Rdd=sc.parallelize([('ID1',80),('ID2',85),('ID1',90),('ID2',95),('ID3',99)],2)>>>combineByKeyRdd=RbineByKey(createCombiner,mergeValue,mergeCombiners)>>>combineByKeyRdd.collect()[('ID1',(170,2)),('ID2',(180,2)),('ID3',(99,1))]#求平均成績(jī)>>>avgRdd=combineByKeyRdd.map(lambdax:(x[0],float(x[1][0])/x[1][1]))>>>avgRdd.collect()[('ID1',85.0),('ID2',90.0),('ID3',99.0)]2RDD轉(zhuǎn)換操作集合操作union(otherDataset)合并轉(zhuǎn)換操作union(otherDataset)對(duì)源RDD和參數(shù)RDD求并集后返回一個(gè)新的RDD,不進(jìn)行去重操作。intersection(otherRDD)交集且去重轉(zhuǎn)換操作intersection(otherRDD)對(duì)源RDD和參數(shù)RDD求交集后返回一個(gè)新的RDD,且去重。subtract(otherRDD)差集轉(zhuǎn)換操作subtract(otherRDD)相當(dāng)于進(jìn)行集合的差集操作,即RDD去除其與otherRDD相同的元素。cartesian(otherRDD)笛卡爾積轉(zhuǎn)換操作cartesian(otherRDD)是對(duì)兩個(gè)RDD進(jìn)行笛卡爾積操作。2RDD轉(zhuǎn)換操作抽樣操作sample()抽樣sample(withReplacement,fraction,seed)抽樣操作以指定的抽樣種子seed從RDD的數(shù)據(jù)中抽樣出抽取比例為fraction的數(shù)據(jù),withReplacement表示抽出的數(shù)據(jù)是否放回,True為有放回的抽樣,F(xiàn)alse為無放回的抽樣,相同的seed得到的隨機(jī)序列一樣。>>>SampleRDD=sc.parallelize(list(range(1,1000)))>>>SampleRDD.sample(False,0.01,1).collect()#輸出取樣[14,100,320,655,777,847,858,884,895,935]2RDD轉(zhuǎn)換操作抽樣操作sampleByKey()抽樣sampleByKey(withReplacement,fractions,seed)按key值按比例抽樣,withReplacement表示是否有放回,fractions表示抽樣比例,seed抽樣種子。>>>fractions={"a":0.5,"b":0.1}>>>rdd=sc.parallelize(fractions.keys(),3).cartesian(sc.parallelize(range(0,10),2))>>>sample=dict(rdd.sampleByKey(False,fractions,2).groupByKey(3).collect())>>>[(iter[0],list(iter[1]))foriterinsample.items()][('b',[5,9]),('a',[1,4,5,7])]2RDD轉(zhuǎn)換操作連接操作join(otherDataset,[numPartitions])連接操作join()對(duì)兩個(gè)鍵值對(duì)數(shù)據(jù)的RDD進(jìn)行內(nèi)連接,將兩個(gè)RDD中鍵相同的(K,V)和(K,W)進(jìn)行連接,返回(K,(V,W))鍵值對(duì)。>>>pairRDD1=sc.parallelize([("Scala",2),("Scala",3),("Java",4),("Python",8)])>>>pairRDD2=sc.parallelize([("Scala",3),("Java",5),("HBase",4),("Java",10)])>>>pairRDD3=pairRDD1.join(pairRDD2)>>>pairRDD3.collect()[('Java',(4,5)),('Java',(4,10)),('Scala',(2,3)),('Scala',(3,3))]2RDD轉(zhuǎn)換操作連接操作leftOuterJoin()左外連接操作leftOuterJoin()可用來對(duì)兩個(gè)鍵值對(duì)的RDD進(jìn)行左外連接,保留第一個(gè)RDD的所有鍵。在leftOuterJoin()左連接中,如果右邊RDD中有對(duì)應(yīng)的鍵,連接結(jié)果中顯示為Some類型,表示有值可以引用;如果沒有對(duì)應(yīng)的鍵,則為None值。rightOuterJoin()右外連接操作rightOuterJoin()可用來對(duì)兩個(gè)鍵值對(duì)的RDD進(jìn)行右外連接,確保第二個(gè)RDD的鍵必須存在,即保留第二個(gè)RDD的所有鍵。fullOuterJoin()全外連接操作fullOuterJoin()是一種全外連接,會(huì)保留兩個(gè)連接的RDD中所有鍵的連接結(jié)果。2RDD轉(zhuǎn)換操作打包操作zip(otherDataset)轉(zhuǎn)換操作將兩個(gè)RDD打包成(K,V)鍵值對(duì)形式的RDD,要求兩個(gè)RDD的分區(qū)數(shù)量以及每個(gè)分區(qū)中元素的數(shù)量都相同。>>>rdd1=sc.parallelize([1,2,3],3)>>>rdd2=sc.parallelize(["a","b","c"],3)>>>zipRDD=rdd1.zip(rdd2)>>>zipRDD.collect()[(1,'a'),(2,'b'),(3,'c')]獲取鍵值對(duì)RDD的鍵和值對(duì)一個(gè)鍵值對(duì)的RDD,調(diào)用keys()返回一個(gè)僅包含鍵的RDD,調(diào)用values()返回一個(gè)僅包含值的RDD。>>>zipRDD.keys().collect()[1,2,3]>>>zipRDD.values().collect()['a','b','c']2RDD轉(zhuǎn)換操作coalesce(numPartitions:Int)重新分區(qū)轉(zhuǎn)換操作coalesce(numPartitions,shuffle)作用:默認(rèn)使用HashPartitioner哈希分區(qū)方式對(duì)RDD進(jìn)行重新分區(qū),返回一個(gè)新的RDD,且該RDD的分區(qū)個(gè)數(shù)等于numPartitions個(gè)數(shù)。參數(shù)說明:numPartitions:擬要生成的新RDD的分區(qū)個(gè)數(shù)。shuffle:是否進(jìn)行shuffle,默認(rèn)為False,重設(shè)分區(qū)個(gè)數(shù)只能比RDD原有分區(qū)數(shù)?。蝗绻鹲huffle為True,重設(shè)的分區(qū)數(shù)不受原有RDD分區(qū)數(shù)的限制。>>>rdd=sc.parallelize(range(1,17),4)#創(chuàng)建RDD,分區(qū)數(shù)量為4>>>rdd.getNumPartitions()#查看RDD分區(qū)個(gè)數(shù)4>>>coalRDD=rdd.coalesce(5)#重新分區(qū),分區(qū)數(shù)量為5>>>coalRDD.getNumPartitions()4重新分區(qū)操作2RDD轉(zhuǎn)換操作repartition(numPartitions)重新分區(qū)轉(zhuǎn)換操作repartition(numPartitions)其實(shí)就是coalesce()方法的第二個(gè)參數(shù)shuffle為True的簡(jiǎn)單實(shí)現(xiàn)。>>>coalRDD2=coalRDD1.repartition(2)#轉(zhuǎn)換成2個(gè)分區(qū)的RDD>>>coalRDD2.getNumPartitions()#查看coalRDD2分區(qū)個(gè)數(shù)2Spark支持自定義分區(qū)方式,即通過一個(gè)自定義的分區(qū)函數(shù)對(duì)RDD進(jìn)行分區(qū)。需要注意,Spark的分區(qū)函數(shù)針對(duì)的是(key,value)鍵值對(duì)類型的RDD,分區(qū)函數(shù)根據(jù)key對(duì)RDD的元素進(jìn)行分區(qū)。因此,當(dāng)需要對(duì)一些非(key,value)類型的RDD進(jìn)行自定義分區(qū)時(shí),需要先把RDD轉(zhuǎn)換成(key,value)類型的RDD,然后再使用分區(qū)函數(shù)。重新分區(qū)操作

SparkRDD編程1 RDD創(chuàng)建的方式2 RDD轉(zhuǎn)換操作3RDD行動(dòng)操作4RDD之間的依賴關(guān)系5RDD的持久化6案例實(shí)戰(zhàn):SparkRDD實(shí)現(xiàn)詞頻統(tǒng)計(jì)3RDD行動(dòng)操作行動(dòng)操作則是向驅(qū)動(dòng)器程序返回結(jié)果或把結(jié)果寫入外部系統(tǒng)的操作,會(huì)觸發(fā)實(shí)際的計(jì)算。行動(dòng)操作接受RDD,但是返回非RDD,即輸出一個(gè)結(jié)果值,并把結(jié)果值返回到驅(qū)動(dòng)器程序中。統(tǒng)計(jì)操作sum()求和:sum()返回RDD對(duì)象中數(shù)據(jù)的和。max()求最大值和min()求最小值mean()求平均值stdev()求標(biāo)準(zhǔn)差stats()描述統(tǒng)計(jì)count()求數(shù)據(jù)個(gè)數(shù)countByValue()求數(shù)據(jù)出現(xiàn)的次數(shù)countByKey()求key相同的鍵值對(duì)數(shù)量:countByKey()返回鍵值對(duì)(key,value)類型的RDD中key相同的鍵值對(duì)數(shù)量,返回值的類型是字典類型。3RDD行動(dòng)操作取數(shù)據(jù)操作collect()返回RDD中的所有元素first()返回RDD的第1個(gè)元素collect()方法以列表形式返回RDD中的所有元素。>>>rddInt=sc.parallelize([1,2,3,4,5,6,2,5,1])#創(chuàng)建RDD>>>rddList=rddInt.collect()>>>rddList[1,2,3,4,5,6,2,5,1]返回RDD的第1個(gè)元素。first()不考慮元素的順序,是一個(gè)非確定性的操作,尤其是在完全分布式的環(huán)境中。take(n)返回RDD的前n個(gè)元素take(n)返回RDD的前n個(gè)元素。選取的元素沒有特定的順序。事實(shí)上,take(n)返回的元素是不確定的,這意味著再次運(yùn)行該操作時(shí),返回的元素可能會(huì)不同,尤其是在完全分布式的環(huán)境中。3RDD行動(dòng)操作取數(shù)據(jù)操作top(n)返回一個(gè)RDD的前n個(gè)元素lookup(key)行動(dòng)操作top(n)以列表的形式返回RDD中按照指定排序(默認(rèn)降序)方式排序后的最前面的n個(gè)元素。lookup()用于(K,V)鍵值對(duì)類型的RDD,查找指定鍵K的值V,返回RDD中該K對(duì)應(yīng)的所有V值。reduce(<function>)歸約操作reduce(<function>)使用指定的滿足交換律或結(jié)合律的運(yùn)算符(由函數(shù)定義)來歸約RDD中的所有元素,這里的交換律和結(jié)合律表示操作與執(zhí)行的順序無關(guān),這是分布式處理所要求的,因?yàn)樵诜植际教幚碇?,順序無法保證。參數(shù)<function>指定接收兩個(gè)輸入的匿名函數(shù)(lambdax,y:....)。聚合操作3RDD行動(dòng)操作fold(zeroValue,<function>)歸約操作fold()使用給定的function和zeroValue把RDD中每個(gè)分區(qū)的元素規(guī)約,然后把每個(gè)分區(qū)的聚合結(jié)果再規(guī)約。盡管reduce()和fold()的功能相似,但還是有區(qū)別的,fold()不滿足交換律,需要給定初始值(zeroValue)。

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論