Hadoop權(quán)威指南-中文版_第1頁
Hadoop權(quán)威指南-中文版_第2頁
Hadoop權(quán)威指南-中文版_第3頁
Hadoop權(quán)威指南-中文版_第4頁
Hadoop權(quán)威指南-中文版_第5頁
已閱讀5頁,還剩66頁未讀 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領

文檔簡介

目錄目錄

I

初識Hadoop

1

1.1

數(shù)據(jù)!數(shù)據(jù)

1

1.2

數(shù)據(jù)的存儲和分析

3

1.3

相較于其他系統(tǒng)

4

1.4

Hadoop發(fā)展簡史

9

1.5

ApacheHadoop項目

12

MapReduce簡介

15

2.1

一個氣象數(shù)據(jù)集

15

2.2

使用UnixTools來分析數(shù)據(jù)

17

2.3

使用Hadoop進行數(shù)據(jù)分析

19

2.4

分布化

30

2.5

Hadoop流

35

2.6

Hadoop管道

40

Hadoop分布式文件系統(tǒng)

44

3.1

HDFS的設計

44

3.2

HDFS的概念

45

3.3

命令行接口

48

3.4

Hadoop文件系統(tǒng)

50

3.5

Java接口

54

3.6

數(shù)據(jù)流

68

3.7

通過distcp進行并行復制

75

3.8

Hadoop歸檔文件

77

Hadoop的I/O

80

4.1

數(shù)據(jù)完整性

80

4.2

壓縮

83

4.3

序列化

92

4.4

基于文件的數(shù)據(jù)結(jié)構(gòu)

111

MapReduce應用開發(fā)

125

5.1

API的配置

126

5.2

配置開發(fā)環(huán)境

128

5.3

編寫單元測試

134

5.4

本地運行測試數(shù)據(jù)

138

5.5

在集群上運行

144

5.6

作業(yè)調(diào)優(yōu)

159

5.7

MapReduce的工作流

162

MapReduce的工作原理

166

6.1

運行MapReduce作業(yè)

166

6.2

失敗

172

6.3

作業(yè)的調(diào)度

174

6.4

shuffle和排序

175

6.6

任務的執(zhí)行

181

MapReduce的類型與格式

188

7.1

MapReduce類型

188

7.3

輸出格式

217

MapReduce特性

227

8.1

計數(shù)器

227

8.2

排序

235

8.3

聯(lián)接

252

8.4

次要數(shù)據(jù)的分布

258

8.5

MapReduce的類庫

263

Hadoop集群的安裝

264

9.1

集群說明

264

9.2

集群的建立和安裝

268

9.3

SSH配置

270

9.4

Hadoop配置

271

9.5

安裝之后

286

9.6

Hadoop集群基準測試

286

9.7

云計算中的Hadoop

290

Hadoop的管理

293

10.1

HDFS

293

10.2

監(jiān)控

306

10.3

維護

313

Pig簡介

321

11.1

安裝和運行Pig

322

11.2

實例

325

11.3

與數(shù)據(jù)庫比較

329

11.4

PigLatin

330

11.5

用戶定義函數(shù)

343

11.6

數(shù)據(jù)處理操作符

353

11.7

Pig實踐提示與技巧

363

Hbase簡介

366

12.1

HBase基礎

366

12.2

概念

367

12.3

安裝

371

12.4

客戶端

374

12.5

示例

377

12.6

HBase與RDBMS的比較

385

12.7

實踐

390

ZooKeeper簡介

394

13.1

ZooKeeper的安裝和運行

395

13.2

范例

396

13.3

ZooKeeper服務

405

13.4

使用ZooKeeper建立應用程序

417

13.5

工業(yè)界中的ZooKeeper

428

案例研究

431

14.1

Hadoop在Last.fm的應用

431

14.2

Hadoop和Hive在Facebook的應用

441

14.3

Hadoop在Nutch搜索引擎

451

14.4

Hadoop用于Rackspace的日志處理

466

14.5

Cascading項目

474

14.6

ApacheHadoop的1TB排序

488

ApacheHadoop的安裝

491

Cloudera的Hadoop分發(fā)包

497

預備NCDC氣象資料

502第1章初識Hadoop古時候,人們用牛來拉重物,當一頭牛拉不動一根圓木的時候,他們不曾想過培育個頭更大的牛。同樣,我們也不需要嘗試更大的計算機,而是應該開發(fā)更多的計算系統(tǒng)。--格蕾斯·霍珀1.1

數(shù)據(jù)!數(shù)據(jù)我們生活在數(shù)據(jù)時代!很難估計全球存儲的電子數(shù)據(jù)總量是多少,但是據(jù)IDC估計2006年"數(shù)字全球"項目(digitaluniverse)的數(shù)據(jù)總量為0.18ZB,并且預測到2011年這個數(shù)字將達到1.8ZB,為2006年的10倍。1ZB相當于10的21次方字節(jié)的數(shù)據(jù),或者相當于1000EB,1000000PB,或者大家更熟悉的10億TB的數(shù)據(jù)!這相當于世界上每個人一個磁盤驅(qū)動器的數(shù)量級。這一數(shù)據(jù)洪流有許多來源。考慮下文:

紐約證券交易所每天產(chǎn)生1TB的交易數(shù)據(jù)。著名社交網(wǎng)站Facebook的主機存儲著約100億張照片,占據(jù)PB級存儲空間。A,一個家譜網(wǎng)站,存儲著2.5PB數(shù)據(jù)?;ヂ?lián)網(wǎng)檔案館(TheInternetArchive)存儲著約2PB數(shù)據(jù),并以每月至少20TB的速度增長。瑞士日內(nèi)瓦附近的大型強子對撞機每年產(chǎn)生約15PB的數(shù)據(jù)。此外還有大量數(shù)據(jù)。但是你可能會想它對自己有何影響。大部分數(shù)據(jù)被鎖定在最大的網(wǎng)頁內(nèi)容里面(如搜索引擎)或者是金融和科學機構(gòu),對不對?是不是所謂的"大數(shù)據(jù)"的出現(xiàn)會影響到較小的組織或個人?我認為是這樣的。以照片為例,我妻子的祖父是一個狂熱的攝影愛好者,并且他成人之后,幾乎一直都在拍照片。他的所有照片(中等格式、幻燈片和35mm膠片),在掃描成高解析度照片時,占了大約10GB的空間。相比之下,我家去年一年用數(shù)碼相機拍攝的照片就占用了5GB的空間。我家產(chǎn)生照片數(shù)據(jù)的速度是我妻子祖父的35倍!并且,隨著拍攝更多的照片變得越來越容易,這個速度還在增加中。更常見的情況是,個人數(shù)據(jù)的產(chǎn)生量正在快速地增長。微軟研究院的MyLifeBits項目()顯示,在不久的將來,個人信息檔案將可能成為普遍現(xiàn)象。MyLifeBits是這樣的一個實驗:一個人與外界的聯(lián)系(電話、郵件和文件)被抓取和存儲供以后訪問。收集的數(shù)據(jù)包括每分鐘拍攝的照片等,導致整個數(shù)據(jù)量達到每月1GB的大小。當存儲成本下降到使其可以存儲連續(xù)的音頻和視頻時,服務于未來MyLifeBits項目的數(shù)據(jù)量將是現(xiàn)在的許多倍。個人數(shù)據(jù)的增長的確是大勢所趨,但更重要的是,計算機所產(chǎn)生的數(shù)據(jù)可能比人所產(chǎn)生的數(shù)據(jù)更大。機器日志、RFID讀取器、傳感器網(wǎng)絡、車載GPS和零售交易數(shù)據(jù)等,這些都會促使"數(shù)據(jù)之山越來越高"。公開發(fā)布的數(shù)據(jù)量也在逐年增加。作為組織或企業(yè),再也不能只管理自己的數(shù)據(jù),未來的成功在很大程度上取決于它是否能從其他組織的數(shù)據(jù)中提取出價值。這方面的先鋒(如亞馬遜網(wǎng)絡服務器、I或者)的公共數(shù)據(jù)集,它們的存在就在于促進"信息共享",任何人都可以共享并自由(或以AWS平臺的形式,或以適度的價格)下載和分析這些數(shù)據(jù)。不同來源的信息混合處理后會帶來意外的效果和至今難以想像的應用。以A項目為例,這是一個研究Flickr網(wǎng)站上天體愛好者群中新照片的項目。它分析每一張上傳的照片,并確定它是天空的哪一部分,或者是否是有趣的天體,如恒星或者星系。雖然這只是一個帶實驗性質(zhì)的新服務,但是它顯示了數(shù)據(jù)(這里特指攝影照片)的可用性并且被用來進行某些活動(圖像分析),而這些活動很多時候并不是數(shù)據(jù)創(chuàng)建者預先能夠想像到的。有句話是這么說的:"算法再好,通常也難敵更多的數(shù)據(jù)。"意思是說對于某些問題(譬如基于既往偏好生成的電影和音樂推薦),不論你的算法有多么猛,它們總是會在更多的數(shù)據(jù)面前無能為力(更不用說沒有優(yōu)化過的算法了)?,F(xiàn)在,我們有一個好消息和一個壞消息。好消息是有海量數(shù)據(jù)!壞消息是我們正在為存儲和分析這些數(shù)據(jù)而奮斗不息。1.2

