




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
Spark編程基礎(chǔ)2021年,某公司為了提高員工工作的積極性,將對(duì)公司員工進(jìn)行一次調(diào)薪,需要根據(jù)員工在2020年的薪資情況及在職表現(xiàn)重新調(diào)整薪資,對(duì)于愛崗敬業(yè)的公司員工,公司擬根據(jù)其業(yè)績分析情況予以不同程度漲薪。公司有員工2020年上半年薪資文件(Employee_salary_first_half.csv)和下半年薪資文件(Employee_salary_second_half.csv),兩份文件的數(shù)據(jù)格式和數(shù)據(jù)字段均相同,以員工2020年上半年的薪資文件Employee_salary_first_half.csv為例,文件共有10個(gè)數(shù)據(jù)字段。任務(wù)背景字段名稱說明
字段名稱說明EmpID員工IDGROSS總薪資Name姓名Net_Pay實(shí)際薪資Gender性別Deduction薪資扣除部分Date_of_Birth出生日期Designation職位Age年齡Department部門為了保證較高的數(shù)據(jù)處理效率,將使用Spark統(tǒng)計(jì)每一位員工2020年的薪資情況。本章將介紹SparkRDD的創(chuàng)建方法、RDD和鍵值對(duì)RDD的轉(zhuǎn)換操作和行動(dòng)操作的基礎(chǔ)使用,并通過Spark編程實(shí)現(xiàn)員工薪資數(shù)據(jù)的統(tǒng)計(jì)分析。任務(wù)背景1查詢上半年實(shí)際薪資排名前3的員工信息目錄讀取員工薪資數(shù)據(jù)創(chuàng)建RDD2查詢上半年或下半年實(shí)際薪資大于20萬元的員工姓名3RDD是一個(gè)容錯(cuò)的、只讀的、可進(jìn)行并行操作的數(shù)據(jù)結(jié)構(gòu),是一個(gè)分布在集群各個(gè)節(jié)點(diǎn)中的存放元素的集合。RDD的創(chuàng)建有3種不同的方法。第一種是將程序中已存在的Seq集合(如集合、列表、數(shù)組)轉(zhuǎn)換成RDD。第二種是對(duì)已有RDD進(jìn)行轉(zhuǎn)換得到新的RDD,這兩種方法都是通過內(nèi)存中已有的集合創(chuàng)建RDD的。第三種是直接讀取外部存儲(chǔ)系統(tǒng)的數(shù)據(jù)創(chuàng)建RDD。為方便后續(xù)的員工薪資分析,本節(jié)的任務(wù)是讀取員工上半年和下半年的薪資數(shù)據(jù)創(chuàng)建RDD。任務(wù)描述parallelize()方法有兩個(gè)輸入?yún)?shù),說明如下。要轉(zhuǎn)化的集合,必須是Seq集合。Seq表示序列,指的是一類具有一定長度的、可迭代訪問的對(duì)象,其中每個(gè)數(shù)據(jù)元素均帶有一個(gè)從0開始的、固定的索引。分區(qū)數(shù)。若不設(shè)分區(qū)數(shù),則RDD的分區(qū)數(shù)默認(rèn)為該程序分配到的資源的CPU核心數(shù)。從內(nèi)存中讀取數(shù)據(jù)創(chuàng)建RDD1.parallelize()makeRDD()方法有兩種使用方式。第一種方式的使用與parallelize()方法一致;第二種方式是通過接收一個(gè)是Seq[(T,Seq[String])]參數(shù)類型創(chuàng)建RDD。第二種方式生成的RDD中保存的是T的值,Seq[String]部分的數(shù)據(jù)會(huì)按照Seq[(T,Seq[String])]的順序存放到各個(gè)分區(qū)中,一個(gè)Seq[String]對(duì)應(yīng)存放至一個(gè)分區(qū),并為數(shù)據(jù)提供位置信息,通過preferredLocations()方法可以根據(jù)位置信息查看每一個(gè)分區(qū)的值。調(diào)用makeRDD()時(shí)不可以直接指定RDD的分區(qū)個(gè)數(shù),分區(qū)的個(gè)數(shù)與Seq[String]參數(shù)的個(gè)數(shù)是保持一致的。從內(nèi)存中讀取數(shù)據(jù)創(chuàng)建RDD2.makeRDD()從外部存儲(chǔ)系統(tǒng)中讀取數(shù)據(jù)創(chuàng)建RDD是指直接讀取存放在文件系統(tǒng)中的數(shù)據(jù)文件創(chuàng)建RDD。從內(nèi)存中讀取數(shù)據(jù)創(chuàng)建RDD的方法常用于測試,從外部存儲(chǔ)系統(tǒng)中讀取數(shù)據(jù)創(chuàng)建RDD才是用于實(shí)踐操作的常用方法。從外部存儲(chǔ)系統(tǒng)中讀取數(shù)據(jù)創(chuàng)建RDD可以有很多種數(shù)據(jù)來源,可通過SparkContext對(duì)象的textFile()方法讀取數(shù)據(jù)集,該方法支持多種類型的數(shù)據(jù)集,如目錄、文本文件、壓縮文件和通配符匹配的文件等,并且允許設(shè)定分區(qū)個(gè)數(shù)。從外部存儲(chǔ)系統(tǒng)中讀取數(shù)據(jù)創(chuàng)建RDD分別讀取HDFS文件和Linux本地文件的數(shù)據(jù)并創(chuàng)建RDD,具體操作如下。通過HDFS文件創(chuàng)建RDD直接通過textFile()方法讀取HDFS文件的位置即可。通過Linux本地文件創(chuàng)建RDD本地文件的讀取也是通過sc.textFile("路徑")的方法實(shí)現(xiàn)的,在路徑前面加上“file://”表示從Linux本地文件系統(tǒng)讀取。在IntelliJIDEA開發(fā)環(huán)境中可以直接讀取本地文件;但在spark-shell中,要求在所有節(jié)點(diǎn)的相同位置保存該文件才可以讀取它.從外部存儲(chǔ)系統(tǒng)中讀取數(shù)據(jù)創(chuàng)建RDD讀取員工上、下半年薪資數(shù)據(jù),并創(chuàng)建RDD。由于數(shù)據(jù)比較多,因此適合通過讀取HDFS上的數(shù)據(jù)創(chuàng)建。首先需要將數(shù)據(jù)上傳至HDFS的/user/root目錄下。在spark-shell中讀取HDFS上的員工上、下半年薪資數(shù)據(jù),創(chuàng)建RDD。任務(wù)實(shí)現(xiàn)1查詢上半年實(shí)際薪資排名前3的員工信息目錄讀取員工薪資數(shù)據(jù)創(chuàng)建RDD2查詢上半年或下半年實(shí)際薪資大于20萬元的員工姓名3SparkRDD提供了豐富的操作方法用于操作分布式的數(shù)據(jù)集合,包括轉(zhuǎn)換操作和行動(dòng)操作兩部分。轉(zhuǎn)換操作可以將一個(gè)RDD轉(zhuǎn)換為一個(gè)新的RDD,但是轉(zhuǎn)換操作是懶操作,不會(huì)立刻執(zhí)行計(jì)算;行動(dòng)操作是用于觸發(fā)轉(zhuǎn)換操作的操作,這時(shí)才會(huì)真正開始進(jìn)行計(jì)算。本節(jié)的任務(wù)如下。使用RDD的基本操作完成對(duì)員工上半年實(shí)際薪資的排名。找出薪資排名前3的員工信息。任務(wù)描述map()方法是一種基礎(chǔ)的RDD轉(zhuǎn)換操作,可以對(duì)RDD中的每一個(gè)數(shù)據(jù)元素通過某種函數(shù)進(jìn)行轉(zhuǎn)換并返回新的RDD。map()方法是轉(zhuǎn)換操作,不會(huì)立即進(jìn)行計(jì)算。轉(zhuǎn)換操作是創(chuàng)建RDD的第二種方法,通過轉(zhuǎn)換已有RDD生成新的RDD。因?yàn)镽DD是一個(gè)不可變的集合,所以如果對(duì)RDD數(shù)據(jù)進(jìn)行了某種轉(zhuǎn)換,那么會(huì)生成一個(gè)新的RDD。使用map()方法轉(zhuǎn)換數(shù)據(jù)sortBy()方法用于對(duì)標(biāo)準(zhǔn)RDD進(jìn)行排序,有3個(gè)可輸入?yún)?shù),說明如下。第1個(gè)參數(shù)是一個(gè)函數(shù)f:(T)=>K,左邊是要被排序?qū)ο笾械拿恳粋€(gè)元素,右邊返回的值是元素中要進(jìn)行排序的值。第2個(gè)參數(shù)是ascending,決定排序后RDD中的元素是升序的還是降序的,默認(rèn)是true,即升序排序,如果需要降序排序那么需要將參數(shù)的值設(shè)置為false。第3個(gè)參數(shù)是numPartitions,決定排序后的RDD的分區(qū)個(gè)數(shù),默認(rèn)排序后的分區(qū)個(gè)數(shù)和排序之前的分區(qū)個(gè)數(shù)相等,即this.partitions.size。第一個(gè)參數(shù)是必須輸入的,而后面的兩個(gè)參數(shù)可以不輸入。使用sortBy()方法進(jìn)行排序collect()方法是一種行動(dòng)操作,可以將RDD中所有元素轉(zhuǎn)換成數(shù)組并返回到Driver端,適用于返回處理后的少量數(shù)據(jù)。因?yàn)樾枰獜募焊鱾€(gè)節(jié)點(diǎn)收集數(shù)據(jù)到本地,經(jīng)過網(wǎng)絡(luò)傳輸,并且加載到Driver內(nèi)存中,所以如果數(shù)據(jù)量比較大,會(huì)給網(wǎng)絡(luò)傳輸造成很大的壓力。因此,數(shù)據(jù)量較大時(shí),盡量不使用collect()方法,否則可能導(dǎo)致Driver端出現(xiàn)內(nèi)存溢出問題。使用collect()方法查詢數(shù)據(jù)collect()方法有以下兩種操作方式。collect:直接調(diào)用collect返回該RDD中的所有元素,返回類型是一個(gè)Array[T]數(shù)組。collect[U:ClassTag](f:PartialFunction[T,U]):RDD[U]。這種方式需要提供一個(gè)標(biāo)準(zhǔn)的偏函數(shù),將元素保存至一個(gè)RDD中。首先定義一個(gè)函數(shù)one,用于將collect方法得到的數(shù)組中數(shù)值為1的值替換為“one”,將其他值替換為“other”。使用collect()方法查詢數(shù)據(jù)flatMap()方法將函數(shù)參數(shù)應(yīng)用于RDD之中的每一個(gè)元素,將返回的迭代器(如數(shù)組、列表等)中的所有元素構(gòu)成新的RDD。使用flatMap()方法時(shí)先進(jìn)行map(映射)再進(jìn)行flat(扁平化)操作,數(shù)據(jù)會(huì)先經(jīng)過跟map一樣的操作,為每一條輸入返回一個(gè)迭代器(可迭代的數(shù)據(jù)類型),然后將所得到的不同級(jí)別的迭代器中的元素全部當(dāng)成同級(jí)別的元素,返回一個(gè)元素級(jí)別全部相同的RDD。這個(gè)轉(zhuǎn)換操作通常用來切分單詞。使用flatMap()方法轉(zhuǎn)換數(shù)據(jù)take(N)方法用于獲取RDD的前N個(gè)元素,返回?cái)?shù)據(jù)為數(shù)組。take()與collect()方法的原理相似,collect()方法用于獲取全部數(shù)據(jù),take()方法獲取指定個(gè)數(shù)的數(shù)據(jù)。獲取RDD的前5個(gè)元素使用take()方法查詢某幾個(gè)值查詢上半年實(shí)際薪資排名前3的員工信息,需要對(duì)上半年的實(shí)際薪資進(jìn)行排序,而創(chuàng)建RDD時(shí),textFile()方法是將每一行數(shù)據(jù)作為一條記錄存儲(chǔ)的,所以在排序前需要先對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換,實(shí)現(xiàn)步驟如下。讀取CSV文件,將第一行字段名稱刪除。將數(shù)據(jù)按分隔符“,”分隔,取出第2列員工姓名和第7列實(shí)際薪資數(shù)據(jù),并將實(shí)際薪資數(shù)據(jù)轉(zhuǎn)換成Int類型數(shù)據(jù)。通過sortBy()方法根據(jù)實(shí)際薪資進(jìn)行降序排列。通過take()方法獲取上半年實(shí)際薪資排名前3的員工信息。任務(wù)實(shí)現(xiàn)1查詢上半年實(shí)際薪資排名前3的員工信息目錄讀取員工薪資數(shù)據(jù)創(chuàng)建RDD2查詢上半年或下半年實(shí)際薪資大于20萬元的員工姓名3Spark的轉(zhuǎn)換操作和行動(dòng)操作,除了可以針對(duì)一個(gè)RDD進(jìn)行操作,也可以進(jìn)行RDD與RDD之間的操作。本節(jié)的任務(wù)如下。查詢上半年或下半年實(shí)際薪資大于20萬元的員工姓名。將最終結(jié)果合并為一個(gè)RDD。任務(wù)描述union()方法是一種轉(zhuǎn)換操作,用于將兩個(gè)RDD合并成一個(gè),不進(jìn)行去重操作,而且兩個(gè)RDD中每個(gè)元素中的值的個(gè)數(shù)、數(shù)據(jù)類型需要保持一致。使用union()方法合并兩個(gè)RDD使用union()方法合并多個(gè)RDDfilter()方法是一種轉(zhuǎn)換操作,用于過濾RDD中的元素。filter()方法需要一個(gè)參數(shù),這個(gè)參數(shù)是一個(gè)用于過濾的函數(shù),該函數(shù)的返回值為Boolean類型。filter()方法將返回值為true的元素保留,將返回值為false的元素過濾掉,最后返回一個(gè)存儲(chǔ)符合過濾條件的所有元素的新RDD。創(chuàng)建一個(gè)RDD,并且過濾掉每個(gè)元組第二個(gè)值小于等于1的元素。使用filter()方法進(jìn)行過濾distinct()方法是一種轉(zhuǎn)換操作,用于RDD的數(shù)據(jù)去重,去除兩個(gè)完全相同的元素,沒有參數(shù)。創(chuàng)建一個(gè)帶有重復(fù)數(shù)據(jù)的RDD,并使用distinct()方法去重。使用distinct()方法進(jìn)行去重Spark中的集合操作常用方法(轉(zhuǎn)換操作)使用簡單的集合操作方法描述union()參數(shù)是RDD,合并兩個(gè)RDD的所有元素intersection()參數(shù)是RDD,求出兩個(gè)RDD的共同元素subtract()參數(shù)是RDD,將原RDD里和參數(shù)RDD里相同的元素去掉cartesian()參數(shù)是RDD,求兩個(gè)RDD的笛卡兒積intersection()方法用于求出兩個(gè)RDD的共同元素,即找出兩個(gè)RDD的交集,參數(shù)是另一個(gè)RDD,先后順序與結(jié)果無關(guān)。創(chuàng)建兩個(gè)RDD,其中有相同的元素,通過intersection()方法求出兩個(gè)RDD的交集。使用簡單的集合操作intersection()方法subtract()方法用于將前一個(gè)RDD中在后一個(gè)RDD出現(xiàn)的元素刪除,可以認(rèn)為是求補(bǔ)集的操作,返回值為前一個(gè)RDD去除與后一個(gè)RDD相同元素后的剩余值所組成的新的RDD。兩個(gè)RDD的順序會(huì)影響結(jié)果。創(chuàng)建兩個(gè)RDD,分別為rdd1和rdd2,包含相同元素和不同元素,通過subtract()方法求rdd1和rdd2彼此的補(bǔ)集。使用簡單的集合操作subtract()方法cartesian()方法可將兩個(gè)集合的元素兩兩組合成一組,即求笛卡兒積。創(chuàng)建兩個(gè)RDD,分別有4個(gè)元素,通過cartesian()方法求兩個(gè)RDD的笛卡兒積。使用簡單的集合操作cartesian()方法輸出上半年或下半年實(shí)際薪資大于20萬元的員工姓名。首先需要過濾出兩個(gè)RDD中實(shí)際薪資大于20萬元的員工姓名。再將兩個(gè)RDD得到的員工姓名合并到一個(gè)RDD中,對(duì)員工姓名進(jìn)行去重。即可得到上半年或下半年實(shí)際薪資大于20萬元的員工姓名。任務(wù)實(shí)現(xiàn)4查詢每位員工2020年的月均實(shí)際薪資目錄輸出每位員工2020年的總實(shí)際薪資5存儲(chǔ)匯總后的員工薪資為文本文件6鍵值對(duì)RDD存儲(chǔ)二元組,二元組分為鍵和值,RDD的基本轉(zhuǎn)換操作對(duì)于鍵值對(duì)RDD也同樣適用。因?yàn)殒I值對(duì)RDD中包含的是二元組,所以需要傳遞的函數(shù)會(huì)由原來的操作單個(gè)元素的函數(shù)改為操作二元組的函數(shù)。本節(jié)的任務(wù)如下。計(jì)算每位員工2020年的總實(shí)際薪資,要求對(duì)上、下半年員工的實(shí)際薪資進(jìn)行相加。任務(wù)描述Spark的大部分RDD操作都支持所有種類的單值RDD,但是有少部分特殊的操作只能作用于鍵值對(duì)類型的RDD。顧名思義,鍵值對(duì)RDD由一組組的鍵值對(duì)組成,這些RDD被稱為PairRDD。PairRDD提供了并行操作各個(gè)鍵或跨節(jié)點(diǎn)重新進(jìn)行數(shù)據(jù)分組的操作接口。例如,PairRDD提供了reduceByKey()方法,可以分別規(guī)約每個(gè)鍵對(duì)應(yīng)的數(shù)據(jù),還有join()方法,可以把兩個(gè)RDD中鍵相同的元素組合在一起,合并為一個(gè)RDD。了解鍵值對(duì)RDD有很多種創(chuàng)建鍵值對(duì)RDD的方式,很多存儲(chǔ)鍵值對(duì)的數(shù)據(jù)格式會(huì)在讀取時(shí)直接返回由其鍵值對(duì)組成的PairRDD。當(dāng)需要將一個(gè)普通的RDD轉(zhuǎn)化為一個(gè)PairRDD時(shí)可以使用map函數(shù)來進(jìn)行操作,傳遞的函數(shù)需要返回鍵值對(duì)。創(chuàng)建鍵值對(duì)RDD使用鍵值對(duì)RDD的keys和values方法鍵值對(duì)RDD,包含鍵和值兩個(gè)部分。Spark提供了兩種方法,分別獲取鍵值對(duì)RDD的鍵和值。keys方法返回一個(gè)僅包含鍵的RDD。values方法返回一個(gè)僅包含值的RDD。當(dāng)數(shù)據(jù)集以鍵值對(duì)形式展現(xiàn)時(shí),合并統(tǒng)計(jì)鍵相同的值是很常用的操作。reduceByKey()方法用于合并具有相同鍵的值,作用對(duì)象是鍵值對(duì),并且只對(duì)每個(gè)鍵的值進(jìn)行處理,當(dāng)RDD中有多個(gè)鍵相同的鍵值對(duì)時(shí),則會(huì)對(duì)每個(gè)鍵對(duì)應(yīng)的值進(jìn)行處理。reduceByKey()方法需要接收一個(gè)輸入函數(shù),鍵值對(duì)RDD相同鍵的值會(huì)根據(jù)函數(shù)進(jìn)行合并并且創(chuàng)建一個(gè)新的RDD作為返回結(jié)果。使用鍵值對(duì)RDD的reduceByKey()方法在進(jìn)行處理時(shí),reduceByKey()方法將相同鍵的前兩個(gè)值傳給輸入函數(shù),產(chǎn)生一個(gè)新的返回值,新產(chǎn)生的返回值與RDD中相同鍵的下一個(gè)值組成兩個(gè)元素,再傳給輸入函數(shù),直到最后每個(gè)鍵只有一個(gè)對(duì)應(yīng)的值為止。reduceByKey()方法不是一種行動(dòng)操作,而是一種轉(zhuǎn)換操作。使用鍵值對(duì)RDD的reduceByKey()方法groupByKey()方法用于對(duì)具有相同鍵的值進(jìn)行分組,可以對(duì)同一組的數(shù)據(jù)進(jìn)行計(jì)數(shù)、求和等操作。對(duì)于一個(gè)由類型K的鍵和類型V的值組成的RDD,通過groupByKey()方法得到的RDD類型是[K,Iterable[V]]。使用鍵值對(duì)RDD的groupByKey()方法在讀取上、下半年員工薪資數(shù)據(jù)并將其轉(zhuǎn)換為RDD的過程中,已經(jīng)將數(shù)據(jù)轉(zhuǎn)換成鍵值對(duì)RDD。統(tǒng)計(jì)每位員工2020年的總實(shí)際薪資,首先需要將數(shù)據(jù)合并到一個(gè)RDD中,通過相同的鍵對(duì)同一個(gè)員工的上半年實(shí)際薪資和下半年實(shí)際薪資進(jìn)行累加,實(shí)現(xiàn)步驟如下。獲取上、下半年員工薪資數(shù)據(jù)并將其轉(zhuǎn)換為RDD,分別為split_first和split_second。使用union()方法將兩個(gè)RDD合并成一個(gè)新的RDD。通過reduceByKey()方法統(tǒng)計(jì)員工總實(shí)際薪資并輸出結(jié)果。任務(wù)實(shí)現(xiàn)4查詢每位員工2020年的月均實(shí)際薪資目錄輸出每位員工2020年的總實(shí)際薪資5存儲(chǔ)匯總后的員工薪資為文本文件6在Spark中,鍵值對(duì)RDD提供了很多基于多個(gè)RDD的鍵進(jìn)行操作的方法。本節(jié)的任務(wù)是輸出每位員工2020年的每月平均實(shí)際薪資,需要先計(jì)算每位員工2020年的總實(shí)際薪資,再求出每位員工2020年的月均實(shí)際薪資。任務(wù)描述將有鍵的一組數(shù)據(jù)與另一組有鍵的數(shù)據(jù)根據(jù)鍵進(jìn)行連接,是對(duì)鍵值對(duì)數(shù)據(jù)常用的操作之一。與合并不同,連接會(huì)對(duì)鍵相同的值進(jìn)行合并,連接方式多種多樣,包含內(nèi)連接、右外連接、左外連接、全外連接,不同的連接方式需要使用不同的連接方法。連接方法如下表。使用join()方法連接兩個(gè)RDD連接方法描述join()對(duì)兩個(gè)RDD進(jìn)行內(nèi)連接rightOuterJoin()對(duì)兩個(gè)RDD進(jìn)行連接操作,確保第二個(gè)RDD的鍵必須存在(右外連接)leftOuterJoin()對(duì)兩個(gè)RDD進(jìn)行連接操作,確保第一個(gè)RDD的鍵必須存在(左外連接)fullOuterJoin()對(duì)兩個(gè)RDD進(jìn)行全外連接join()方法用于根據(jù)鍵對(duì)兩個(gè)RDD進(jìn)行內(nèi)連接,將兩個(gè)RDD中鍵相同的數(shù)據(jù)的值存放在一個(gè)元組中,最后只返回兩個(gè)RDD中都存在的鍵的連接結(jié)果。例如,在兩個(gè)RDD中分別有鍵值對(duì)(K,V)和(K,W),通過join()方法連接會(huì)返回(K,(V,W))。創(chuàng)建兩個(gè)RDD,含有相同鍵和不同的鍵,通過join()方法進(jìn)行內(nèi)連接。使用join()方法連接兩個(gè)RDD(1)join()方法rightOuterJoin()方法用于根據(jù)鍵對(duì)兩個(gè)RDD進(jìn)行右外連接,連接結(jié)果是右邊RDD的所有鍵的連接結(jié)果,不管這些鍵在左邊RDD中是否存在。在rightOuterJoin()方法中,如果在左邊RDD中有對(duì)應(yīng)的鍵,那么連接結(jié)果中值顯示為Some類型值;如果沒有,那么顯示為None值。使用join()方法連接兩個(gè)RDD(2)rightOuterJoin()方法leftOuterJoin()方法用于根據(jù)鍵對(duì)兩個(gè)RDD進(jìn)行左外連接,與rightOuterJoin()方法相反,返回結(jié)果保留左邊RDD的所有鍵。使用join()方法連接兩個(gè)RDD(3)leftOuterJoin()方法fullOuterJoin()方法用于對(duì)兩個(gè)RDD進(jìn)行全外連接,保留兩個(gè)RDD中所有鍵的連接結(jié)果。使用join()方法連接兩個(gè)RDD(4)fullOuterJoin()方法zip()方法用于將兩個(gè)RDD組合成鍵值對(duì)RDD,要求兩個(gè)RDD的分區(qū)數(shù)量以及元素?cái)?shù)量相同,否則會(huì)拋出異常。將兩個(gè)RDD組合成Key/Value形式的RDD,這里要求兩個(gè)RDD的partition數(shù)量以及元素?cái)?shù)量都相同,否則會(huì)拋出異常。使用zip()方法組合兩個(gè)RDDcombineByKey()方法是Spark中一個(gè)比較核心的高級(jí)方法,鍵值對(duì)的其他一些高級(jí)方法底層均是使用combineByKey()方法實(shí)現(xiàn)的,如groupByKey()方法、reduceByKey()方法等。combineByKey()方法用于將鍵相同的數(shù)據(jù)聚合,并且允許返回類型與輸入數(shù)據(jù)的類型不同的返回值。combineByKey()方法的使用方式如下。combineByKey(createCombiner,mergeValue,mergeCombiners,numPartitions=None)使用combineByKey()方法合并相同鍵的值combineByKey()方法接收3個(gè)重要的參數(shù),具體說明如下。createCombiner:V=>C,V是鍵值對(duì)RDD中的值部分,將該值轉(zhuǎn)換為另一種類型的值C,C會(huì)作為每一個(gè)鍵的累加器的初始值。mergeValue:(C,V)=>C,該函數(shù)將元素V聚合到之前的元素C(createCombiner)上(這個(gè)操作在每個(gè)分區(qū)內(nèi)進(jìn)行)。mergeCombiners:(C,C)=>C,該函數(shù)將兩個(gè)元素C進(jìn)行合并(這個(gè)操作在不同分區(qū)間進(jìn)行)。使用combineByKey()方法合并相同鍵的值由于合并操作會(huì)遍歷分區(qū)中所有的元素,因此每個(gè)元素(這里指的是鍵值對(duì))的鍵只有兩種情況:以前沒出現(xiàn)過或以前出現(xiàn)過。對(duì)于這兩種情況,3個(gè)參數(shù)的執(zhí)行情況描述如下。如果以前沒出現(xiàn)過,則執(zhí)行的是createCombiner()方法,createCombiner()方法會(huì)在新遇到的鍵對(duì)應(yīng)的累加器中賦予初始值,否則執(zhí)行mergeValue()方法。對(duì)于已經(jīng)出現(xiàn)過的鍵,調(diào)用mergeValue()方法進(jìn)行合并操作,對(duì)該鍵的累加器對(duì)應(yīng)的當(dāng)前值(C)與新值(V)進(jìn)行合并。由于每個(gè)分區(qū)都是獨(dú)立處理的,因此對(duì)于同一個(gè)鍵可以有多個(gè)累加器。如果有兩個(gè)或更多的分區(qū)都有對(duì)應(yīng)同一個(gè)鍵的累加器,就需要使用用戶提供的mergeCombiners()方法對(duì)各個(gè)分區(qū)的結(jié)果(全是C)進(jìn)行合并。使用combineByKey()方法合并相同鍵的值lookup(key:K)方法作用于鍵值對(duì)RDD,返回指定鍵的所有值。使用lookup()方法查找指定鍵的值查詢每位員工2020年的月均實(shí)際薪資需要先篩選出上、下半年的員工薪資數(shù)據(jù)中的員工姓名和實(shí)際薪資兩個(gè)字段數(shù)據(jù)并創(chuàng)建RDD,然后將篩選后的兩個(gè)RDD合并,再根據(jù)員工姓名對(duì)實(shí)際薪資求和,最后查詢出2020年的每位員工的月均實(shí)際薪資,具體實(shí)現(xiàn)步驟如下。獲取兩個(gè)RDD,即split_first和split_second,使用union()方法合并兩個(gè)RDD。使用combineByKey()方法計(jì)算每位員工2020年的月均實(shí)際薪資。任務(wù)實(shí)現(xiàn)4查詢每位員工2020年的月均實(shí)際薪資目錄輸出每位員工2020年的總實(shí)際薪資5存儲(chǔ)匯總后的員工薪資為文本文件6在實(shí)際生產(chǎn)環(huán)境中,需要讀取的文本格式不僅包含普通的文本文件,還包含其他格式的文件,如JS
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 臨時(shí)家具租賃協(xié)議書
- 轉(zhuǎn)讓免責(zé)協(xié)議書模板
- 燃?xì)忭?xiàng)目轉(zhuǎn)讓協(xié)議書
- 婆家出錢結(jié)婚協(xié)議書
- 終止合伙關(guān)系協(xié)議書
- 情侶房產(chǎn)分割協(xié)議書
- 朋友合伙購房協(xié)議書
- 施工安全協(xié)議書全部
- 領(lǐng)養(yǎng)寵物責(zé)任協(xié)議書
- 簽訂社保繳費(fèi)協(xié)議書
- JT∕T 780-2010 港口設(shè)施保安計(jì)劃制訂導(dǎo)則
- 義務(wù)教育勞動(dòng)課程標(biāo)準(zhǔn)(2022)測試題帶答案
- 房屋延期交房起訴狀
- 2.2活塞連桿組課件講解
- 飯店定金合同范本
- 2024年廣東省中考生物+地理試卷(含答案)
- CHT 1027-2012 數(shù)字正射影像圖質(zhì)量檢驗(yàn)技術(shù)規(guī)程(正式版)
- 圍擋施工組織設(shè)計(jì)方案
- 第8課第二框課件《化解沖突促進(jìn)和諧》-【中職專用】中職思想政治《心理健康與職業(yè)生涯》(高教版2023·基礎(chǔ)模塊)
- 2024年河南師范大學(xué)附中中招二模英語試卷含答案
- MOOC 以案說法-中南財(cái)經(jīng)政法大學(xué) 中國大學(xué)慕課答案
評(píng)論
0/150
提交評(píng)論