單元5 任務(wù)5.3-MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)_第1頁(yè)
單元5 任務(wù)5.3-MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)_第2頁(yè)
單元5 任務(wù)5.3-MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)_第3頁(yè)
單元5 任務(wù)5.3-MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)_第4頁(yè)
單元5 任務(wù)5.3-MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)_第5頁(yè)
已閱讀5頁(yè),還剩15頁(yè)未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

《大數(shù)據(jù)平臺(tái)部署與運(yùn)維》單元5MapReduce實(shí)現(xiàn)電商銷售數(shù)據(jù)統(tǒng)計(jì)任務(wù)5.3MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)01掌握MapReduce自定義分區(qū)的概念與用法02掌握MapReduce自定義數(shù)據(jù)類型的概念與用法學(xué)習(xí)目標(biāo)任務(wù)5.3MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)【任務(wù)場(chǎng)景】經(jīng)理:小張,接下來(lái)我們要使用MapReduce對(duì)電商銷售數(shù)據(jù)進(jìn)行統(tǒng)計(jì),可能要用到自定義分區(qū)和自定義數(shù)據(jù)類型,你了解MapReduce中的自定義分區(qū)和自定義數(shù)據(jù)類型嗎?小張:日常業(yè)務(wù)中Hadoop提供的數(shù)據(jù)類型有時(shí)不滿足使用,我們需要根據(jù)業(yè)務(wù)創(chuàng)建合適的自定義數(shù)據(jù)類型,也可以通過(guò)自定義分區(qū)對(duì)數(shù)據(jù)進(jìn)行分區(qū)。經(jīng)理:是的,自定義數(shù)據(jù)類型根據(jù)場(chǎng)景不同需要實(shí)現(xiàn)不同的接口,自定義分區(qū)可以把數(shù)據(jù)分到不同的reducer中。我給你一份后臺(tái)導(dǎo)出的數(shù)據(jù),你用MapReduce統(tǒng)計(jì)一下每個(gè)買(mǎi)家收藏商品的數(shù)量,根據(jù)收藏日期做一下自定義分區(qū)。小張:好的。任務(wù)5.3MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)【任務(wù)布置】MapReduce的運(yùn)行依賴與JDK和Hadoop,因此必須將Hadoop的基礎(chǔ)環(huán)境提前安裝好,才能進(jìn)行MapReduce的運(yùn)行和操作。本任務(wù)要求在前面已經(jīng)完成安裝部署Hadoop平臺(tái)的node1節(jié)點(diǎn)上完成。要求掌握MapReduce自定義分區(qū)與自定義數(shù)據(jù)類型的創(chuàng)建;基于IDEA進(jìn)行開(kāi)發(fā),每個(gè)賣家收藏商品數(shù)量的統(tǒng)計(jì),將收藏統(tǒng)計(jì)結(jié)果分為兩個(gè)分區(qū),2020-04-14日之前及14日當(dāng)天的數(shù)據(jù)為一個(gè)分區(qū),2020-04-14日之后的數(shù)據(jù)為一個(gè)分區(qū)。任務(wù)5.3MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)5.3.1MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)的流程數(shù)據(jù)介紹現(xiàn)有某電商網(wǎng)站用戶對(duì)商品的收藏?cái)?shù)據(jù),記錄了用戶收藏的商品id以及收藏日期,名為buyer_favorite1。buyer_favorite1包含:買(mǎi)家id,商品id,收藏日期這三個(gè)字段,數(shù)據(jù)以空格分割,樣本數(shù)據(jù)及格式如下:1018110004812020-04-0416:54:312000110015972020-04-0715:07:522000110015602020-04-0715:08:272004210013682020-04-0808:20:302006710020612020-04-0816:45:332005610032892020-04-1210:50:552005610032902020-04-1211:57:352005610032922020-04-1212:05:292005410024202020-04-1415:24:12……任務(wù)5.3MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)5.3.2自定義分區(qū)1.

MapReduce

Partitioner類通過(guò)前面的學(xué)習(xí)我們知道Mapper最終處理的鍵值對(duì)<key,value>,是需要送到Reducer去合并的,合并的時(shí)候,有相同key的鍵/值對(duì)會(huì)送到同一個(gè)Reducer節(jié)點(diǎn)中進(jìn)行歸并。哪個(gè)key到哪個(gè)Reducer的分配過(guò)程,是由Partitioner規(guī)定的。分區(qū)的目的是把具有相同key的值集合在一起,確保key相同的值都會(huì)在同一個(gè)reducer里面。這樣才能保證map的輸出數(shù)據(jù)被均勻的分發(fā)到reducer。HadoopMapReduce默認(rèn)的HadoopPartitioner是哈希