數(shù)據(jù)的存儲和分析問題很簡單:多年來硬盤存儲容量快速增加的同時,訪問速度--數(shù)據(jù)從硬盤讀取的速度--卻未能與時俱進。1990年,一個普通的硬盤驅(qū)動器可存儲1370MB的數(shù)據(jù)并擁有4.4MB/s的傳輸速度,所以,只需五分鐘的時間就可以讀取整個磁盤的數(shù)據(jù)。20年過去了,1TB級別的磁盤驅(qū)動器是很正常的,但是數(shù)據(jù)傳輸?shù)乃俣葏s在100MB/s左右。所以它需要花兩個半小時以上的時間讀取整個驅(qū)動器的數(shù)據(jù)。從一個驅(qū)動器上讀取所有的數(shù)據(jù)需要很長的時間,寫甚至更慢。一個很簡單的減少讀取時間的辦法是同時從多個磁盤上讀取數(shù)據(jù)。試想一下,我們擁有100個磁盤,每個存儲百分之一的數(shù)據(jù)。如果它們并行運行,那么不到兩分鐘我們就可以讀完所有的數(shù)據(jù)。只使用一個磁盤的百分之一似乎很浪費。但是我們可以存儲100個數(shù)據(jù)集,每個1TB,并讓它們共享磁盤的訪問。我們可以想像,此類系統(tǒng)的用戶會很高興看到共享訪問可以縮短分析時間,并且,從統(tǒng)計角度來看,他們的分析工作會分散到不同的時間點,所以互相之間不會有太多干擾。盡管如此,現(xiàn)在更可行的是從多個磁盤并行讀寫數(shù)據(jù)。第一個需要解決的問題是硬件故障。一旦開始使用多個硬件設施,其中一個會出故障的概率是非常高的。避免數(shù)據(jù)丟失的常見做法是復制:通過系統(tǒng)保存數(shù)據(jù)的冗余副本,在故障發(fā)生時,可以使用數(shù)據(jù)的另一份副本。這就是冗余磁盤陣列的工作方式。Hadoop的文件系統(tǒng)HDFS(HadoopDistributedFilesystem)也是一個例子,雖然它采取的是另一種稍有不同的方法,詳見后文描述。第二個問題是大部分分析任務需要通過某種方式把數(shù)據(jù)合并起來,即從一個磁盤讀取的數(shù)據(jù)可能需要和另外99個磁盤中讀取的數(shù)據(jù)合并起來才能使用。各種不同的分布式系統(tǒng)能夠組合多個來源的數(shù)據(jù),但是如何保證正確性是一個非常難的挑戰(zhàn)。MapReduce提供了一個編程模型,其抽象出上述磁盤讀寫的問題,將其轉(zhuǎn)換為計算一個由成對鍵/值組成的數(shù)據(jù)集。這種模型的具體細節(jié)將在后面的章節(jié)討論。但是目前討論的重點是,這個計算由兩部分組成:Map和Reduce。這兩者的接口就是"整合"之地。就像HDFS一樣,MapReduce是內(nèi)建可靠性這個功能的。簡而言之,Hadoop提供了一個穩(wěn)定的共享存儲和分析系統(tǒng)。存儲由HDFS實現(xiàn),分析由MapReduce實現(xiàn)??v然Hadoop還有其他功能,但這些功能是它的核心所在。1.3

相較于其他系統(tǒng)MapReduce似乎采用的是一種蠻力方法。即,針對每個查詢,每一個數(shù)據(jù)集--至少是很大一部分--都會被處理。但這正是它的能力。MapReduce可以處理一批查詢,并且它針對整個數(shù)據(jù)集處理即席查詢并在合理時間內(nèi)獲得結(jié)果的能力也是具有突破性的。它改變了我們對數(shù)據(jù)的看法,并且解放了以前存儲在磁帶和磁盤上的數(shù)據(jù)。它賦予我們對數(shù)據(jù)進行創(chuàng)新的機會。那些以前需要很長時間才能獲得答案的問題現(xiàn)在已經(jīng)迎刃而解,但反過來,這又帶來了新的問題和見解。例如,Rackspace的郵件部門Mailtrust,用Hadoop處理郵件的日志。他們寫的一個查詢是找到其用戶的地理分布。他們是這樣說的:"隨著我們的壯大,這些數(shù)據(jù)非常有用,我們每月運行一次MapReduce任務來幫助我們決定哪些Rackspace數(shù)據(jù)中心需要添加新的郵件服務器。"通過將數(shù)百GB的數(shù)據(jù)整合,借助于分析工具,Rackspace的工程師得以了解這些數(shù)據(jù),否則他們永遠都不會了解,并且他們可以運用這些信息去改善他們?yōu)橛脩籼峁┑姆铡5?4章將詳細介紹Rackspace公司是如何運用Hadoop的。1.3.1

