尚硅谷大數(shù)據(jù)技術(shù)之_第1頁
尚硅谷大數(shù)據(jù)技術(shù)之_第2頁
尚硅谷大數(shù)據(jù)技術(shù)之_第3頁
尚硅谷大數(shù)據(jù)技術(shù)之_第4頁
尚硅谷大數(shù)據(jù)技術(shù)之_第5頁
已閱讀5頁,還剩97頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

1、尚硅谷大數(shù)據(jù)技術(shù)之 Hadoop(MapReduce)尚硅谷大數(shù)據(jù)技術(shù)之Hadoop(MapReduce)(作者:大海哥)官網(wǎng):版本:V1.1 MapReduce 概念Mapreduce 是一個分布式運(yùn)算程序的編程框架,是用戶開發(fā)“基于 hadoop 的數(shù)據(jù)分析應(yīng)用”的框架;Mapreduce功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整一個完整的分布式運(yùn)算程序,并發(fā)運(yùn)行在一個 hadoop 集群上。1.1 為什么要 MapReduce1)海量數(shù)據(jù)在單機(jī)上處理因為硬件限制,無法勝任2)而一旦將單機(jī)版程序擴(kuò)展到集群來分布式運(yùn)行,將極大增加程序的復(fù)雜度和開發(fā)難度3)引入 mapreduce 框架后

2、,開發(fā)可以將絕大部分工作集中在業(yè)務(wù)邏輯的開發(fā)上,而將分布式計算中的復(fù)雜由框架來處理。4)mapreduce 分布式方案考慮的(1)運(yùn)算邏輯要不要先分后合?(2)程序如何分配運(yùn)算任務(wù)(切片)?(3)兩階段的程序如何啟動?如何協(xié)調(diào)?(4)整個程序運(yùn)行過程中的?容錯?重試?分布式方案需要考慮很多,但是我們可以將分布式的公共功能封裝成框架,讓開發(fā)將精力集中于業(yè)務(wù)邏輯上。而 mapreduce 就是這樣一個分布式程序的通用框架。Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官尚硅谷大數(shù)據(jù)技術(shù)之 Hadoop(MapReduce)1.2 MapReduce思想上

3、圖簡單的闡明了 map 和 reduce 的兩個過程或者作用,雖然不夠嚴(yán)謹(jǐn),但是足以提供一個大概的認(rèn)知,map 過程是一個蔬菜到制成前的準(zhǔn)備工作,reduce 將準(zhǔn)備好的材料合并進(jìn)而制作出的過程MapReduceMap和Reduce階段編程思想需求:統(tǒng)計其中每一個單詞出現(xiàn)的總1) 分布式的運(yùn)算程序往往需要分成至少2個階段2) 第一個階段的maptask并發(fā)實例,完全并行運(yùn)行,互不相干3) 第二個階段的reduce task并發(fā)實例互不相干,但是他們的數(shù)據(jù)依賴于上一個階段的所有maptask并發(fā)實例的輸出4) MapReduce編程模型只能包含一個map階段和一個reduce階段,如果用戶的業(yè)務(wù)

4、邏輯非常復(fù)雜,那就只能多個mapreduce程序,串行運(yùn)行次數(shù)(結(jié)果:a-p一個文件,q-z一個文件)輸出數(shù)據(jù)若干細(xì)節(jié)1)maptask如何進(jìn)行任務(wù)分配2)Reduce task 如何進(jìn)行任務(wù)分配3)Maptask和Reduce task之間如何銜接4)如果某maptask運(yùn)行失敗,如何處理5)Maptask如果都要據(jù)的分區(qū),很麻煩負(fù)責(zé)輸出數(shù)輸入數(shù)據(jù)MrAppMaster負(fù)責(zé)整個程序的過程調(diào)度及狀態(tài)協(xié)調(diào)1)分布式的運(yùn)算程序往往需要分成至少 2 個階段2)第一個階段的 maptask 并發(fā)實例,完全并行運(yùn)行,互不相干3)第二個階段的 reduce task 并發(fā)實例互不相干,但是他們的數(shù)據(jù)依賴于

