Apache Storm技術(shù)參考手冊(cè)_第1頁(yè)
Apache Storm技術(shù)參考手冊(cè)_第2頁(yè)
Apache Storm技術(shù)參考手冊(cè)_第3頁(yè)
Apache Storm技術(shù)參考手冊(cè)_第4頁(yè)
Apache Storm技術(shù)參考手冊(cè)_第5頁(yè)
已閱讀5頁(yè),還剩107頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

目錄

簡(jiǎn)介.............................................................................4

什么是ApacheStorm?.................................................................................................................4

ApacheStormvsHadoop...............................................................................................................4

使用ApacheStorm的例子.....................................................5

ApacheStorm優(yōu)勢(shì)............................................................5

Storm的核心組件.............................................................5

Topology...........................................................................................................................................8

Storm集群安裝..............................................................9

StormHelloWorld...........................................................................................................................9

環(huán)境準(zhǔn)備................................................................9

具體流程................................................................10

核心概念........................................................................35

拓?fù)?.......................................................................36

任務(wù)........................................................................37

進(jìn)程........................................................................37

流分組......................................................................37

隨機(jī)分組...............................................................37

字段分組...............................................................38

全局分組...............................................................39

所有分組...............................................................40

集群架構(gòu)........................................................................40

工作流程........................................................................42

分布式消息系統(tǒng).................................................................44

什么是分布式消息系統(tǒng)?.....................................................44

Thrift協(xié)議...................................................................45

安裝............................................................................46

步驟1-驗(yàn)證Java安裝.......................................................46

步驟1.1-下載JDK.............................................................................................................46

步驟1.2-解壓文件......................................................47

步驟1.3-移動(dòng)到opt文件夾.............................................47

步驟1.4-設(shè)置路徑......................................................47

步驟1.5-Java替代項(xiàng)....................................................48

步驟1.6-Java安裝驗(yàn)證..................................................48

第2步-ZooKeeper框架安裝.................................................48

步驟2.1-下載ZooKeeper.................................................................................................48

步驟2.2-解壓tar文件...................................................48

步驟2.3-創(chuàng)建配置文件..................................................49

步驟2.4-啟動(dòng)ZooKeeper服務(wù)器..........................................49

步驟2.5?啟動(dòng)CLI..............................................................................................................49

步驟2.6-停止ZooKeeper服務(wù)器..........................................50

第3步-ApacheStorm框架安裝...............................................51

步驟3.1-下載Storm.........................................................................................................51

步驟3.2??解壓tar文件...................................................51

步驟3.3-打開配置文件..................................................51

步驟3.4-啟動(dòng)Nimbus.......................................................................................................52

步驟3.5-啟動(dòng)Supervisor..................................................................................................52

步驟3.6-啟動(dòng)UI...............................................................................................................52

工作實(shí)例........................................................................53

場(chǎng)景-移動(dòng)呼叫日志分析器..................................................53

Spout創(chuàng)建..................................................................53

open.......................................................................................................................................54

nextTuple...............................................................................................................................54

close.......................................................................................................................................55

declareOutputFields.............................................................................................................55

ack..........................................................................................................................................55

fail...........................................................................................................................................55

FakeCallLogReaderSpout......................................................................................................56

編碼-FakeCallLogReaderSpout.java.................................................................................56

Bolt創(chuàng)建....................................................................60

Prepare...................................................................................................................................60

execute...................................................................................................................................60

cleanup...................................................................................................................................61

declareOutputFields.............................................................................................................61

呼叫日志創(chuàng)建者bolt...................................................................................................................61

編碼-CallLogCreatorBoltjava............................................................................................62

呼叫日志計(jì)數(shù)器Bolt...................................................................................................................64

編碼-CallLogCounterBolt.java..........................................................................................64

創(chuàng)建拓?fù)?...................................................................66

本地集群....................................................................67

編碼-LogAnalyserStorm.java.............................................................................................67

