版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
接下來,咱們借助一個(gè)小例子,來講一講廣播變量的含義與作用。這個(gè)例子和WordCout有關(guān),它可以說是分布式編程里的 oworld了,WordCout就是用來統(tǒng)計(jì)文件中全部單詞的,你肯定已經(jīng)非常熟悉了,所以,我們例子中的需求增加了一點(diǎn)難度,我們要對(duì)指定列表中給定的單詞計(jì)數(shù)。1234valdict=List(“spark”,valwords==words.filter(word=>.map((_,1)).reduceByKey(_+按照這個(gè)需求,同學(xué)小A實(shí)現(xiàn)了如上的代碼,一共有4行,我們逐一來看。第1行在Driver端給定待查單詞列表dict;第2行以textFileAPI分布式文件,內(nèi)容包含一列,的是常見的單詞;第3行用列表dict中的單詞過濾分布式文件內(nèi)容,只保留dict中給定的單詞;第4行調(diào)用reduceByKey對(duì)單詞進(jìn)行累加計(jì)數(shù)。學(xué)習(xí)過調(diào)度系統(tǒng)之后,我們知道,第一行代碼定義的dict列表連帶后面的3行代碼會(huì)一同打包到Task里面去。這個(gè)時(shí)候,Task就像是一架架小飛機(jī),攜帶著這些“行李”,飛往集群中不同的Executors。對(duì)于這些“行李”來說,代碼的“負(fù)重”較輕,可以忽略不你可能會(huì)說:“也還好吧,dict個(gè)例子中的并行度是10000,那么,Driver端需要通過網(wǎng)絡(luò)分發(fā)總共10000份dict拷貝。這個(gè)時(shí)候,集群內(nèi)所有的Eecutors需要消耗大量?jī)?nèi)存來這10000份的拷貝,對(duì)寶貴的網(wǎng)絡(luò)和內(nèi)存資源來說,這已經(jīng)是一筆不小的浪費(fèi)了。更何況,如果換做一個(gè)更大的數(shù)據(jù)結(jié)構(gòu),Task分發(fā)所引入的網(wǎng)絡(luò)與內(nèi)存開銷會(huì)更可怕。換句話說,統(tǒng)計(jì)計(jì)數(shù)的業(yè)務(wù)邏輯還沒有開始執(zhí)行,Spark但是,在著手優(yōu)化之前,我們不妨先來想,現(xiàn)有的問題是什么,我們要達(dá)到的目的是什么。結(jié)合剛剛的分析,我們不難發(fā)現(xiàn),WrdCunt的痛點(diǎn)在于,數(shù)據(jù)結(jié)構(gòu)的分發(fā)和受制于并行,并且是以Task為粒度的,因此往往頻次過高。痛點(diǎn)明確了,調(diào)優(yōu)的目的也就清晰了,我們需要降低數(shù)據(jù)結(jié)構(gòu)分發(fā)的頻次。要達(dá)到這個(gè)目的,我們首先想到的就是降低并行度。不過,牽一發(fā)而動(dòng)全身,并行度一旦調(diào)整,其他與CPU、內(nèi)存有關(guān)的配置項(xiàng)都要跟著適配,這難免把調(diào)優(yōu)變復(fù)雜了。實(shí)際上,要降低數(shù)據(jù)結(jié)構(gòu)的分發(fā)頻次,我們還可以考慮廣播變量。廣播變量是一種分發(fā)機(jī)制,它封裝目標(biāo)數(shù)據(jù)結(jié)構(gòu),以Executors為粒度去做數(shù)據(jù)分發(fā)。換句話說,在廣播變量的工作機(jī)制下,數(shù)據(jù)分發(fā)的頻次等同于集群中的Executors個(gè)數(shù)。通常來說,集群中的Executors數(shù)量都遠(yuǎn)遠(yuǎn)小于Task數(shù)量,相差兩到三個(gè)數(shù)量級(jí)是常有的事。那么,對(duì)于第一版的WordCount實(shí)現(xiàn),如果我們使用廣播變量的話,會(huì)有哪些代碼的改動(dòng)很簡(jiǎn)單,主要有兩個(gè)改動(dòng):第一個(gè)改動(dòng)是用broadcast封裝dict列表,第二個(gè)改動(dòng)是在dict列表的地方改用broadcast.value替代。12345valdict=List(“spark”,valbc=valwords==words.filter(word=>.map((_,1)).reduceByKey(_+在廣播變量的運(yùn)行機(jī)制下,封裝成廣播變量的數(shù)據(jù),由Driver端以Executors為粒度分發(fā),每一個(gè)Executors接收到廣播變量之后,將其交給BlockManager管理。由于廣播變量攜帶的數(shù)據(jù)已經(jīng)通過專門的途徑到BlockManager中,因此分發(fā)到Executors的Task不需要再攜帶同樣的數(shù)據(jù)。這個(gè)時(shí)候,你可以把廣播變量想象成一架架貨機(jī),專門為Task這些小飛機(jī)運(yùn)送“大件行李”。Driver與每一個(gè)Executors之間都開通一條這樣的貨機(jī)航線,統(tǒng)一運(yùn)載負(fù)重較大的“數(shù)據(jù)行李”。有了貨機(jī)來幫忙,Task小飛機(jī)只需要攜帶那些負(fù)重較輕的代碼就好了。等這些Task小飛機(jī)在Executors著陸,它們就可以到Executors的公用倉(cāng)庫(kù)BlockManager里去提取它們的“大件行李”??傊?,在廣播變量的機(jī)制下,dit列表數(shù)據(jù)需要分發(fā)和的次數(shù)銳減。我們假設(shè)集群中有20個(gè)Eecuors,不過任務(wù)并行度還是10000,那么,Drier需要通過網(wǎng)絡(luò)分發(fā)的it列表拷貝就會(huì)由原來的10000份減少到20份。同理,集群范圍內(nèi)所有Eeutors需要的it拷貝,也由原來的10000份,減少至20份。這個(gè)時(shí)候,引入廣播變量后的開銷只是原來Task分發(fā)的1/500!Driver我們來看這樣一個(gè)例子。在電子商務(wù)領(lǐng)域中,開發(fā)者往往用事實(shí)表來類數(shù)據(jù),用維度表來像物品、用戶這樣的描述性數(shù)據(jù)。事實(shí)表的特點(diǎn)是規(guī)模龐大,數(shù)據(jù)體量隨著業(yè)務(wù)的發(fā)展不斷地快速增長(zhǎng)。維度表的規(guī)模要比事實(shí)表小很多,數(shù)據(jù)體量的變化也相對(duì)穩(wěn)定。假設(shè)用戶維度數(shù)據(jù)以Parquet文件格式在HDFS文件系統(tǒng)中,業(yè)務(wù)部門需要我們valuserFile:String=valdf:DataFrame=valbc_df:Broadcast[DataFrame]=首先,我們用ParquetAPIHDFS分布式數(shù)據(jù)文件生成DataFrame,然后用broadcastDataFrame。從代碼上來看,這種實(shí)現(xiàn)方式和封裝普通變量沒有太大差別,它們都調(diào)用了broadcastAPI,只是傳入的參數(shù)不同。DriverDriver到各個(gè)Executors,再讓Executors把數(shù)據(jù)緩存到BlockManager就好了。與普通變量相比,分布式數(shù)據(jù)集的數(shù)據(jù)源不在Driver端,而是來自所有的Executors。Executors中的每個(gè)分布式任務(wù)負(fù)責(zé)生產(chǎn)全量數(shù)據(jù)集的一部分,也就是圖中不同的數(shù)據(jù)分區(qū)。因此,步驟1就是Driver從所有的Executors拉取這些數(shù)據(jù)分區(qū),然后在本地構(gòu)建全量數(shù)據(jù)。步驟2與從普通變量創(chuàng)建廣播變量的過程類似。Driver把匯總好的全量數(shù)據(jù)分發(fā)給各個(gè)Executors,Executors將接收到的全量數(shù)據(jù)緩存到系統(tǒng)的BlockManager中。不難發(fā)現(xiàn),相比從普通變量創(chuàng)建廣播變量,從分布式數(shù)據(jù)集創(chuàng)建廣播變量的網(wǎng)絡(luò)開銷更大。原因主要有二:一是,前者比后者多了一步網(wǎng)絡(luò)通信;二是,前者的數(shù)據(jù)體量通常比后者大很多。你可能會(huì)問:“Driver從Executors拉取DataFrame的數(shù)據(jù)分片,揉成一份全量數(shù)據(jù),然后再?gòu)V去,拋開網(wǎng)絡(luò)開銷不說,來來回回得費(fèi)這么大勁,圖啥呢?”這是一個(gè)好問題,因?yàn)橐詮V播變量的形式緩存分布式數(shù)據(jù)集,正是克制Shuffle锏。Shuffle為什么這么說呢?我還是拿電子商務(wù)場(chǎng)景舉例。有了用戶的數(shù)據(jù)之后,為了分析不同用戶的購(gòu)物習(xí)慣,業(yè)務(wù)部門要求我們對(duì)表和用戶表進(jìn)行數(shù)據(jù)關(guān)聯(lián)。這樣的數(shù)據(jù)關(guān)聯(lián)需求在數(shù)據(jù)分析領(lǐng)域還是相當(dāng)普遍的。valtransactionsDF:DataFrame=valuserDF:DataFrame=transactionsDF.join(userDF,Seq(“userID”),因?yàn)樾枨蠓浅C鞔_,同學(xué)小A立即調(diào)用Parquet數(shù)據(jù)源API,分布式文件,創(chuàng)表和用戶表的DataFrame,然后調(diào)用DataFrame的Join方法,以u(píng)serID作為Joinkeys,用內(nèi)關(guān)聯(lián)(InnerJoin)的方式完成了兩表的數(shù)據(jù)關(guān)聯(lián)。在分布式環(huán)境中,表和用戶表想要以u(píng)serID為Joinkeys進(jìn)行關(guān)聯(lián),就必須要確保一個(gè)前提:記錄和與之對(duì)應(yīng)的用戶信息在同一個(gè)Executors內(nèi)。也就是說,如果用戶黃小乙的購(gòu)物信息都在Executor0,而個(gè)人屬性信息緩存在Executor2,那么,在分布在不進(jìn)行任何調(diào)優(yōu)的情況下,Spark默認(rèn)采用ShuffleJoin的方式來做到這一點(diǎn)。ShuffleJoin的過程主要有兩步。第一步就是對(duì)參與關(guān)聯(lián)的左右表分別進(jìn)行Shuffle,Shuffle的分區(qū)規(guī)則是先對(duì)Joinkeys計(jì)算哈希值,再把哈希值對(duì)分區(qū)數(shù)取模。由于左右表的分區(qū)數(shù)是一致的,因此Shuffle過后,一定能夠保證userID相同的記錄和用戶數(shù)據(jù)坐落在同一個(gè)Executors內(nèi)。ShuffleJoinhffle完成之后,第二步就是在同一個(gè)Excutrs內(nèi),Rducetask就可以對(duì)userID一致的記錄進(jìn)行關(guān)聯(lián)操作。但是,由于表是事實(shí)表,數(shù)據(jù)體量異常龐大,對(duì)TB級(jí)別的數(shù)據(jù)進(jìn)行hffle,想想都覺得可怕!因此,上面對(duì)兩個(gè)DaaFrame直接關(guān)聯(lián)的代碼,還有很大的調(diào)優(yōu)空間。我們?cè)撛趺醋瞿兀吭捑湓捳f,對(duì)于分布式環(huán)境中的數(shù)據(jù)關(guān)聯(lián)來說,要想確保記錄和與之對(duì)應(yīng)的用戶信息在同一個(gè)Eeutors中,我們有沒有其他辦法呢?克制Shuffle代代123456importvaltransactionsDF:DataFrame=_valuserDF:DataFrame=_valbcUserDF=77transactionsDF.join(bcUserDF,Seq(“userID”),8Drier從所有Eecutors收集erDF所屬的所有數(shù)據(jù)分片,在本地匯總用戶數(shù)據(jù),然后給每一個(gè)Eecutors都發(fā)送一份全量數(shù)據(jù)的拷貝。既然每個(gè)Eeutors都有userDF的全量數(shù)據(jù),這個(gè)時(shí)候,表的數(shù)據(jù)分區(qū)待在原地、保持不動(dòng),就可以輕松地關(guān)聯(lián)到一致的用戶數(shù)據(jù)。如此一來,我們不需要對(duì)數(shù)據(jù)體量巨大的表進(jìn)行Shuffle,同樣可以在分布式環(huán)境中,完成兩張表的數(shù)據(jù)關(guān)聯(lián)。BroadcastJoin將小表廣播,避免大表利用廣播變量,我們成功地避免了海量數(shù)據(jù)在集群內(nèi)的、分發(fā),節(jié)省了原本由ufle引入的磁盤和網(wǎng)絡(luò)開銷,大幅提升運(yùn)行時(shí)執(zhí)行性能。當(dāng)然,采用廣播變量?jī)?yōu)化也是有成本的,畢竟廣播變量的創(chuàng)建和分發(fā),也是會(huì)帶來網(wǎng)絡(luò)開銷的。但是,相比大表的全網(wǎng)分發(fā),小表的網(wǎng)絡(luò)開銷幾乎可以忽略不計(jì)。這種小投入、大產(chǎn)出,用極小的成本去博取高額的性能收益,真可以說是“四兩撥千斤”!在數(shù)據(jù)關(guān)聯(lián)場(chǎng)景中,廣播變量是克制Shuffle的锏。掌握了它,我們就能以極小的成第一種,從普通變量創(chuàng)建廣播變量。在廣播變量的運(yùn)行機(jī)制下,普通變量的數(shù)據(jù)封裝成廣播變量,由Driver端以Executors為粒度進(jìn)行分發(fā),每一個(gè)Executors接收到廣播變量之后,將其交由BlockManager管理。Driver需要從所有的ExecutorsDriverExecutors,Executors存到系統(tǒng)的BlockManager中。ShuffleJoinsBroadcastJoins,就可以用小表廣播來代替大表的全網(wǎng)分發(fā),真正做到克制Shuffle。Spark廣播機(jī)制現(xiàn)有的實(shí)現(xiàn)方式是存在隱患的,在數(shù)據(jù)量較大的情況下,Driver可能會(huì)成為瓶頸,你能想到更好的方式來重新實(shí)現(xiàn)Spark的廣播機(jī)制嗎?(提示:在什么情況下,不適合把ShuffleJoins轉(zhuǎn)換為BroadcastJoins? 不得售賣。頁(yè)面已增加防盜追蹤,將依法其上一 11|Shuffle的工作原理:為什么說Shuffle是一時(shí)無兩的性能下一 13|廣播變量(二):有哪些途徑讓SparkSQL選擇Broadcast絡(luò)分發(fā)多次,已經(jīng)遠(yuǎn)超出了shufflejoin需要傳輸?shù)臄?shù)據(jù)作者回復(fù):9??哈~但是,據(jù)我觀察,這部分代碼尚未生效。細(xì)節(jié)可以參考這個(gè)ticket:【Executorsidebroadcastforbroadcastjoins】,看上去還是進(jìn)先是調(diào)用 其中 newOutOfMemoryError("Notenoughmemorytobuildandbroadcastthetabletoall"+"workernodes.Asaworkaround,youcaneitherdisablebroadcastbysetting"+s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}to-1orincreasethespark"+s"drivermemorybysetting${SparkLauncher.DRIVER_MEMORY}toahighervalue.").initCause(oe.ge2 3廣播join不是默認(rèn)開啟的嗎,好像小表默認(rèn)10M;還需像文中代碼valbcUserDFbroahlerD相的錄和用數(shù)據(jù)坐在同一個(gè)ors內(nèi)?!眔pD的分?jǐn)?shù)、或者說并行度,確實(shí)是由D文件系統(tǒng)決定的;但是,he過后,每個(gè)分布式數(shù)據(jù)集的并行度,就由數(shù)pr..hl.priions來決,這個(gè)咱在配置哪一講喲~果你沒有手工用riion或是olehle過后(Re階段)默認(rèn)確實(shí)是開啟的,默認(rèn)值確實(shí)也是10MB,但是,這個(gè)10MB太太太太太太(xN)小了!12021-05-orpr.o.rs給or指定的rororor理能力其實(shí)的地方于,不的者、不同上下文并發(fā)和行這兩詞,所以,回你的問,其實(shí)什么不康的~100并行度意味著000個(gè)區(qū)的分式數(shù)據(jù)集,這應(yīng)該不見到。外10個(gè)的集群其實(shí)也不小了~不過你的200有到,不知是200行度,是 集群發(fā)。如果是 集群發(fā)的話這個(gè)和100r不上。意味著的每個(gè)ore要20線程,哈,目前沒有這給力的P
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 全新電腦出租租賃合同(2025版)3篇
- 二零二五年度新能源汽車車輛掛靠運(yùn)營(yíng)合作協(xié)議4篇
- 二零二五年度智能配送員勞動(dòng)合同模板4篇
- 二零二五年度體育賽事贊助商權(quán)益合作協(xié)議
- 《樓蓋樓面層》課件
- 二零二五年酒店拆除工程拆除現(xiàn)場(chǎng)監(jiān)控與應(yīng)急處置合同3篇
- 2025年度水利工程承包管理合同范本(二零二五版)4篇
- 二零二五年度窗簾布料阻燃性能檢測(cè)合同3篇
- 回順超前單軌吊的新型應(yīng)用
- 煤礦掘進(jìn)作業(yè)
- 獅子王影視鑒賞
- DB13(J)T 8434-2021 民用建筑節(jié)能門窗工程技術(shù)標(biāo)準(zhǔn)(京津冀)
- 2024年在職申碩同等學(xué)力英語(yǔ)真題試卷題后含答案及解析4
- 預(yù)防溺水六不準(zhǔn)中小學(xué)生防溺水安全教育宣傳課件可編輯課件
- 學(xué)校廚房設(shè)備投標(biāo)方案(技術(shù)方案)
- 一年級(jí)數(shù)學(xué)加減法口算題每日一練(25套打印版)
- 電力系統(tǒng)中的虛擬電廠運(yùn)營(yíng)與管理考核試卷
- Starter Unit 3 同步練習(xí)人教版2024七年級(jí)英語(yǔ)上冊(cè)
- 風(fēng)力發(fā)電收購(gòu)協(xié)議書
- 大學(xué)生無人機(jī)創(chuàng)業(yè)計(jì)劃書
- 2024年甘肅省武威市、嘉峪關(guān)市、臨夏州中考英語(yǔ)真題
評(píng)論
0/150
提交評(píng)論