5、上一個階段的所有Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官輸出結(jié)果到文件maptaskHashmap(a-p) Hashmap(q-z)統(tǒng)計q-z開頭的單詞3reducetask輸出結(jié)果到文件統(tǒng)計a-p開頭的單詞reducetaskmaptaskHashmap(a-p) Hashmap(q-z)2Hadoop spark hive Hbase Hadoop sparkJava php Android Html5 Bigdata python1) 讀數(shù)據(jù)2) 按行處理maptask 3)按空格切分行內(nèi)單詞4)Hashmap(單詞,value+1)

6、等到分給的數(shù)據(jù)片全部讀完之后5)將hashmap按照首個字母范圍分成2個hashmap1尚硅谷大數(shù)據(jù)技術(shù)之 Hadoop(MapReduce)maptask 并發(fā)實例的輸出4)MapReduce 編程模型只能包含一個 map 階段和一個 reduce 階段,如果用戶的業(yè)務(wù)邏輯非常復(fù)雜,那就只能多個 mapreduce 程序,串行運(yùn)行1.3 MapReduce 進(jìn)程一個完整的 mapreduce 程序在分布式運(yùn)行時有三類實例進(jìn)程:1)MrAppMaster:負(fù)責(zé)整個程序的過程調(diào)度及狀態(tài)協(xié)調(diào)2)MapTask:負(fù)責(zé) map 階段的整個數(shù)據(jù)處理流程3)ReduceTask:負(fù)責(zé) reduce 階段的

7、整個數(shù)據(jù)處理流程1.4 MapReduce 編程規(guī)范(八股文)成三個部分:Mapper,Reducer,Driver(提交運(yùn)行 mr 程序的客戶端)用戶編寫的1)Mapper 階段(1)用戶自定義的 Mapper 要繼承的父類(2)Mapper 的輸入數(shù)據(jù)是 KV 對的形式(KV 的類型可自定義)(3)Mapper 中的業(yè)務(wù)邏輯寫在 map()中(4)Mapper 的輸出數(shù)據(jù)是 KV 對的形式(KV 的類型可自定義)(5)map()(maptask 進(jìn)程)對每一個調(diào)用一次2)Reducer 階段(1)用戶自定義的 Reducer 要繼承的父類(2)Reducer 的輸入數(shù)據(jù)類型對應(yīng) Mappe

8、r 的輸出數(shù)據(jù)類型,也是 KV(3)Reducer 的業(yè)務(wù)邏輯寫在 reduce()中(4)Reducetask 進(jìn)程對每一組相同 k 的組調(diào)用一次 reduce()3)Driver 階段整個程序需要一個 Drvier 來進(jìn)行提交,提交的是一個描述了各種必要信息的 job 對象4)案例詳見 3.1.1 統(tǒng)計一堆文件中單詞出現(xiàn)的個數(shù)(WordCount 案例)。Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官尚硅谷大數(shù)據(jù)技術(shù)之 Hadoop(MapReduce)1.5 MapReduce 程序運(yùn)行流程分析MapReduce程序運(yùn)行流程分析數(shù)據(jù)5Map

9、taskss.txt 0-1287 收集kv到緩存1 待處理文本/user/atguigu/inputK,vInputFormatoutputCollector8 按照k分區(qū)排序后寫入磁盤txtwordcountm處理a-gMapper6 邏輯運(yùn)算11 輸出結(jié)果到文件 map(K,v)Context.write(k,v) tbb.txt100m128-200處理h-r2 客戶端submit()前,獲取待處理數(shù)據(jù)的信息,然后根據(jù)參數(shù)配置,形成一個任務(wù)分配的 。ss.txt 0-128ss.txt 128-200 3 提交切片信息reduce taskOutputFormatwordcountRe

10、duce reduce(k,v) Context.write(k,v);0-10010 reduce task獲取數(shù)據(jù),并運(yùn)算Part-r-00001bb.txt處理s-z4 計算出maptask數(shù)量9 所有maptask任務(wù)完成后,啟動相應(yīng)數(shù)量的reducetask,并告知reducetask處理數(shù)據(jù)范圍(數(shù)據(jù)分區(qū))客戶端YarnRM1)在 MapReduce 程序文件的輸入目錄上存放相應(yīng)的文件。2)客戶端程序在 submit()執(zhí)行前,獲取待處理的數(shù)據(jù)信息,然后根據(jù)集群中參數(shù)的配置形成一個任務(wù)分配。3)客戶端提交 job.split、jar 包、job.xml 等文件給 yarn,yarn