構(gòu)建和運(yùn)行應(yīng)用程序.........................................................69

輸出....................................................................69

非JVM語(yǔ)言.................................................................70

Python綁定.............................................................70

Trident....................................................................................................................................................71

Trident拓?fù)?................................................................72

TridentTuples................................................................................................................................72

TridentSpout.................................................................................................................................73

Trident操作.................................................................73

過(guò)濾....................................................................74

函數(shù)....................................................................乃

聚合....................................................................76

分組....................................................................78

合并和連接.............................................................79

狀態(tài)維護(hù)....................................................................79

分布式RPC...................................................................................................................................80

什么時(shí)候使用Trident?.............................................................................................................80

Trident的工作實(shí)例...........................................................80

格式化呼叫信息.........................................................81

編碼:FormatCall.java.........................................................................................................81

CSVSplit.................................................................................................................................81

編碼:CSVSplit.java.............................................................................................................82

日志分析器.............................................................82

編碼:LogAnalyserTrident.java..........................................................................................83

構(gòu)建和運(yùn)行應(yīng)用程序.........................................................86

輸出....................................................................87

在Twitter上的應(yīng)用..............................................................88

Twitter............................................................................................................................................88

Spout創(chuàng)建..............................................................88

編碼:TwitterSampleSpout.java........................................................................................89

Hashtag閱讀器spout..................................................................................................................94

編碼:HashtagReaderBok.java..........................................................................................94

Hashtag計(jì)數(shù)器spout.................................................................................................................96

編碼:HashtagCounterBolt.java.........................................................................................96

提交拓?fù)?...................................................................99

編碼:TwitterHashtagStorm.java.......................................................................................99

構(gòu)建和運(yùn)行應(yīng)用程序........................................................101

輸出...................................................................101

在雅虎財(cái)經(jīng)上的應(yīng)用............................................................102

Spout創(chuàng)建.................................................................103

編碼:YahooFinanceSpout.java........................................................................................103

Bolt倉(cāng)ij建...................................................................106

編碼:PriceCutOffBolt.java...............................................................................................106

提交拓?fù)?..................................................................109

編碼:YahooFinanceStorm.java........................................................................................109

構(gòu)建和運(yùn)行應(yīng)用程序........................................................110

輸出...................................................................111

應(yīng)用程序.......................................................................111

Klout.............................................................................................................................................Ill

天氣頻道...................................................................111

電信業(yè).....................................................................112

簡(jiǎn)介

什么是ApacheStorm?

ApacheStorm是Twitter開源的分布式實(shí)時(shí)大數(shù)據(jù)處理框架,最早開源于

github,從0.9.1版本之后,歸于Apache社區(qū),被業(yè)界稱為實(shí)時(shí)版Hadoop。

Storm設(shè)計(jì)用于在容錯(cuò)和水平可擴(kuò)展方法中處理大量數(shù)據(jù)。它是一個(gè)流數(shù)據(jù)框

架,具有最高的攝取率。雖然Storm是無(wú)狀態(tài)的,它通過(guò)ApacheZooKeeper管

理分布式環(huán)境和集群狀態(tài)。它很簡(jiǎn)單,您可以并行地對(duì)實(shí)時(shí)數(shù)據(jù)執(zhí)行各種操

作。

隨著越來(lái)越多的場(chǎng)景對(duì)Hadoop的MapReduce高延遲無(wú)法容忍,比如網(wǎng)站統(tǒng)

計(jì)、推薦系統(tǒng)、預(yù)警系統(tǒng)、金融系統(tǒng)(高頻交易、股票)等等,大數(shù)據(jù)實(shí)時(shí)處理

解決方案(流計(jì)算)的應(yīng)用E趨廣泛,目前已是分布式技術(shù)領(lǐng)域最新爆發(fā)點(diǎn),

而Storm更是流計(jì)算技術(shù)中的佼佼者和主流。