關系型數(shù)據(jù)庫管理系統(tǒng)為什么我們不能使用數(shù)據(jù)庫加上更多磁盤來做大規(guī)模的批量分析?為什么我們需要MapReduce?這個問題的答案來自于磁盤驅(qū)動器的另一個發(fā)展趨勢:尋址時間的提高速度遠遠慢于傳輸速率的提高速度。尋址就是將磁頭移動到特定位置進行讀寫操作的工序。它的特點是磁盤操作有延遲,而傳輸速率對應于磁盤的帶寬。如果數(shù)據(jù)的訪問模式受限于磁盤的尋址,勢必會導致它花更長時間(相較于流)來讀或?qū)懘蟛糠謹?shù)據(jù)。另一方面,在更新一小部分數(shù)據(jù)庫記錄的時候,傳統(tǒng)的B樹(關系型數(shù)據(jù)庫中使用的一種數(shù)據(jù)結(jié)構(gòu),受限于執(zhí)行查找的速度)效果很好。但在更新大部分數(shù)據(jù)庫數(shù)據(jù)的時候,B樹的效率就沒有MapReduce的效率高,因為它需要使用排序/合并來重建數(shù)據(jù)庫。在許多情況下,MapReduce能夠被視為一種RDBMS(關系型數(shù)據(jù)庫管理系統(tǒng))的補充。(兩個系統(tǒng)之間的差異見表1-1)。MapReduce很適合處理那些需要分析整個數(shù)據(jù)集的問題,以批處理的方式,尤其是AdHoc(自主或即時)分析。RDBMS適用于點查詢和更新(其中,數(shù)據(jù)集已經(jīng)被索引以提供低延遲的檢索和短時間的少量數(shù)據(jù)更新。MapReduce適合數(shù)據(jù)被一次寫入和多次讀取的應用,而關系型數(shù)據(jù)庫更適合持續(xù)更新的數(shù)據(jù)集。表1-1:關系型數(shù)據(jù)庫和MapReduce的比較傳統(tǒng)關系型數(shù)據(jù)庫MapReduce數(shù)據(jù)大小GBPB訪問交互型和批處理批處理更新多次讀寫一次寫入多次讀取結(jié)構(gòu)靜態(tài)模式動態(tài)模式集成度高低伸縮性非線性線性

MapReduce和關系型數(shù)據(jù)庫之間的另一個區(qū)別是它們操作的數(shù)據(jù)集中的結(jié)構(gòu)化數(shù)據(jù)的數(shù)量。結(jié)構(gòu)化數(shù)據(jù)是擁有準確定義的實體化數(shù)據(jù),具有諸如XML文檔或數(shù)據(jù)庫表定義的格式,符合特定的預定義模式。這就是RDBMS包括的內(nèi)容。另一方面,半結(jié)構(gòu)化數(shù)據(jù)比較寬松,雖然可能有模式,但經(jīng)常被忽略,所以它只能用作數(shù)據(jù)結(jié)構(gòu)指南。例如,一張電子表格,其中的結(jié)構(gòu)便是單元格組成的網(wǎng)格,盡管其本身可能保存任何形式的數(shù)據(jù)。非結(jié)構(gòu)化數(shù)據(jù)沒有什么特別的內(nèi)部結(jié)構(gòu),例如純文本或圖像數(shù)據(jù)。MapReduce對于非結(jié)構(gòu)化或半結(jié)構(gòu)化數(shù)據(jù)非常有效,因為它被設計為在處理時間內(nèi)解釋數(shù)據(jù)。換句話說:MapReduce輸入的鍵和值并不是數(shù)據(jù)固有的屬性,它們是由分析數(shù)據(jù)的人來選擇的。關系型數(shù)據(jù)往往是規(guī)范的,以保持其完整性和刪除冗余。規(guī)范化為MapReduce帶來問題,因為它使讀取記錄成為一個非本地操作,并且MapReduce的核心假設之一就是,它可以進行(高速)流的讀寫。Web服務器日志是記錄集的一個很好的非規(guī)范化例子(例如,客戶端主機名每次都以全名來指定,即使同一客戶端可能會出現(xiàn)很多次),這也是MapReduce非常適合用于分析各種日志文件的原因之一。MapReduce是一種線性的可伸縮的編程模型。程序員編寫兩個函數(shù)--

map函數(shù)和Reduce函數(shù)--每一個都定義一個鍵/值對集映射到另一個。這些函數(shù)無視數(shù)據(jù)的大小或者它們正在使用的集群的特性,這樣它們就可以原封不動地應用到小規(guī)模數(shù)據(jù)集或者大的數(shù)據(jù)集上。更重要的是,如果放入兩倍的數(shù)據(jù)量,運行的時間會少于兩倍。但是如果是兩倍大小的集群,一個任務任然只是和原來的一樣快。這不是一般的SQL查詢的效果。隨著時間的推移,關系型數(shù)據(jù)庫和MapReduce之間的差異很可能變得模糊。關系型數(shù)據(jù)庫都開始吸收MapReduce的一些思路(如ASTERDATA的和GreenPlum的數(shù)據(jù)庫),另一方面,基于MapReduce的高級查詢語言(如Pig和Hive)使MapReduce的系統(tǒng)更接近傳統(tǒng)的數(shù)據(jù)庫編程人員。1.3.2

網(wǎng)格計算高性能計算(HighPerformanceComputing,HPC)和網(wǎng)格計算社區(qū)多年來一直在做大規(guī)模的數(shù)據(jù)處理,它們使用的是消息傳遞接口(MessagePassingInterface,MPI)這樣的API。從廣義上講,高性能計算的方法是將作業(yè)分配給一個機器集群,這些機器訪問共享文件系統(tǒng),由一個存儲區(qū)域網(wǎng)絡(StorageAreaNetwork,SAN)進行管理。這非常適用于以主計算密集型為主的作業(yè),但當節(jié)點需要訪問的大數(shù)據(jù)量(數(shù)百GB的數(shù)據(jù),這是MapReduce實際開始"發(fā)光"的起點)時,這會成為一個問題,因為網(wǎng)絡帶寬成為"瓶頸",所以計算節(jié)點閑置下來了。MapReduce嘗試在計算節(jié)點本地存儲數(shù)據(jù),因此數(shù)據(jù)訪問速度會因為它是本地數(shù)據(jù)而比較快。這項"數(shù)據(jù)本地化"功能,成為MapReduce的核心功能并且也是它擁有良好性能的原因之一。意識到網(wǎng)絡帶寬在數(shù)據(jù)中心環(huán)境是最有價值的資源(到處復制數(shù)據(jù)會很容易的把網(wǎng)絡帶寬飽和)之后,MapReduce便通過顯式網(wǎng)絡拓撲結(jié)構(gòu)不遺余力地加以保護。請注意,這種安排不會排除MapReduce中的高CPU使用分析。MPI賦予程序員很大的控制,但也要求顯式控制數(shù)據(jù)流機制,需要使用傳統(tǒng)的C語言的功能模塊完成(例如socket),以及更高級的算法來進行分析。而MapReduce卻是在更高層面上完成任務,即程序員從鍵/值對函數(shù)的角度來考慮,同時數(shù)據(jù)流是隱含的。在一個大規(guī)模分布式計算平臺上協(xié)調(diào)進程是一個很大的挑戰(zhàn)。最困難的部分是恰當?shù)奶幚硎c錯誤--在不知道一個遠程進程是否已經(jīng)失敗的時候--仍然需要繼續(xù)整個計算。MapReduce將程序員從必須考慮失敗任務的情況中解放出來,它檢測失敗的map或者reduce任務,在健康的機器上重新安排任務。MapReduce能夠做到這一點,因為它是一個無共享的架構(gòu),這意味著各個任務之間彼此并不依賴。(這里講得稍微簡單了一些,因為mapper的輸出是反饋給reducer的,但這由MapReduce系統(tǒng)控制。在這種情況下,相對于返回失敗的map,應該對返回reducer給予更多關注,因為它必須確保它可以檢索到必要的map輸出,如果不行,必須重新運行相關的map從而生成必要的這些輸出。)因此,從程序員的角度來看,執(zhí)行任務的順序是無關緊要的。相比之下,MPI程序必須顯式地管理自己的檢查點和恢復機制,從而把更多控制權(quán)交給程序員,但這樣會加大編程的難度。MapReduce聽起來似乎是一個相當嚴格的編程模型,而且在某種意義上看的確如此:我們被限定于鍵/值對的類型(它們按照指定的方式關聯(lián)在一起),mapper和reducer彼此間的協(xié)作有限,一個接一個地運行(mapper傳輸鍵/值對給reducer)。對此,一個很自然的問題是:你是否能用它做點兒有用或普通的事情?答案是肯定的。MapReduce作為一個建立搜索索引產(chǎn)品系統(tǒng),是由Google的工程師們開發(fā)出來的,因為他們發(fā)現(xiàn)自己一遍又一遍地解決相同的問題(MapReduce的靈感來自傳統(tǒng)的函數(shù)式編程、分布式計算和數(shù)據(jù)庫社區(qū)),但它后來被應用于其他行業(yè)的其他許多應用。我們驚喜地看到許多算法的變體在MapReduce中得以表示,從圖像圖形分析,到基于圖表的問題,再到機器學習算法。它當然不能解決所有問題,但它是一個很普遍的數(shù)據(jù)處理工具。第14章將介紹一些Hadoop應用范例。1.3.3