11、 中的 resourcemanager 啟動MRAppMaster。4)MRAppMaster 啟動后根據(jù)本次 job 的描述信息,計算出需要的 maptask 實例數(shù)量,然后向集群申請啟動相應(yīng)數(shù)量的 maptask 進(jìn)程。5)maptask 利用客戶指定的 inputformat 來數(shù)據(jù),形成輸入 KV 對。6)maptask 將輸入 KV 對傳遞給客戶定義的 map(),做邏輯運(yùn)算7)map()運(yùn)算完畢后將 KV 對收集到 maptask 緩存。8)maptask 緩存中的 KV 對按照 K 分區(qū)排序后不斷寫到磁盤文件9)MRAppMaster到所有 maptask 進(jìn)程任務(wù)完成之后,會根

12、據(jù)客戶指定的參數(shù)啟動相應(yīng)數(shù)量的 reducetask 進(jìn)程,并告知 reducetask 進(jìn)程要處理的數(shù)據(jù)分區(qū)。10)Reducetask 進(jìn)程啟動之后,根據(jù) MRAppMaster 告知的待處理數(shù)據(jù)所在位置,從若干臺maptask 運(yùn)行所在上獲取到若干個 maptask 輸出結(jié)果文件,并在本地進(jìn)行重新歸并排序,然后按照相同 key 的 KV 為一個組,調(diào)用客戶定義的 reduce()進(jìn)行邏輯運(yùn)算。11)Reducetask 運(yùn)算完畢后,調(diào)用客戶指定的 outputformat 將結(jié)果數(shù)據(jù)輸出到外部。Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官P

13、art-r-00002Mr appmasterNodeManagerreduce taskwordcountReduce reduce(k,v) Context.write(k,v);OutputFormatJob.split wc.jar Job.xmlMap task bb.txtMap task ss.txtPart-r-00000reduce taskwordcountReduce reduce(k,v) Context.write(k,v);OutputFormatatguigubigdatass.200HbahivspaJava An Hphp droid ml5Bigdatapy

14、thon尚硅谷大數(shù)據(jù)技術(shù)之 Hadoop(MapReduce)二 MapReduce 理論篇2.1 Writable 序列化序列化就是把內(nèi)存中的對象,轉(zhuǎn)換成字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)以便于(持久化)和傳輸。反序列化就是將收到字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)或者是硬盤的持久化數(shù)據(jù),轉(zhuǎn)換成內(nèi)存中的對象。Java 的序列化是一個重量級序列化框架(Serializable),一個對象被序列化后,會附帶很多額外的信息(各種校驗信息,header,繼承體系等),不便于在中高效傳輸。所以,hadoop開發(fā)了一套序列化機(jī)制(Writable),精簡、高效。2.1.1 常用數(shù)據(jù)序列化類型常用的數(shù)據(jù)類型對應(yīng)的

15、hadoop 數(shù)據(jù)序列化類型2.1.2 自定義 bean 對象實現(xiàn)序列化接口1)自定義 bean 對象要想序列化傳輸,必須實現(xiàn)序列化接口,需要注意以下 7 項。(1)必須實現(xiàn) Writable 接口(2)反序列化時,需要反射調(diào)用空參構(gòu)造函數(shù),所以必須有空參構(gòu)造(3)重寫序列化(4)重寫反序列化(5)注意反序列化的順序和序列化的順序完全一致(6)要想把結(jié)果顯示在文件中,需要重寫 toString(),且用”t”,方便后續(xù)用(7)如果需要將自定義的 bean 放在key 中傳輸,則還需要實現(xiàn) comparable 接口,因為Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),

16、可尚硅谷(中國)官Java 類型Hadoop Writable 類型booleanBooleanWritablebyteByteWritableintIntWritablefloatFloatWritablelongLongWritabledoubleDoubleWritablestringTextmapMapWritablearrayArrayWritable尚硅谷大數(shù)據(jù)技術(shù)之 Hadoop(MapReduce)mapreduce 框中的 shuffle 過程一定會對 key 進(jìn)行排序Java、HTML5、Android、python、大數(shù)據(jù)區(qū)】【網(wǎng)資料,可尚硅谷(中國)官/ 1 必須實現(xiàn)

17、Writable 接口public class FlowBean implements Writable private long upFlow; private long downFlow; private long sumFlow;/2 反序列化時,需要反射調(diào)用空參構(gòu)造函數(shù),所以必須有public FlowBean() super();/* 3 重寫序列化* param out* throws IOException*/ Overridepublic void write(DataOutput out) throws IOException out.writeLong(upFlow);ou