ApacheStorm繼續(xù)成為實(shí)時(shí)數(shù)據(jù)分析的領(lǐng)導(dǎo)者。Storm易于設(shè)置和操作,并且

它保證每個(gè)消息將通過(guò)拓?fù)渲辽偬幚硪淮巍?/p>

ApacheStormvsHadoop

基本上Hadoop和Storm框架用于分析大數(shù)據(jù)。兩者互補(bǔ),在某些方面有所不

同。ApacheSiorm執(zhí)行除持久性之外的所有操作,而Hadoop在所有方面都很

好,但滯后于實(shí)時(shí)計(jì)算。下表比較了Stoi*m和Hadoop的屬在。

StormHadoop

實(shí)時(shí)流處理批量處理

無(wú)狀態(tài)有狀態(tài)

主/從架構(gòu)與基于ZooKeeper的協(xié)調(diào)。主節(jié)點(diǎn)稱為具有/不具有基于ZooKeeper的協(xié)調(diào)

nimbus,從屬節(jié)點(diǎn)是主管。的主-從結(jié)構(gòu)。主節(jié)點(diǎn)是作業(yè)跟蹤

器,從節(jié)點(diǎn)是任務(wù)跟蹤器,

Storm流過(guò)程在集群上每秒可以訪問(wèn)數(shù)萬(wàn)條消息。Hadoop分布式文件系統(tǒng)(HDFS)使

用MapReduce框架來(lái)處理大量的數(shù)

據(jù),需要幾分鐘或幾小時(shí)。

Storm拓?fù)溥\(yùn)行直到用戶關(guān)閉或意外的不可恢復(fù)故MapReduce作業(yè)按順序執(zhí)行并最終

障。完成。

兩者都是分布式和容錯(cuò)的

如果nimbus/supervisor死機(jī),重新啟動(dòng)使它從它停如果JobTracker死機(jī),所有正在運(yùn)行

止的地方繼續(xù),因此沒有什么受到影響。的作業(yè)都會(huì)丟失。

使用ApacheStorm的例子

ApacheStorm對(duì)于實(shí)時(shí)大數(shù)據(jù)流處理非常有名。因此,大多數(shù)公司都將Storm

用作其系統(tǒng)的一個(gè)組成部分。一些值得注意的例子如下?

Twitter-Twitter正在使用ApacheStorm作為其“發(fā)布商分析產(chǎn)品”。“發(fā)布

商分析產(chǎn)品”處理Twiiter平臺(tái)中的每個(gè)tweets和點(diǎn)擊。ApacheStorm與

Twitter基礎(chǔ)架構(gòu)深度集成。

NaviSite-NaviSite正在使用Storm進(jìn)行事件日志監(jiān)控/審計(jì)系統(tǒng)。系統(tǒng)中生成

的每個(gè)日志都將通過(guò)Slorm。Storm將根據(jù)配置的正則表達(dá)式集檢查消息,如果

存在匹配,那么該特定消息將保存到數(shù)據(jù)庫(kù)。

Wego-Wego是位于新加坡的旅行元搜索引擎。旅行相關(guān)數(shù)據(jù)來(lái)自世界各地的

許多來(lái)源,時(shí)間不同。Storm幫助Wego搜索實(shí)時(shí)數(shù)據(jù),解決并發(fā)問(wèn)題,并為最

終用戶找到最佳匹配。

ApacheStorm優(yōu)勢(shì)

下面是ApacheStorm提供的好處列表:

?Storm是開源的,強(qiáng)大的,用戶友好的。它可以用于小公司和大公司。

?Storm是容錯(cuò)的,靈活的,可靠的,并且支持任何編程語(yǔ)言。

?允許實(shí)時(shí)流處理。

?Storm是令人難以置信的快,因?yàn)樗哂芯薮蟮奶幚頂?shù)據(jù)的力量。

?Slorm可以通過(guò)線性增加資源來(lái)保持性能,即使在負(fù)載增加的情況下。它是高