志愿計算人們第一次聽說Hadoop和MapReduce的時候,經(jīng)常會問:"和SETI@home有什么區(qū)別?"SETI,全稱為SearchforExtra-TerrestrialIntelligence(搜尋外星人),運行著一個稱為SETI@home的項目()。在此項目中,志愿者把自己計算機CPU的空閑時間貢獻出來分析無線天文望遠鏡的數(shù)據(jù)借此尋外星智慧生命信號。SETI@home是最有名的擁有許多志愿者的項目,其他的還有GreatInternetMersennePrimeSearch(搜索大素數(shù))與Folding@home項目(了解蛋白質(zhì)構(gòu)成及其與疾病之間的關系)。志愿計算項目通過將他們試圖解決的問題分為幾個他們成為工作單元的塊來工作,并將它們送到世界各地的電腦上進行分析。例如,SETI@home的工作單元大約是0.35MB的無線電望遠鏡數(shù)據(jù),并且一個典型的計算機需要數(shù)小時或數(shù)天來分析。完成分析后,結(jié)果發(fā)送回服務器,客戶端獲得的另一項工作單元。作為防止欺騙的預防措施,每個工作單元必須送到三臺機器上并且需要有至少兩個結(jié)果相同才會被接受。雖然SETI@home在表面上可能類似于MapReduce(將問題分為獨立的塊,然后進行并行計算),但差異還是顯著的。SETI@home問題是CPU高度密集型的,使其適合運行于世界各地成千上萬臺計算機上,因為相對于其計算時間而言,傳輸工作單元的時間微不足道。志愿者捐獻的是CPU周期,而不是帶寬。MapReduce被設計為用來運行那些需要數(shù)分鐘或數(shù)小時的作業(yè),這些作業(yè)在一個聚集帶寬很高的數(shù)據(jù)中心中可信任的專用硬件設備上運行。相比之下,SETI@home項目是在接入互聯(lián)網(wǎng)的不可信的計算機上運行,這些計算機的網(wǎng)速不同,而且數(shù)據(jù)也不在本地。1.4

Hadoop發(fā)展簡史Hadoop是DougCutting--ApacheLucene創(chuàng)始人--開發(fā)的使用廣泛的文本搜索庫。Hadoop起源于ApacheNutch,后者是一個開源的網(wǎng)絡搜索引擎,本身也是由Lucene項目的一部分。Hadoop名字的起源Hadoop這個名字不是一個縮寫,它是一個虛構(gòu)的名字。該項目的創(chuàng)建者,DougCutting如此解釋Hadoop的得名:"這個名字是我孩子給一頭吃飽了的棕黃色大象命名的。我的命名標準就是簡短,容易發(fā)音和拼寫,沒有太多的意義,并且不會被用于別處。小孩子是這方面的高手。Googol就是由小孩命名的。"Hadoop及其子項目和后繼模塊所使用的名字往往也與其功能不相關,經(jīng)常用一頭大象或其他動物主題(例如:"Pig")。較小的各個組成部分給與更多描述性(因此也更俗)的名稱。這是一個很好的原則,因為它意味著可以大致從其名字猜測其功能,例如,jobtracker的任務就是跟蹤MapReduce作業(yè)。從頭開始構(gòu)建一個網(wǎng)絡搜索引擎是一個雄心勃勃的目標,不只是要編寫一個復雜的、能夠抓取和索引網(wǎng)站的軟件,還需要面臨著沒有專有運行團隊支持運行它的挑戰(zhàn),因為它有那么多獨立部件。同樣昂貴的還有:據(jù)MikeCafarella和DougCutting估計,一個支持此10億頁的索引需要價值約50萬美元的硬件投入,每月運行費用還需要3萬美元。不過,他們相信這是一個有價值的目標,因為這會開放并最終使搜索引擎算法普及化。Nutch項目開始于2002年,一個可工作的抓取工具和搜索系統(tǒng)很快浮出水面。但他們意識到,他們的架構(gòu)將無法擴展到擁有數(shù)十億網(wǎng)頁的網(wǎng)絡。在2003年發(fā)表的一篇描述Google分布式文件系統(tǒng)(簡稱GFS)的論文為他們提供了及時的幫助,文中稱Google正在使用此文件系統(tǒng)。GFS或類似的東西,可以解決他們在網(wǎng)絡抓取和索引過程中產(chǎn)生的大量的文件的存儲需求。具體而言,GFS會省掉管理所花的時間,如管理存儲節(jié)點。在2004年,他們開始寫一個開放源碼的應用,即Nutch的分布式文件系統(tǒng)(NDFS)。2004年,Google發(fā)表了論文,向全世界介紹了MapReduce。2005年初,Nutch的開發(fā)者在Nutch上有了一個可工作的MapReduce應用,到當年年中,所有主要的Nutch算法被移植到使用MapReduce和NDFS來運行。Nutch中的NDFS和MapReduce實現(xiàn)的應用遠不只是搜索領域,在2006年2月,他們從Nutch轉(zhuǎn)移出來成為一個獨立的Lucene子項目,稱為Hadoop。大約在同一時間,DougCutting加入雅虎,Yahoo提供一個專門的團隊和資源將Hadoop發(fā)展成一個可在網(wǎng)絡上運行的系統(tǒng)(見后文的補充材料)。在2008年2月,雅虎宣布其搜索引擎產(chǎn)品部署在一個擁有1萬個內(nèi)核的Hadoop集群上。2008年1月,Hadoop已成為Apache頂級項目,證明它是成功的,是一個多樣化、活躍的社區(qū)。通過這次機會,Hadoop成功地被雅虎之外的很多公司應用,如Last.fm、Facebook和《紐約時報》。(一些應用在第14章的案例研究和Hadoop維基有介紹,Hadoop維基的網(wǎng)址為。)有一個良好的宣傳范例,《紐約時報》使用亞馬遜的EC2云計算將4TB的報紙掃描文檔壓縮,轉(zhuǎn)換為用于Web的PDF文件。這個過程歷時不到24小時,使用100臺機器運行,如果不結(jié)合亞馬遜的按小時付費的模式(即允許《紐約時報》在很短的一段時間內(nèi)訪問大量機器)和Hadoop易于使用的并行程序設計模型,該項目很可能不會這么快開始啟動。2008年4月,Hadoop打破世界紀錄,成為最快排序1TB數(shù)據(jù)的系統(tǒng)。運行在一個910節(jié)點的群集,Hadoop在209秒內(nèi)排序了1TB的數(shù)據(jù)(還不到三分半鐘),擊敗了前一年的297秒冠軍。同年11月,谷歌在報告中聲稱,它的MapReduce實現(xiàn)執(zhí)行1TB數(shù)據(jù)的排序只用了68秒。在2009年5月,有報道宣稱Yahoo的團隊使用Hadoop對1TB的數(shù)據(jù)進行排序只花了62秒時間。Hadoop@Yahoo!構(gòu)建互聯(lián)網(wǎng)規(guī)模的搜索引擎需要大量的數(shù)據(jù),因此需要大量的機器來進行處理。Yahoo!Search包括四個主要組成部分:Crawler,從因特網(wǎng)下載網(wǎng)頁;WebMap,構(gòu)建一個網(wǎng)絡地圖;Indexer,為最佳頁面構(gòu)建一個反向索引;Runtime(運行時),回答用戶的查詢。WebMap是一幅圖,大約包括一萬億條邊(每條代表一個網(wǎng)絡鏈接)和一千億個節(jié)點(每個節(jié)點代表不同的網(wǎng)址)。創(chuàng)建和分析此類大圖需要大量計算機運行若干天。在2005年初,WebMap所用的基礎設施名為Dreadnaught,需要重新設計以適應更多節(jié)點的需求。Dreadnaught成功地從20個節(jié)點擴展到600個,但需要一個完全重新的設計,以進一步擴大。Dreadnaught與MapReduce有許多相似的地方,但靈活性更強,結(jié)構(gòu)更少。具體說來,每一個分段(fragment),Dreadnaught作業(yè)可以將輸出發(fā)送到此作業(yè)下一階段中的每一個分段,但排序是在庫函數(shù)中完成的。在實際情形中,大多數(shù)WebMap階段都是成對存在的,對應于MapReduce。因此,WebMap應用并不需要為了適應MapReduce而進行大量重構(gòu)。EricBaldeschwieler(Eric14)組建了一個小團隊,我們開始設計并原型化一個新的框架(原型為GFS和MapReduce,用C++語言編寫),打算用它來替換Dreadnaught。盡管當務之急是我們需要一個WebMap新框架,但顯然,標準化對于整個Yahoo!Search平臺至關重要,并且通過使這個框架泛化,足以支持其他用戶,我們才能夠充分運用對整個平臺的投資。與此同時,我們在關注Hadoop(當時還是Nutch的一部分)及其進展情況。2006年1月,雅虎聘請了DougCutting,一個月后,我們決定放棄我們的原型,轉(zhuǎn)而使用Hadoop。相較于我們的原型和設計,Hadoop的優(yōu)勢在于它已經(jīng)在20個節(jié)點上實際應用過。這樣一來,我們便能在兩個月內(nèi)搭建一個研究集群,并著手幫助真正的客戶使用這個新的框架,速度比原來預計的快許多。另一個明顯的優(yōu)點是Hadoop已經(jīng)開源,較容易(雖然遠沒有那么容易!)從雅虎法務部門獲得許可在開源方面進行工作。因此,我們在2006年初設立了一個200個節(jié)點的研究集群,我們將WebMap的計劃暫時擱置,轉(zhuǎn)而為研究用戶支持和發(fā)展Hadoop。Hadoop大事記2004年--最初的版本(現(xiàn)在稱為HDFS和MapReduce)由DougCutting和MikeCafarella開始實施。2005年12月--Nutch移植到新的框架,Hadoop在20個節(jié)點上穩(wěn)定運行。2006年1月--DougCutting加入雅虎。2006年2月--ApacheHadoop項目正式啟動以支持MapReduce和HDFS的獨立發(fā)展。2006年2月--雅虎的網(wǎng)格計算團隊采用Hadoop。2006年4月--標準排序(10GB每個節(jié)點)在188個節(jié)點上運行47.9個小時。2006年5月--雅虎建立了一個300個節(jié)點的Hadoop研究集群。2006年5月--標準排序在500個節(jié)點上運行42個小時(硬件配置比4月的更好)。06年11月--研究集群增加到600個節(jié)點。06年12月--標準排序在20個節(jié)點上運行1.8個小時,100個節(jié)點3.3小時,500個節(jié)點5.2小時,900個節(jié)點7.8個小時。07年1月--研究集群到達900個節(jié)點。07年4月--研究集群達到兩個1000個節(jié)點的集群。08年4月--贏得世界最快1TB數(shù)據(jù)排序在900個節(jié)點上用時209秒。08年10月--研究集群每天裝載10TB的數(shù)據(jù)。09年3月--17個集群總共24000臺機器。09年4月--贏得每分鐘排序,59秒內(nèi)排序500GB(在1400個節(jié)點上)和173分鐘內(nèi)排序100TB數(shù)據(jù)(在3400個節(jié)點上)。1.5

