




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
1、自定義Hadoop Map/Reduce輸入文件切割I(lǐng)nputFormatHadoop會(huì)對(duì)原始輸入文件進(jìn)行文件切割,然后把每個(gè)split傳入mapper程序中進(jìn)行處理,F(xiàn)ileInputFormat是所有以文件作 為數(shù)據(jù)源的InputFormat實(shí)現(xiàn)的基類,F(xiàn)ileInputFormat保存作為job輸入的所有文件,并實(shí)現(xiàn)了對(duì)輸入文件計(jì)算splits的方 法。至于獲得記錄的方法是有不同的子類進(jìn)行實(shí)現(xiàn)的。那么,F(xiàn)ileInputFormat是怎樣將他們劃分成splits的呢?FileInputFormat只劃分比HDFS block大的文件,所以如果一個(gè)文件的大小比block小,將不會(huì)被劃分,這
2、也是Hadoop處理大文件的效率要比處理很多小文件的效率高的原因。hadoop默認(rèn)的InputFormat是TextInputFormat,重寫了FileInputFormat中的createRecordReader和isSplitable方法。該類使用的reader是LineRecordReader,即以回車鍵(CR = 13)或換行符(LF = 10)為行分隔符。但大多數(shù)情況下,回車鍵或換行符作為輸入文件的行分隔符并不能滿足我們的需求,通常用戶很有可能會(huì)輸入回車鍵、換行符,所以通常我們會(huì)定義不可見字符(即用戶無法輸入的字符)為行分隔符,這種情況下,就需要新寫一個(gè)InputFormat。又或
3、者,一條記錄的分隔符不是字符,而是字符串,這種情況相對(duì)麻煩;還有一種情況,輸入文件的主鍵key已經(jīng)是排好序的了,需要hadoop做的只是把相 同的key作為一個(gè)數(shù)據(jù)塊進(jìn)行邏輯處理,這種情況更麻煩,相當(dāng)于免去了mapper的過程,直接進(jìn)去reduce,那么InputFormat的邏輯就相 對(duì)較為復(fù)雜了,但并不是不能實(shí)現(xiàn)。1、改變一條記錄的分隔符,不用默認(rèn)的回車或換行符作為記錄分隔符,甚至可以采用字符串作為記錄分隔符。1)自定義一個(gè)InputFormat,繼承FileInputFormat,重寫createRecordReader方法,如果不需要分片或者需要改變分片的方式,則重寫isSplitab
4、le方法,具體代碼如下:public class FileInputFormatB extends FileInputFormat<LongWritable, Text> Overridepublic RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) return new SearchRecordReader("b");Overrideprotected boolean isSplitable(FileS
5、ystem fs, Path filename) / 輸入文件不分片return false;2)關(guān)鍵在于定義一個(gè)新的SearchRecordReader繼承RecordReader,支持自定義的行分隔符,即一條記錄的分隔符。標(biāo)紅的地方為與hadoop默認(rèn)的LineRecordReader不同的地方。public class IsearchRecordReader extends RecordReader<LongWritable, Text> private static final Log LOG = LogFactory.getLog(IsearchRecordReader.
6、class);private CompressionCodecFactory compressionCodecs = null;private long start;private long pos;private long end;private LineReader in;private int maxLineLength;private LongWritable key = null;private Text value = null;/行分隔符,即一條記錄的分隔符private byte separator = 'b'private int sepLength = 1;
7、public IsearchRecordReader()public IsearchRecordReader(String seps)this.separator = seps.getBytes(); sepLength = separator.length;public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException FileSplit split = (FileSplit) genericSplit;Configuration job = context.getC
8、onfiguration();this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);this.start = split.getStart();this.end = (this.start + split.getLength();Path file = split.getPath();pressionCodecs = new CompressionCodecFactory(job);CompressionCodec codec = pressionCod
9、ecs.getCodec(file);/ open the file and seek to the start of the splitFileSystem fs = file.getFileSystem(job);FSDataInputStream fileIn = fs.open(split.getPath();boolean skipFirstLine = false;if (codec != null) this.in = new LineReader(codec.createInputStream(fileIn), job);this.end = Long.MAX_VALUE; e
10、lse if (this.start != 0L) skipFirstLine = true;this.start -= sepLength;fileIn.seek(this.start);this.in = new LineReader(fileIn, job);if (skipFirstLine) / skip first line and re-establish "start".int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start)
11、;if(newSize > 0)start += newSize;this.pos = this.start;public boolean nextKeyValue() throws IOException if (this.key = null) this.key = new LongWritable();this.key.set(this.pos);if (this.value = null) this.value = new Text();int newSize = 0;while (this.pos < this.end) newSize = this.in.readLin
12、e(this.value, this.maxLineLength, Math.max(int) Math.min(Integer.MAX_VALUE, this.end - this.pos), this.maxLineLength);if (newSize = 0) break;this.pos += newSize;if (newSize < this.maxLineLength) break;LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - newSiz
13、e);if (newSize = 0) /讀下一個(gè)bufferthis.key = null;this.value = null;return false;/讀同一個(gè)buffer的下一個(gè)記錄return true;public LongWritable getCurrentKey() return this.key;public Text getCurrentValue() return this.value;public float getProgress() if (this.start = this.end) return 0.0F;return Math.min(1.0F, (floa
14、t) (this.pos - this.start) / (float) (this.end - this.start);public synchronized void close() throws IOException if (this.in != null)this.in.close();3)重寫SearchRecordReader需要的LineReader,可作為SearchRecordReader內(nèi)部類。特別需要注意的地方就 是,讀取文件的方式是按指定大小的buffer來讀,必定就會(huì)遇到一條完整的記錄被切成兩半,甚至如果分隔符大于1個(gè)字符時(shí)分隔符也會(huì)被切成兩半的情況, 這種情況一定
15、要加以拼接處理。public class LineReader /回車鍵(hadoop默認(rèn))/private static final byte CR = 13;/換行符(hadoop默認(rèn))/private static final byte LF = 10;/按buffer進(jìn)行文件讀取private static final int DEFAULT_BUFFER_SIZE = 32 * 1024 * 1024;private int bufferSize = DEFAULT_BUFFER_SIZE;private InputStream in;private byte buffer;priv
16、ate int bufferLength = 0;private int bufferPosn = 0;LineReader(InputStream in, int bufferSize) this.bufferLength = 0;this.bufferPosn = 0;this.in = in;this.bufferSize = bufferSize;this.buffer = new bytethis.bufferSize;public LineReader(InputStream in, Configuration conf) throws IOException this(in, c
17、onf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);public void close() throws IOException in.close();public int readLine(Text str, int maxLineLength) throws IOException return readLine(str, maxLineLength, Integer.MAX_VALUE);public int readLine(Text str) throws IOException return readLi
18、ne(str, Integer.MAX_VALUE, Integer.MAX_VALUE);/以下是需要改寫的部分_start,核心代碼public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOExceptionstr.clear();Text record = new Text();int txtLength = 0;long bytesConsumed = 0L;boolean newline = false;int sepPosn = 0;do /已經(jīng)讀到buffer的末尾了,讀下一個(gè)
19、bufferif (this.bufferPosn >= this.bufferLength) bufferPosn = 0;bufferLength = in.read(buffer);/讀到文件末尾了,則跳出,進(jìn)行下一個(gè)文件的讀取if (bufferLength <= 0) break;int startPosn = this.bufferPosn;for (; bufferPosn < bufferLength; bufferPosn +) /處理上一個(gè)buffer的尾巴被切成了兩半的分隔符(如果分隔符中重復(fù)字符過多在這里會(huì)有問題)if(sepPosn > 0 &
20、amp;& bufferbufferPosn != separatorsepPosn)sepPosn = 0;/遇到行分隔符的第一個(gè)字符if (bufferbufferPosn = separatorsepPosn) bufferPosn +;int i = 0;/判斷接下來的字符是否也是行分隔符中的字符for(+ sepPosn; sepPosn < sepLength; i +, sepPosn +)/buffer的最后剛好是分隔符,且分隔符被不幸地切成了兩半if(bufferPosn + i >= bufferLength)bufferPosn += i - 1;br
21、eak;/一旦其中有一個(gè)字符不相同,就判定為不是分隔符if(this.bufferthis.bufferPosn + i != separatorsepPosn)sepPosn = 0;break;/的確遇到了行分隔符if(sepPosn = sepLength)bufferPosn += i;newline = true;sepPosn = 0;break;int readLength = this.bufferPosn - startPosn;bytesConsumed += readLength;/行分隔符不放入塊中/int appendLength = readLength - new
22、lineLength;if (readLength > maxLineLength - txtLength) readLength = maxLineLength - txtLength;if (readLength > 0) record.append(this.buffer, startPosn, readLength);txtLength += readLength;/去掉記錄的分隔符if(newline)str.set(record.getBytes(), 0, record.getLength() - sepLength); while (!newline &&a
23、mp; (bytesConsumed < maxBytesToConsume);if (bytesConsumed > (long)Integer.MAX_VALUE) throw new IOException("Too many bytes before newline: " + bytesConsumed);return (int) bytesConsumed;/以下是需要改寫的部分_end/以下是hadoop-core中LineReader的源碼_startpublic int readLine(Text str, int maxLineLength,
24、int maxBytesToConsume) throws IOExceptionstr.clear();int txtLength = 0;int newlineLength = 0;boolean prevCharCR = false;long bytesConsumed = 0L;do int startPosn = this.bufferPosn;if (this.bufferPosn >= this.bufferLength) startPosn = this.bufferPosn = 0;if (prevCharCR) bytesConsumed +;this.bufferL
25、ength = this.in.read(this.buffer);if (this.bufferLength <= 0) break;for (; this.bufferPosn < this.bufferLength; this.bufferPosn +) if (this.bufferthis.bufferPosn = LF) newlineLength = (prevCharCR) ? 2 : 1;this.bufferPosn +;break;if (prevCharCR) newlineLength = 1;break;prevCharCR = this.bufferthis.bufferPosn = CR;int readLength = this.bufferPosn - startPosn;if (prevCharCR) &&a
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 重慶能源職業(yè)學(xué)院《機(jī)電系統(tǒng)建模與仿真》2023-2024學(xué)年第二學(xué)期期末試卷
- 甘孜職業(yè)學(xué)院《大跨度空間結(jié)構(gòu)》2023-2024學(xué)年第二學(xué)期期末試卷
- 2025屆寧夏吳忠市高三上學(xué)期適應(yīng)性考試(一模)歷史試卷
- 2024-2025學(xué)年浙江省六校聯(lián)盟高一上學(xué)期期中聯(lián)考?xì)v史試卷
- 做賬實(shí)操-代理記賬行業(yè)的賬務(wù)處理分錄
- 長春大學(xué)旅游學(xué)院《幼兒舞蹈創(chuàng)編二》2023-2024學(xué)年第二學(xué)期期末試卷
- 2024-2025學(xué)年湖北省新高考聯(lián)考協(xié)作體高一上學(xué)期期中考試歷史試卷
- 濟(jì)南工程職業(yè)技術(shù)學(xué)院《信息安全基礎(chǔ)》2023-2024學(xué)年第二學(xué)期期末試卷
- 聊城大學(xué)東昌學(xué)院《病理學(xué)與病理生理學(xué)》2023-2024學(xué)年第二學(xué)期期末試卷
- 亳州職業(yè)技術(shù)學(xué)院《數(shù)據(jù)分析與可視化實(shí)驗(yàn)》2023-2024學(xué)年第二學(xué)期期末試卷
- GB 1886.375-2024食品安全國家標(biāo)準(zhǔn)食品添加劑氫氧化鈣
- 物業(yè)員工晉升述職報(bào)告
- 建設(shè)工程施工專業(yè)分包合同(GF-2003-0213)
- 耳鼻喉科各項(xiàng)規(guī)章制度
- 玻璃分化板制作工藝
- 虹吸現(xiàn)象講解
- 設(shè)備采購計(jì)劃書
- 長興縣合溪水庫清淤工程(一期)環(huán)境影響報(bào)告
- 粒籽源永久性植入治療放射防護(hù)要求
- 新聞選題申報(bào)單
- 醫(yī)學(xué)倫理審查申請(qǐng)表
評(píng)論
0/150
提交評(píng)論