18、t.writeLong(downFlow); out.writeLong(sumFlow);/* 4 重寫反序列化5 注意反序列化的順序和序列化的順序完全一致* param in* throws IOException*/ Overridepublic void readFields(DataInput in) throws IOException upFlow = in.readLong();downFlow = in.readLong(); sumFlow = in.readLong();/ 6 要想把結(jié)果顯示在文件中,需要重寫 toString(),且用”t”,方便后續(xù)用Override

19、public String toString() 尚硅谷大數(shù)據(jù)技術(shù)之 Hadoop(MapReduce)2)案例詳見 3.2.1 統(tǒng)計每一個號耗費(fèi)的總上行流量、下行流量、總流量(序列化)。2.2 InputFormat 數(shù)據(jù)切片機(jī)制2.2.1 FileInputFormat 切片機(jī)制1)job 提交流程源碼詳解waitForCompletion()submit();/ 1 建立連接connect();/ 1)創(chuàng)建提交 job 的new Cluster(getConfiguration();/ (1)是本地 yarn 還是initialize(jobTrackAddr, conf);/ 2 提交

20、jobsubmitter.submitJobInternal(Job.this, cluster)/ 1)創(chuàng)建給集群提交數(shù)據(jù)的 Stag 路徑Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);/ 2)獲取 jobid ,并創(chuàng)建 job 路徑JobID jobId = submitClient.getNewJobID();/ 3)拷貝 jar 包到集群copyAndConfigureFiles(job, submitJobDir);rUploader.uploadFiles(job, jobSubmitDi

21、r);Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官return upFlow + t + downFlow + t + sumFlow;/7 如果需要將自定義的 bean 放在 key 中傳輸,則還需要實現(xiàn) comparable 接口,因為 mapreduce 框中的 shuffle 過程一定會對 key 進(jìn)行排序Overridepublic int compareTo(FlowBean o) / 倒序排列,從大到小return this.sumFlow o.getSumFlow() ? -1 : 1;尚硅谷大數(shù)據(jù)技術(shù)之 Hadoop(MapRe

22、duce)/ 4)計算切片,生成切片文件writeSplits(job, submitJobDir);maps = writeNewSplits(job, jobSubmitDir);input.getSplits(job);/ 5)向 Stag 路徑寫 xml 配置文件writeConf(conf, submitJobFile);conf.writeXml(out);/ 6)提交 job,返回提交狀態(tài)status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials();FileInputFor

23、mat源碼stagingDirJobSubmiterYarnRunnerLocalJobRunneryarn2)FileInputFormat 源碼(input.getSplits(job)(1)找到你數(shù)據(jù)的目錄。(2)開始遍歷處理(切片)目錄下的每一個文件(3)遍歷第一個文件 ss.txta)獲取文件大小 fs.sizeOf(ss.txt);b)計算切片大小computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)=blocksize=128Mc)默認(rèn)情況下,切片大小=blocksized)開始切,形成第 1 個切片:ss.tx

24、t0:128M 第 2 個切片 ss.txt128:256M第 3個切片 ss.txt256M:300M(每次切片時,都要切完剩下的部分是否大于塊的 1.1倍,Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官File:/./.staging/ .jar hfds:/.staging/job.jarmr程序運(yùn)行在本地模擬器獲取job的jar包jarFile:/.staging/job.xmlhfds:/.staging/job.xml將job相關(guān)參數(shù)寫到文件Job.xmlCluster成員proxyFile:/.staging/job.splithfd

25、s:/.staging/job.split調(diào)用FileInputFormat.get Splits()獲取切片規(guī)劃List, 并序列化成文件Job.splitJob.submit();jobidFile:/.staging/jobidhfds:/.staging/jobidFile:/.staginghfds:/.stagingConfiguration conf=new Configuration(); Job=job.getInstance(conf); Job.waitForCompletion(true)尚硅谷大數(shù)據(jù)技術(shù)之 Hadoop(MapReduce)不大于 1.1 倍就劃分一塊

26、切片)e)將切片信息寫到一個切片文件中f)整個切片的過程在 getSplit()中完成。g)數(shù)據(jù)切片只是在邏輯上對輸入數(shù)據(jù)進(jìn)行分片,并再磁盤上將其切分成分片進(jìn)行。InputSplit 只了分片的元數(shù)據(jù)信息,比如起始位置、長度以及所在的節(jié)點列表等。h)注意:block 是 HDFS 上物理上的的數(shù)據(jù),切片是對數(shù)據(jù)邏輯上的劃分。(4)提交切片文件到 yarn 上,yarn 上的 MrAppMaster 就可以根據(jù)切片文件計算開啟 maptask 個數(shù)。3)FileInputFormat 中默認(rèn)的切片機(jī)制:(1)簡單地按照文件的內(nèi)容長度進(jìn)行切片(2)切片大小,默認(rèn)等于 block 大?。?)切片時不