ApacheHadoop項目今天,Hadoop是一個分布式計算基礎架構(gòu)這把"大傘"下的相關子項目的集合。這些項目屬于Apache軟件基金會(),后者為開源軟件項目社區(qū)提供支持。雖然Hadoop最出名的是MapReduce及其分布式文件系統(tǒng)(HDFS,從NDFS改名而來),但還有其他子項目提供配套服務,其他子項目提供補充性服務。這些子項目的簡要描述如下,其技術(shù)棧如圖1-1所示。

圖1-1:Hadoop的子項目Core一系列分布式文件系統(tǒng)和通用I/O的組件和接口(序列化、JavaRPC和持久化數(shù)據(jù)結(jié)構(gòu))。Avro一種提供高效、跨語言RPC的數(shù)據(jù)序列系統(tǒng),持久化數(shù)據(jù)存儲。(在本書寫作期間,Avro只是被當作一個新的子項目創(chuàng)建,而且尚未有其他Hadoop子項目在使用它。)MapReduce分布式數(shù)據(jù)處理模式和執(zhí)行環(huán)境,運行于大型商用機集群。HDFS分布式文件系統(tǒng),運行于大型商用機集群。Pig一種數(shù)據(jù)流語言和運行環(huán)境,用以檢索非常大的數(shù)據(jù)集。Pig運行在MapReduce和HDFS的集群上。Hbase一個分布式的、列存儲數(shù)據(jù)庫。HBase使用HDFS作為底層存儲,同時支持MapReduce的批量式計算和點查詢(隨機讀取)。ZooKeeper一個分布式的、高可用性的協(xié)調(diào)服務。ZooKeeper提供分布式鎖之類的基本服務用于構(gòu)建分布式應用。

Hive分布式數(shù)據(jù)倉庫。Hive管理HDFS中存儲的數(shù)據(jù),并提供基于SQL的查詢語言(由運行時引擎翻譯成MapReduce作業(yè))用以查詢數(shù)據(jù)。Chukwa分布式數(shù)據(jù)收集和分析系統(tǒng)。Chukwa運行HDFS中存儲數(shù)據(jù)的收集器,它使用MapReduce來生成報告。(在寫作本書期間,Chukwa剛剛從Core中的"contrib"模塊分離出來獨立成為一個獨立的子項目。)第2章MapReduce簡介MapReduce是一種用于數(shù)據(jù)處理的編程模型。該模型非常簡單。同一個程序Hadoop可以運行用各種語言編寫的MapReduce程序。在本章中,我們將看到用Java,Ruby,Python和C++這些不同語言編寫的不同版本。最重要的是,MapReduce程序本質(zhì)上是并行的,因此可以將大規(guī)模的數(shù)據(jù)分析交給任何一個擁有足夠多機器的運營商。MapReduce的優(yōu)勢在于處理大型數(shù)據(jù)集,所以下面首先來看一個例子。2.1

一個氣象數(shù)據(jù)集在我們這個例子里,要編寫一個挖掘氣象數(shù)據(jù)的程序。分布在全球各地的氣象傳感器每隔一小時便收集當?shù)氐臍庀髷?shù)據(jù),從而積累了大量的日志數(shù)據(jù)。它們是適合用MapReduce進行分析的最佳候選,因為它們是半結(jié)構(gòu)化且面向記錄的數(shù)據(jù)。數(shù)據(jù)的格式我們將使用NationalClimaticDataCenter(國家氣候數(shù)據(jù)中心,NCDC,網(wǎng)址為)提供的數(shù)據(jù)。數(shù)據(jù)是以面向行的ASCII格式存儲的,每一行便是一個記錄。該格式支持許多氣象元素,其中許多數(shù)據(jù)是可選的或長度可變的。為簡單起見,我們將重點討論基本元素(如氣溫),這些數(shù)據(jù)是始終都有且有固定寬度的。例2-1顯示了一個簡單的示例行,其中一些重要字段加粗顯示。該行已被分成多行以顯示出每個字段,在實際文件中,字段被整合成一行且沒有任何分隔符。例2-1:國家氣候數(shù)據(jù)中心數(shù)據(jù)記錄的格式0057

332130

#

USAF

weather

station

identifier

99999

#

WBAN

weather

station

identifier

19500101

#

observation

date

0300

#

observation

time

4

+51317

#

latitude

(degrees

×

1000)

+028783

#

longitude

(degrees

×

1000)

FM-12

+0171

#

elevation

(meters)

99999

V020

320

#

wind

direction

(degrees)

1

#

quality

code

N

0072

1

00450

#

sky

ceiling

height

(meters)

1

#

quality

code

C

N

010000

#

visibility

distance

(meters)

1

#

quality

code

N

9

-0128

#

air

temperature

(degrees

Celsius

×

10)

1

#

quality

code

-0139

#

dew

point

temperature

(degrees

Celsius

×

10)

1

#

quality

code

10268

#

atmospheric

pressure

(hectopascals

×

10)

1

#

quality

code

數(shù)據(jù)文件按照日期和氣象站進行組織。從1901年到2001年,每一年都有一個目錄,每一個目錄都包含一個打包文件,文件中的每一個氣象站都帶有當年的數(shù)據(jù)。例如,1990年的前面的數(shù)據(jù)項如下:%

ls

raw/1990

|

head

010010-99999-1990.gz

010014-99999-1990.gz

010015-99999-1990.gz

010016-99999-1990.gz

010017-99999-1990.gz

010030-99999-1990.gz

010040-99999-1990.gz

010080-99999-1990.gz

010100-99999-1990.gz

010150-99999-1990.gz

因為實際生活中有成千上萬個氣象臺,所以整個數(shù)據(jù)集由大量較小的文件組成。通常情況下,我們更容易、更有效地處理數(shù)量少的大型文件,因此,數(shù)據(jù)會被預先處理而使每年記錄的讀數(shù)連接到一個單獨的文件中。(具體做法請參見附錄C)2.2

