Hadoop大數(shù)據(jù)處理技術(shù)基礎(chǔ)與實(shí)踐(第3版)課件 第6章-Hadoop I-O 操作_第1頁(yè)
Hadoop大數(shù)據(jù)處理技術(shù)基礎(chǔ)與實(shí)踐(第3版)課件 第6章-Hadoop I-O 操作_第2頁(yè)
Hadoop大數(shù)據(jù)處理技術(shù)基礎(chǔ)與實(shí)踐(第3版)課件 第6章-Hadoop I-O 操作_第3頁(yè)
Hadoop大數(shù)據(jù)處理技術(shù)基礎(chǔ)與實(shí)踐(第3版)課件 第6章-Hadoop I-O 操作_第4頁(yè)
Hadoop大數(shù)據(jù)處理技術(shù)基礎(chǔ)與實(shí)踐(第3版)課件 第6章-Hadoop I-O 操作_第5頁(yè)
已閱讀5頁(yè),還剩28頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

第6章HadoopIO操作1了解什么是數(shù)據(jù)完整性掌握基本的基于文件的數(shù)據(jù)結(jié)構(gòu)了解常用的壓縮算法理解序列化基本原理學(xué)習(xí)目標(biāo)28/21/2024參考書(shū)38/21/2024

由于每個(gè)磁盤(pán)或者網(wǎng)絡(luò)上的I/O操作可能會(huì)對(duì)正在讀寫(xiě)的數(shù)據(jù)不慎引入錯(cuò)誤,如果通過(guò)的數(shù)據(jù)流量非常大,數(shù)據(jù)發(fā)生損壞的幾率很高。

檢查損壞數(shù)據(jù)的常用方法是在第一次進(jìn)入系統(tǒng)時(shí)計(jì)算數(shù)據(jù)的校驗(yàn)和,然后只要數(shù)據(jù)不是在一個(gè)可靠的通道上傳輸,就可能會(huì)發(fā)生損壞。如果新生成的校驗(yàn)和不完全匹配原始的校驗(yàn)和,那么數(shù)據(jù)就會(huì)被認(rèn)為是損壞的。

一個(gè)常用的錯(cuò)誤檢測(cè)代碼是CRC-32(cyclicredundancycheck,循環(huán)冗余檢查),計(jì)算一個(gè)32位的任何大小輸入的整數(shù)校驗(yàn)和。HDFS數(shù)據(jù)完整性48/21/2024HDFS的數(shù)據(jù)完整性58/21/2024HDFS以透明方式校驗(yàn)所有寫(xiě)入它的數(shù)據(jù),并在默認(rèn)設(shè)置下,會(huì)在讀取數(shù)據(jù)時(shí)驗(yàn)證校驗(yàn)和。針對(duì)數(shù)據(jù)的每個(gè)io.bytes.per.checksum(默認(rèn)512字節(jié))字節(jié),都會(huì)創(chuàng)建一個(gè)單獨(dú)的校驗(yàn)和。數(shù)據(jù)節(jié)點(diǎn)負(fù)責(zé)在存儲(chǔ)數(shù)據(jù)及其校驗(yàn)和之前驗(yàn)證它們收到的數(shù)據(jù)。從客戶端和其它數(shù)據(jù)節(jié)點(diǎn)復(fù)制過(guò)來(lái)的數(shù)據(jù)??蛻舳藢?xiě)入數(shù)據(jù)并且將它發(fā)送到一個(gè)數(shù)據(jù)節(jié)點(diǎn)管線中,在管線的最后一個(gè)數(shù)據(jù)節(jié)點(diǎn)驗(yàn)證校驗(yàn)和。HDFS的數(shù)據(jù)完整性68/21/2024客戶端讀取數(shù)據(jù)節(jié)點(diǎn)上的數(shù)據(jù)時(shí),會(huì)驗(yàn)證校驗(yàn)和,將其與數(shù)據(jù)節(jié)點(diǎn)上存儲(chǔ)的校驗(yàn)和進(jìn)行對(duì)比。每個(gè)數(shù)據(jù)節(jié)點(diǎn)維護(hù)一個(gè)連續(xù)的校驗(yàn)和驗(yàn)證日志,因此它知道每個(gè)數(shù)據(jù)塊最后驗(yàn)證的時(shí)間。每個(gè)數(shù)據(jù)節(jié)點(diǎn)還會(huì)在后臺(tái)線程運(yùn)行一個(gè)DataBlockScanner(數(shù)據(jù)塊檢測(cè)程序),定期驗(yàn)證存儲(chǔ)在數(shù)據(jù)節(jié)點(diǎn)上的所有塊,為了防止物理存儲(chǔ)介質(zhì)中位衰減鎖造成的數(shù)據(jù)損壞HDFS的數(shù)據(jù)完整性78/21/2024HDFS的數(shù)據(jù)完整性88/21/2024HDFS通過(guò)復(fù)制完整的副本來(lái)產(chǎn)生一個(gè)新的,無(wú)錯(cuò)的副本來(lái)“治愈”哪些出錯(cuò)的數(shù)據(jù)塊。工作方式:如果客戶端讀取數(shù)據(jù)塊時(shí)檢測(cè)到錯(cuò)誤,拋出ChecksumException前報(bào)告該壞塊以及它試圖從名稱節(jié)點(diǎn)中藥讀取的數(shù)據(jù)節(jié)點(diǎn)。名稱節(jié)點(diǎn)將這個(gè)塊標(biāo)記為損壞的,不會(huì)直接復(fù)制給客戶端或復(fù)制該副本到另一個(gè)數(shù)據(jù)節(jié)點(diǎn)。它會(huì)從其他副本復(fù)制一個(gè)新的副本。本地文件系統(tǒng)98/21/2024也可能禁用校驗(yàn)和:底層文件系統(tǒng)原生支持校驗(yàn)和。這里通過(guò)RawLocalFileSystem來(lái)替代LocalFileSystem完成。要在一個(gè)應(yīng)用中全局使用,只需要設(shè)置fs.file.impl值為org.apache.hadoop.fs.RawLocalFileSystem來(lái)重新map執(zhí)行文件的URL?;蛘咧幌雽?duì)某些讀取禁用校驗(yàn)和校驗(yàn)。