Partitioner(HashPartitioner),它會(huì)對(duì)key計(jì)算哈希值,并基于該哈希值對(duì)鍵值對(duì)數(shù)據(jù)進(jìn)行分區(qū)。任務(wù)5.3MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)Partitioner的數(shù)量等于reducer的數(shù)量,Partitioner會(huì)根據(jù)reducer的數(shù)量來(lái)劃分?jǐn)?shù)據(jù),reducer數(shù)量可以通過(guò)下面的方法進(jìn)行設(shè)置:JobConf.setNumReduceTasks()因此,來(lái)自同一個(gè)分區(qū)的數(shù)據(jù)會(huì)被一個(gè)reducer任務(wù)處理。需要注意的是,只有作業(yè)具有多個(gè)reducer任務(wù)時(shí),分區(qū)才會(huì)被創(chuàng)建。也就是說(shuō),如果作業(yè)只有1個(gè)reducer任務(wù),分區(qū)階段是不會(huì)發(fā)生的。低效的分區(qū)意味著,某些reducer將比其他reducer任務(wù)處理更多的數(shù)據(jù)。那么,整個(gè)作業(yè)的運(yùn)行時(shí)間將取決于這些需要處理更多數(shù)據(jù)的reducer,也就是說(shuō),作業(yè)的運(yùn)行時(shí)間會(huì)更長(zhǎng)。為了克服低效分區(qū)的問(wèn)題,我們可以自定義分區(qū)器(partitioner),這樣我們就可以根據(jù)具體業(yè)務(wù)修改分區(qū)邏輯,把數(shù)據(jù)均分的分發(fā)到不同的reducer任務(wù)里。任務(wù)5.3MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)2.

Partitioner實(shí)現(xiàn)過(guò)程(1)先分析一下具體的業(yè)務(wù)邏輯,確定大概有多少個(gè)分區(qū)(2)首先書(shū)寫(xiě)一個(gè)類,它要繼承

org.apache.hadoop.mapreduce.Partitioner這個(gè)抽象類(3)重寫(xiě)publicintgetPartition這個(gè)方法,根據(jù)具體邏輯,讀數(shù)據(jù)庫(kù)或者配置返回相同的數(shù)字(4)在main方法中設(shè)置Partioner的類,job.setPartitionerClass(DataPartitioner.class);(5)設(shè)置Reducer的數(shù)量,job.setNumReduceTasks(2);任務(wù)5.3MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)3.總結(jié)分區(qū)Partitioner主要作用在于以下兩點(diǎn)(1)根據(jù)業(yè)務(wù)需要,產(chǎn)生多個(gè)輸出文件;(2)多個(gè)reduce任務(wù)并發(fā)運(yùn)行,提高整體job的運(yùn)行效率任務(wù)5.3MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)5.3.3自定義數(shù)據(jù)類型Hadoop使用了自己寫(xiě)的序列化格式

Writable,它格式緊湊,速度快,但是它很難用Java以外的語(yǔ)言進(jìn)行拓展或使用,因?yàn)閃ritable是Hadoop的核心,大多數(shù)MapReduce程序都會(huì)為鍵和值使用它,Hadoop中的數(shù)據(jù)類型都要實(shí)現(xiàn)Writable接口,以便用這些類型定義的數(shù)據(jù)可以被網(wǎng)絡(luò)傳輸和文件存儲(chǔ)。任務(wù)5.3MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)自定義數(shù)據(jù)類型實(shí)現(xiàn)過(guò)程(1)繼承接口Writable,實(shí)現(xiàn)其方法write()和readFields(),以便該數(shù)據(jù)能被序列化后完成網(wǎng)絡(luò)傳輸或文件輸入/輸出。(2)如果該數(shù)據(jù)需要作為主鍵key使用,或需要比較數(shù)值大小時(shí),則需要實(shí)現(xiàn)WritalbeComparable接口,實(shí)現(xiàn)其方法write(),readFields(),CompareTo()。(3)數(shù)據(jù)類型,為了方便反射,必須要有一個(gè)無(wú)參的構(gòu)造方法用來(lái)創(chuàng)建對(duì)象。(4)在自定義數(shù)據(jù)類型中,建議使用java的原生數(shù)據(jù)類型,最好不要使用Hadoop對(duì)原生類型進(jìn)行封裝的數(shù)據(jù)類型。比如intx;//IntWritable和Strings;//Text等等。任務(wù)5.3MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)【工作流程】創(chuàng)建新的項(xiàng)目;編寫(xiě)自定義數(shù)據(jù)類型類,繼承Writable接口并重寫(xiě)write和readFields方法;編寫(xiě)Mapper類,繼承父類并重寫(xiě)map方法;編寫(xiě)自定義分區(qū)類,繼承父類并重寫(xiě)getPartition方法;編寫(xiě)Reducer類,繼承父類并重寫(xiě)reduce方法;編寫(xiě)驅(qū)動(dòng)類,程序?qū)С鰹閖ar包運(yùn)行,查看結(jié)果。任務(wù)5.3MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)【操作步驟】創(chuàng)建新的項(xiàng)目,項(xiàng)目類型選擇Maven,項(xiàng)目名稱為MRCollectionCount。1.首先編寫(xiě)自定義數(shù)據(jù)類型類:CollectionWritable,實(shí)現(xiàn)Writable接口并重寫(xiě)write和readFields方法。importorg.apache.hadoop.io.Writable;importjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;publicclassCollectionWritableimplementsWritable{privateintcount;//收藏次數(shù)privateStringcollectTime;//收藏時(shí)間publicintgetCount(){returncount;}publicvoidsetCount(intcount){this.count=count;}publicStringgetCollectTime(){returncollectTime;}publicvoidsetCollectTime(StringcollectTime){this.collectTime=collectTime;}publicCollectionWritable(intcount,StringcollectTime){super();this.collectTime=collectTime;this.count=count;}