27、考慮數(shù)據(jù)集整體,而是逐個每一個文件單獨(dú)切片比如待處理數(shù)據(jù)有兩個文件:經(jīng)過 FileInputFormat 的切片機(jī)制運(yùn)算后,形成的切片信息如下:4)FileInputFormat 切片大小的參數(shù)配置(1)通過分析源碼,在 FileInputFormat 中,計算切片大小的邏輯:Math.max(minSize,Math.min(maxSize, blockSize);切片主要由這幾個值來運(yùn)算決定mapreduce.input.fileinputformat.split.minsize=1 默認(rèn)值為 1mapreduce.input.fileinputformat.split.maxsize=

28、Long.MAXValue 默認(rèn)值 Long.MAXValue因此,默認(rèn)情況下,切片大小=blocksize。maxsize(切片最大值):參數(shù)如果調(diào)得比 blocksize 小,則會讓切片變小,而且就等于配置的這個參數(shù)的值。minsize (切片最小值):參數(shù)調(diào)的比 blockSize 大,則可以讓切片變得比 blocksize 還Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官file1.txt.split1-0128 file1.txt.split2-128256 file1.txt.split3-256320 file2.txt.split1

29、-010Mfile1.txt320Mfile2.txt10M尚硅谷大數(shù)據(jù)技術(shù)之 Hadoop(MapReduce)大。5)獲取切片信息 API2.2.2 CombineTextInputFormat 切片機(jī)制關(guān)于大量件的優(yōu)化策略1)默認(rèn)情況下 TextInputformat 對任務(wù)的切片機(jī)制是按文件切片,不管文件多小,都會是一個單獨(dú)的切片,都會交給一個 maptask,這樣如果有大量件,就會產(chǎn)生大量的maptask,處理效率極其低下。2)優(yōu)化策略(1)最好的辦法,在數(shù)據(jù)處理系統(tǒng)的最前端(預(yù)處理/),將件先合并成大文件,再上傳到 HDFS 做后續(xù)分析。(2)補(bǔ)救措施:如果已經(jīng)是大量件在 HDFS

30、 中了,可以使用另一種 InputFormat來做切片(CombineTextInputFormat),它的切片邏輯跟 TextFileInputFormat 不同:它可以將多個件從邏輯上到一個切片中,這樣,多個件就可以交給一個 maptask。(3)優(yōu)先滿足最小切片大小,不超過最大切片大小CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);/ 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);/ 2m舉例:0.5m+1m+0.3m+5m=2m + 4.8m=

31、2m + 4m + 0.8m3)具體實現(xiàn)步驟4)案例詳見 3.1.4 需求 4:大量件的切片優(yōu)化(CombineTextInputFormat)。2.2.3 自定義 InputFormat1)概述(1)自定義一個 InputFormatJava、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官/ 9 如果不設(shè)置 InputFormat,它默認(rèn)用的是 TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class) CombineTextInputFormat.setMaxIn

32、putSplitSize(job, 4194304);/ 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);/ 2m/ 根據(jù)文件類型獲取切片信息FileSplit inputSplit = (FileSplit) context.getInputSplit();/ 獲取切片的文件名稱String name = inputSplit.getPath().getName();尚硅谷大數(shù)據(jù)技術(shù)之 Hadoop(MapReduce)(2)改寫 RecordReader,實現(xiàn)一次一個完整文件封裝為 KV(3)在輸出時使用 Seque