Configurationconf=...

FileSystemfs=newRawLocalFileSystem();

fs.initialize(null,conf);ChecksumFileSystem108/21/2024LocalFileSystem使用ChecksumFileSystem(校驗(yàn)和文件系統(tǒng))為自己工作,這個(gè)類可以很容易添加校驗(yàn)和功能到其他文件系統(tǒng)中。因?yàn)镃hecksumFileSystem也包含于文件系統(tǒng)中。

FileSystemrawFs=...

FileSystemchecksummedFs=newChecksumFileSystem(rawFs);Hadoop的HDFS和MapReduce子框架主要是針對(duì)大數(shù)據(jù)文件來(lái)設(shè)計(jì)的,在小文件的處理上不但效率低下,而且十分消耗內(nèi)存資源。解決辦法通常是選擇一個(gè)容器,將這些小文件包裝起來(lái),將整個(gè)文件作為一條記錄,可以獲得更高效率的存儲(chǔ)和處理,避免了多次打開(kāi)關(guān)閉流耗費(fèi)計(jì)算資源。HDFS提供了兩種類型的容器,分別是SequenceFile和MapFile?;谖募臄?shù)據(jù)結(jié)構(gòu)118/21/2024SequenceFile存儲(chǔ)SequenceFile的存儲(chǔ)類似于日志文件,所不同的是日志文件的每條記錄都是純文本數(shù)據(jù),而SequenceFile的每條記錄是可序列化、可持久化的鍵值數(shù)據(jù)結(jié)構(gòu)。SequenceFile提供相應(yīng)的讀寫(xiě)器和排序器,寫(xiě)操作根據(jù)壓縮的類型分為3種。(1)Writer:無(wú)壓縮寫(xiě)數(shù)據(jù)。(2)RecordCompressWriter:記錄級(jí)壓縮文件,只壓縮值。(3)BlockCompressWrite:塊級(jí)壓縮文件,鍵值采用獨(dú)立壓縮方式。讀取操作實(shí)際上可以讀取上述3種類型。128/21/2024SequenceFile存儲(chǔ)在存儲(chǔ)結(jié)構(gòu)上,SequenceFile主要由一個(gè)Header后跟多條Record組成,如圖所示。138/21/2024SequenceFile存儲(chǔ)當(dāng)保存的記錄有很多的時(shí)候,可以把一連串的記錄組織到一起,統(tǒng)一壓縮成一個(gè)塊。148/21/2024用命令行接口顯示序列文件

