




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)
文檔簡介
1、大數(shù)據(jù)技術(shù)概論實驗報告作業(yè)三 姓名: 郭利強(qiáng) 專業(yè): 工程管理專業(yè) 學(xué)號: 2015E8009064028目錄1.實驗要求32.環(huán)境說明42.1系統(tǒng)硬件42.2系統(tǒng)軟件42.3集群配置43.實驗設(shè)計43.1第一部分設(shè)計43.2第二部分設(shè)計64.程序代碼114.1第一部分代碼114.2第二部分代碼175.實驗輸入和結(jié)果21實驗輸入輸出結(jié)果見壓縮包中對應(yīng)目錄211. 實驗要求第一部分:采用輔助排序的設(shè)計方法, 對于輸入的N個IP網(wǎng)絡(luò)流量文件,計算得到文件中的各個源IP地址連接的不同目的IP地址個數(shù),即對各個源IP地址連接的目的IP地址去重并計數(shù)舉例如下:第二部分: 輸入N個文件,生成帶詳細(xì)信息的倒
2、排索引舉例如下,有4個輸入文件: d1.txt: cat dog cat fox d2.txt: cat bear cat cat fox d3.txt: fox wolf dog d4.txt: wolf hen rabbit cat sheep要求建立如下格式的倒排索引: cat >3: 4: (d1.txt,2,4),(d2.txt,3,5),(d4.txt,1,5) 單詞>出現(xiàn)該單詞的文件個數(shù):總文件個數(shù): (出現(xiàn)該單詞的文件名,單詞在該文件中的出現(xiàn)次數(shù),該文件的總單詞數(shù)),2. 環(huán)境說明2.1 系統(tǒng)硬件處理器:Intel Core i3-2350M CPU2.3GHz
3、215;4內(nèi)存:2GB磁盤:60GB2.2 系統(tǒng)軟件操作系統(tǒng):Ubuntu 14.04 LTS 操作系統(tǒng)類型:32位Java版本:1.7.0_85Eclipse版本:3.8Hadoop插件:hadoop-eclipse-plugin-2.6.0.jarHadoop: 集群配置集群配置為偽分布模式,節(jié)點數(shù)量一個3. 實驗設(shè)計3.1 第一部分設(shè)計利用兩個Map/Reduce過程,在第一個MR中,讀取記錄并去除重復(fù)記錄,第二個MR按照輔助排序設(shè)計方法,根據(jù)源地址進(jìn)行分組,統(tǒng)計目的地址數(shù)量。第一個MR設(shè)計:自定義StringPair源地址,目的地址類型,實現(xiàn)WritableCompar
4、able,在map過程讀取文件,輸出<StringPair,NullWritable>,reduce過程去除重復(fù)記錄輸出<StringPair.toString, NullWritable >。在第二個MR設(shè)計:1.在Map過程讀取第一個MR的輸出,對value值進(jìn)行拆分,并以拆分得到的源地址和目的地址初始化StringPair對象作為輸出鍵,輸出值為1。public void map(Object key, Text value, Context context)throws IOException, InterruptedException String recor
5、ds = value.toString().split("t");String sourceip = records0;String desip=records1;context.write(new StringPair(sourceip,desip),one);2.定義GroupComparator類,繼承WritableComparator類,并重載compare方法,對Map過程輸出按照StringPair.first排序,完成按照源地址分組。public static class GroupComparator extends WritableComparator
6、protected GroupComparator() super(StringPair.class, true); Override public int compare(WritableComparable w1,WritableComparable w2) StringPair ip1=(StringPair)w1; StringPair ip2=(StringPair)w2; return ip1.getFirst().compareTo(ip2.getFirst(); 3.在Reduce過程統(tǒng)計分組中的所有值,得到源地址連接不同目的地址數(shù)量。public void reduce( S
7、tringPair key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException int sum = 0;for (IntWritable val:values) sum += val.get();statistic.set(sum);context.write(key.getFirst(), statistic);3.2 第二部分設(shè)計利用兩個Map/Reduce過程,第一個MR統(tǒng)計各個文件中的所有單詞的出現(xiàn)次數(shù),以及各個文件單詞總數(shù),第二個MR根據(jù)統(tǒng)計結(jié)果處理
8、加工得到單詞倒排索引。第一個MR設(shè)計:1.在Map過程中,重寫map類,利用StringTokenizer類,將map方法中的value值中存儲的文本,拆分成一個個的單詞,并獲取文件名,以兩種格式進(jìn)行輸出< filename+word,1>或者<filename,1>。public void map(Object key, Text value, Context context) throws IOException, InterruptedException /獲取文件名FileSplit fileSplit= (FileSplit)context.getInputS
9、plit();String fileName = fileSplit.getPath().getName();/獲取單詞在單個文件中出現(xiàn)次數(shù),及文件單詞總數(shù)StringTokenizer itr= new StringTokenizer(value.toString();for(; itr.hasMoreTokens(); ) String word =removeNonLetters( itr.nextToken().toLowerCase();String fileWord = fileName+"001"+word;if(!word.equals("&quo
10、t;)context.write(new Text(fileWord), new IntWritable(1);context.write(new Text(fileName), new IntWritable(1);2.在Reduce過程中,統(tǒng)計得到每個文件中每個單詞的出現(xiàn)次數(shù),以及每個文件的單詞總數(shù),輸出<key,count>。public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException int
11、sum = 0; for (IntWritable val : values) sum += val.get(); context.write(key,new IntWritable(sum);第二個MR設(shè)計:1.Map過程讀取第一個MR的輸出,對value值進(jìn)行拆分,重新組合后輸出鍵為固定Text類型值index,值為filename+word+count或者filename+count。public void map(Object key, Text value, Context context)throws IOException, InterruptedException String
12、 valStr = value.toString();String records = valStr.split("t");context.write(new Text("index"),new Text(records0+"001"+records1);2.Reduce過程中定義四個HashMap,Map<String,Integer> wordinfilescount,key為單詞+文件名,value為單詞在該文件中出現(xiàn)的次數(shù);Map<String,Integer> filescount ,key為文件名,
13、value為文件的單詞總數(shù);Map<String,Integer> wordinfiles, key為單詞,value為單詞在多少個文件中出現(xiàn);Map<String,String> indexes,key為單詞,value為倒排索引。讀取values值,根據(jù)設(shè)定分隔符拆分,判斷拆分后長度如果為2,則該值為文件名+文件單詞總數(shù),將拆分后的文件名及文件單詞總數(shù),組成鍵值對放入Map<String,Integer> filescount;拆分后長度如果為3,則該值為文件名+單詞+單詞在該文件中出現(xiàn)次數(shù),將拆分后的文件名+單詞及單詞在該文件中出現(xiàn)次數(shù)組成鍵值對放入M
14、ap<String,Integer> wordinfilescount,同時統(tǒng)計單詞在多少個文件中出現(xiàn),并組成鍵值對放入Map<String,Integer> wordinfiles。遍歷Map<String,Integer> wordinfilescount,將單詞作為鍵,“單詞->出現(xiàn)該單詞的文件個數(shù):總文件個數(shù):(出現(xiàn)該單詞的文件名,單詞在該文件中的出現(xiàn)次數(shù),該文件的總單詞數(shù))”作為值,放入Map<String,String> indexes中。遍歷Map<String,String> indexes獲取倒排索引并輸出全部
15、索引。public void reduce( Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException /拆分輸入,獲取單詞出現(xiàn)在幾個文件中以及在該文件中出現(xiàn)次數(shù),各個文件的單詞總數(shù),總文件數(shù)for (Text val : values) String valStr = val.toString();String records = valStr.split("001");switch(records.length)case 2:filesc
16、ount.put(records0, Integer.parseInt(records1);break;case 3:wordinfilescount.put(valStr, Integer.parseInt(records2);if(!wordinfiles.containsKey(records1)wordinfiles.put(records1, 1);elsewordinfiles.put(records1, wordinfiles.get(records1)+1);break;/處理獲取倒排索引for (Entry<String, Integer> entry : wor
17、dinfilescount.entrySet() String valStr = entry.getKey();String records = valStr.split("001");String word = records1;if(!indexes.containsKey(word)StringBuilder sb = new StringBuilder();sb.append(word).append("->").append(wordinfiles.get(word).append(":").append(filesc
18、ount.size().append(":") .append("(") .append( records0) .append(",") .append(entry.getValue() .append(",") .append(filescount.get( records0) .append(")");indexes.put(word,sb.toString() );elseStringBuilder sb = new StringBuilder();sb.append(",(&q
19、uot;) .append( records0) .append(",") .append(entry.getValue() .append(",") .append(filescount.get( records0) .append(")");indexes.put(word,indexes.get(word)+sb.toString() );for (Entry<String, String> entry : indexes.entrySet() context.write(new Text(entry.getValu
20、e()+""), NullWritable.get();4. 程序代碼4.1 第一部分代碼1. IpStatistics.java/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF li
21、censes this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * /licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in
22、writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */import java.io.IOExcepti
23、on;import java.util.ArrayList;import java.util.Collections;import java.util.Comparator;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Map.Entry;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.IntWritable;i
24、mport org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import or
25、g.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.fs.Path;public class IpStatistics /第一個Map/Reduce的map類,用于去重public static class RemoveMapper extendsMapper<O
26、bject, Text, StringPair, NullWritable> public void map(Object key, Text value, Context context)throws IOException, InterruptedException StringTokenizer itr = new StringTokenizer(value.toString();while (itr.hasMoreTokens() String nextToken =itr.nextToken();String records = nextToken.split(",&
27、quot;);String sourceip = records0.replace("<", "");String destinationip = records1.replace(">", "");context.write(new StringPair(sourceip,destinationip),NullWritable.get() );/第二個Map/Reduce過程map類,用于統(tǒng)計public static class StatisticsMapper extendsMapper<O
28、bject, Text, StringPair, IntWritable> IntWritable one=new IntWritable(1);public void map(Object key, Text value, Context context)throws IOException, InterruptedException String records = value.toString().split("t");String sourceip = records0;String desip=records1;context.write(new Strin
29、gPair(sourceip,desip),one);/按照源地址分組public static class GroupComparator extends WritableComparator protected GroupComparator() super(StringPair.class, true); Override public int compare(WritableComparable w1,WritableComparable w2) StringPair ip1=(StringPair)w1; StringPair ip2=(StringPair)w2; return i
30、p1.getFirst().compareTo(ip2.getFirst(); /第一個Map/Reduce過程reduce過程,去重public static class RemoveReducer extendsReducer<StringPair, IntWritable, Text, NullWritable> public void reduce( StringPair key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException cont
31、ext.write(new Text(key.toString(), NullWritable.get();/第二個Map/Reduce過程reduce過程,統(tǒng)計public static class StatisticsReducer extendsReducer<StringPair, IntWritable, Text, IntWritable> private IntWritable statistic = new IntWritable();public void reduce( StringPair key, Iterable<IntWritable> va
32、lues, Context context)throws IOException, InterruptedException int sum = 0;for (IntWritable val:values) sum += val.get();statistic.set(sum);context.write(key.getFirst(), statistic);/去重任務(wù)public static void RemoveTask(String args) throws ExceptionConfiguration conf = new Configuration();String master=
33、"" conf.set("fs.defaultFS", "hdfs:/:9000"); conf.set("hadoop.job.user", "hadoop"); conf.set("","yarn"); conf.set("yarn.resourcemanager.address", master+":8032"); conf.s
34、et("yarn.resourcemanager.scheduler.address", master+":8030"); conf.set("mapred.jar","ipstatistics.jar");String otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 3) System.err.println("Usage: ipstatistics <in&
35、gt; <in>. <out>");System.exit(2);Job job = new Job(conf, "ipstatisticsRemoving");job.setMapperClass(RemoveMapper.class);job.setReducerClass(RemoveReducer.class);job.setOutputKeyClass(StringPair.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.addInputPath
36、(job, new Path(otherArgs0);FileOutputFormat.setOutputPath(job, new Path(otherArgs1);job.waitForCompletion(true) ;/統(tǒng)計任務(wù)public static void StatisticsTask(String args) throws ExceptionConfiguration conf = new Configuration();String master="" conf.set("fs.defaultFS", "h
37、dfs:/:9000"); conf.set("hadoop.job.user", "hadoop"); conf.set("","yarn"); conf.set("yarn.resourcemanager.address", master+":8032"); conf.set("yarn.resourcemanager.scheduler.address", master+&q
38、uot;:8030"); conf.set("mapred.jar","ipstatistics.jar");String otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 3) System.err.println("Usage: ipstatistics <in> <in>. <out>");System.exit(2);Job job = new
39、Job(conf, "ipstatisticsStatistics");job.setMapperClass(StatisticsMapper.class);job.setGroupingComparatorClass(GroupComparator.class);job.setReducerClass(StatisticsReducer.class);job.setOutputKeyClass(StringPair.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(
40、job, new Path(otherArgs1);FileOutputFormat.setOutputPath(job, new Path(otherArgs2);System.exit(job.waitForCompletion(true) ? 0 : 1);public static void main(String args) throws Exception RemoveTask(args);StatisticsTask(args);2. StringPair.javaimport org.apache.hadoop.io.Text;import org.apache.hadoop.
41、io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class StringPair implements WritableComparable<StringPair> private Text first; private Text second; public StringPair() this.first = new Text(); this.second = new Text(); public StringPai
42、r(String first, String second) set(new Text(first), new Text(second); public StringPair(Text first, Text second) set(first, second); public void set(Text first, Text second) this.first = first; this.second = second; public Text getFirst() return first; public Text getSecond() return second; public v
43、oid write(DataOutput out) throws IOException first.write(out); second.write(out); public void readFields(DataInput in) throws IOException first.readFields(in); second.readFields(in); Override public int hashCode() return first.hashCode() * 163 + second.hashCode(); Override public boolean equals(Obje
44、ct obj) if (obj instanceof StringPair) StringPair ip = (StringPair) obj; return first.toString().equals(ip.first.toString() && second.toString().equals(ip.second.toString(); 4.2 第二部分代碼import java.io.IOException;import java.util.HashSet;import java.util.Map;import java.util.Set;import java.ut
45、il.StringTokenizer;import java.util.HashMap;import java.util.Map.Entry;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job
46、;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.Mapper.Context;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.Fi
47、leOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class InverseV3 /* * 第一個mr的map類,獲取每個單詞在單個文件中出現(xiàn)次數(shù),輸入為每個文件行偏移量,輸出為<word+filename,1> * 或<filename,1>*/public static class statisticsMap extendsMapper<Object, Text, Text, IntWritable> private Text mapKey = new Tex
48、t("key");Overridepublic void map(Object key, Text value, Context context) throws IOException, InterruptedException /獲取文件名FileSplit fileSplit= (FileSplit)context.getInputSplit();String fileName = fileSplit.getPath().getName();/獲取單詞在單個文件中出現(xiàn)次數(shù),及文件單詞總數(shù)StringTokenizer itr= new StringTokenizer(v
49、alue.toString();for(; itr.hasMoreTokens(); ) String word =removeNonLetters( itr.nextToken().toLowerCase();String fileWord = fileName+"001"+word;if(!word.equals("")context.write(new Text(fileWord), new IntWritable(1);context.write(new Text(fileName), new IntWritable(1);/去掉字符串非字母字符
50、public static String removeNonLetters(String original)StringBuffer aBuffer=new StringBuffer(original.length();char aCharacter;for(int i=0;i<original.length();i+)aCharacter=original.charAt(i);if(Character.isLetter(aCharacter)aBuffer.append(aCharacter);return new String(aBuffer); /第一個mr的reduce類,統(tǒng)計匯
51、總出現(xiàn)單詞的文件個數(shù),及每個文件中單詞出現(xiàn)個數(shù)及每個文件單詞個數(shù),public static class statisticsReduce extendsReducer<Text, IntWritable, Text, IntWritable> Overridepublic void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException int sum = 0; for (IntWritable val : valu
52、es) sum += val.get(); context.write(key,new IntWritable(sum);public static class InverseMapper extendsMapper<Object, Text, Text, Text> public void map(Object key, Text value, Context context)throws IOException, InterruptedException String valStr = value.toString();String records = valStr.split
53、("t");context.write(new Text("index"),new Text(records0+"001"+records1);public static class InverseReducer extendsReducer<Text, Text, Text, NullWritable> private Map<String,Integer> wordinfilescount = new HashMap<String,Integer>();/key為單詞+文件名,value為單詞在該文件中出現(xiàn)的次數(shù)private Map<String,Integer> filescount = new HashMap<String,I
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 質(zhì)量控制計劃表CP
- 12、新人培訓(xùn)教材接觸
- 貸房貸委托書范本
- 敬老院雙十一活動策劃書
- 高級文秘工作簡歷模板
- 會計信息系統(tǒng)應(yīng)用 (第二版)教案全套 鐘愛軍
- 農(nóng)民合作社土地承包經(jīng)營權(quán)確權(quán)登記指南
- 三農(nóng)行業(yè)三農(nóng)村基層社區(qū)治理實踐指南
- 二零二五年辦公室防盜門定制與智能安防系統(tǒng)安裝合同
- 商務(wù)活動策劃與執(zhí)行手冊
- 2025年企業(yè)資金授權(quán)管理協(xié)議范本
- 2024-2025學(xué)年山東省濟(jì)南市九年級(上)期末語文試卷(含答案)
- 鄧宗良《煤油燈》閱讀答案
- 2024中考地理時事熱點強(qiáng)化訓(xùn)練
- 2024年合理膳食教案
- 臨床檢驗分子生物學(xué)發(fā)展
- 2025版年度城市綠化活動策劃及實施服務(wù)合同范本
- 2025年全國高考體育單招政治時事填空練習(xí)50題(含答案)
- 人教版高中物理《圓周運動》
- 【課件】平行線的概念課件人教版(2024)+數(shù)學(xué)七年級下冊
- 勞務(wù)派遣服務(wù)方案(技術(shù)方案)
評論
0/150
提交評論