33、nceFileOutPutFormat 輸出合并文件2)案例詳見 3.5件處理(自定義 InputFormat)。2.3 MapTask 工作機(jī)制1)引出maptask 的并行度決定 map 階段的任務(wù)處理并發(fā)度,進(jìn)而影響到整個 job 的處理速度。那么,mapTask 并行任務(wù)是否越多越好呢?2)MapTask 并行度決定機(jī)制一個 job 的 map 階段MapTask 并行度(個數(shù)),由客戶端提交 job 時的切片個數(shù)決定。3)MapTask 工作機(jī)制(1)Read 階段:Map Task 通過用戶編寫的 RecordReader,從輸入 InputSplit 中出一個個 key/valu

34、e。(2)Map 階段:該節(jié)點主要是將出的 key/value 交給用戶編寫 map()函數(shù)處理,并產(chǎn)生一系列新的 key/value。( 3 ) Collect 階段:在用戶編寫 map() 函數(shù)中, 當(dāng)數(shù)據(jù)處理完成后, 一般會調(diào)用OutputCollector.collect()輸出結(jié)果。在該函數(shù)內(nèi)部,它會將生成的 key/value 分區(qū)(調(diào)用Partitioner),并寫入一個環(huán)形內(nèi)存緩沖區(qū)中。(4)Spill 階段:即“溢寫”,當(dāng)環(huán)形緩沖區(qū)滿后,MapReduce 會將數(shù)據(jù)寫到本地磁盤上,個臨時文件。需要注意的是,將數(shù)據(jù)寫入本地磁盤之前,先要對數(shù)據(jù)進(jìn)行一次本地排序,并在必要時對數(shù)據(jù)進(jìn)

35、行合并、壓縮等操作。溢寫階段詳情:步驟 1:利用快速排序算法對緩存區(qū)內(nèi)的數(shù)據(jù)進(jìn)行排序,排序方式是,先按照分區(qū)編號partition 進(jìn)行排序,然后按照 key 進(jìn)行排序。這樣,經(jīng)過排序后,數(shù)據(jù)以分區(qū)為在一起,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照 key 有序。步驟 2:按照分區(qū)編號由小到大依次將每個分區(qū)中的數(shù)據(jù)寫入任務(wù)工作目錄下的臨時文件 output/spillN.out(N 表示當(dāng)前溢寫次數(shù))中。如果用戶設(shè)置了 Combiner,則寫入文件之前,對每個分區(qū)中的數(shù)據(jù)進(jìn)行一次操作。步驟 3:將分區(qū)數(shù)據(jù)的元信息寫到內(nèi)存索引數(shù)據(jù)結(jié)構(gòu) SpillRecord 中,其中每個分區(qū)的元Java、HTML5、Andro

36、id、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官尚硅谷大數(shù)據(jù)技術(shù)之 Hadoop(MapReduce)信息在臨時文件中的偏移量、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小。如果當(dāng)期內(nèi)存索引大小超過 1MB,則將內(nèi)存索引寫到文件 output/spillN.out.index 中。(5)Combine 階段:當(dāng)所有數(shù)據(jù)處理完成后,MapTask 對所有臨時文件進(jìn)行一次合并,以確保最終只會個數(shù)據(jù)文件。當(dāng)所有數(shù)據(jù)處理完后,MapTask 會將所有臨時文件合并成一個大文件,并保存到文件output/file.out 中,同時生成相應(yīng)的索引文件 output/file.out.index。在進(jìn)行文件合并

37、過程中,MapTask 以分區(qū)為進(jìn)行合并。對于某個分區(qū),它將采用多輪遞歸合并的方式。每輪合并 io.sort.factor(默認(rèn) 100)個文件,并將產(chǎn)生的文件重新加入待合并列表中,對文件排序后,重復(fù)以上過程,直到最終得到一個大文件。讓每個 MapTask 最終只個數(shù)據(jù)文件,可避免同時打開大量文件和同時大量件產(chǎn)生的隨機(jī)帶來的開銷。2.4 Shuffle 機(jī)制2.4.1 Shuffle 機(jī)制Mapreduce 確保每個 reducer 的輸入都是按鍵排序的。系統(tǒng)執(zhí)行排序的過程(即將 map輸出作為輸入傳給 reducer)稱為 shuffle。2.4.2 MapReduce 工作流程1)流程示意