使用-text選項(xiàng)顯示文本格式的序列文件。

%hadoopfs-textnumber.seq158/21/2024MapFile是排序后的SequenceFile,并且它會(huì)額外生成一個(gè)索引文件提供按鍵的查找。讀寫(xiě)MapFile與讀寫(xiě)SequenceFile非常類似,只需要換成MapFie.Reader和MapFile.Writer就可以了。在命令行顯示MapFile的文件內(nèi)容同樣要用-text。MapFile168/21/2024MapFile是排序后的SequenceFile,并且它會(huì)額外生成一個(gè)索引文件提供按鍵的查找。讀寫(xiě)MapFile與讀寫(xiě)SequenceFile非常類似,只需要換成MapFie.Reader和MapFile.Writer就可以了。在命令行顯示MapFile的文件內(nèi)容同樣要用-text。壓縮178/21/2024文件壓縮188/21/2024文件壓縮兩大好處:減少存儲(chǔ)文件所需要的空間且加快了數(shù)據(jù)在網(wǎng)絡(luò)上或從磁盤(pán)上或到磁盤(pán)上的傳輸速度。各種壓縮算法的壓縮比:

編碼和解碼198/21/2024Codec是coder與decoder的縮略詞,實(shí)現(xiàn)了一種壓縮-解壓算法。Hadoop中的壓縮與解壓的類實(shí)現(xiàn)CompressionCodec接口

編碼和解碼208/21/2024CompressionCodec有兩個(gè)方法輕松地壓縮和解壓數(shù)據(jù)。使用usethe

createOutputStream(OutputStreamout)創(chuàng)建一個(gè)CompressionOutputStream,將其以壓縮格式寫(xiě)入底層的流。使用createInputStream(InputStreamin)獲取一個(gè)CompressionInputStream,從底層的流讀取未壓縮的數(shù)據(jù)。

編碼和解碼218/21/202401packagecom.laos.hadoop;0203importorg.apache.hadoop.conf.Configuration;04importorg.apache.hadoop.io.IOUtils;05importpress.CompressionCodec;06importpress.CompressionOutputStream;07importorg.apache.hadoop.util.ReflectionUtils;0809publicclassStreamCompressor{10

publicstaticvoidmain(String[]args)throwsException{11

StringcodecClassname="press.GzipCodec";12

Class<?>codecClass=Class.forName(codecClassname);13

Configurationconf=newConfiguration();14

CompressionCodeccodec=(CompressionCodec)ReflectionUtils15

.newInstance(codecClass,conf);16

//將讀入數(shù)據(jù)壓縮至System.out17

CompressionOutputStreamout=codec.createOutputStream(System.out);18

IOUtils.copyBytes(System.in,out,4096,false);19

out.finish();20

}2122}

在unix窗口輸入命令:$echo"Test"|hadoopjarhadoop-itest.jarcom.laos.hadoop.StreamCompressor|gunzip編碼和解碼228/21/2024使用CompressionCodecFactory方法來(lái)推斷CompressionCodec

在閱讀一個(gè)壓縮文件時(shí),我們可以從擴(kuò)展名來(lái)推斷出它的編碼/解碼器。以.gz結(jié)尾的文件可以用GzipCodec來(lái)閱讀。CompressionCodecFactory提供了getCodec()方法,從而將文件擴(kuò)展名映射到相應(yīng)的CompressionCodec。編碼和解碼238/21/2024