度可擴(kuò)展的。

?Storm在幾秒鐘或幾分鐘內(nèi)執(zhí)行數(shù)據(jù)刷新和端到端傳送響應(yīng)取決于問(wèn)題。它具

有非常低的延遲。

?Storm有操作智能。

?Storm提供保證的數(shù)據(jù)處理,即使群集中的任何連接的節(jié)點(diǎn)死或消息丟失。

Storm的核心組件

?Nimbus:即Storm的Master,負(fù)責(zé)資源分配和任務(wù)調(diào)度。一個(gè)Storm集群只有一

個(gè)

Nimbuso

?Supervisor:即Storm的Slave,負(fù)責(zé)接收Nimbus分配的任務(wù),管理所有Worke

r,一個(gè)Supervisor節(jié)點(diǎn)中包含多個(gè)Worker進(jìn)程。

?Worker:工作進(jìn)程,每個(gè)工作進(jìn)程中都有多個(gè)Task.

■Task:任務(wù),在Storm集群中每個(gè)Spout和Bolt都由若干個(gè)任務(wù)(tasks)來(lái)執(zhí)

行。每個(gè)任務(wù)都與一個(gè)執(zhí)行線程相對(duì)應(yīng)。

?Topology:計(jì)算拓?fù)?,Storm的拓?fù)涫菍?duì)實(shí)時(shí)計(jì)算應(yīng)用邏輯的封裝,它的作用與

MapReduce的任務(wù)(Job)很相似,區(qū)別在于MapReduce的一個(gè)Job在得到結(jié)

果之后總會(huì)結(jié)束,而拓?fù)鋾?huì)一直在集群中運(yùn)行,直到你手動(dòng)去終止它。拓?fù)溥€可以理

解成由一系列通過(guò)數(shù)據(jù)流(StreamGrouping)相互關(guān)聯(lián)的Spout和Bolt組成的

的拓?fù)浣Y(jié)構(gòu)。

?Stream:數(shù)據(jù)流(Streams)是Storm中最核心的抽象概念。一個(gè)數(shù)據(jù)流指的是在

分布式環(huán)境中并行創(chuàng)建、處理的一組元組(tuple)的無(wú)界序列。數(shù)據(jù)流可以由一種

能夠表述數(shù)據(jù)流中元組的域(fields)的模式來(lái)定義。

?Spout:數(shù)據(jù)源(Spout)是拓?fù)渲袛?shù)據(jù)流的來(lái)源。一般Spout會(huì)從一個(gè)外部的數(shù)

據(jù)源讀取元組然后將他們發(fā)送到拓?fù)渲?。根?jù)需求的不同,Spout既可以定義為可靠

的數(shù)據(jù)源,也可以定義為不可靠的數(shù)據(jù)源。一個(gè)可靠的Spout能夠在它發(fā)送的元組

處理失敗時(shí)重新發(fā)送該元組,以確保所有的元組都能得到正確的處理;相對(duì)應(yīng)的,不

可靠的Spout就不會(huì)在元組發(fā)送之后對(duì)元組進(jìn)行任何其他的處理。一個(gè)Spout可

以發(fā)送多個(gè)數(shù)據(jù)流。

?Bolt:拓?fù)渲兴械臄?shù)據(jù)處理均是由Bolt完成的。通過(guò)數(shù)據(jù)過(guò)濾(filtering\函數(shù)

處理(functions\聚合(aggregations\聯(lián)結(jié)(joins\數(shù)據(jù)庫(kù)交互等功能,Bolt

幾乎能夠完成任]可一種數(shù)據(jù)處理需求。一個(gè)Bolt可以實(shí)現(xiàn)簡(jiǎn)鑿的數(shù)據(jù)流轉(zhuǎn)換,而更

復(fù)雜的數(shù)據(jù)流變換通常需要使用多個(gè)Bolt并通過(guò)多個(gè)步驟完成。

?Streamgrouping:為拓?fù)渲械拿總€(gè)Bolt的確定輸入數(shù)據(jù)流是定義一個(gè)拓?fù)涞闹?/p>

要環(huán)節(jié)。數(shù)據(jù)流分組定義了在Bolt的不同任務(wù)(tasks)中劃分?jǐn)?shù)據(jù)流的方式。在S