38、圖Java、HTML5、Android、python、大數(shù)據(jù)區(qū)】【網(wǎng)資料,可尚硅谷(中國)官尚硅谷大數(shù)據(jù)技術(shù)之 Hadoop(MapReduce)MapReduce詳細(xì)工作流程(一)ss.txt 0-1285 默認(rèn)TextInputFormat7 環(huán)形緩沖區(qū)默認(rèn)100MMap taskK,vInputFormatK,vreader()Mapper80%6 邏輯運(yùn)算map(K,v)Context.write(k,v)9 溢出到文件(分區(qū)且區(qū)內(nèi)有序)1 待處理文本/user/input ss.txt200m10 Merge 歸并排序outputCollector2 客戶端submit()前,獲取待

39、處理數(shù)據(jù)的信息,然后根據(jù)參數(shù)配置,形成一個任務(wù)分配的 。ss.txt 0-128ss.txt 128-200 3 提交切片信息partition0partition18 分區(qū)、排序11 合并ss.txt 128-200Merge 歸并排序 partition0partition1bb.txt4 計算出maptask數(shù)量客戶端YarnRMMapReduce詳細(xì)工作流程(二)Reducetask1reducer到reducetask本地磁盤1314 一次一組13 合并文件歸并排序Reduce(k,v)Context.write(kv)16 默認(rèn)TextOutputFormatOutPutForma

40、tPart-r-00000015 分組Write(k,v)默認(rèn)TextOutputFormatPart-r-00000112 所有maptask任務(wù)完成后,啟動相應(yīng)數(shù)量的reducetask,并告知reducetask處理數(shù)據(jù)范圍(數(shù)據(jù)分區(qū))Write(k,v)2)流程詳解上面的流程是整個 mapreduce 最全工作流程,但是 shuffle 過程只是從第 7 步開始到第16 步結(jié)束,具體 shuffle 過程詳解,如下:1)maptask 收集我們的 map()輸出的 kv 對,放到內(nèi)存緩沖區(qū)中2)從內(nèi)存緩沖區(qū)不斷溢出本地磁盤文件,可能會溢出多個文件3)多個溢出文件會被合并成大的溢出文件4

41、)在溢出過程中,及合并的過程中,都要調(diào)用 partitoner 進(jìn)行分組和key 進(jìn)行排序5)reducetask 根據(jù)的分區(qū)號,去各個 maptask上取相應(yīng)的結(jié)果分區(qū)數(shù)據(jù)6)reducetask 會取到同一個分區(qū)的來自不同 maptask 的結(jié)果文件,reducetask 會將這些Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官M(fèi)r appmasterRecordWritera 2b 1c 1d 1Reducetask2reducerReduce(k,v) Context.wr te(kv)OutPutFormatmaptask210 Merge

42、 歸并排序partition0partition1RecordWritera 2b 1c 1d 1GroupingComparator(k,knext)maptask110 Merge 歸并排序partition0partition1Mr appmasterNodeManagerJob.split wc.jar Job.xmlMap taskabcabspillerKpareTo排序Combiner合并HashPartitioner分區(qū) RecorderReader尚硅谷大數(shù)據(jù)技術(shù)之 Hadoop(MapReduce)文件再進(jìn)行合并(歸并排序)7)合并成大文件后,shuffle 的過程也就結(jié)束

43、了,后面進(jìn)入 reducetask 的邏輯運(yùn)算過程(從文件中取出一個一個的鍵值對 group,調(diào)用用戶自定義的 reduce())3)注意Shuffle 中的緩沖區(qū)大小會影響到 mapreduce 程序的執(zhí)行效率,原則上說,緩沖區(qū)越大,磁盤 io 的次數(shù)越少,執(zhí)行速度就越快。緩沖區(qū)的大小可以通過參數(shù)調(diào)整,參數(shù):io.sort.mb默認(rèn) 100M2.4.3 partition 分區(qū)0)引出:要求將統(tǒng)計結(jié)果按照條件輸出到不同文件中(分區(qū))。比如:將統(tǒng)計結(jié)果按照歸屬地不同省份輸出到不同文件中(分區(qū))1)默認(rèn) partition 分區(qū)默認(rèn)分區(qū)是根據(jù) key 的 hashCode 對 reduceTas

