《Hadoop大數(shù)據(jù)原理與應(yīng)用實(shí)驗(yàn)教程》實(shí)驗(yàn)指導(dǎo)書-實(shí)驗(yàn)3MapReduce編程_第1頁
《Hadoop大數(shù)據(jù)原理與應(yīng)用實(shí)驗(yàn)教程》實(shí)驗(yàn)指導(dǎo)書-實(shí)驗(yàn)3MapReduce編程_第2頁
《Hadoop大數(shù)據(jù)原理與應(yīng)用實(shí)驗(yàn)教程》實(shí)驗(yàn)指導(dǎo)書-實(shí)驗(yàn)3MapReduce編程_第3頁
《Hadoop大數(shù)據(jù)原理與應(yīng)用實(shí)驗(yàn)教程》實(shí)驗(yàn)指導(dǎo)書-實(shí)驗(yàn)3MapReduce編程_第4頁
《Hadoop大數(shù)據(jù)原理與應(yīng)用實(shí)驗(yàn)教程》實(shí)驗(yàn)指導(dǎo)書-實(shí)驗(yàn)3MapReduce編程_第5頁
已閱讀5頁,還剩35頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

《Hadoop大數(shù)據(jù)原理與應(yīng)用實(shí)驗(yàn)教程》配套實(shí)驗(yàn)指導(dǎo)書實(shí)驗(yàn)3MapReduce編程編寫者:國(guó)信藍(lán)橋-顏群實(shí)驗(yàn)3MapReduce編程本實(shí)驗(yàn)的知識(shí)地圖如圖3-1所示(表示重點(diǎn)表示難點(diǎn))。圖3-1實(shí)驗(yàn)3MapReduce編程知識(shí)地圖一、實(shí)驗(yàn)?zāi)康?.理解MapReduce編程思想。2.理解MapReduce作業(yè)執(zhí)行流程。3.理解MR-App編寫步驟,掌握使用MapReduceJavaAPI進(jìn)行MapReduce基本編程,熟練掌握如何在Hadoop集群上運(yùn)行MR-App并查看運(yùn)行結(jié)果。4.熟練掌握MapReduceWeb界面的使用。5.掌握MapReduceShell常用命令的使用。二、實(shí)驗(yàn)環(huán)境本實(shí)驗(yàn)所需的軟件環(huán)境包括全分布模式Hadoop集群、Eclipse。三、實(shí)驗(yàn)內(nèi)容1.啟動(dòng)全分布模式Hadoop集群,守護(hù)進(jìn)程包括NameNode、DataNode、SecondaryNameNode、ResourceManager、NodeManager和JobHistoryServer。2.在Hadoop集群主節(jié)點(diǎn)上搭建MapReduce開發(fā)環(huán)境Eclipse。3.查看Hadoop自帶的MR-App單詞計(jì)數(shù)源代碼WordCount.java,在Eclipse項(xiàng)目MapReduceExample下建立新包c(diǎn)om.xijing.mapreduce,模仿內(nèi)置的WordCount示例,自己編寫一個(gè)WordCount程序,最后打包成JAR形式并在Hadoop集群上運(yùn)行該MR-App,查看運(yùn)行結(jié)果。4分別在自編MapReduce程序WordCount運(yùn)行過程中和運(yùn)行結(jié)束后查看MapReduceWeb界面。5.分別在自編MapReduce程序WordCount運(yùn)行過程中和運(yùn)行結(jié)束后練習(xí)MapReduceShell常用命令。6.關(guān)閉Hadoop集群。四、實(shí)驗(yàn)原理(一)MapReduce編程思想MapReduce是Hadoop生態(tài)中的一款分布式計(jì)算框架,它可以讓不熟悉分布式計(jì)算的人員也能編寫出優(yōu)秀的分布式系統(tǒng),因此可以讓開發(fā)人員將精力專注到業(yè)務(wù)邏輯本身。MapReduce采用“分而治之”的核心思想,可以先將一個(gè)大型任務(wù)拆分成若干個(gè)簡(jiǎn)單的子任務(wù),然后將每個(gè)子任務(wù)交給一個(gè)獨(dú)立的節(jié)點(diǎn)去處理。當(dāng)所有節(jié)點(diǎn)的子任務(wù)都處理完畢后,再匯總所有子任務(wù)的處理結(jié)果,從而形成最終的結(jié)果。以“單詞統(tǒng)計(jì)”為例,如果要統(tǒng)計(jì)一個(gè)擁有海量單詞的詞庫,就可以先將整個(gè)詞庫拆分成若干個(gè)小詞庫,然后將各個(gè)小詞庫發(fā)送給不同的節(jié)點(diǎn)去計(jì)算,當(dāng)所有節(jié)點(diǎn)將分配給自己的小詞庫中的單詞統(tǒng)計(jì)完畢后,再將各個(gè)節(jié)點(diǎn)的統(tǒng)計(jì)結(jié)果進(jìn)行匯總,形成最終的統(tǒng)計(jì)結(jié)果。以上,“拆分”任務(wù)的過程稱為Map階段,“匯總”任務(wù)的過程稱為Reduce階段,如圖3-2所示。節(jié)點(diǎn)節(jié)點(diǎn)3海量詞庫小詞庫小詞庫小詞庫統(tǒng)計(jì)部分單詞統(tǒng)計(jì)全部單詞Map階段Reduce階段節(jié)點(diǎn)1節(jié)點(diǎn)2統(tǒng)計(jì)部分單詞節(jié)點(diǎn)4節(jié)點(diǎn)5圖3-2MapReduce執(zhí)行流程MapReduce在發(fā)展史上經(jīng)過一次重大改變,舊版MapReduce(MapReduce1.0)采用的是典型的Master/Slave結(jié)構(gòu),Master表現(xiàn)為JobTracker進(jìn)程,而Slave表現(xiàn)為TaskTracker,MapReduce1.0體系架構(gòu)如圖3-3所示。但是這種架構(gòu)過于簡(jiǎn)單,例如Master的任務(wù)過于集中,并且存在單點(diǎn)故障等問題。因此,MapReduce進(jìn)行了一次重要的升級(jí),舍棄JobTracker和TaskTracker,而改用了ResourceManager進(jìn)程負(fù)責(zé)處理資源,并且使用ApplicationMaster進(jìn)程管理各個(gè)具體的應(yīng)用,用NodeManager進(jìn)程對(duì)各個(gè)節(jié)點(diǎn)的工作情況進(jìn)行監(jiān)聽。升級(jí)后的MapReduce稱為MapReduce2.0,MapReduce2.0體系架構(gòu)如圖3-4所示。JobTrackerJobTrackerTaskTrackerClientClientTaskSchedulerMapTaskMapTaskReduceTaskTaskTrackerMapTaskMapTaskReduceTaskTaskTrackerMapTaskMapTaskReduceTask圖3-3MapReduce1.0體系架構(gòu)ResourceManagerResourceManagerNameNodeNodeManagerApplicationMasterDataNodeNodeManagerApplicationMasterDataNodeNodeManagerContainerDataNodeContainerNodeManagerContainerDataNodeNodeManagerContainerDataNodeNodeManagerContainerDataNodeClientClient圖3-4MapReduce2.0執(zhí)行作業(yè)時(shí)體系架構(gòu)(二)MapReduce作業(yè)執(zhí)行流程MapReduce作業(yè)的執(zhí)行流程主要包括InputFormat、Map、Shuffle、Reduce、OutputFormat五個(gè)階段,MapReduce作業(yè)執(zhí)行流程如圖3-5所示。最終結(jié)果<key,value>中間結(jié)果<key,List(value)>中間結(jié)果<key,value>最終結(jié)果<key,value>中間結(jié)果<key,List(value)>中間結(jié)果<key,value>輸入<key,value>加載文件最終結(jié)果<key,value>中間結(jié)果<key,List(value)>中間結(jié)果<key,value>輸入<key,value>寫入文件分布式文件系統(tǒng)(如HDFS)InputFormatSplitSplitSplitRRRRRRMapMapMapShuffleReduceOutputFormat節(jié)點(diǎn)1加載文件寫入文件InputFormatSplitSplitSplitRRRRRRMapMapMapShuffleReduceOutputFormat節(jié)點(diǎn)2分布式文件系統(tǒng)(如HDFS)關(guān)于MapReduce作業(yè)各個(gè)執(zhí)行階段的詳細(xì)說明,具體如下所示。(1)InputFormatInputFormat模塊首先對(duì)輸入數(shù)據(jù)做預(yù)處理,比如驗(yàn)證輸入格式是否符合輸入定義;然后將輸入文件切分為邏輯上的多個(gè)InputSplit,InputSplit是MapReduce對(duì)文件進(jìn)行處理和運(yùn)算的輸入單位,并沒有對(duì)文件進(jìn)行實(shí)際切割;由于InputSplit是邏輯切分而非物理切分,所以還需要通過RecordReader(圖4-4中的RR)根據(jù)InputSplit中的信息來處理InputSplit中的具體記錄,加載數(shù)據(jù)并轉(zhuǎn)換為適合Map任務(wù)讀取的鍵值對(duì)<key,valule>,輸入給Map任務(wù)。(2)MapMap模塊會(huì)根據(jù)用戶自定義的映射規(guī)則,輸出一系列的<key,value>作為中間結(jié)果。(3)Shuffle為了讓Reduce可以并行處理Map的結(jié)果,需要對(duì)Map的輸出進(jìn)行一定的排序、分區(qū)、合并、歸并等操作,得到<key,List(value)>形式的中間結(jié)果,再交給對(duì)應(yīng)的Reduce進(jìn)行處理,這個(gè)過程叫做Shuffle。(4)ReduceReduce以一系列的<key,List(value)>中間結(jié)果作為輸入,執(zhí)行用戶定義的邏輯,輸出<key,valule>形式的結(jié)果給OutputFormat。(5)OutputFormatOutputFormat模塊會(huì)驗(yàn)證輸出目錄是否已經(jīng)存在以及輸出結(jié)果類型是否符合配置文件中的配置類型,如果都滿足,就輸出Reduce的結(jié)果到分布式文件系統(tǒng)。(三)MapReduceWebUIMapReduceWebUI接口面向管理員??梢栽陧撁嫔峡吹揭呀?jīng)完成的所有MR-App執(zhí)行過程中的統(tǒng)計(jì)信息,該頁面只支持讀,不支持寫。MapReduceWebUI的默認(rèn)地址為http://JobHistoryServerIP:19888,可以查看MapReduce的歷史運(yùn)行情況,如圖3-6所示。圖3-6MapReduce歷史情況(四)MapReduceShellMapReduceShell接口面向MapReduce程序員。程序員通過Shell接口能夠向YARN集群提交MR-App,查看正在運(yùn)行的MR-App,甚至可以終止正在運(yùn)行的MR-App。MapReduceShell命令統(tǒng)一入口為:mapred,語法格式如下:mapred[--configconfdir][--loglevelloglevel]COMMAND讀者需要注意的是,若$HADOOP_HOME/bin未加入到系統(tǒng)環(huán)境變量PATH中,則需要切換到Hadoop安裝目錄下,輸入“bin/mapred”。讀者可以使用“mapred-help”查看其幫助,命令“mapred”的具體用法和參數(shù)說明如圖3-7所示。圖3-7命令“mapred”用法MapReduceShell命令分為用戶命令和管理員命令。本章僅介紹部分命令,關(guān)于MapReduceShell命令的完整說明,讀者請(qǐng)參考官方網(wǎng)站/docs/r2.9.2/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapredCommands.html。1.用戶命令MapReduceShell用戶命令如表3-1所示。表3-1MapReduceShell用戶命令命令選項(xiàng)功能描述archive創(chuàng)建一個(gè)Hadoop檔案文件archive-logs將聚合日志合并到Hadoop檔案文件中classpath打印運(yùn)行MapReduce子命令所需的包路徑distcp遞歸拷貝文件或目錄job管理MapReduce作業(yè)pipes運(yùn)行Pipes任務(wù),此功能允許用戶使用C++語言編寫MapReduce程序queue查看JobQueue信息2.管理員命令MapReduceShell管理員命令如表3-2所示。表3-2MapReduceShell用戶命令命令選項(xiàng)功能描述historyserver啟動(dòng)JobHistoryServer服務(wù)hsadminJobHistoryServer管理命令接口其中,命令“mapredhistoryserver”與啟動(dòng)MapReduce的命令“mr-jobhistory-daemon.shstarthistoryserver”效果相同。讀者請(qǐng)注意,一般不建議使用命令start-all.sh啟動(dòng)HDFS和YARN,而是建議使用start-dfs.sh和start-yarn.sh命令來分別啟動(dòng)。另外,對(duì)于一般計(jì)算機(jī)而言,在執(zhí)行start-dfs.sh和start-yarn.sh命令之后最好等待一會(huì)兒再操作各種MapReduce命令,防止因?yàn)榫€程未加載完畢而導(dǎo)致的各種初始化問題。在MapReduce程序運(yùn)行一段時(shí)間后,可能由于各種故障造成HDFS的數(shù)據(jù)在各個(gè)DataNode中的分布不均勻的情況,此時(shí)也只需要通過以下shell命令即可重新分布HDFS集群上的各個(gè)DataNode。$HADOOP_HOME/bin/start-balancer.sh此外,在啟動(dòng)時(shí)可以通過日志看到“Namenodeinsafemode”提示,這表示系統(tǒng)正在處于安全模式,此時(shí)只需要等待一會(huì)即可(通常是十幾秒)。如果硬件資源較差,也可以通過執(zhí)行以下命令直接退出安全模式。$HADOOP_HOME/bin/hadoopdfsadmin-safemodeleave(五)MapReduceJavaAPIMapReduceJavaAPI接口面向Java開發(fā)工程師。程序員可以通過該接口編寫MR-App用戶層代碼MRApplicationBusinessLogic?;赮ARN編寫的MR-App和基于MapReduce1.0編寫的MR-App編程步驟相同。MR-App稱為MapReduce應(yīng)用程序,標(biāo)準(zhǔn)YARN-App包含3部分:MRv2框架中的MRAppMaster、MRClient,加上用戶編寫的MRApplicationBusinessLogic(Mapper類和Reduce類),合稱為MR-App。MR-App編寫步驟如下所示:(1)編寫MRApplicationBusinessLogic。自行編寫。(2)編寫MRApplicationMaster。無需編寫,Hadoop開發(fā)人員已編寫好MRAppMaster.java。(3)編寫MRApplicationClient。無需編寫,Hadoop開發(fā)人員已編寫好YARNRunner.java。其中,MRApplicationBusinessLogic編寫步驟如下:(1)確定<key,value>對(duì)。(2)定制輸入格式。(3)Mapper階段,其業(yè)務(wù)代碼需要繼承自org.apache.hadoop.mapreduce.Mapper類。(4)Reducer階段,其業(yè)務(wù)代碼需要繼承自org.apache.hadoop.mapreduce.Reducer類。(5)定制輸出格式。編寫類后,在main方法里,按下述過程依次指向各類即可:(1)實(shí)例化配置文件類。(2)實(shí)例化Job類。(3)指向InputFormat類。(4)指向Mapper類。(5)指向Partitioner類。(6)指向Reducer類。(7)指向OutputFormat類。(8)提交任務(wù)。實(shí)際開發(fā)中,MapReduceJavaAPI最常用的類是org.apache.hadoop.mapreduce.Mapper和org.apache.hadoop.mapreduce.Reducer。常用的MapReduceJava類如表3-3所示。表3-3MapReduceJavaAPI常用類類名說明org.apache.hadoop.mapreduce.JobMapReduce作業(yè)類org.apache.hadoop.mapreduce.MapperMapper類,泛型類,帶有4個(gè)參數(shù),分別表示Map階段輸入數(shù)據(jù)的key類型、輸入數(shù)據(jù)的value類型、輸出數(shù)據(jù)的key類型、輸出數(shù)據(jù)的value類型。其中,輸入的key為Object(默認(rèn)是行),輸入的值為Text(Hadoop中的String類型),輸出的key為Text(關(guān)鍵字),輸出的值為IntWritable(Hadoop中的int類型)org.apache.hadoop.mapreduce.ReducerReducer類,泛型類,帶有4個(gè)參數(shù),分別表示Reduce階段輸入數(shù)據(jù)的key類型、value類型,輸出數(shù)據(jù)的key類型、value類型org.apache.hadoop.mapreduce.InputFormatMapReduce接收輸入數(shù)據(jù)的頂級(jí)類org.apache.hadoop.mapreduce.OutputFormatMapReduce接收輸出數(shù)據(jù)的頂級(jí)類關(guān)于MapReduceAPI的完整說明,讀者請(qǐng)參考官方網(wǎng)站/docs/r2.9.2/api/index.html。五、實(shí)驗(yàn)步驟(一)啟動(dòng)Hadoop集群在主節(jié)點(diǎn)上依次執(zhí)行以下3條命令啟動(dòng)全分布模式Hadoop集群。start-dfs.shstart-yarn.shmr-jobhistory-daemon.shstarthistoryserverstart-dfs.sh命令會(huì)在主節(jié)點(diǎn)上啟動(dòng)NameNode和SecondaryNameNode服務(wù),會(huì)在從節(jié)點(diǎn)上啟動(dòng)DataNode服務(wù);start-yarn.sh命令會(huì)在主節(jié)點(diǎn)上啟動(dòng)ResourceManager服務(wù),會(huì)在從節(jié)點(diǎn)上啟動(dòng)NodeManager服務(wù);mr-jobhistory-daemon.sh命令會(huì)在主節(jié)點(diǎn)上啟動(dòng)JobHistoryServer服務(wù)。(二)搭建MapReduce開發(fā)環(huán)境Eclipse在Hadoop集群主節(jié)點(diǎn)上搭建MapReduce開發(fā)環(huán)境Eclipse,具體過程請(qǐng)讀者參考實(shí)驗(yàn)項(xiàng)目2,此處不再贅述。(三)編寫并運(yùn)行MapReduce程序WordCount查看Hadoop自帶的MR-App單詞計(jì)數(shù)源代碼WordCount.java,在Eclipse項(xiàng)目MapReduceExample下建立新包c(diǎn)om.xijing.mapreduce,模仿內(nèi)置的WordCount示例,自己編寫一個(gè)WordCount程序,最后打包成JAR形式并在Hadoop集群上運(yùn)行該MR-App,查看運(yùn)行結(jié)果。具體過程如下所示。1.查看示例WordCount從$HADOOP_HOME/share/hadoop/mapreduce/sources/hadoop-mapreduce-examples-2.9.2-sources.jar中找到單詞計(jì)數(shù)源代碼文件WordCount.java,打開并查看源代碼,完整的源代碼如下所示。packageorg.apache.hadoop.examples;importjava.io.IOException;importjava.util.StringTokenizer;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;packageorg.apache.hadoop.examples;importjava.io.IOException;importjava.util.StringTokenizer;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;publicclassWordCount{ publicstaticclassTokenizerMapperextendsMapper<Object,Text,Text,IntWritable>{ privatefinalstaticIntWritableone=newIntWritable(1); privateTextword=newText(); publicvoidmap(Objectkey,Textvalue,Contextcontext)throwsIOException,InterruptedException{ StringTokenizeritr=newStringTokenizer(value.toString()); while(itr.hasMoreTokens()){ word.set(itr.nextToken()); context.write(word,one); } } } publicstaticclassIntSumReducerextendsReducer<Text,IntWritable,Text,IntWritable>{ privateIntWritableresult=newIntWritable(); publicvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{ intsum=0; for(IntWritableval:values){ sum+=val.get(); } result.set(sum); context.write(key,result); } } publicstaticvoidmain(String[]args)throwsException{ Configurationconf=newConfiguration(); String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length<2){ System.err.println("Usage:wordcount<in>[<in>...]<out>"); System.exit(2); } Jobjob=Job.getInstance(conf,"wordcount"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for(inti=0;i<otherArgs.length-1;++i){ FileInputFormat.addInputPath(job,newPath(otherArgs[i])); } FileOutputFormat.setOutputPath(job,newPath(otherArgs[otherArgs.length-1])); System.exit(job.waitForCompletion(true)?0:1); }}2.在Eclipse中創(chuàng)建Java項(xiàng)目進(jìn)入/usr/local/eclipse中通過可視化桌面打開EclipseIDE,默認(rèn)的工作空間為“/home/xuluhui/eclipse-workspace”。選擇菜單『File』→『New』→『JavaProject』,創(chuàng)建Java項(xiàng)目“MapReduceExample”,如圖3-8所示。本書中關(guān)于MapReduce編程實(shí)例均存放在此項(xiàng)目下。圖3-8創(chuàng)建Java項(xiàng)目“MapReduceExample”3.在項(xiàng)目中導(dǎo)入所需JAR包為了編寫關(guān)于MapReduce應(yīng)用程序,需要向Java工程中添加MapReduce核心包hadoop-mapreduce-client-core-2.9.2.jar,該包中包含了可以訪問MapReduce的JavaAPI,位于$HADOOP_HOME/share/hadoop/mapreduce下。另外,由于還需要對(duì)HDFS文件進(jìn)行操作,所以還需要導(dǎo)入JAR包hadoop-common-2.9.2.jar,該包位于$HADOOP_HOME/share/hadoop/common下。若不導(dǎo)入這兩個(gè)JAR包,代碼將會(huì)出現(xiàn)錯(cuò)誤。讀者可以按以下步驟添加該應(yīng)用程序編寫時(shí)所需的JAR包。(1)右鍵單擊Java項(xiàng)目“MapReduceExample”,從彈出的菜單中選擇『BuildPath』→『ConfigureBuildPath…』,如圖3-9所示。圖3-9進(jìn)入“MapReduceExample”項(xiàng)目“JavaBuildPath”(2)進(jìn)入窗口【PropertiesforMapReduceExample】,可以看到添加JAR包的主界面,如圖3-10所示。圖3-10添加JAR包主界面(3)單擊圖中的按鈕AddExternalJARS,依次添加jar文件$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.9.2.jar和$HADOOP_HOME/share/hadoop/common/hadoop-common-2.9.2.jar。其中添加JAR包hadoop-mapreduce-client-core-2.9.2.jar的過程如圖3-11所示,找到此JAR包后選中并單擊右上角的OK按鈕,這樣就成功把mapreduce-client-core-2.9.2.jar增加到了當(dāng)前Java項(xiàng)目中。添加hadoop-common-2.9.2.jar的過程同此,不再贅述。圖3-11添加hadoop-mapreduce-client-core-2.9.2.jar到Java項(xiàng)目中(4)完成JAR包添加后的界面如圖3-12所示,單擊按鈕ApplyandClose。圖3-12完成JAR包添加后的界面(5)自動(dòng)返回到Eclipse界面,如圖3-13所示,從圖中可以看到,項(xiàng)目“MapReduceExample”目錄樹下多了“ReferencedLibraries”,內(nèi)部有以上步驟添加進(jìn)來的兩個(gè)JAR包。圖3-13添加JAR包后“MapReduceExample”項(xiàng)目目錄樹變化4.在項(xiàng)目中新建包右鍵單擊項(xiàng)目“MapReduceExample”,從彈出的快捷菜單中選擇『New』→『Package』,創(chuàng)建包“com.xijing.mapreduce”,如圖3-14所示。圖3-14創(chuàng)建包“com.xijing.mapreduce”5.自編MapReduce程序WordCount下面模仿示例WordCount自編一個(gè)WordCount應(yīng)用程序,借助MapReduceAPI,實(shí)現(xiàn)對(duì)輸入文件單詞頻次的統(tǒng)計(jì)。1)編寫Mapper類(1)右鍵單擊Java項(xiàng)目“MapReduceExample”中目錄“src”下的包“com.xijing.mapreduce”,從彈出的菜單中選擇『New』→『Class』,如圖3-15所示。圖3-15進(jìn)入“com.xijing.mapreduce”包的新建類窗口(2)進(jìn)入窗口【NewJavaClass】??梢钥闯?,由于上步在包“com.xijing.mapreduce”下新建類,故此處不需要選擇該類所屬包;輸入新建類的名字,例如“WordCountMapper”,之所以這樣命名,是本類要實(shí)現(xiàn)Map階段業(yè)務(wù)邏輯,建議讀者命名時(shí)也要做到見名知意;讀者還可以選擇是否創(chuàng)建main函數(shù)。本實(shí)驗(yàn)中新建類“WordCountMapper”的具體輸入和選擇如圖3-16所示。完成后單擊Finish按鈕。圖3-16新建類“WordCountMapper”(3)編寫Mapper類,自編WordCount程序Mapper類的源碼如下所示。packagecom.xijing.mapreduce;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;publicclassWordCountMapperextendsMapper<LongWritable,Text,Text,IntWritable>{ //自定義map方法 @Override protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{ Stringline=value.toString(); String[]words=line.split(""); for(Stringword:words){ //context.write()將數(shù)據(jù)交給下一階段處理shuffle context.write(newText(word),newIntWritable(1)); } }}2)編寫Reducer類在包“com.xijing.mapreduce”下新建類“WordCountReducer”,方法同上文“WordCountMapper”類。自編WordCount程序Reducer類的源碼如下所示。packagecom.xijing.mapreduce;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importjava.io.IOException;publicclassWordCountReducerextendsReducer<Text,IntWritable,Text,IntWritable>{ //自定義reduce方法 @Override protectedvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{ intsum=0; for(IntWritablevalue:values) sum+=value.get(); context.write(key,newIntWritable(sum)); }}3)編寫入口Driver類Mapper類和Reducer類編寫完畢后,再通過Driver類將本次Job進(jìn)行設(shè)置。在包“com.xijing.mapreduce”下新建類“WordCountDriver”,方法同上文“WordCountMapper”,入口Driver類的源碼如下所示。packagecom.xijing.mapreduce;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importpress.BZip2Codec;importpress.CompressionCodec;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importjava.io.IOException;publicclassWordCountDriver{ //args:輸入文件路徑和輸出文件路徑 publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{ Configurationconf=newConfiguration(); //開啟map階段的壓縮 conf.setBoolean("press",true); //指定壓縮類型 conf.setClass("press.codec",BZip2Codec.class,CompressionCodec.class); Jobjob=Job.getInstance(conf,"wordcountdiy"); job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); //使用了自定義Combine job.setCombinerClass(WordCountReducer.class); job.setReducerClass(WordCountReducer.class); //指定map輸出數(shù)據(jù)的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定reduce輸出數(shù)據(jù)的類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //設(shè)置輸入文件路徑 FileInputFormat.setInputPaths(job,newPath(args[0])); //設(shè)置輸出文件路徑 FileOutputFormat.setOutputPath(job,newPath(args[1])); //開啟reduce階段的解壓縮 FileOutputFormat.setCompressOutput(job,true); //指定解壓縮類型(需要與壓縮類型保持一致) FileOutputFormat.setOutputCompressorClass(job,BZip2Codec.class); booleanresult=job.waitForCompletion(true); System.exit(result?0:1); }}6.將MapReduce程序打包成JAR包為了運(yùn)行寫好的MapReduce程序,需要首先將程序打包成JAR包??梢允褂肕aven或者Eclipse打JAR包,下面以Eclipse為例進(jìn)行介紹。(1)右鍵單擊項(xiàng)目“MapReduceExample”,從彈出的快捷菜單中選擇『Export...』,如圖3-17所示。圖3-17進(jìn)入“MapReduceExample”項(xiàng)目Export窗口(2)進(jìn)入窗口【Export】,選擇Java→JARfile,單擊按鈕Next>,如圖3-18所示。圖3-18在窗口【Export】中選擇Java→JARfile(3)進(jìn)入窗口【JARExport】,單擊按鈕Browse…選擇JAR包的導(dǎo)出位置和文件名,此處編者將其保存在/home/xuluhui/eclipse-workspace/MapReduceExample下,命名為WordCountDIY.jar,效果如圖3-19所示。圖3-19選擇打包文件及JAR包存放路徑及名字7.提交JAR包到Hadoop中運(yùn)行與運(yùn)行hadoop-mapreduce-examples-2.9.2.jar中的wordcount程序一樣,只需要執(zhí)行以下命令,就能在Hadoop集群中成功運(yùn)行自己編寫的MapReduce程序了,命令如下所示。hadoopjar/home/xuluhui/eclipse-workspace/MapReduceExample/WordCountDIY.jarcom.xijing.mapreduce.WordCountDriver/InputDataTest/OutputDataTest4上述命令中,/InputDataTest表示輸入目錄,/OutputDataTest4表示輸出目錄。執(zhí)行該命令前,假設(shè)HDFS的目錄/InputDataTest下已存在待分析詞頻的3個(gè)文件,而輸出目錄/OutputDataTest4不存在,在執(zhí)行過程中會(huì)自動(dòng)創(chuàng)建。部分執(zhí)行過程如圖3-20所示。圖3-20向Hadoop集群提交并運(yùn)行自編WordCount的執(zhí)行過程(部分)8.查看運(yùn)行結(jié)果如圖3-21所示,上述程序執(zhí)行完畢后,會(huì)將結(jié)果輸出到/OutputDataTest4目錄中,可以使用命令“hdfsdfs-ls/OutputDataTest4”來查看。圖3-20中/OutputDataTest4目錄下有2個(gè)文件,其中/OutputDataTest4/_SUCCESS表示Hadoop程序已執(zhí)行成功,這個(gè)文件大小為0,文件名就告知了Hadoop程序的執(zhí)行狀態(tài);第二個(gè)文件/OutputDataTest4/part-r-00000.bz2才是Hadoop程序的運(yùn)行結(jié)果。由于輸出結(jié)果進(jìn)行了壓縮,所以無法使用命令“hdfsdfs-cat/OutputDataTest4/part-r-00000.bz2”直接查看Hadoop程序的運(yùn)行結(jié)果,查看效果如圖3-21所示。圖3-21無法使用-cat選項(xiàng)直接查看輸出文件為.bz2的結(jié)果若想查看輸出文件擴(kuò)展名為.bz2的文件,讀者可以首先使用命令“hdfsdfs-get”將HDFS上的文件/OutputDataTest4/part-r-00000.bz2下載到本地操作系統(tǒng),然后使用命令“bzcat”查看.bz2文件的結(jié)果,使用命令及運(yùn)行結(jié)果如圖3-22所示。圖3-22下載.bz2文件到本地并使用bzcat查看運(yùn)行結(jié)果(四)練習(xí)使用MapReduceShell命令分別在自編MapReduce程序WordCount運(yùn)行過程中和運(yùn)行結(jié)束后練習(xí)MapReduceShell常用命令。例如,使用如下命令查看MapReduce作業(yè)的狀態(tài)信息。mapredjob-status<job-id>如圖3-23所示,當(dāng)前MapReduce作業(yè)“job_1568702465801_0002”正處于運(yùn)行(RUNNING)狀態(tài)。圖3-23通過命令“mapredjob-status”查看該MapReduce作業(yè)狀態(tài)(五)練習(xí)使用MapReduceWeb界面分別在自編MapReduce程序WordCount運(yùn)行過程中和運(yùn)行結(jié)束后查看MapReduceWeb界面。例如,如圖3-24所示,當(dāng)前MapReduce作業(yè)“job_1568702465801_0002”已運(yùn)行結(jié)束,其State為成功(SUCCEEDED)狀態(tài)。圖3-24通過MapReduceWeb查看該MapReduce作業(yè)信息(六)關(guān)閉Hadoop集群關(guān)閉全分布模式Hadoop集群的命令與啟動(dòng)命令次序相反,只需在主節(jié)點(diǎn)master上依次執(zhí)行以下3條命令即可關(guān)閉Hadoop。mr-jobhistory-daemon.shstophistoryserverstop-yarn.shstop-dfs.sh執(zhí)行mr-jobhistory-daemon.shstophistoryserver時(shí),其*historyserver.pid文件消失;執(zhí)行stop-yarn.sh時(shí),*resourcemanager.pid和*nodemanager.pid文件依次消失;stop-dfs.sh,*namenode.pid、*datanode.pid、*secondarynamenode.pid文件依次消失。六、實(shí)驗(yàn)報(bào)告要求實(shí)驗(yàn)報(bào)告以電子版和打印版雙重形式提交。實(shí)驗(yàn)報(bào)告主要內(nèi)容包括實(shí)驗(yàn)名稱、實(shí)驗(yàn)類型、實(shí)驗(yàn)地點(diǎn)、學(xué)時(shí)、實(shí)驗(yàn)環(huán)境、實(shí)驗(yàn)原理、實(shí)驗(yàn)步驟、實(shí)驗(yàn)結(jié)果、總結(jié)與思考等。實(shí)驗(yàn)報(bào)告格式如表1-9所示。七、拓展訓(xùn)練(一)在Windows平臺(tái)上開發(fā)MapReduce程序在學(xué)習(xí)階段,我們也可以直接在Windows平臺(tái)上開發(fā)并運(yùn)行MapReduce程序。【案例3-1】在Windows平臺(tái)上開發(fā)并運(yùn)行MapReduce程序。具體實(shí)現(xiàn)過程如下所示。(1)將編譯后的Windows版本的Hadoop解壓到本地,并將解壓后的路徑設(shè)置為環(huán)境變量,如圖3-25所示。圖3-25配置HADOOP_HOME系統(tǒng)變量(2)將Hadoop中可執(zhí)行命令的目錄\bin和\sbin添加到環(huán)境變量PATH中,如圖3-26所示。圖3-26配置Hadoop環(huán)境變量(3)將剛剛解壓后的MapReduce中的相關(guān)jar文件引入工程,或者使用Maven引入需要的JAR包,pom.xml如下所示。<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="/POM/4.0.0"xmlns:xsi="/2001/XMLSchema-instance"xsi:schemaLocation="/POM/4.0.0/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>test</groupId><artifactId>mp</artifactId><version>1.0-SNAPSHOT</version><!--統(tǒng)一Hadoop版本號(hào)--><properties><hadoop.version>2.9.2</hadoop.version></properties><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-jobclient</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>commons-cli</groupId><artifactId>commons-cli</artifactId><version>1.3.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency></dependencies></project>(4)為了在運(yùn)行時(shí)可以在Eclipse控制臺(tái)觀察到MapReduce的運(yùn)行時(shí)日志,可以在項(xiàng)目中引入Log4j,并將perties存放在項(xiàng)目的CLASSPATH下,perties的內(nèi)容如下所示。log4j.rootLogger=DEBUG,stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%5p[%t]-%m%n(5)在運(yùn)行時(shí),由于權(quán)限限制,還需要通過運(yùn)行參數(shù)設(shè)置訪問Hadoop的用戶是master。具體方法是首先在Eclipse中單擊右鍵,從彈出快捷菜單中選擇『RunAs』→『RunConfigurations...』,如圖3-27所示。圖3-27進(jìn)入配置運(yùn)行參數(shù)窗口然后,在虛擬機(jī)參數(shù)中,通過語句“-DHADOOP_USER_NAME=master”指定執(zhí)行的用戶是master,如圖3-28所示。圖3-28設(shè)置VM參數(shù)(6)此時(shí),便可以在本地通過main()方法直接運(yùn)行MapReduce程序了。(二)MapReduce編程實(shí)踐:使用MapReduce統(tǒng)計(jì)對(duì)象中的某些屬性之前使用MapReduce統(tǒng)計(jì)的是單詞數(shù)量,而單詞本身屬于字面值,是比較容易計(jì)算的。本案例將會(huì)講解如何使用MapReduce統(tǒng)計(jì)對(duì)象中的某些屬性?!景咐?-2】以下是某個(gè)超市的結(jié)算記錄,從左往右各字段的含義依次是會(huì)員編號(hào)、結(jié)算時(shí)間、消費(fèi)金額和用戶身份,請(qǐng)計(jì)算會(huì)員和非會(huì)員的平均消費(fèi)金額。242315 2019-10-15.18:20:10 32 會(huì)員984518 2019-10-15.18:21:02 167 會(huì)員226335 2019-10-15.18:21:54 233 非會(huì)員341665 2019-10-15.18:22:11 5 非會(huì)員273367 2019-10-15.18:23:07 361 非會(huì)員296223 2019-10-15.18:25:12 19 會(huì)員193363 2019-10-15.18:25:55 268 會(huì)員671512 2019-10-15.18:26:04 76 非會(huì)員596233 2019-10-15.18:27:42 82 非會(huì)員323444 2019-10-15.18:28:02 219 會(huì)員345672 2019-10-15.18:28:48 482 會(huì)員...本案例的實(shí)現(xiàn)思路是:先計(jì)算會(huì)員和非會(huì)員的總消費(fèi)金額,然后除以會(huì)員或非會(huì)員的數(shù)量。具體實(shí)現(xiàn)過程如下所示。(1)編寫實(shí)體類編寫封裝每個(gè)消費(fèi)者記錄的實(shí)體類,每個(gè)消費(fèi)者至少包含了編號(hào)、消費(fèi)金額和是否為會(huì)員等屬性,源代碼如下所示。packagecom.xijing.mapreduce;importorg.apache.hadoop.io.Writable;importjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;publicclassCustomerimplementsWritable{ //會(huì)員編號(hào) privateStringid; //消費(fèi)金額 privateintmoney; //0:非會(huì)員1:會(huì)員 privateintvip; publicCustomer(){ } publicCustomer(Stringid,intmoney,intvip){ this.id=id; this.money=money; this.vip=vip; } publicintgetMoney(){ returnmoney; } publicvoidsetMoney(intmoney){ this.money=money; } publicStringgetId(){ returnid; } publicvoidsetId(Stringid){ this.id=id; } publicintgetVip(){ returnvip; } publicvoidsetVip(intvip){ this.vip=vip; } //序列化 publicvoidwrite(DataOutputdataOutput)throwsIOException{ dataOutput.writeUTF(id); dataOutput.writeInt(money); dataOutput.writeInt(vip); } //反序列化(注意:各屬性的順序要和序列化保持一致) publicvoidreadFields(DataInputdataInput)throwsIOException{ this.id=dataInput.readUTF(); this.money=dataInput.readInt(); this.vip=dataInput.readInt(); } @Override publicStringtoString(){ returnthis.id+"\t"+this.money+"\t"+this.vip; }}由于本次統(tǒng)計(jì)的Customer對(duì)象需要在Hadoop集群中的多個(gè)節(jié)點(diǎn)之間傳遞數(shù)據(jù),因此需要將Customer對(duì)象通過write(DataOutputdataOutput)方法進(jìn)行序列化操作,并通過readFields(DataInputdataInput)進(jìn)行反序列化操作。(2)編寫Mapper類在Map階段讀取文本中的消費(fèi)者記錄信息,并將消費(fèi)者的各個(gè)屬性字段拆分讀取,然后根據(jù)會(huì)員情況,將消費(fèi)者的消費(fèi)金額輸出到MapReduce的下一個(gè)處理階段(即Shuffle),源代碼如下所示。packagecom.xijing.mapreduce;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;publicclassCustomerMapperextendsMapper<LongWritable,Text,Text,IntWritable>{ @Override protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{ //將一行內(nèi)容轉(zhuǎn)成string Stringline=value.toString(); //獲取各個(gè)顧客的消費(fèi)數(shù)據(jù) String[]fields=line.split("\t"); //獲取消費(fèi)金額 intmoney=Integer.parseInt(fields[2]); //獲取會(huì)員情況 Stringvip=fields[3]; /* 輸出 Key:會(huì)員情況,value:消費(fèi)金額 例如: 會(huì)員32 會(huì)員167 非會(huì)員233 非會(huì)員5 */ context.write(newText(vip),newIntWritable(money)); }}(3)編寫Reducer類Map階段的輸出數(shù)據(jù)在經(jīng)過shuffle階段混洗以后,就會(huì)傳遞給Reduce階段。Reduce拿到的數(shù)據(jù)形式是“會(huì)員(或非會(huì)員),[消費(fèi)金額1,消費(fèi)金額2,消費(fèi)金額3,...]”。因此,與WordCount類似,只需要在Reduce階段累加會(huì)員或非會(huì)員的總消費(fèi)金額就能完成本次任務(wù),源代碼如下所示。packagecom.xijing.mapreduce;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importjava.io.IOException;publicclassCustomerReducerextendsReducer<Text,IntWritable,Text,LongWritable>{ @Override protectedvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{ //統(tǒng)計(jì)會(huì)員(或非會(huì)員)的個(gè)數(shù) intvipCount=0; //總消費(fèi)金額 longsumMoney=0; for(IntWritablemoney:values){ vipCount++; sumMoney+=money.get(); } //會(huì)員(或非會(huì)員)的平均消費(fèi)金額 longavgMoney=sumMoney/vipCount; context.write(key,newLongWritable(avgMoney)); }}(4)編寫MapReduce程序的驅(qū)動(dòng)類在編寫Ma

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論