publicCollectionWritable(){

}@Overridepublicvoidwrite(DataOutputdataOutput)throwsIOException{dataOutput.writeInt(this.count);dataOutput.writeUTF(this.collectTime);}@OverridepublicvoidreadFields(DataInputdataInput)throwsIOException{this.count=dataInput.readInt();this.collectTime=dataInput.readUTF();}@OverridepublicStringtoString(){return"CollectionWritable{"+"count="+count+",collectTime='"+collectTime+'\''+'}';}}

任務(wù)5.3MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)2.編寫(xiě)Mapper類,繼承父類并重寫(xiě)map方法importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;

importjava.io.IOException;

publicclassMyMapperextendsMapper<LongWritable,Text,Text,CollectionWritable>{@Overrideprotectedvoidmap(LongWritablekey,Textvalue,Mapper<LongWritable,Text,Text,CollectionWritable>.Contextcontext)throwsIOException,InterruptedException{//通過(guò)空格分割finalString[]splited=value.toString().split("");//第一列為用戶IDfinalStringuserId=splited[0];finalTextk2=newText(userId);//收藏次數(shù)記為1finalintcount=1;//第三列為收藏時(shí)間finalStringcollectTime=splited[2];finalCollectionWritablev2=newCollectionWritable(count,collectTime);context.write(k2,v2);}}

任務(wù)5.3MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)3.編寫(xiě)自定義分區(qū)類,繼承父類并重寫(xiě)getPartition方法importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

publicclassMyPartitionextendsHashPartitioner<Text,CollectionWritable>{@OverridepublicintgetPartition(Textk2,CollectionWritablev2,intnumReduceTasks){Stringdate=v2.getCollectTime();//通過(guò)比對(duì)收藏時(shí)間進(jìn)行分區(qū)

if(pareTo("2020-04-14")>0){return1;}else{return0;}}}任務(wù)5.3MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)4.編寫(xiě)Reducer類,繼承父類并重寫(xiě)reduce方法importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;

importjava.io.IOException;

publicclassMyReducerextendsReducer<Text,CollectionWritable,Text,LongWritable>{@Overrideprotectedvoidreduce(Textk2,Iterable<CollectionWritable>values,Reducer<Text,CollectionWritable,Text,LongWritable>.Contextcontext)throwsIOException,InterruptedException{Textk3=k2;longsum=0;//收藏次數(shù)進(jìn)行累加for(CollectionWritablecollect:values){sum+=collect.getCount();}

LongWritablev3=newLongWritable(sum);

context.write(k3,v3);}}

任務(wù)5.3MapReduce完成電商銷售數(shù)據(jù)統(tǒng)計(jì)5.編寫(xiě)驅(qū)動(dòng)類importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importjava.io.IOException;publicclassCollectionApp{publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{Configurationconf=newConfiguration();Jobjob=Job.getInstance(conf,"collectionJob");job.setJarByClass(CollectionApp.class);PathfileIn=newPath(args[0]);PathfileOut=newPath(args[1]);job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(CollectionWritable.class);FileInputFormat.setInputPaths(job,fileIn);//設(shè)置自定義分區(qū)類job.setPartitionerClass(MyPartition.class);//將reduce任務(wù)數(shù)量設(shè)置為2job.setNumReduceTasks(2);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);FileOutputFormat.setOutputPath(job,fileOut);Booleanr

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論