44、ks 個數(shù)取模得到的。用戶沒法哪個key到哪個分區(qū)2)自定義 Partitioner 步驟(1)自定義類繼承 Partitioner,重新getPartition()Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官public class ProvincePartitioner extends Partitioner Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) / 1 獲取號碼的前三位String preNum = key.toString(

45、).substring(0, 3);int partition = 4;/ 2是哪個省if (136.equals(preNum) partition = 0;else if (137.equals(preNum) partition = 1;else if (138.equals(preNum) public class HashPartitioner extends Partitioner /* Use link Object#hashCode() to partition. */public int getPartition(K key, V value, int numReduceTa

46、sks) return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;尚硅谷大數(shù)據(jù)技術(shù)之 Hadoop(MapReduce)(2)在 job 驅(qū)動中,設(shè)置自定義 partitioner:(3)自定義partition 后,要根據(jù)自定義 partitioner 的邏輯設(shè)置相應(yīng)數(shù)量的 reduce task3)注意:如果 reduceTask 的數(shù)量 getPartition 的結(jié)果數(shù), 則會多產(chǎn)生幾個空的輸出文件part-r-000xx;如果 1reduceTask 的數(shù)量getPartition 的結(jié)果數(shù),則有一部分分區(qū)數(shù)據(jù)無處安放

47、,會Exception;如果 reduceTask 的數(shù)量=1,則不管 mapTask 端輸出多少個分區(qū)文件,最終結(jié)果都交給這一個 reduceTask,最終也就只會產(chǎn)生一個結(jié)果文件 part-r-00000;例如:假設(shè)自定義分區(qū)數(shù)為 5,則(1)job.setNumReduceTasks(1);會正常運(yùn)行,只不過會產(chǎn)生一個輸出文件(2)job.setNumReduceTasks(2);會報錯(3)job.setNumReduceTasks(6);大于 5,正常運(yùn)行,會產(chǎn)生空文件4)案例詳見 3.2.2 需求 2:將統(tǒng)計結(jié)果按照歸屬地不同省份輸出到不同文件中(Partitioner)詳見 3.

48、1.2 需求 2:把單詞按照 ASCII 碼奇偶分區(qū)(Partitioner)2.4.4 排序排序是 MapReduce 框架中最重要的操作之一。Map Task 和 Reduce Task 均會對數(shù)據(jù)(按照 key)進(jìn)行排序。該操作屬于 Hadoop 的默認(rèn)行為。任何應(yīng)用的數(shù)據(jù)均會被排序,而不管邏輯上是否需要。對于 Map Task,它會將處理的結(jié)果暫時放到一個緩沖區(qū)中,當(dāng)緩沖區(qū)使用率達(dá)到一定閾值后,再對緩沖區(qū)中的數(shù)據(jù)進(jìn)行一次排序,并將這些有序數(shù)據(jù)寫到磁盤上,而當(dāng)數(shù)據(jù)處理Java、HTML5、Android、python、大數(shù)據(jù) 資料區(qū)】【網(wǎng),可尚硅谷(中國)官job.setNumReduc

49、eTasks(5);job.setPartitionerClass(CustomPartitioner.class)partition = 2;else if (139.equals(preNum) partition = 3;return partition;尚硅谷大數(shù)據(jù)技術(shù)之 Hadoop(MapReduce)完畢后,它會對磁盤上所有文件進(jìn)行一次合并,以將這些文件合并成一個大的有序文件。對于 Reduce Task,它從每個 Map Task 上拷貝相應(yīng)的數(shù)據(jù)文件,如果文件大小超過一定閾值,則放到磁盤上,否則放到內(nèi)存中。如果磁盤上文件數(shù)目達(dá)到一定閾值,則進(jìn)行一次合并以個更大文件;如果內(nèi)存中

50、文件大小或者數(shù)目超過一定閾值,則進(jìn)行一次合并后將數(shù)據(jù)寫到磁盤上。當(dāng)所有數(shù)據(jù)拷貝完畢后,Reduce Task 統(tǒng)一對內(nèi)存和磁盤上的所有數(shù)據(jù)進(jìn)行一次合并。每個階段的默認(rèn)排序1)排序的分類:(1)部分排序:MapReduce 根據(jù)輸入的鍵對數(shù)據(jù)集排序。保證輸出的每個文件內(nèi)部排序。(2)全排序:如何用 Hadoop 產(chǎn)生一個全局排序的文件?最簡單的是使用一個分區(qū)。但該在處理大型文件時效率極低, 因為一臺必須處理所有輸出文件, 從而完全喪失了MapReduce 所提供的并行架構(gòu)。替代方案:首先創(chuàng)建一系列排好序的文件;其次,串聯(lián)這些文件;最后,個全局排序的文件。主要思路是使用一個分區(qū)來描述輸出的全局排序。例如:可以為上述

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論