torm中有八種內(nèi)置的數(shù)據(jù)流分組方式。

?Reliability:可靠性。Storm可以通過(guò)拓?fù)鋪?lái)確保每個(gè)發(fā)送的元組都能得到正確處

理。通過(guò)跟蹤由Spout發(fā)出的每個(gè)元組構(gòu)成的元組樹可以確定元組是否已經(jīng)完成處

理。每個(gè)拓?fù)涠加幸粋€(gè)"消息延時(shí)"參數(shù),如果Storm在延時(shí)時(shí)間內(nèi)沒有檢測(cè)到元

組是否處理完成,就會(huì)將該元組標(biāo)記為處理失敗,并會(huì)在稍后重新發(fā)送該元組。

Storm程序再Storm集群中運(yùn)行的示例圖如下:

Topology

為什么把Topology單獨(dú)提出來(lái)呢,因?yàn)門opology是我們開發(fā)程序主要的用的組件。

Topology和MapReduce很相像。

M叩Reduce是M叩進(jìn)行獲取數(shù)據(jù),Reduce進(jìn)行處理數(shù)據(jù)。

而Topology則是使用Spout獲取數(shù)據(jù),Bolt來(lái)進(jìn)行計(jì)算。

總的來(lái)說(shuō)就是一個(gè)Topology由一個(gè)或者多個(gè)的Spout和Bolt組成

具體流程是怎么走,可以通過(guò)查看下面這張圖來(lái)進(jìn)行了解。

示例圖:

Stream

^Stream2

.----Stream!

Stream"

0一-一

'、~Stream------Stream

Stream

g

stream—

注:圖片來(lái)源/api/tutorials/storm/52o

圖片有三種模式,解釋如下:

第一種比較簡(jiǎn)單,就是由一個(gè)Spout獲取物g,然后交給一個(gè)Bolt進(jìn)行處理;

第二種稍微復(fù)雜點(diǎn),由一個(gè)Spout獲取數(shù)據(jù),然后交給一個(gè)Bolt進(jìn)行處理一部分,然后

在交給下一個(gè)Bolt進(jìn)行處理其他部分。

第三種則上瞰復(fù)雜,一個(gè)Spout可以同時(shí)發(fā)送數(shù)據(jù)到多個(gè)Bolt,而一個(gè)Bolt也可以接受

多個(gè)Spout或多個(gè)Bolt,最終形成多個(gè)數(shù)據(jù)流。但是這種數(shù)據(jù)流必須是有方向的,有起點(diǎn)

和終點(diǎn),不然會(huì)造成死循環(huán),數(shù)據(jù)永遠(yuǎn)也處理不完。就是Spout發(fā)給Boltl,Boltl發(fā)給

Bolt2,Bolt2又發(fā)給了Boltl,最終形成了一環(huán)狀。

Storm集群安裝

之前已經(jīng)寫過(guò)了,這里就不在說(shuō)明了。

博客itetlhhttp:〃/2018/01/26/pancm70/

StormHelloWorld

前面講了一些Storm概念,可能在理解上不太清楚,那么這里我們就用一個(gè)HelloWorld

代碼示例來(lái)體驗(yàn)下Storm運(yùn)作的流程吧。

環(huán)境準(zhǔn)備

在進(jìn)行代碼開發(fā)之前,首先得做好相關(guān)的準(zhǔn)備。

本項(xiàng)目是使用Maven構(gòu)建的,使用Storm的版本為1.1.1。

Maven的相關(guān)依賴如下:

<!一-storm相關(guān)jar—>

<dependency>

<groupld>org.apache.storm</groupld>