使用UnixTools來分析數(shù)據(jù)在全球氣溫數(shù)據(jù)中每年記錄的最高氣溫是多少?我們先不用Hadoop來回答這一問題,因為答案中需要提供一個性能標準(baseline)和一種檢查結(jié)果的有效工具。對于面向行的數(shù)據(jù),傳統(tǒng)的處理工具是awk。例2-2是一個小的程序腳本,用于計算每年的最高氣溫。例2-2:一個用于從NCDC氣象記錄中找出每年最高氣溫的程序#!/usr/bin/env

bash

for

year

in

all/*

do

echo

-ne

'basename

$year

.gz'"\t"

gunzip

-c

$year

|

\

awk

'{

temp

=

substr($0,

88,

5)

+

0;

q

=

substr($0,

93,

1);

if

(temp

!=9999

&&

q

~

/[01459]/

&&

temp

>

max)

max

=

temp

}

END

{

print

max

}'

done

該腳本循環(huán)遍歷壓縮文件,首先顯示年份,然后使用awk處理每個文件。awk腳本從數(shù)據(jù)中提取兩個字段:氣溫和質(zhì)量代碼。氣溫值通過加上一個0變成一個整數(shù)。接下來,執(zhí)行測試,從而判斷氣溫值是否有效(值9999代表在NCDC數(shù)據(jù)集缺少值),質(zhì)量代碼顯示的讀數(shù)是有疑問還是根本就是錯誤的。如果讀數(shù)是正確的,那么該值將與目前看到的最大值進行比較,如果該值比原先的最大值大,就替換掉目前的最大值。當文件中所有的行都已處理完并打印出最大值后,END塊中的代碼才會被執(zhí)行。下面是某次運行結(jié)果的開始部分:%./max_temperature.sh

1901

317

1902

244

1903

289

1904

256

1905

283

...

由于源文件中的氣溫值按比例增加到10倍,所以結(jié)果1901年的最高氣溫是31.7°C(在本世紀初幾乎沒有多少氣溫讀數(shù)會被記錄下來,所以這是可能的)。為完成對跨越一世紀這么長時間的查找,程序在EC2High-CPUExtraLargeInstance機器上一共運行了42分鐘。為加快處理,我們需要并行運行部分程序。從理論上講,這很簡單:我們可以通過使用計算機上所有可用的硬件線程來處理在不同線程中的各個年份的數(shù)據(jù)。但是這之中存在一些問題。首先,劃分成大小相同的作業(yè)塊通常并不容易或明顯。在這種情況下,不同年份的文件,大小差異很大,所以一些線程會比其他線程更早完成。即使它們繼續(xù)下一步的工作,但是整個運行中占主導地位的還是那些運行時間很長的文件。另一種方法是將輸入數(shù)據(jù)分成固定大小的塊,然后把每塊分配到各個進程。其次,獨立線程運行結(jié)果在合并后,可能還需要進一步的處理。在這種情況下,每年的結(jié)果是獨立于其他年份,并可能通過連接所有結(jié)果和按年份排序這兩種方式來合并它們。如果使用固定大小的塊這種方法,則此類合并會更緊湊。對于這個例子,某年的數(shù)據(jù)通常被分割成幾個塊,每個進行獨立處理。我們將最終獲得每個數(shù)據(jù)塊的最高氣溫,所以最后一步是尋找這些每年氣溫值中的最大值。最后,我們?nèi)匀皇芟抻谝慌_計算機的處理能力。如果手中所有的處理器都使用上都至少需要20分鐘,那就只能這樣了。我們不能使它更快。另外,一些數(shù)據(jù)集的增長會超出一臺計算機的處理能力。當我們開始使用多臺計算機時,整個大環(huán)境中的許多其他因素將發(fā)揮作用,可能由于協(xié)調(diào)性和可靠性的問題而出現(xiàn)當機等錯誤。誰運行整個作業(yè)?我們?nèi)绾翁幚硎〉倪M程?因此,盡管并行處理可行,但實際上它非常復雜。使用Hadoop之類的框架非常有助于處理這些問題。2.3

使用Hadoop進行數(shù)據(jù)分析為了更好地發(fā)揮Hadoop提供的并行處理機制的優(yōu)勢,我們必須把查詢表示成MapReduce作業(yè)。經(jīng)過一些本地的小規(guī)模測試,我們將能夠在機器集群上運行它。2.3.1

map和reduceMapReduce的工作過程分為兩個階段:map階段和reduce階段。每個階段都有鍵/值對作為輸入和輸出,并且它們的類型可由程序員選擇。程序員還具體定義了兩個函數(shù):map函數(shù)和reduce函數(shù)。我們在map階段輸入的是原始的NCDC數(shù)據(jù)。我們選擇的是一種文本輸入格式,以便數(shù)據(jù)集的每一行都會是一個文本值。鍵是在文件開頭部分文本行起始處的偏移量,但我們沒有這方面的需要,所以將其忽略。map函數(shù)很簡單。我們使用map函數(shù)來找出年份和氣溫,因為我們只對它們有興趣。在本例中,map函數(shù)只是一個數(shù)據(jù)準備階段,通過這種方式來建立數(shù)據(jù),使得reducer函數(shù)能在此基礎上進行工作:找出每年的最高氣溫。map函數(shù)也是很適合去除已損記錄的地方:在這里,我們將篩選掉缺失的、不可靠的或錯誤的氣溫

數(shù)據(jù)。為了全面了解map的工作方式,我們思考下面幾行示例的輸入數(shù)據(jù)(考慮到頁面篇幅,一些未使用的列已被去除,用省略號表示):這些行以鍵/值對的方式來表示map函數(shù):鍵是文件中的行偏移量,而這往往是我們在map函數(shù)中所忽視的。map函數(shù)的功能僅僅提取年份和氣溫(以粗體顯示),并將其作為輸出被發(fā)送。(氣溫值已被解釋為整數(shù))(1950,

0)

(1950,

22)

(1950,

?11)

(1949,

111)

(1949,

78)

map函數(shù)的輸出先由MapReduce框架處理,然后再被發(fā)送到reduce函數(shù)。這一處理過程根據(jù)鍵來對鍵/值對進行排序和分組。因此,繼續(xù)我們的示例,reduce函數(shù)會看到如下輸入:(1949,

[111,

78])

(1950,

[0,

22,

?11])

每年的年份后都有一系列氣溫讀數(shù)。所有reduce函數(shù)現(xiàn)在必須重復這個列表并從中找出最大的讀數(shù):(1949,

111)

(1950,

22)

這是最后的輸出:全球氣溫記錄中每年的最高氣溫。

整個數(shù)據(jù)流如圖2-1所示。在圖的底部是Unix的管道,模擬整個MapReduce的流程,其中的內(nèi)容我們將在以后討論Hadoop數(shù)據(jù)流時再次提到。

(點擊查看大圖)圖2-1:MapReduce的邏輯數(shù)據(jù)流2.3.2

JavaMapReduce(1)在明白MapReduce程序的工作原理之后,下一步就是要用代碼來實現(xiàn)它。我們需要三樣東西:一個map函數(shù)、一個reduce函數(shù)和一些來運行作業(yè)的代碼。map函數(shù)是由一個Mapper接口來實現(xiàn)的,其中聲明了一個map()方法。例2-3顯示了我們的map函數(shù)的實現(xiàn)。例2-3:最高氣溫示例的Mapper接口import

java.io.IOException;

import

org.apache.hadoop.io.IntWritable;

import

org.apache.hadoop.io.LongWritable;

import

org.apache.hadoop.io.Text;

import

org.apache.hadoop.mapred.MapReduceBase;

import

org.apache.hadoop.mapred.Mapper;

import

org.apache.hadoop.mapred.OutputCollector;

import

org.apache.hadoop.mapred.Reporter;

public

class

MaxTemperatureMapper

extends

MapReduceBase

implements

Mapper<LongWritable,

Text,

Text,

IntWritable>

{

private

static

final

int

MISSING

=

9999;

public

void

map(LongWritable

key,

Text

value,

OutputCollector<Text,

IntWritable>

output,

Reporter

reporter)

throws

IOException

{

String

line

=

value.toString();

String

year

=

line.substring(15,

19);

int

airTemperature;

if

(line.charAt(87)

==

'+')

{

//

parseInt

doesn't

like

leading

plus

signs

airTemperature

=

Integer.parseInt(line.substring(88,

92));

}

else

{

airTemperature

=

Integer.parseInt(line.substring(87,

92));

}

String

quality

=

line.substring(92,

93);

if

(airTemperature

!=

MISSING

&&

quality.matches("[01459]"))

{

output.collect(new

Text(year),

new

IntWritable(airTemperature));

}

}

}

該Mapper接口是一個泛型類型,它有4個形式參數(shù)類型,由它們來指定map函數(shù)的輸入鍵、輸入值、輸出鍵和輸出值的類型。就目前的示例來說,輸入鍵是一個長整數(shù)偏移量,輸入的值是一行文本,輸出的鍵是年份,輸出的值是氣溫(整數(shù))。Hadoop規(guī)定了自己的一套可用于網(wǎng)絡序列優(yōu)化的基本類型,而不是使用內(nèi)置的Java類型。這些都可以在org.apache.hadoop.io包中找到?,F(xiàn)在我們使用的是LongWritable類型(相當于Java的Long類型)、Text類型(相當于Java的String類型)和IntWritable類型(相當于Java的Integer類型)。map()方法需要傳入一個鍵和一個值。我們將一個包含Java字符串輸入行的Text值轉(zhuǎn)換成Java的String類型,然后利用其substring()方法提取我們感興趣的列。map()方法還提供了一個OutputCollector實例來寫入輸出內(nèi)容。在這種情況下,我們寫入年份作為一個Text對象(因為我們只使用一個鍵),用IntWritable類型包裝氣溫值。我們只有在氣溫顯示出來后并且它的質(zhì)量代碼表示的是正確的氣溫讀數(shù)時才寫入輸出記錄。reduce函數(shù)同樣在使用Reducer時被定義,如例2-4所示。例2-4:最高氣溫示例的Reducerimport

java.io.IOException;

import

java.util.Iterator;

import

org.apache.hadoop.io.IntWritable;

import

org.apache.hadoop.io.Text;

import

org.apache.hadoop.mapred.MapReduceBase;

import

org.apache.hadoop.mapred.OutputCollector;

import

org.apache.hadoop.mapred.Reducer;

import

org.apache.hadoop.mapred.Reporter;

public

class

MaxTemperatureReducer

extends

MapReduceBase

implements

Reducer<Text,

IntWritable,

Text,

IntWritable>

{

public

void

reduce(Text

key,

Iterator<IntWritable>

values,

OutputCollector<Text,

IntWritable>

output,

Reporter

reporter)

throws

IOException

{

int

maxValue

=

Integer.MIN_VALUE;

while

(values.hasNext())

{

maxValue

=

Math.max(maxValue,

values.next().get());

}

output.collect(key,

new

IntWritable(maxValue));

}

}

同樣,四個形式參數(shù)類型用于指定reduce函數(shù)的輸入和輸出類型。reduce函數(shù)的輸入類型必須與map函數(shù)的輸出類型相匹配:Text類型和IntWritable類型。在這種情況下,reduce函數(shù)的輸出類型是Text和IntWritable這兩種類型,前者是年份的類型而后者是最高氣溫的類型,在這些輸入類型之中,我們遍歷所有氣溫,并把每個記錄進行比較直到找到一個最高的為止。第三部分代碼運行的是MapReduce作業(yè)(請參見例2-5)。2.3.2

JavaMapReduce(2)例2-5:在氣象數(shù)據(jù)集中找出最高氣溫的應用程序import

java.io.IOException;

import

org.apache.hadoop.fs.Path;

import

org.apache.hadoop.io.IntWritable;

import

org.apache.hadoop.io.Text;

import

org.apache.hadoop.mapred.FileInputFormat;

import

org.apache.hadoop.mapred.FileOutputFormat;

import

org.apache.hadoop.mapred.JobClient;

import

org.apache.hadoop.mapred.JobConf;

public

class

MaxTemperature

{

public

static

void

main(String[]

args)

throws

IOException

{

if

(args.length

!=

2)

{

System.err.println("Usage:

MaxTemperature

<input

path>

<output

path>");

System.exit(-1);

}

JobConf

conf

=

new

JobConf(MaxTemperature.class);

conf.setJobName("Max

temperature");

FileInputFormat.addInputPath(conf,

new

Path(args[0]));

FileOutputFormat.setOutputPath(conf,

new

Path(args[1]));

conf.setMapperClass(MaxTemperatureMapper.class);

conf.setReducerClass(MaxTemperatureReducer.class);

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

JobClient.runJob(conf);

}

}

JobConf對象指定了作業(yè)的各種參數(shù)。它授予你對整個作業(yè)如何運行的控制權(quán)。當我們在Hadoop集群上運行這個作業(yè)時,我們把代碼打包成一個JAR文件(Hadoop會在集群分發(fā)這個包)。我們沒有明確指定JAR文件的名稱,而是在JobConf構(gòu)造函數(shù)中傳送一個類,Hadoop會找到這個包含此類的JAR文件。在創(chuàng)建JobConf對象后,我們將指定輸入和輸出的路徑。通過調(diào)用FileInputFormat內(nèi)的靜態(tài)方法addInputPath()來定義輸入的路徑,它可以是單個文件、目錄(本例中,輸入的內(nèi)容組成此目錄下所有文件)或文件模式的路徑。同時,addInputPath()可被調(diào)用多次從而實現(xiàn)使用多路徑輸入。輸出路徑(其中只有一個)是在FileOutputFormat內(nèi)的靜態(tài)方法setOutputPath()來指定的。它指定了reduce函數(shù)輸出文件寫入的目錄。在運行作業(yè)前該目錄不應該存在,否則Hadoop會報錯并且拒絕運行任務。這種預防措施是為了防止數(shù)據(jù)丟失(一個長時間的任務可能非常惱人地被另一個意外覆蓋)。接下來,通過setMapperClass()和setReducerClass()這兩個方法來指定要使用的map和reduce類型。setOutputKeyClass()和setOutputValueClass()方法控制map和reduce函數(shù)的輸出類型,正如本例所示,這兩個方法往往是相同的。如果它們不同,那么map的輸出類型可設置成使用setMapOutputKeyClass()和setMapOutputValue-Class()方法。輸入的類型通過輸入格式來控制,我們沒有設置,因為我們使用的是默認的TextInputFormat(文本輸入格式)。在設置了定義map和reduce函數(shù)的類之后,運行作業(yè)的準備工作就算完成了。JobClient內(nèi)的靜態(tài)方法runJob()會提交作業(yè)并等待它完成,把進展情況寫入控制臺。運行測試寫完MapReduce作業(yè)之后,拿一個小型的數(shù)據(jù)集進行測試以排除與代碼直接有關的問題,這是常規(guī)做法。首先,以獨立模式安裝Hadoop(詳細說明請參見附錄A)。在這種模式下,Hadoop運行中使用本地帶jobrunner(作業(yè)運行程序)的文件系統(tǒng)。讓我們用前面討論過的五行代碼的例子來測試它(考慮到頁面,這里已經(jīng)對輸出稍做修改和重新排版):%

export

HADOOP_CLASSPATH=build/classes

%

hadoop

MaxTemperature

input/ncdc/sample.txt

output

09/04/07

12:34:35

INFO

jvm.JvmMetrics:

Initializing

JVM

Metrics

with

processName=Job

Tracker,

sessionId=

09/04/07

12:34:35

WARN

mapred.JobClient:

Use

GenericOptionsParser

for

parsing

the

arguments.

Applications

should

implement

Tool

for

the

same.

09/04/07

12:34:35

WARN

mapred.JobClient:

No

job

jar

file

set.

User

classes

may

not

be

found.

See

JobConf(Class)

or

JobConf#setJar(String).

Format:

Total

input

paths

to

process

:

1

09/04/07

12:34:35

INFO

mapred.JobClient:

Running

job:

job_local_0001

Format:

Total

input

paths

to

process

:

1

09/04/07

12:34:35

INFO

mapred.MapTask:

numReduceTasks:

1

09/04/07

12:34:35

INFO

mapred.MapTask:

=

100

09/04/07

12:34:35

INFO

mapred.MapTask:

data

buffer

=

79691776/99614720

09/04/07

12:34:35

INFO

mapred.MapTask:

record

buffer

=

262144/327680

09/04/07

12:34:35

INFO

mapred.MapTask:

Starting

flush

of

map

output

09/04/07

12:34:36

INFO

mapred.MapTask:

Finished

spill

0

09/04/07

12:34:36

INFO

mapred.TaskRunner:

Task:attempt_local_0001_m_000000_0

is

done.

And

is

in

the

process

of

commiting

09/04/07

12:34:36

INFO

mapred.LocalJobRunner:

file:/Users/tom/workspace/htdg/input/n

cdc/sample.txt:0+529

09/04/07

12:34:36

INFO

mapred.TaskRunner:

Task

'

attempt_local_0001_m_000000_0'

done.

09/04/07

12:34:36

INFO

mapred.LocalJobRunner:

09/04/07

12:34:36

INFO

mapred.Merger:

Merging

1

sorted

segments

09/04/07

12:34:36

INFO

mapred.Merger:

Down

to

the

last

merge-pass,

with

1

segments

left

of

total

size:

57

bytes

09/04/07

12:34:36

INFO

mapred.LocalJobRunner:

09/04/07

12:34:36

INFO

mapred.TaskRunner:

Task:attempt_local_0001_r_000000_0

is

done

.

And

is

in

the

process

of

commiting

09/04/07

12:34:36

INFO

mapred.LocalJobRunner:

09/04/07

12:34:36

INFO

mapred.TaskRunner:

Task

attempt_local_0001_r_000000_0

is

allowed

to

commit

now

09/04/07

12:34:36

INFO

mapred.

FileOutputCommitter:

Saved

output

of

task

'attempt_local_0001_r_000000_0'

to

file:/

Users/tom/workspace/htdg/output

09/04/07

12:34:36

INFO

mapred.

LocalJobRunner:

reduce

>

reduce

09/04/07

12:34:36

INFO

mapred.TaskRunner:

Task

'attempt_local_0001_r_000000_0'

done.

09/04/07

12:34:36

INFO

mapred.JobClient:

map

100%

reduce

100%

09/04/07

12:34:36

INFO

mapred.JobClient:

Job

complete:

job_local_0001

09/04/07

12:34:36

INFO

mapred.JobClient:

Counters:

13

09/04/07

12:34:36

INFO

mapred.JobClient:

FileSystemCounters

09/04/07

12:34:36

INFO

mapred.JobClient:

FILE_BYTES_READ=27571

09/04/07

12:34:36

INFO

mapred.JobClient:

FILE_BYTES_WRITTEN=53907

09/04/07

12:34:36

INFO

mapred.JobClient:

Map-Reduce

Framework

09/04/07

12:34:36

INFO

mapred.JobClient:

Reduce

input

groups=2

09/04/07

12:34:36

INFO

mapred.JobClient:

Combine

output

records=0

09/04/07

12:34:36

INFO

mapred.JobClient:

Map

input

records=5

09/04/07

12:34:36

INFO

mapred.JobClient:

Reduce

shuffle

bytes=0

09/04/07

12:34:36

INFO

mapred.JobClient:

Reduce

output

records=2

09/04/07

12:34:36

INFO

mapred.JobClient:

Spilled

Records=10

09/04/07

12:34:36

INFO

mapred.JobClient:

Map

output

bytes=45

09/04/07

12:34:36

INFO

mapred.JobClient:

Map

input

bytes=529

09/04/07

12:34:36

INFO

mapred.JobClient:

Combine

input

records=0

09/04/07

12:34:36

INFO

mapred.JobClient:

Map

output

records=5

09/04/07

12:34:36

INFO

mapred.JobClient:

Reduce

input

records=5

2.3.2

JavaMapReduce(3)如果Hadoop命令是以類名作為第一個參數(shù),它就會啟動一個JVM來運行這個類。使用命令比直接使用Java更方便,因為前者把類的路徑(及其依賴關系)加入Hadoop的庫中,并獲得Hadoop的配置。要添加應用程序類的路徑,我們需要定義一個HADOOP_CLASSPATH環(huán)境變量,Hadoop腳本會來執(zhí)行相關操作。注意:以本地(獨立)模式運行時,本書所有程序希望都以這種方式來設置HADOOP_CLA-SSPATH。命令必須在示例代碼所在的文件夾下被運行。運行作業(yè)所得到的輸出提供了一些有用的信息。(無法找到作業(yè)JAR文件的相關信息是意料之中的,因為我們是在本地模式下沒有JAR的情況下運行的。在集群上運行時,不會看到此警告。)例如,我們可以看到,這個作業(yè)被給予了一個IDjob_local_0001,并且它運行了一個map任務和一個reduce任務(使用attempt_local_0001_m_000000_0和attempt_local_0001_r_000000_0兩個ID)。在調(diào)試MapReduce作業(yè)時,知道作業(yè)和任務的ID是非常有用的。輸出的最后一部分叫"計數(shù)器"(Counter),顯示了在Hadoop上運行的每個作業(yè)產(chǎn)生的統(tǒng)計信息。這些對檢查處理的數(shù)據(jù)量是否符合預期非常有用。例如,我們可以遵循整個系統(tǒng)中記錄的數(shù)目:5個map輸入產(chǎn)生了5個map的輸出,然后5個reduce輸入產(chǎn)生兩個reduce輸出。輸出被寫入output目錄,其中每個reducer包括一個輸出文件。作業(yè)包含一個reducer,所以我們只能找到一個文件,名為part-00000:%cat

output/part-00000

1949

111

1950

22

℃℃。新的JavaMapreduceAPIHadoop最新版JavaMapReduceRelease0.20.0的API包括一個全新的MapReduceJavaAPI,有時也稱為"contextobject"(上下文對象),旨在使API在未來更容易擴展。新的API類型上不兼容以前的API,所以,以前的應用程序需要重寫才能使新的API發(fā)揮其作用。新的API和舊的API之間有下面幾個明顯的區(qū)別。新的API傾向于使用抽象類,而不是接口,因為這更容易擴展。例如,你可以添加一個方法(用默認的實現(xiàn))到一個抽象類而不需修改類之前的實現(xiàn)方法。在新的API中,Mapper和Reducer是抽象類。新的API是在org.apache.hadoop.mapreduce包(和子包)中的。之前版本的API則是放在org.apache.hadoop.mapred中的。新的API廣泛使用contextobject(上下文對象),并允許用戶代碼與MapReduce系統(tǒng)進行通信。例如,MapContext基本上充當著JobConf的OutputCollector和Reporter的角色。新的API同時支持"推"和"拉"式的迭代。在這兩個新老API中,鍵/值記錄對被推mapper中,但除此之外,新的API允許把記錄從map()方法中拉出,這也適用于reducer。"拉"式的一個有用的例子是分批處理記錄,而不是一個接一個。新的API統(tǒng)一了配置。舊的API有一個特殊的JobConf對象用于作業(yè)配置,這是一個對于Hadoop通常的Configuration對象的擴展(用于配置守護進程,請參見5.1節(jié))。在新的API中,這種區(qū)別沒有了,所以作業(yè)配置通過Configuration來完成。作業(yè)控制的執(zhí)行由Job類來負責,而不是JobClient,它在新的API中已經(jīng)蕩然無存。例2-6使用新API重寫了MaxTemperature的代碼,不同之處用黑體字突出顯示。例2-6:使用新的contextobject(上下文對象)MapReduceAPI在氣象數(shù)據(jù)集中查找最高氣溫public

class

NewMaxTemperature

{

static

class

NewMaxTemperatureMapper

extends

Mapper<LongWritable,

Text,

Text,

IntWritab

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論