01packagecom.laos.hadoop;0203importjava.io.InputStream;04importjava.io.OutputStream;05import.URI;0607importorg.apache.hadoop.conf.Configuration;08importorg.apache.hadoop.fs.FileSystem;09importorg.apache.hadoop.fs.Path;10importorg.apache.hadoop.io.IOUtils;11importpress.CompressionCodec;12importpress.CompressionCodecFactory;1314publicclassFileDecompressor{15

publicstaticvoidmain(String[]args)throwsException{16

Stringuri=args[0];17

Configurationconf=newConfiguration();18

FileSystemfs=FileSystem.get(URI.create(uri),conf);1920

PathinputPath=newPath(uri);21

CompressionCodecFactoryfactory=newCompressionCodecFactory(conf);22

CompressionCodeccodec=factory.getCodec(inputPath);23

if(codec==null){24

System.err.println("Nocodecfoundfor"+uri);25

System.exit(1);26

}27

StringoutputUri=CompressionCodecFactory.removeSuffix(uri,codec28

.getDefaultExtension());29

InputStreamin=null;30

OutputStreamout=null;31

try{32

in=codec.createInputStream(fs.open(inputPath));33

out=fs.create(newPath(outputUri));34

IOUtils.copyBytes(in,out,conf);35

}finally{36

IOUtils.closeStream(in);37

IOUtils.closeStream(out);38

}39

}40}壓縮和輸入分隔248/21/2024考慮如何壓縮哪些將由MapReduce處理的數(shù)據(jù)時(shí),考慮壓縮格式是否支持分隔很重要。

例如,gzip格式使用default來(lái)存儲(chǔ)壓縮過(guò)的數(shù)據(jù),default將數(shù)據(jù)作為一系列壓縮過(guò)的塊存儲(chǔ),但是每塊的開(kāi)始沒(méi)有指定用戶在數(shù)據(jù)流中的任意點(diǎn)定位到下一個(gè)塊的起始位置,而是自身與數(shù)據(jù)同步,所以gzip不支持分隔機(jī)制。在MapReduce中使用壓縮258/21/2024如果要壓縮MapReduce作業(yè)的輸出,設(shè)置press為true,pression.codec屬性指定編碼解碼器。

如果輸入的文件時(shí)壓縮過(guò)的,MapReduce讀取時(shí),它們會(huì)自動(dòng)解壓,根據(jù)文件擴(kuò)展名來(lái)決定使用那一個(gè)壓縮解碼器。publicclassMaxTemperatureWithCompression{17

publicstaticvoidmain(String[]args)throwsIOException{18

if(args.length!=2){19

System.err.println("Usage:MaxTemperatureWithCompression<inputpath>"+20

"<outputpath>");21

System.exit(-1);22

}23

24

JobConfconf=newJobConf(MaxTemperatureWithCompression.class);conf.setJobName("Maxtemperaturewithoutputcompression");25

FileInputFormat.addInputPath(conf,newPath(args[0]));26

FileOutputFormat.setOutputPath(conf,newPath(args[1]));27

28

conf.setOutputKeyClass(Text.class);29

conf.setOutputValueClass(IntWritable.class);30

31

conf.setBoolean("press",true);32

conf.setClass("pression.codec",GzipCodec.class,33

CompressionCodec.class);

34

JobClient.runJob(conf);35

}36

}序列化:將結(jié)構(gòu)化對(duì)象轉(zhuǎn)換為字節(jié)流以便于通過(guò)網(wǎng)絡(luò)進(jìn)行傳輸或?qū)懭氪鎯?chǔ)的過(guò)程。反序列化:將字節(jié)流轉(zhuǎn)為一系列結(jié)構(gòu)化對(duì)象的過(guò)程。

序列化用在兩個(gè)地方:進(jìn)程間通信和持久存儲(chǔ)。在Hadoop中,節(jié)點(diǎn)之間的進(jìn)程間通信是用遠(yuǎn)程過(guò)程調(diào)用(RPC)。RPC協(xié)議將使用序列化將消息編碼為二進(jìn)制流(發(fā)送到遠(yuǎn)程節(jié)點(diǎn)),此后在接收端二進(jìn)制流被反序列化為消息。Hadoop使用自己的序列化格式Writables。序列化268/21/2024Writable接口packageorg.apache.hadoop.io;importjava.io.DataOutput;importjava.io.DataInput;importjava.io.IOException;

publicinterfaceWritable{

voidwrite(DataOutputout)throwsIOException;

//將狀態(tài)寫(xiě)入二進(jìn)制格式的流

voidreadFields(DataInputin)throwsIOException;//從二進(jìn)制格式的流讀出其狀態(tài)}WritableComparable和Comparator

IntWritable實(shí)現(xiàn)了WritableComparable接口。而WritableComparable繼承了Writable和Comparable。278/21/2024Writabl

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

最新文檔

評(píng)論

0/150

提交評(píng)論