<artifactId>storm-core</artifactId>

<version>l.1.l</version>

<scope>provided</scope>

</dependency>

具體流程

在寫代碼的時(shí)候,我們先來(lái)明確要用Storm做什么。

那么第一個(gè)程序,就簡(jiǎn)單的輸出下信息。

具體步驟如下:

1.啟動(dòng)topology,設(shè)置好Spout和Bolt.

2.將Spout獲取的數(shù)據(jù)傳遞給Bolt,

3.Bolt接受Spout的數(shù)據(jù)進(jìn)行打印。

Spout

那么首先開始編寫Spout類。一股是實(shí)現(xiàn)IRichSpout或繼承BaseRichSpout該類,然

后實(shí)現(xiàn)該方法。

這里我們繼承BaseRichSpout這個(gè)類,該類需要實(shí)現(xiàn)這幾個(gè)主要的方法:

一、open

open()方法中是在ISpout接口中定義,在Spout組件初始化時(shí)被調(diào)用。

有三個(gè)參數(shù),它們的作用分別是:

1.Storm配置的Map;

2.topology中組件的信息;

3.發(fā)射tuple的方法;

代碼示例:

?Override

publicvoidopen(Mapmap,TopologyContextargl,

SpoutOutputCollectorcollector){

System,out.printlnC^open:imap.get("test"));

this,collector=collector;

)

二、nextTuple

nextTuple。方法是Spout實(shí)現(xiàn)的核心。

也就是主要執(zhí)行方法,用于輸出信息,通過(guò)collector,emit方法發(fā)射。

這里我們的數(shù)據(jù)信息已經(jīng)寫死了,所以這里我們就直接將數(shù)據(jù)進(jìn)行發(fā)送。

這里設(shè)置只發(fā)送兩次。

代碼示例:

@Override

