




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)
文檔簡介
1、Hadoop:The Definitive Guid學(xué)習(xí)筆記mingyuan Email:cn.mingyuan1. Meet Hadoop1.1. Data 數(shù)據(jù)本節(jié)陳述了這樣的事實:數(shù)據(jù)量越來越大,并且來源也越來越多,我們面臨的問題是如何有效的存儲和分析它們。1.2. Data Storage and Analysis 數(shù)據(jù)存儲和分析現(xiàn)在面臨這這樣的一個問題:當(dāng)磁盤的存儲量隨著時間的推移越來越大的時候,對磁盤上的數(shù)據(jù)的讀取速度卻沒有多大的增長。讀取硬盤上的全部數(shù)據(jù)會花費比較長的時間,如果是寫操作的話則會更慢。一個解決的辦法是同時讀取多個硬盤上的數(shù)據(jù)。例如我們有100塊硬盤,而他們分別存儲整
2、個數(shù)據(jù)的1%的話,若是用并行讀取的方法操作,可以在兩分鐘之內(nèi)搞定。只使用每塊硬盤的1%當(dāng)然是浪費的,但是我們可以存儲100個數(shù)據(jù)集(dataset),每個1T,并且提供共享訪問。有了更短的分析時間之后,這樣的系統(tǒng)的用戶應(yīng)該是樂于接受共享訪問模式的。他們的工作是按時間劃分的,這樣相互之間就不會有較大的影響。從多個磁盤上進(jìn)行并行讀寫操作是可行的,但是存在以下幾個方面的問題:1) 第一個問題是硬件錯誤。使用的硬件越多出錯的幾率就越大。一種常用的解決方式是數(shù)據(jù)冗余,保留多分拷貝,即使一份數(shù)據(jù)處理出錯,還有另外的數(shù)據(jù)。HDFS使用的也是類似的方式,但稍有不同。2) 第二個問題是數(shù)據(jù)處理的相關(guān)性問題。例如
3、很多分析工作在一快磁盤上處理出來的結(jié)果需要與其他磁盤上處理處理出來的結(jié)果合并才能完成任務(wù)。各種分布式系統(tǒng)也都給出了合并的策略,但是做好這方面確實是一個挑戰(zhàn)。MapReduce提供了一種編程模型,他將從硬盤上讀寫數(shù)據(jù)的問題抽象出來,轉(zhuǎn)化成對一系列鍵值對的計算。簡而言之,Hadoop提供了一個可靠的存儲和分析系統(tǒng)。存儲又HDFS提供,分析由MapReduce提供。1.3. 與其他系統(tǒng)比較1.3.1. RDBMS為什么我們不能使用大量的磁盤數(shù)據(jù)庫做大規(guī)模的批量分析?為什么需要MapReduce?1) 磁盤的尋道時間提高的速度低于數(shù)據(jù)的傳輸速度,如果數(shù)據(jù)訪問模式由尋道時間支配的話,在讀寫數(shù)據(jù)集的一大部
4、分的時候速度就會較流式讀取慢很多,這樣就出現(xiàn)了瓶頸。2) 另一方面在更新數(shù)據(jù)集的少量數(shù)據(jù)的時候,傳統(tǒng)的B-樹工作的比較好,但是在更新數(shù)據(jù)集的大部分?jǐn)?shù)據(jù)的時候B-樹就顯得比MapReduce方式慢了。MapReduce使用排序/合并操作去重建數(shù)據(jù)庫(完成數(shù)據(jù)更新)。在很多方面MapReduce可以看作是對傳統(tǒng)關(guān)系數(shù)據(jù)庫的補(bǔ)充。MapReduce比較適合于需要分析整個數(shù)據(jù)集,并且要使用批處理方式,特別是特定的分析的情況;RDBMS點查詢方面占優(yōu)勢,或在已編制索引的數(shù)據(jù)集提供低延遲的檢索和更新的數(shù)據(jù),但是數(shù)據(jù)量不能太大。MapReduce適合一次寫入,多次讀取的操作,但是關(guān)系數(shù)據(jù)庫就比較適合對數(shù)據(jù)集
5、的持續(xù)更新。還有一方面,MapReduce比較適合處理半結(jié)構(gòu)化,非結(jié)構(gòu)化的數(shù)據(jù)。關(guān)系數(shù)據(jù)往往進(jìn)行規(guī)則化以保證數(shù)據(jù)完整性,并刪除冗余。這樣做給MapReduce提出了新的問題:它使得讀數(shù)據(jù)變成了非本地執(zhí)行,而MapReduce的一個重要前提(假設(shè))就是數(shù)據(jù)可以進(jìn)行高速的流式讀寫。MapReduce是可以進(jìn)行線性擴(kuò)展的編程模型。一個對集群級別的數(shù)據(jù)量而寫的MapReduce可以不加修改的應(yīng)用于小數(shù)據(jù)量或者更大數(shù)據(jù)量的處理上。更重要的是當(dāng)你的輸入數(shù)據(jù)增長一倍的時候,相應(yīng)的處理時間也會增加一倍。但是如果你把集群也增長一倍的話,處理的速度則會和沒有增加數(shù)據(jù)量時候的速度一樣快,這方面對SQL查詢來說不見得
6、是正確的。隨著時間的推移,關(guān)系數(shù)據(jù)庫和MapReduce之間的差別變得越來越不明顯,很多數(shù)據(jù)庫(例如Aster Data和Greenplum的數(shù)據(jù))已經(jīng)借用了一些MapReduce的思想。另一個方面,基于MapReduce的高層次查詢語言使得MapReduce系統(tǒng)較傳統(tǒng)的關(guān)系數(shù)據(jù)庫來說,使程序員們更容易接受。1.3.2. Grid Compuing 網(wǎng)格計算數(shù)據(jù)量大的時候網(wǎng)絡(luò)帶寬會成為網(wǎng)格計算的瓶頸。但是MapReduce使數(shù)據(jù)和計算在一個節(jié)點上完成,這樣就變成了本地的讀取。這是MapReduce高性能的核心。MPI將控制權(quán)大大的交給了程序員,但是這就要求程序員明確的處理數(shù)據(jù)流等情況,而Map
7、Reduce只提供高層次的操作:程序員只需考慮處理鍵值對的函數(shù),而對數(shù)據(jù)流則是比較隱晦的。在分布式計算中,如何協(xié)調(diào)各個處理器是一項很大的挑戰(zhàn)。最大的挑戰(zhàn)莫過于如何很好的處理部分計算的失誤。當(dāng)你不知道是不是出現(xiàn)錯誤的時候,程序還在繼續(xù)運行,這就比較麻煩了。由于MapReduce是一種非共享(Shared-nothing)的架構(gòu),當(dāng)MapReduce實現(xiàn)檢測到map或者reduce過程出錯的時候,他可以將錯誤的部分再執(zhí)行一次。MPI程序員則需要明確的考慮檢查點和恢復(fù),這雖然給程序員很大自由,但是也使得程序變得難寫。也許你會覺得mapreduce模式過于嚴(yán)格,程序員面對的都是些鍵值對,并且mappe
8、r和reducer之間很少來往,這樣的模式能做一些有用的或者是非凡的事情嗎?答案是肯定的,Google已經(jīng)把Mapreduce使用在了很多方面從圖像分析到基于圖的問題,再到機(jī)器學(xué)習(xí),MapReduce工作的很好。雖然他不是萬能的,但是他確是一種通用的數(shù)據(jù)處理工具。1.3.3. Volunteer Computing志愿計算志愿計算主要是讓志愿者貢獻(xiàn)CPU時間來完成計算。MapReduce是針對在一個高聚合網(wǎng)絡(luò)連接的數(shù)據(jù)中心中進(jìn)行的可信的、使用專用的硬件工作持續(xù)數(shù)分鐘或者數(shù)個小時而設(shè)計的。相比之下,志愿計算則是在不可信的、鏈接速度有很大差異的、沒有數(shù)據(jù)本地化特性的,互聯(lián)網(wǎng)上的計算機(jī)上運行永久的(
9、超長時間的)計算,1.4. Hadoop簡史(略)1.5. The Apache Hadoop Project(略)2. MapReduce2.1. A Weather Dataset 一個天氣數(shù)據(jù)集數(shù)據(jù)是NCDC的數(shù)據(jù),我們關(guān)注以下特點:1) 數(shù)據(jù)是半格式化的2) 目錄里面存放的是從1901-2001年一個世紀(jì)的記錄,是gzip壓縮過的文件。3) 以行為單位,使用ASCII格式存儲,每行就是一條記錄4) 每條記錄我們關(guān)注一些基本的元素,比如溫度,這些數(shù)據(jù)在每條數(shù)據(jù)中都會出現(xiàn),并且寬度也是固定的。下面是一條記錄的格式,為了便于顯示,做了一部分調(diào)整。2.2. Analyzing the Data
10、 with Unix Tools 使用Unix工具分析數(shù)據(jù)以分析某年份的最高溫度為例,下面是一段Unix的腳本程序:這段腳本的執(zhí)行過程如下:腳本循環(huán)處理每一個壓縮的年份文件,首先打印出年份,然后對每一個文件使用awk處理。Awk腳本從數(shù)據(jù)中解析出兩個字段:一個air temperature,一個quality code。air temperature值加0被轉(zhuǎn)換成整形。接下來查看溫度數(shù)據(jù)是否有效(9999表示在NCDC數(shù)據(jù)集中丟失的值),并且檢查quality code是不是可信并沒有錯誤的。如果讀取一切正常,temp將與目前的最大值比較,如果出現(xiàn)新的最大值,則更新當(dāng)前max的值。當(dāng)文件中所有
11、行的數(shù)據(jù)都被處理之后,開始執(zhí)行End程序塊,并且打印出最大值。程序執(zhí)行之后將產(chǎn)生如下樣式的輸出:處理結(jié)果之中,溫度的值被放大了10倍。所以,1901年的溫度應(yīng)該是31.7度,1902年的溫度應(yīng)該是24.4度所有的,一個世紀(jì)的氣象記錄在一臺EC2 High-CPU Extra Large Instance上耗時42分鐘。為了加速處理速度,我們將程序的某些部分進(jìn)行并行執(zhí)行。這在理論上是比較簡單的,我們可以按照年份來在不同的處理器上執(zhí)行,使用所有可用的硬件線程,但是還是有些問題:1) 把任務(wù)切分成相同大小的塊不總是那么容易的。這這種情況下,不同年份的文件大小有很大不同,這樣就會導(dǎo)致一些過程較早的完成
12、,盡管這些他們可以進(jìn)行下一步的工作,但是總的運行時間是由耗費時間最長的文件所決定的。一種可供選擇的嘗試是將輸入分成固定大小的塊,并把它們分配給處理進(jìn)程。2) 合并單獨處理出來的結(jié)果還需要進(jìn)一步的處理。在這種情況下,一個年份的結(jié)果對于其他年份來說是獨立的,并且可能經(jīng)過聯(lián)接所有的結(jié)果,并按照年份進(jìn)行排序之后被合并。如果使用固定大小塊的方式,合并是很脆弱的。例如,某一年份的數(shù)據(jù)可能被分到不同的塊中,并且被單獨處理。我們最終也會得每塊數(shù)據(jù)的最高溫度,但是這時候我們最后一步變成了在這些最大值中,為每一個年份找出最大值。3) 人們?nèi)耘f被單機(jī)的處理能力所束縛。如果在一臺擁有確定數(shù)量處理器的計算機(jī)上面執(zhí)行程序
13、的的開銷是20分鐘的話,你也不能可能再有所提高了。并且有些數(shù)據(jù)集的數(shù)據(jù)量已經(jīng)超出了單臺計算機(jī)的處理能力。當(dāng)我們開始使用多臺機(jī)器的時候,其它一大堆因素就跳了出來,主要是協(xié)調(diào)和可靠性的問題。誰掌控全局?怎么進(jìn)行處理器的失效處理?所以,盡管在理論上并行處理是可行的,但是實踐上卻是麻煩的。使用一個類似于Hadoop的框架將會有很大的幫助。2.3. Analyzing the Data with Hadoop 使用Hadoop分析數(shù)據(jù)為了使用Hadoop并行處理的長處,我們需要將程序做成MapReduce格式。經(jīng)過一些本地的、小數(shù)據(jù)量的測試之后,我們將可以把程序放在集群上進(jìn)行運行。2.3.1. Map
14、and ReduceMapReduce將工作分為map階段和reduce階段,每個階段都將鍵值對作為輸入輸入,鍵值對的類型可以由程序員選擇。程序員還指定兩個函數(shù):map和reduce函數(shù)。Map階段的輸入數(shù)據(jù)是NCDC的原始數(shù)據(jù),我們選擇文本格式輸入,這樣可以把記錄中的每一行作為文本value。Key是當(dāng)前行離開始行的偏移量,但是我們并不需要這個key,所以省去。我們的Map函數(shù)比較簡單,僅僅從輸入中析取出temperature。從這個意義上來說,map函數(shù)僅僅是完成了數(shù)據(jù)的準(zhǔn)備階段,這樣使得reducer函數(shù)可以基于它查找歷年的最高溫度。Map函數(shù)也是一個很好的過濾階段,這里可以過濾掉丟失、
15、置疑、錯誤的temperature數(shù)據(jù)。形象化一點:下面是輸入數(shù)據(jù) 下面的鍵值對給map函數(shù)處理,其中加粗的是有用的數(shù)據(jù)處理之后的結(jié)果如下:經(jīng)過以上的處理之后還需要在mapreduce框架中進(jìn)行進(jìn)一步的處理,主要有排序和按照key給鍵值對給key-value排序。經(jīng)過這一番處理之后的結(jié)果如下:上面的數(shù)據(jù)將傳遞給reduce之后,reduce所需要做的工作僅僅是遍歷這些數(shù)據(jù),找出最大值,產(chǎn)生最終的輸出結(jié)果:以上過程可以用下圖描述:2.3.2. Java MapReduceMap函數(shù)實現(xiàn)了mapper接口,此接口聲明了一個map()函數(shù)。下面是map實現(xiàn)import java.io.IOExcep
16、tion;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reporter;public
17、class MaxTemperatureMapper extends MapReduceBase implementsMapper<LongWritable, Text, Text, IntWritable> private static final int MISSING = 9999;public void map(LongWritable key, Text value,OutputCollector<Text, IntWritable> output, Reporter reporter)throws IOException String line = valu
18、e.toString();String year = line.substring(15, 19);int airTemperature;if (line.charAt(87) = '+') / parseInt doesn't like leading plus/ signsairTemperature = Integer.parseInt(line.substring(88, 92); else airTemperature = Integer.parseInt(line.substring(87, 92);String quality = line.substri
19、ng(92, 93);if (airTemperature != MISSING && quality.matches("01459") output.collect(new Text(year), new IntWritable(airTemperature);Mapper是一個泛型類型,有四個參數(shù)分別代表Map函數(shù)的input key, input value, output key, output value 的類型。對于本例來說,input key是一個long integer的偏移量,input value是一行文本,output key是年份,o
20、utput value是氣溫(整形)。除了Java的數(shù)據(jù)類型之外,Hadoop也提供了他自己的基本類型,這些類型為網(wǎng)絡(luò)序列化做了專門的優(yōu)化??梢栽趏rg.apache.hadoop.io包中找到他們。比如LongWritable相當(dāng)于Java中的Long,Text相當(dāng)于String而IntWritable在相當(dāng)于Integer。map()方法傳入一個key和一個value。我們將Text類型的value轉(zhuǎn)化成Java的String,然后用String的substring方法取出我偶們需要的部分。map()方法也提供了OutputCollector的一個實例,用來寫輸出數(shù)據(jù)。在次情況下,我們將y
21、ear封裝成Text,而將temperature包裝成IntWritable類型。只要在temperature值出現(xiàn)并且quality code表示temperature讀取正常的情況下我們才進(jìn)行數(shù)據(jù)的寫入。下面是Reduce函數(shù)的類似實現(xiàn),僅用了一個Reducer接:import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.MapRedu
22、ceBase;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;public class MaxTemperatureReducer extends MapReduceBase implementsReducer<Text, IntWritable, Text, IntWritable> public void reduce(Text key, Iterator<IntW
23、ritable> values,OutputCollector<Text, IntWritable> output, Reporter reporter)throws IOException int maxValue = Integer.MIN_VALUE;while (values.hasNext() maxValue = Math.max(maxValue, values.next().get();output.collect(key, new IntWritable(maxValue);類似的,Reducer也有四個參數(shù)來分別標(biāo)記輸入輸出。Reduce函數(shù)的輸入類型必須
24、對應(yīng)于Map函數(shù)的輸出,拿本例來說,輸入必須是:Text,IntWritable類型。Reduce在本例輸出結(jié)果是Text和IntWritbale類型,year和與其對應(yīng)的maxValue是經(jīng)過遍歷、比較之后得到的。下面的一段代碼執(zhí)行了MapReduce工作:import java.io.IOException;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileI
25、nputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;public class MaxTemperature public static void main(String args) throws IOException if (args.length != 2) System.err.println("Usage: MaxTemperature <i
26、nput path> <output path>");System.exit(-1);JobConf conf = new JobConf(MaxTemperature.class);conf.setJobName("Max temperature");FileInputFormat.addInputPath(conf, new Path(args0);FileOutputFormat.setOutputPath(conf, new Path(args1);conf.setMapperClass(MaxTemperatureMapper.clas
27、s);conf.setReducerClass(MaxTemperatureReducer.class);conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(IntWritable.class);JobClient.runJob(conf);JobConf對象構(gòu)成了mapReduce job的說明,給出了job的總體控制。當(dāng)執(zhí)行MapReduce工作的時候我們需要將代碼打包成一個jar文件(這個文件將被Hadoop在集群中分發(fā))。我們并沒有指定jar文件,但是我們傳遞了一個class給JobConf的構(gòu)造函數(shù),Hadoop將利
28、用它通過查找包含這個類的jar文件而去定位相關(guān)的jar件。之后我們指定input、output路徑。FileInputFormat.addInputPath(conf, new Path(args0);FileInputFormat的靜態(tài)方法addInputPath來添加input path,input path可以是文件名或者目錄,如果是目錄的話,在目錄下面的文件都會作為輸入。addInputPath可以調(diào)用多次。FileOutputFormat.setOutputPath(conf, new Path(args1);FileOutputFormat的setOutput Path()方法指定
29、output path。這個目錄在運行job之前是不應(yīng)該存在的,這樣可以阻止數(shù)據(jù)丟失。conf.setMapperClass(MaxTemperatureMapper.class);conf.setReducerClass(MaxTemperatureReducer.class);指定了mapper和reducer類。conf.setOutputKeyClass(Text.class);設(shè)置output key類型conf.setOutputValueClass(IntWritable.class);設(shè)置output value類型一般map和reduce的key、value類型都是一樣的,如
30、果不一樣的話可以調(diào)用setMapOutputKeyClass() 和 setMapOutputValueClass()來設(shè)置。輸入類型由input format控制,本例使用的是默認(rèn)的Text格式,所以沒有顯式指定。JobClient.runJob(conf);提交工作,等待工作完成。2.4. The New Java MapReduce API0.20.0版本的Hadoop新增了一個Context Object,為API將來進(jìn)化做準(zhǔn)備。新舊api不兼容,要想使用新api的特性,程序需要重寫。主要有以下幾處重大改進(jìn):1)The new API favors abstract classes o
31、ver interfaces, since these are easier to evolve.For example, you can add a method (with a default implementation) to an abstractclass without breaking old implementations of the class. In the new API, theMapper and Reducer interfaces are now abstract classes.2) The new API is in the org.apache.hado
32、op.mapreduce package (and subpackages).The old API is found in org.apache.hadoop.mapred.3) The new API makes extensive use of context objects that allow the user code tocommunicate with the MapReduce system. The MapContext, for example, essentiallyunifies the role of the JobConf, the OutputCollector
33、, and the Reporter.4)The new API supports both a “push” and a “pull” style of iteration. In both APIs,key-value record pairs are pushed to the mapper, but in addition, the new APIallows a mapper to pull records from within the map() method. The same goes forthe reducer. An example of how the “pull”
34、style can be useful is processing recordsin batches, rather than one by one.5)Configuration has been unified. The old API has a special JobConf object for jobconfiguration, which is an extension of Hadoops vanilla Configuration object(used for configuring daemons; see “The Configuration API” on page
35、 116). In thenew API, this distinction is dropped, so job configuration is done through aConfiguration.6)Job control is performed through the Job class, rather than JobClient, which nolonger exists in the new API.使用新api重寫的程序如下:public class NewMaxTemperature static class NewMaxTemperatureMapper exten
36、dsMapper<LongWritable, Text, Text, IntWritable> private static final int MISSING = 9999;public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException String line = value.toString();String year = line.substring(15, 19);int airTemperature;if (line.charAt(
37、87) = '+') / parseInt doesn't like leading plus/ signsairTemperature = Integer.parseInt(line.substring(88, 92); else airTemperature = Integer.parseInt(line.substring(87, 92);String quality = line.substring(92, 93);if (airTemperature != MISSING && quality.matches("01459"
38、) context.write(new Text(year), new IntWritable(airTemperature);static class NewMaxTemperatureReducer extendsReducer<Text, IntWritable, Text, IntWritable> public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException int maxValue = In
39、teger.MIN_VALUE;for (IntWritable value : values) maxValue = Math.max(maxValue, value.get();context.write(key, new IntWritable(maxValue);public static void main(String args) throws Exception if (args.length != 2) System.err.println("Usage: NewMaxTemperature <input path> <output path>
40、");System.exit(-1);Job job = new Job();job.setJarByClass(NewMaxTemperature.class);FileInputFormat.addInputPath(job, new Path(args0);FileOutputFormat.setOutputPath(job, new Path(args1);job.setMapperClass(NewMaxTemperatureMapper.class);job.setReducerClass(NewMaxTemperatureReducer.class);job.setOu
41、tputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);System.exit(job.waitForCompletion(true) ? 0 : 1);2.5. Scaling Out2.5.1. DataFlow數(shù)據(jù)流1)MapReduce Job:客戶端要處理的一批工作,包括input data、mapreduce程序、配置信息。2)Hadoop工作分為map task和reduce task兩種。3)有兩種節(jié)點控制job運行,一種是jobtracker,一種是tasktracker。Jobtracker通過
42、調(diào)度tasktracker協(xié)調(diào)所有工作的執(zhí)行。Tasktracker運行任務(wù)并將報告發(fā)送給jobtracker,jobtracker所有工作的進(jìn)度。如果一個任務(wù)失敗,jobtracker再重新調(diào)度一個不同的tasktracker進(jìn)行工作。4)splits,input splits。Hadoop將輸入劃分成固定大小的塊,這些塊就叫splits。分塊不能太大,也不能太小,一般是64MB,也就是HDFS默認(rèn)的塊大小。5)data locality map處理本機(jī)HDFS的數(shù)據(jù),不經(jīng)過網(wǎng)絡(luò)。6)Map將輸出寫到本地磁盤,沒有寫到HDFS中。7)reduce task沒有data locality優(yōu)勢下
43、面是mapreduce的幾種執(zhí)行方式:MapReduce data flow with a single reduce taskMapReduce data flow with multiple reduce tasksMapReduce data flow with no reduce task2.5.2. Combiner FunctionsCombiner將map出來的中間數(shù)據(jù)進(jìn)行處理,減少網(wǎng)絡(luò)傳輸量。指定combiner方法:conf.setCombinerClass(MaxTemperatureReducer.class);2.6. Hadoop streaming(略)2.7. H
44、adoop pipes(略)3. The Hadoop Distributed File System3.1. The Design of HDFSHDFS設(shè)計的針對對象:適合流式訪問的超大文件、在使用便宜的硬件搭建的集群上運行。HDFS不足:低延遲數(shù)據(jù)訪問(Hbase是個好選擇)、小文件多的時候出現(xiàn)問題(HDFS將文件Meta信息存儲在內(nèi)存中,內(nèi)存限制了可以控制的文件數(shù)量)、對文件的多個wirter進(jìn)行寫入或者任意位置的修改。3.2. HDFS Concept3.2.1. BlocksHDFS中Block的大小默認(rèn)是64M,小于塊大小的的文件并不占據(jù)整個塊的全部空間(而是將文件大小作為塊的大
45、小.比如要存放的文件是1k,但是系統(tǒng)的Block默認(rèn)是64MB,存放之后塊的大小是1k,不是64MB.文件若是大于64MB,則分多快進(jìn)行存儲.)使用Blocks的好處:1) 可以存儲大文件,一個文件的大小可以大于任何一個單塊硬盤的容量2) 把存儲單元抽象成塊而不是文件,簡化了存儲子系統(tǒng):簡化了數(shù)據(jù)管理、取消元數(shù)據(jù)關(guān)注3) 能很好適應(yīng)數(shù)據(jù)復(fù)制,數(shù)據(jù)復(fù)制保證系統(tǒng)的容錯和可用性。3.2.2. Namenodes and DatanodesNamenode:masterDatanode:workerNamenode管理文件系統(tǒng)名字空間(filesystem namespace),它維持了一個files
46、ystem tree,所有文件的metadata和目錄都在里面。信息被以兩種文件的形式持久化在硬盤上,namespace image,edit log.Hdfs提供了兩種namenode的容錯機(jī)制:1) 備份存儲持久化狀態(tài)的文件系統(tǒng)元數(shù)據(jù)的文件2) 提供secondary namenode。Secondary的主要角色是合并namespace image和edit log,防止edit log過大。但是secondary namenode的數(shù)據(jù)較master namenode的數(shù)據(jù)有所延遲,所有數(shù)據(jù)恢復(fù)以后肯定會有數(shù)據(jù)丟失3.3. The Command-line Interface以偽分布式
47、為例基本的文件系統(tǒng)操作:1) 將本地數(shù)據(jù)拷貝到hdfs上% hadoop fs -copyFromLocal input/docs/quangle.txt hdfs:/localhost/user/tom/quangle.txthdfs:/可省去,這樣變成% hadoop fs -copyFromLocal input/docs/quangle.txt /user/tom/quangle.txt也可以使用相對路徑:% hadoop fs -copyFromLocal input/docs/quangle.txt quangle.txt2) 將數(shù)據(jù)從hdfs上拷貝到本地硬盤并檢查文件時候一致%
48、hadoop fs -copyToLocal quangle.txt quangle.copy.txt% md5 input/docs/quangle.txt quangle.copy.txtMD5 (input/docs/quangle.txt) = a16f231da6b05e2ba7a339320e7dacd9MD5 (quangle.copy.txt) = a16f231da6b05e2ba7a339320e7dacd93) Hdfs文件列表% hadoop fs -mkdir books% hadoop fs -ls .Found 2 itemsdrwxr-xr-x - tom su
49、pergroup 0 2009-04-02 22:41 /user/tom/books-rw-r-r- 1 tom supergroup 118 2009-04-02 22:29 /user/tom/quangle.txt第一列:文件模式(類似posix)第二列:文件被復(fù)制的份數(shù)第三列:文件擁有者第四列:文件擁有者的group第五列:文件大小,目錄顯示為0第六列:文件最后修改日期第七列:文件最后修改時間第八列:文件的絕對路徑3.4. Hadoop FilesystemsHadoop有一個對文件系統(tǒng)的抽象,HDFS只是其中的一個實現(xiàn)。Java的抽象類org.apache.hadoop.fs.Fi
50、leSystem代表了Hadoop中的文件系統(tǒng),還有其他的幾種實現(xiàn)(48頁):3.4.1. InterfacesHadoop用Java寫成,所有Hadoop文件的交互都通過Java api來完成。還有另外的與Hadoop文件系統(tǒng)交互的庫:Thrift、C、FUSE、WebDAV等3.5. The Java Interface3.5.1. Reading Data from a Hadoop URL最簡單的方式是用.URL對象打開一個流來讀取。如下:InputStream in = null;try in = new URL("hdfs:/host/path").openSt
51、ream();/ process in finally IOUtils.closeStream(in);這里需要進(jìn)行一點額外的工作才能使得URL識別hdfs的uri。我們要使用.URL的setURLStreamHandlerFactory()方法設(shè)置URLStreamHandlerFactory,這里需要傳遞一個FsUrlStreamHandlerFactory。這個操作對一個jvm只能使用一次,我們可以在靜態(tài)塊中調(diào)用。public class URLCat static URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory(
52、);public static void main(String args) throws Exception InputStream in = null;try in = new URL(args0).openStream();IOUtils.copyBytes(in, System.out, 4096, false); finally IOUtils.closeStream(in);IOUtils是一個工具類,用來在finally從句中關(guān)閉流,也可以用來拷貝數(shù)據(jù)到輸出流中。copyBytes方法的四個參數(shù)代表的含義分別是:拷貝的來源,去處,拷貝的字節(jié)數(shù)已經(jīng)在拷貝完成之后是否關(guān)閉流。本例會有
53、如下結(jié)果呈現(xiàn):% hadoop URLCat hdfs:/localhost/user/tom/quangle.txtOn the top of the Crumpetty TreeThe Quangle Wangle sat,But his face you could not see,On account of his Beaver Hat.3.5.2. Reading Data Using the FileSystem API在某些情況下設(shè)置URLStreamHandlerFactory的方式并不一定回生效。在這種情況下,需要用FileSystem API來打開一個文件的輸入流。文件的位
54、置是使用Hadoop Path呈現(xiàn)在Hadoop中的,與java.io中的不一樣。有兩種方式獲取FileSystem的實例:public static FileSystem get(Configuration conf) throws IOExceptionpublic static FileSystem get(URI uri, Configuration conf) throws IOExceptionConfiguration封裝了client或者server的配置,這些配置從classpath中讀取,比如被classpath指向的conf/core-site.xml文件.第一個方法從默
55、認(rèn)位置(conf/core-site.xml)讀取配置,第二個方法根據(jù)傳入的uri查找適合的配置文件,若找不到則返回使用第一個方法,即從默認(rèn)位置讀取。在獲得FileSystem實例之后,我們可以調(diào)用open()方法來打開輸入流:public FSDataInputStream open(Path f) throws IOExceptionpublic abstract FSDataInputStream open(Path f, int bufferSize) throws IOException第一個方法的參數(shù)f是文件位置,第二個方法的bufferSize就是輸入流的緩沖大小。下面的代碼是使用FileSystem打開輸入流的示例:public class FileSystemCat public static void main(String args) throws Exception String uri = args0;Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(uri), conf);InputStream in = null;try in = fs.open(new Path(uri);IOUtils.copyBytes(i
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 企業(yè)運營效率與策略研究
- 三農(nóng)村危房改造工作指南
- 綜合農(nóng)業(yè)可研報告
- 三農(nóng)產(chǎn)品品牌打造作業(yè)指導(dǎo)書
- 軟件行業(yè)項目可行性分析報告
- 裝配式建筑設(shè)計規(guī)范
- 農(nóng)業(yè)產(chǎn)業(yè)鏈延伸發(fā)展策略手冊
- 光伏發(fā)電太陽能工程
- 環(huán)保產(chǎn)業(yè)園區(qū)可行性研究報告
- 項目籌備及執(zhí)行計劃書
- 《電氣基礎(chǔ)知識培訓(xùn)》課件
- 2024年河南省新鄉(xiāng)市中考數(shù)學(xué)一模試卷
- 2024春蘇教版《亮點給力大試卷》 數(shù)學(xué)四年級下冊(全冊有答案)
- 數(shù)電課件康華光電子技術(shù)基礎(chǔ)-數(shù)字部分第五版完全
- DB21-T 2041-2022寒區(qū)溫拌瀝青路面工程技術(shù)規(guī)程
- 語文主題學(xué)習(xí)整本書閱讀指導(dǎo)課件
- 職業(yè)教育課堂教學(xué)設(shè)計(全)課件
- 工程項目造價控制措施
- 心電監(jiān)護(hù)操作評分標(biāo)準(zhǔn)
- 電子印鑒卡講解
- 二方審核計劃
評論
0/150
提交評論