publicvoidnextTuple(){

if(count<=2){

System.out.printin(〃第〃+count+”次開始發(fā)送數(shù)

據(jù)???〃);

this,collector,emit(newValues(message));

count++;

三、declareOutputFields

declareOutputFields是在IComponent接口中定義,用于聲明數(shù)據(jù)格式。

即輸出的一個(gè)Tuple中,包含幾個(gè)字段。

因?yàn)檫@里我們只發(fā)射一個(gè),所以就指定一個(gè)。如果是多個(gè),則用逗號(hào)隔開。

代碼示例:

?Override

publicvoiddeclareOutputFields(OutputFieldsDeclarer

declarer){

System.out.printin("定義格式...;

declarer,declare(newFields(field));

)

四、ack

ack是在ISpout接口中定義,用于表示Tuple處理成功。

代碼示例:

?Override

publicvoidack(Objectobj){

System.out.println("ack:"+obj);

五、fail

fail是在ISpout接口中定義,用于表示Tuple處理失敗。

代碼示例:

@Override

publicvoidfail(Objectobj){

System,out.printin("失敗:〃+obj);

)

六、close

dose是在ISpout接口中定義,用于表示Topology停止。

代碼示例:

?Override

publicvoidclose(){

System,out.printin("關(guān)閉...;

)

至于還有其他的,這里就不在一列舉了。

Bolt

Bolt是用于處理數(shù)據(jù)的組件,主要是由execute方法來(lái)進(jìn)行實(shí)現(xiàn)。一般來(lái)說(shuō)需要實(shí)

現(xiàn)IRichBolt或繼承BaseRichBolt該類,然后實(shí)現(xiàn)其方法。

需要實(shí)現(xiàn)方法如下:

一、prepare

在Bolt啟動(dòng)前執(zhí)行,提供Bolt啟動(dòng)環(huán)境配置的入口。

參數(shù)基本和Sqout-Wo

一般對(duì)于不可序列化的對(duì)象進(jìn)行實(shí)例化。

這里的我們就簡(jiǎn)單的打印下

?Override

publicvoidprepare(Mapmap,TopologyContextargl,

OutputCollectorcollector){

Systpm.out.print1n(^prpparp:zz+map.gpt(,,tpst,/));

this,collector=collector;

)

注:如果是可以序列化的對(duì)象,那么最好是使用構(gòu)造函數(shù)。

二、execute

execute。方法是Bolt實(shí)現(xiàn)的核心。

也就是執(zhí)行方法,每次Bolt從流凌收f(shuō)丁閱的tuple,都會(huì)調(diào)用這個(gè)方法。

從tuple中獲取消息可以使用tuple.getStringO和tuple.getStringByFieldO;這

兩個(gè)方法。個(gè)人推薦第二種,可以通過(guò)field來(lái)指定接收的消息。

注:如果繼承的是IRichBolt,則需要手動(dòng)ack。這里就不用了,BaseRichBolt會(huì)自動(dòng)幫我

們應(yīng)答。

代碼示例:

?Override

publicvoidexecute(Tupletuple){

//Stringmsg=tuple.getString(O);

Stringmsg=tuple.getStringByField("test");

〃這里我們就不做消息的處理,只打印

System.out.printin(,zBolt第〃+count+”接受的消息:"+msg);

count++;

/**

*

*沒次調(diào)用處理一個(gè)輸入的tuple,所有的tuple都必須在一定時(shí)間

內(nèi)應(yīng)答。

*可以是ack或者fail。否則,spout就會(huì)重發(fā)tuple。

*/

//collector,ack(tuple);

)

三、declareOutputFields

和Spout的一樣。

因?yàn)榈搅诉@里就不再輸出了,所以就什么都沒寫。

?Override

publicvoiddeclareOutputFields(OutputFieldsDeclarerargO)

(

)

cleanup

cleanup是IBolt接口中定義,用于釋放bolt占用的資源。

Storm在終止一個(gè)bolt之前會(huì)調(diào)用這個(gè)方法。

因?yàn)檫@里沒有什么資源需要釋放,所以就簡(jiǎn)單的打印一句就行了。

?Override

publicvoidcleanup(){

System.out.printin("資源釋放〃);

)

Topology

這里我們就是用main方法進(jìn)行提交topology。

不過(guò)在提交topology之前,需要進(jìn)行相應(yīng)的設(shè)置。

這里我就不一細(xì)說(shuō)了,代碼的注釋已經(jīng)很詳細(xì)了。

代碼示例:

importorg.apache.storm.Config;

importorg.apache,storm.LocalCluster;

importorg.apache,storm.StormSubmitter;

importorg.apache,storm,topology.TopologyBuilder;

/**

*

*Title:App

*Description:

*storm測(cè)試

*Version:1.0.0

*@authorpancm

*@date2018年3月6日

*/

publicclassApp{

privatestaticfinalStringstrl="testl”;

privatestaticfinalStringstr2="test2〃;

publicstaticvoidmain(String[]args)(

//TODOAuto-generatedmethodstub

〃定義一個(gè)拓?fù)?/p>

TopologyBuilderbuilder=newTopologyBuilder();

〃設(shè)置一個(gè)Executeor(線程),默認(rèn)一個(gè)

builder.setSpout(strl,newTestSpout());

〃設(shè)置一個(gè)Executeor(線程),和一個(gè)task

builder.setBolt(str2,new

TestBolt(),1).setNumTasks(l).shuffleGrouping(strl);

Configconf=newConfigO;

conf,put("test","test");

try(

〃運(yùn)行拓?fù)?/p>

if(args!=null&&args.length>0){//有參數(shù)時(shí),表示向集群提交

作業(yè),并把第一個(gè)參數(shù)當(dāng)做topology名稱

System.out.printin(〃遠(yuǎn)程模式〃);

StormSubmitter.submitTopology(args[0],conf,

builder.createTopology());

}else{〃沒有參數(shù)時(shí),本地提交

〃啟動(dòng)本地模式

System,out.printIn(〃本地模式〃);

LocalClustercluster=newLocalCluster();

cluster,submitTopology11177,conf,

builder.createTopology());

Thread,sleep(10000);

//關(guān)閉本地集群

cluster,shutdown();

)

}catch(Exceptione)(

e.printStackTraceO;

)

)

}

運(yùn)行該方法,輸出結(jié)果如下:

本地模式

定義格式.??

open:test

第1次開始發(fā)送數(shù)據(jù)...

第2次開始發(fā)送數(shù)據(jù)...

prepare:test

Bolt第1接受的消息:這是個(gè)測(cè)試消息!

Bolt第2接受的消息:這是個(gè)測(cè)試消息!

資源釋放

關(guān)閉...

到這里,是不是基本上對(duì)Storm的運(yùn)作有些了解了呢。

這個(gè)demo達(dá)到了上述的三種模式圖中的第一種,一個(gè)Spout傳輸數(shù)據(jù),一個(gè)Bolt處理

雌。

那么如果我們想達(dá)到第二種模式呢,那又該如何做呢?

假如我們想統(tǒng)計(jì)下在一段文本中的單詞出現(xiàn)頻率的話,我們只需執(zhí)行一下步驟就可以了。

.首先將中的消息進(jìn)行更改為數(shù)組,并依次將消息發(fā)送到

1SpoutmessageTestBolto

2.然后TestBolt將獲取的數(shù)據(jù)進(jìn)行分割,將分割的數(shù)據(jù)發(fā)送到TestBolt2。

3,TestBolt2對(duì)數(shù)據(jù)進(jìn)行統(tǒng)計(jì),在程序關(guān)閉的時(shí)候進(jìn)行打印。

4.Topology成功配置并且啟動(dòng)之后,等待20秒左右,關(guān)閉程序,然后得到輸出的結(jié)果。

代碼示例如下:

Spout

用于發(fā)送消息。

importjava.util.Map;

importorg.apache,storm,spout.SpoutOutputCollector;

importorg.apache,storm,task.TopologyContext;

importorg.apache,storm,topology.OutputFie1dsDec1arer;

importorg.apache,storm,topology,base.BaseRichSpout;

importorg.apache,storm,tuple.Fields;

importorg.apache,storm,tuple.Values;

/**

*

*Title:TestSpout

*Description:

*發(fā)送信息

*Version:1.0.0

*?authorpancm

*?date2018年3月6日

*/

publicclassTestSpoutextendsBaseRichSpout{

privatestaticfinallongserialVersionUID=

225243592780939490L;

privateSpoutOutputCollectorcollector;

privatestaticfinalStringfield二〃word”;

privateintcount=l;

privateStringf]message={

“Mynicknameisxuwujing",

“Myblogaddressishttp://www.panchengming.com//7,

“Myinterestisplayinggames”

};

/**

*open。方法中是在ISpout接口中定義,在Spout組件初始化時(shí)被調(diào)

用。

*有三個(gè)參數(shù):

*1.Storm配置的Map;

*2.topology中組件的信息;

*3.發(fā)射tuple的方法;

*/

?Override

publicvoidopen(Mapmap,TopologyContextargl,

SpoutOutputCollectorcollector){

System,out.printIn(^open:z,+map.get("test"));

this,collector=collector;

)

/**

*ncxtTupleO方法是Spout實(shí)現(xiàn)的核心。

*也就是主要執(zhí)行方法,用于輸出信息,通過(guò)collector.emit方法發(fā)射。

*/

?Override

publicvoidnextTupleO{

if(count<=message.length){

System,out.printin("第〃+count+”次開始發(fā)送數(shù)

據(jù)…〃);

this,collector,emit(newValues(message[count-

1]));

count++;

/**

*declarcOutputFields是在[Component接口中定義,用于聲明數(shù)據(jù)格

式。

*即輸出的一個(gè)Tuple中,包含幾個(gè)字段。

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論