![Apache Storm技術(shù)參考手冊(cè)_第1頁(yè)](http://file4.renrendoc.com/view14/M08/02/0B/wKhkGWd_NAqALbTOAAIsjk00z1M457.jpg)
![Apache Storm技術(shù)參考手冊(cè)_第2頁(yè)](http://file4.renrendoc.com/view14/M08/02/0B/wKhkGWd_NAqALbTOAAIsjk00z1M4572.jpg)
![Apache Storm技術(shù)參考手冊(cè)_第3頁(yè)](http://file4.renrendoc.com/view14/M08/02/0B/wKhkGWd_NAqALbTOAAIsjk00z1M4573.jpg)
![Apache Storm技術(shù)參考手冊(cè)_第4頁(yè)](http://file4.renrendoc.com/view14/M08/02/0B/wKhkGWd_NAqALbTOAAIsjk00z1M4574.jpg)
![Apache Storm技術(shù)參考手冊(cè)_第5頁(yè)](http://file4.renrendoc.com/view14/M08/02/0B/wKhkGWd_NAqALbTOAAIsjk00z1M4575.jpg)
版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
目錄
簡(jiǎn)介.............................................................................4
什么是ApacheStorm?.................................................................................................................4
ApacheStormvsHadoop...............................................................................................................4
使用ApacheStorm的例子.....................................................5
ApacheStorm優(yōu)勢(shì)............................................................5
Storm的核心組件.............................................................5
Topology...........................................................................................................................................8
Storm集群安裝..............................................................9
StormHelloWorld...........................................................................................................................9
環(huán)境準(zhǔn)備................................................................9
具體流程................................................................10
核心概念........................................................................35
拓?fù)?.......................................................................36
任務(wù)........................................................................37
進(jìn)程........................................................................37
流分組......................................................................37
隨機(jī)分組...............................................................37
字段分組...............................................................38
全局分組...............................................................39
所有分組...............................................................40
集群架構(gòu)........................................................................40
工作流程........................................................................42
分布式消息系統(tǒng).................................................................44
什么是分布式消息系統(tǒng)?.....................................................44
Thrift協(xié)議...................................................................45
安裝............................................................................46
步驟1-驗(yàn)證Java安裝.......................................................46
步驟1.1-下載JDK.............................................................................................................46
步驟1.2-解壓文件......................................................47
步驟1.3-移動(dòng)到opt文件夾.............................................47
步驟1.4-設(shè)置路徑......................................................47
步驟1.5-Java替代項(xiàng)....................................................48
步驟1.6-Java安裝驗(yàn)證..................................................48
第2步-ZooKeeper框架安裝.................................................48
步驟2.1-下載ZooKeeper.................................................................................................48
步驟2.2-解壓tar文件...................................................48
步驟2.3-創(chuàng)建配置文件..................................................49
步驟2.4-啟動(dòng)ZooKeeper服務(wù)器..........................................49
步驟2.5?啟動(dòng)CLI..............................................................................................................49
步驟2.6-停止ZooKeeper服務(wù)器..........................................50
第3步-ApacheStorm框架安裝...............................................51
步驟3.1-下載Storm.........................................................................................................51
步驟3.2??解壓tar文件...................................................51
步驟3.3-打開配置文件..................................................51
步驟3.4-啟動(dòng)Nimbus.......................................................................................................52
步驟3.5-啟動(dòng)Supervisor..................................................................................................52
步驟3.6-啟動(dòng)UI...............................................................................................................52
工作實(shí)例........................................................................53
場(chǎng)景-移動(dòng)呼叫日志分析器..................................................53
Spout創(chuàng)建..................................................................53
open.......................................................................................................................................54
nextTuple...............................................................................................................................54
close.......................................................................................................................................55
declareOutputFields.............................................................................................................55
ack..........................................................................................................................................55
fail...........................................................................................................................................55
FakeCallLogReaderSpout......................................................................................................56
編碼-FakeCallLogReaderSpout.java.................................................................................56
Bolt創(chuàng)建....................................................................60
Prepare...................................................................................................................................60
execute...................................................................................................................................60
cleanup...................................................................................................................................61
declareOutputFields.............................................................................................................61
呼叫日志創(chuàng)建者bolt...................................................................................................................61
編碼-CallLogCreatorBoltjava............................................................................................62
呼叫日志計(jì)數(shù)器Bolt...................................................................................................................64
編碼-CallLogCounterBolt.java..........................................................................................64
創(chuàng)建拓?fù)?...................................................................66
本地集群....................................................................67
編碼-LogAnalyserStorm.java.............................................................................................67
構(gòu)建和運(yùn)行應(yīng)用程序.........................................................69
輸出....................................................................69
非JVM語(yǔ)言.................................................................70
Python綁定.............................................................70
Trident....................................................................................................................................................71
Trident拓?fù)?................................................................72
TridentTuples................................................................................................................................72
TridentSpout.................................................................................................................................73
Trident操作.................................................................73
過(guò)濾....................................................................74
函數(shù)....................................................................乃
聚合....................................................................76
分組....................................................................78
合并和連接.............................................................79
狀態(tài)維護(hù)....................................................................79
分布式RPC...................................................................................................................................80
什么時(shí)候使用Trident?.............................................................................................................80
Trident的工作實(shí)例...........................................................80
格式化呼叫信息.........................................................81
編碼:FormatCall.java.........................................................................................................81
CSVSplit.................................................................................................................................81
編碼:CSVSplit.java.............................................................................................................82
日志分析器.............................................................82
編碼:LogAnalyserTrident.java..........................................................................................83
構(gòu)建和運(yùn)行應(yīng)用程序.........................................................86
輸出....................................................................87
在Twitter上的應(yīng)用..............................................................88
Twitter............................................................................................................................................88
Spout創(chuàng)建..............................................................88
編碼:TwitterSampleSpout.java........................................................................................89
Hashtag閱讀器spout..................................................................................................................94
編碼:HashtagReaderBok.java..........................................................................................94
Hashtag計(jì)數(shù)器spout.................................................................................................................96
編碼:HashtagCounterBolt.java.........................................................................................96
提交拓?fù)?...................................................................99
編碼:TwitterHashtagStorm.java.......................................................................................99
構(gòu)建和運(yùn)行應(yīng)用程序........................................................101
輸出...................................................................101
在雅虎財(cái)經(jīng)上的應(yīng)用............................................................102
Spout創(chuàng)建.................................................................103
編碼:YahooFinanceSpout.java........................................................................................103
Bolt倉(cāng)ij建...................................................................106
編碼:PriceCutOffBolt.java...............................................................................................106
提交拓?fù)?..................................................................109
編碼:YahooFinanceStorm.java........................................................................................109
構(gòu)建和運(yùn)行應(yīng)用程序........................................................110
輸出...................................................................111
應(yīng)用程序.......................................................................111
Klout.............................................................................................................................................Ill
天氣頻道...................................................................111
電信業(yè).....................................................................112
簡(jiǎn)介
什么是ApacheStorm?
ApacheStorm是Twitter開源的分布式實(shí)時(shí)大數(shù)據(jù)處理框架,最早開源于
github,從0.9.1版本之后,歸于Apache社區(qū),被業(yè)界稱為實(shí)時(shí)版Hadoop。
Storm設(shè)計(jì)用于在容錯(cuò)和水平可擴(kuò)展方法中處理大量數(shù)據(jù)。它是一個(gè)流數(shù)據(jù)框
架,具有最高的攝取率。雖然Storm是無(wú)狀態(tài)的,它通過(guò)ApacheZooKeeper管
理分布式環(huán)境和集群狀態(tài)。它很簡(jiǎn)單,您可以并行地對(duì)實(shí)時(shí)數(shù)據(jù)執(zhí)行各種操
作。
隨著越來(lái)越多的場(chǎng)景對(duì)Hadoop的MapReduce高延遲無(wú)法容忍,比如網(wǎng)站統(tǒng)
計(jì)、推薦系統(tǒng)、預(yù)警系統(tǒng)、金融系統(tǒng)(高頻交易、股票)等等,大數(shù)據(jù)實(shí)時(shí)處理
解決方案(流計(jì)算)的應(yīng)用E趨廣泛,目前已是分布式技術(shù)領(lǐng)域最新爆發(fā)點(diǎn),
而Storm更是流計(jì)算技術(shù)中的佼佼者和主流。
ApacheStorm繼續(xù)成為實(shí)時(shí)數(shù)據(jù)分析的領(lǐng)導(dǎo)者。Storm易于設(shè)置和操作,并且
它保證每個(gè)消息將通過(guò)拓?fù)渲辽偬幚硪淮巍?/p>
ApacheStormvsHadoop
基本上Hadoop和Storm框架用于分析大數(shù)據(jù)。兩者互補(bǔ),在某些方面有所不
同。ApacheSiorm執(zhí)行除持久性之外的所有操作,而Hadoop在所有方面都很
好,但滯后于實(shí)時(shí)計(jì)算。下表比較了Stoi*m和Hadoop的屬在。
StormHadoop
實(shí)時(shí)流處理批量處理
無(wú)狀態(tài)有狀態(tài)
主/從架構(gòu)與基于ZooKeeper的協(xié)調(diào)。主節(jié)點(diǎn)稱為具有/不具有基于ZooKeeper的協(xié)調(diào)
nimbus,從屬節(jié)點(diǎn)是主管。的主-從結(jié)構(gòu)。主節(jié)點(diǎn)是作業(yè)跟蹤
器,從節(jié)點(diǎn)是任務(wù)跟蹤器,
Storm流過(guò)程在集群上每秒可以訪問(wèn)數(shù)萬(wàn)條消息。Hadoop分布式文件系統(tǒng)(HDFS)使
用MapReduce框架來(lái)處理大量的數(shù)
據(jù),需要幾分鐘或幾小時(shí)。
Storm拓?fù)溥\(yùn)行直到用戶關(guān)閉或意外的不可恢復(fù)故MapReduce作業(yè)按順序執(zhí)行并最終
障。完成。
兩者都是分布式和容錯(cuò)的
如果nimbus/supervisor死機(jī),重新啟動(dòng)使它從它停如果JobTracker死機(jī),所有正在運(yùn)行
止的地方繼續(xù),因此沒有什么受到影響。的作業(yè)都會(huì)丟失。
使用ApacheStorm的例子
ApacheStorm對(duì)于實(shí)時(shí)大數(shù)據(jù)流處理非常有名。因此,大多數(shù)公司都將Storm
用作其系統(tǒng)的一個(gè)組成部分。一些值得注意的例子如下?
Twitter-Twitter正在使用ApacheStorm作為其“發(fā)布商分析產(chǎn)品”。“發(fā)布
商分析產(chǎn)品”處理Twiiter平臺(tái)中的每個(gè)tweets和點(diǎn)擊。ApacheStorm與
Twitter基礎(chǔ)架構(gòu)深度集成。
NaviSite-NaviSite正在使用Storm進(jìn)行事件日志監(jiān)控/審計(jì)系統(tǒng)。系統(tǒng)中生成
的每個(gè)日志都將通過(guò)Slorm。Storm將根據(jù)配置的正則表達(dá)式集檢查消息,如果
存在匹配,那么該特定消息將保存到數(shù)據(jù)庫(kù)。
Wego-Wego是位于新加坡的旅行元搜索引擎。旅行相關(guān)數(shù)據(jù)來(lái)自世界各地的
許多來(lái)源,時(shí)間不同。Storm幫助Wego搜索實(shí)時(shí)數(shù)據(jù),解決并發(fā)問(wèn)題,并為最
終用戶找到最佳匹配。
ApacheStorm優(yōu)勢(shì)
下面是ApacheStorm提供的好處列表:
?Storm是開源的,強(qiáng)大的,用戶友好的。它可以用于小公司和大公司。
?Storm是容錯(cuò)的,靈活的,可靠的,并且支持任何編程語(yǔ)言。
?允許實(shí)時(shí)流處理。
?Storm是令人難以置信的快,因?yàn)樗哂芯薮蟮奶幚頂?shù)據(jù)的力量。
?Slorm可以通過(guò)線性增加資源來(lái)保持性能,即使在負(fù)載增加的情況下。它是高
度可擴(kuò)展的。
?Storm在幾秒鐘或幾分鐘內(nèi)執(zhí)行數(shù)據(jù)刷新和端到端傳送響應(yīng)取決于問(wèn)題。它具
有非常低的延遲。
?Storm有操作智能。
?Storm提供保證的數(shù)據(jù)處理,即使群集中的任何連接的節(jié)點(diǎn)死或消息丟失。
Storm的核心組件
?Nimbus:即Storm的Master,負(fù)責(zé)資源分配和任務(wù)調(diào)度。一個(gè)Storm集群只有一
個(gè)
Nimbuso
?Supervisor:即Storm的Slave,負(fù)責(zé)接收Nimbus分配的任務(wù),管理所有Worke
r,一個(gè)Supervisor節(jié)點(diǎn)中包含多個(gè)Worker進(jìn)程。
?Worker:工作進(jìn)程,每個(gè)工作進(jìn)程中都有多個(gè)Task.
■Task:任務(wù),在Storm集群中每個(gè)Spout和Bolt都由若干個(gè)任務(wù)(tasks)來(lái)執(zhí)
行。每個(gè)任務(wù)都與一個(gè)執(zhí)行線程相對(duì)應(yīng)。
?Topology:計(jì)算拓?fù)?,Storm的拓?fù)涫菍?duì)實(shí)時(shí)計(jì)算應(yīng)用邏輯的封裝,它的作用與
MapReduce的任務(wù)(Job)很相似,區(qū)別在于MapReduce的一個(gè)Job在得到結(jié)
果之后總會(huì)結(jié)束,而拓?fù)鋾?huì)一直在集群中運(yùn)行,直到你手動(dòng)去終止它。拓?fù)溥€可以理
解成由一系列通過(guò)數(shù)據(jù)流(StreamGrouping)相互關(guān)聯(lián)的Spout和Bolt組成的
的拓?fù)浣Y(jié)構(gòu)。
?Stream:數(shù)據(jù)流(Streams)是Storm中最核心的抽象概念。一個(gè)數(shù)據(jù)流指的是在
分布式環(huán)境中并行創(chuàng)建、處理的一組元組(tuple)的無(wú)界序列。數(shù)據(jù)流可以由一種
能夠表述數(shù)據(jù)流中元組的域(fields)的模式來(lái)定義。
?Spout:數(shù)據(jù)源(Spout)是拓?fù)渲袛?shù)據(jù)流的來(lái)源。一般Spout會(huì)從一個(gè)外部的數(shù)
據(jù)源讀取元組然后將他們發(fā)送到拓?fù)渲?。根?jù)需求的不同,Spout既可以定義為可靠
的數(shù)據(jù)源,也可以定義為不可靠的數(shù)據(jù)源。一個(gè)可靠的Spout能夠在它發(fā)送的元組
處理失敗時(shí)重新發(fā)送該元組,以確保所有的元組都能得到正確的處理;相對(duì)應(yīng)的,不
可靠的Spout就不會(huì)在元組發(fā)送之后對(duì)元組進(jìn)行任何其他的處理。一個(gè)Spout可
以發(fā)送多個(gè)數(shù)據(jù)流。
?Bolt:拓?fù)渲兴械臄?shù)據(jù)處理均是由Bolt完成的。通過(guò)數(shù)據(jù)過(guò)濾(filtering\函數(shù)
處理(functions\聚合(aggregations\聯(lián)結(jié)(joins\數(shù)據(jù)庫(kù)交互等功能,Bolt
幾乎能夠完成任]可一種數(shù)據(jù)處理需求。一個(gè)Bolt可以實(shí)現(xiàn)簡(jiǎn)鑿的數(shù)據(jù)流轉(zhuǎn)換,而更
復(fù)雜的數(shù)據(jù)流變換通常需要使用多個(gè)Bolt并通過(guò)多個(gè)步驟完成。
?Streamgrouping:為拓?fù)渲械拿總€(gè)Bolt的確定輸入數(shù)據(jù)流是定義一個(gè)拓?fù)涞闹?/p>
要環(huán)節(jié)。數(shù)據(jù)流分組定義了在Bolt的不同任務(wù)(tasks)中劃分?jǐn)?shù)據(jù)流的方式。在S
torm中有八種內(nèi)置的數(shù)據(jù)流分組方式。
?Reliability:可靠性。Storm可以通過(guò)拓?fù)鋪?lái)確保每個(gè)發(fā)送的元組都能得到正確處
理。通過(guò)跟蹤由Spout發(fā)出的每個(gè)元組構(gòu)成的元組樹可以確定元組是否已經(jīng)完成處
理。每個(gè)拓?fù)涠加幸粋€(gè)"消息延時(shí)"參數(shù),如果Storm在延時(shí)時(shí)間內(nèi)沒有檢測(cè)到元
組是否處理完成,就會(huì)將該元組標(biāo)記為處理失敗,并會(huì)在稍后重新發(fā)送該元組。
Storm程序再Storm集群中運(yùn)行的示例圖如下:
Topology
為什么把Topology單獨(dú)提出來(lái)呢,因?yàn)門opology是我們開發(fā)程序主要的用的組件。
Topology和MapReduce很相像。
M叩Reduce是M叩進(jìn)行獲取數(shù)據(jù),Reduce進(jìn)行處理數(shù)據(jù)。
而Topology則是使用Spout獲取數(shù)據(jù),Bolt來(lái)進(jìn)行計(jì)算。
總的來(lái)說(shuō)就是一個(gè)Topology由一個(gè)或者多個(gè)的Spout和Bolt組成
具體流程是怎么走,可以通過(guò)查看下面這張圖來(lái)進(jìn)行了解。
示例圖:
Stream
^Stream2
.----Stream!
Stream"
0一-一
'、~Stream------Stream
Stream
g
stream—
注:圖片來(lái)源/api/tutorials/storm/52o
圖片有三種模式,解釋如下:
第一種比較簡(jiǎn)單,就是由一個(gè)Spout獲取物g,然后交給一個(gè)Bolt進(jìn)行處理;
第二種稍微復(fù)雜點(diǎn),由一個(gè)Spout獲取數(shù)據(jù),然后交給一個(gè)Bolt進(jìn)行處理一部分,然后
在交給下一個(gè)Bolt進(jìn)行處理其他部分。
第三種則上瞰復(fù)雜,一個(gè)Spout可以同時(shí)發(fā)送數(shù)據(jù)到多個(gè)Bolt,而一個(gè)Bolt也可以接受
多個(gè)Spout或多個(gè)Bolt,最終形成多個(gè)數(shù)據(jù)流。但是這種數(shù)據(jù)流必須是有方向的,有起點(diǎn)
和終點(diǎn),不然會(huì)造成死循環(huán),數(shù)據(jù)永遠(yuǎn)也處理不完。就是Spout發(fā)給Boltl,Boltl發(fā)給
Bolt2,Bolt2又發(fā)給了Boltl,最終形成了一環(huán)狀。
Storm集群安裝
之前已經(jīng)寫過(guò)了,這里就不在說(shuō)明了。
博客itetlhhttp:〃/2018/01/26/pancm70/
StormHelloWorld
前面講了一些Storm概念,可能在理解上不太清楚,那么這里我們就用一個(gè)HelloWorld
代碼示例來(lái)體驗(yàn)下Storm運(yùn)作的流程吧。
環(huán)境準(zhǔn)備
在進(jìn)行代碼開發(fā)之前,首先得做好相關(guān)的準(zhǔn)備。
本項(xiàng)目是使用Maven構(gòu)建的,使用Storm的版本為1.1.1。
Maven的相關(guān)依賴如下:
<!一-storm相關(guān)jar—>
<dependency>
<groupld>org.apache.storm</groupld>
<artifactId>storm-core</artifactId>
<version>l.1.l</version>
<scope>provided</scope>
</dependency>
具體流程
在寫代碼的時(shí)候,我們先來(lái)明確要用Storm做什么。
那么第一個(gè)程序,就簡(jiǎn)單的輸出下信息。
具體步驟如下:
1.啟動(dòng)topology,設(shè)置好Spout和Bolt.
2.將Spout獲取的數(shù)據(jù)傳遞給Bolt,
3.Bolt接受Spout的數(shù)據(jù)進(jìn)行打印。
Spout
那么首先開始編寫Spout類。一股是實(shí)現(xiàn)IRichSpout或繼承BaseRichSpout該類,然
后實(shí)現(xiàn)該方法。
這里我們繼承BaseRichSpout這個(gè)類,該類需要實(shí)現(xiàn)這幾個(gè)主要的方法:
一、open
open()方法中是在ISpout接口中定義,在Spout組件初始化時(shí)被調(diào)用。
有三個(gè)參數(shù),它們的作用分別是:
1.Storm配置的Map;
2.topology中組件的信息;
3.發(fā)射tuple的方法;
代碼示例:
?Override
publicvoidopen(Mapmap,TopologyContextargl,
SpoutOutputCollectorcollector){
System,out.printlnC^open:imap.get("test"));
this,collector=collector;
)
二、nextTuple
nextTuple。方法是Spout實(shí)現(xiàn)的核心。
也就是主要執(zhí)行方法,用于輸出信息,通過(guò)collector,emit方法發(fā)射。
這里我們的數(shù)據(jù)信息已經(jīng)寫死了,所以這里我們就直接將數(shù)據(jù)進(jìn)行發(fā)送。
這里設(shè)置只發(fā)送兩次。
代碼示例:
@Override
publicvoidnextTuple(){
if(count<=2){
System.out.printin(〃第〃+count+”次開始發(fā)送數(shù)
據(jù)???〃);
this,collector,emit(newValues(message));
count++;
三、declareOutputFields
declareOutputFields是在IComponent接口中定義,用于聲明數(shù)據(jù)格式。
即輸出的一個(gè)Tuple中,包含幾個(gè)字段。
因?yàn)檫@里我們只發(fā)射一個(gè),所以就指定一個(gè)。如果是多個(gè),則用逗號(hào)隔開。
代碼示例:
?Override
publicvoiddeclareOutputFields(OutputFieldsDeclarer
declarer){
System.out.printin("定義格式...;
declarer,declare(newFields(field));
)
四、ack
ack是在ISpout接口中定義,用于表示Tuple處理成功。
代碼示例:
?Override
publicvoidack(Objectobj){
System.out.println("ack:"+obj);
五、fail
fail是在ISpout接口中定義,用于表示Tuple處理失敗。
代碼示例:
@Override
publicvoidfail(Objectobj){
System,out.printin("失敗:〃+obj);
)
六、close
dose是在ISpout接口中定義,用于表示Topology停止。
代碼示例:
?Override
publicvoidclose(){
System,out.printin("關(guān)閉...;
)
至于還有其他的,這里就不在一列舉了。
Bolt
Bolt是用于處理數(shù)據(jù)的組件,主要是由execute方法來(lái)進(jìn)行實(shí)現(xiàn)。一般來(lái)說(shuō)需要實(shí)
現(xiàn)IRichBolt或繼承BaseRichBolt該類,然后實(shí)現(xiàn)其方法。
需要實(shí)現(xiàn)方法如下:
一、prepare
在Bolt啟動(dòng)前執(zhí)行,提供Bolt啟動(dòng)環(huán)境配置的入口。
參數(shù)基本和Sqout-Wo
一般對(duì)于不可序列化的對(duì)象進(jìn)行實(shí)例化。
這里的我們就簡(jiǎn)單的打印下
?Override
publicvoidprepare(Mapmap,TopologyContextargl,
OutputCollectorcollector){
Systpm.out.print1n(^prpparp:zz+map.gpt(,,tpst,/));
this,collector=collector;
)
注:如果是可以序列化的對(duì)象,那么最好是使用構(gòu)造函數(shù)。
二、execute
execute。方法是Bolt實(shí)現(xiàn)的核心。
也就是執(zhí)行方法,每次Bolt從流凌收f(shuō)丁閱的tuple,都會(huì)調(diào)用這個(gè)方法。
從tuple中獲取消息可以使用tuple.getStringO和tuple.getStringByFieldO;這
兩個(gè)方法。個(gè)人推薦第二種,可以通過(guò)field來(lái)指定接收的消息。
注:如果繼承的是IRichBolt,則需要手動(dòng)ack。這里就不用了,BaseRichBolt會(huì)自動(dòng)幫我
們應(yīng)答。
代碼示例:
?Override
publicvoidexecute(Tupletuple){
//Stringmsg=tuple.getString(O);
Stringmsg=tuple.getStringByField("test");
〃這里我們就不做消息的處理,只打印
System.out.printin(,zBolt第〃+count+”接受的消息:"+msg);
count++;
/**
*
*沒次調(diào)用處理一個(gè)輸入的tuple,所有的tuple都必須在一定時(shí)間
內(nèi)應(yīng)答。
*可以是ack或者fail。否則,spout就會(huì)重發(fā)tuple。
*/
//collector,ack(tuple);
)
三、declareOutputFields
和Spout的一樣。
因?yàn)榈搅诉@里就不再輸出了,所以就什么都沒寫。
?Override
publicvoiddeclareOutputFields(OutputFieldsDeclarerargO)
(
)
cleanup
cleanup是IBolt接口中定義,用于釋放bolt占用的資源。
Storm在終止一個(gè)bolt之前會(huì)調(diào)用這個(gè)方法。
因?yàn)檫@里沒有什么資源需要釋放,所以就簡(jiǎn)單的打印一句就行了。
?Override
publicvoidcleanup(){
System.out.printin("資源釋放〃);
)
Topology
這里我們就是用main方法進(jìn)行提交topology。
不過(guò)在提交topology之前,需要進(jìn)行相應(yīng)的設(shè)置。
這里我就不一細(xì)說(shuō)了,代碼的注釋已經(jīng)很詳細(xì)了。
代碼示例:
importorg.apache.storm.Config;
importorg.apache,storm.LocalCluster;
importorg.apache,storm.StormSubmitter;
importorg.apache,storm,topology.TopologyBuilder;
/**
*
*Title:App
*Description:
*storm測(cè)試
*Version:1.0.0
*@authorpancm
*@date2018年3月6日
*/
publicclassApp{
privatestaticfinalStringstrl="testl”;
privatestaticfinalStringstr2="test2〃;
publicstaticvoidmain(String[]args)(
//TODOAuto-generatedmethodstub
〃定義一個(gè)拓?fù)?/p>
TopologyBuilderbuilder=newTopologyBuilder();
〃設(shè)置一個(gè)Executeor(線程),默認(rèn)一個(gè)
builder.setSpout(strl,newTestSpout());
〃設(shè)置一個(gè)Executeor(線程),和一個(gè)task
builder.setBolt(str2,new
TestBolt(),1).setNumTasks(l).shuffleGrouping(strl);
Configconf=newConfigO;
conf,put("test","test");
try(
〃運(yùn)行拓?fù)?/p>
if(args!=null&&args.length>0){//有參數(shù)時(shí),表示向集群提交
作業(yè),并把第一個(gè)參數(shù)當(dāng)做topology名稱
System.out.printin(〃遠(yuǎn)程模式〃);
StormSubmitter.submitTopology(args[0],conf,
builder.createTopology());
}else{〃沒有參數(shù)時(shí),本地提交
〃啟動(dòng)本地模式
System,out.printIn(〃本地模式〃);
LocalClustercluster=newLocalCluster();
cluster,submitTopology11177,conf,
builder.createTopology());
Thread,sleep(10000);
//關(guān)閉本地集群
cluster,shutdown();
)
}catch(Exceptione)(
e.printStackTraceO;
)
)
}
運(yùn)行該方法,輸出結(jié)果如下:
本地模式
定義格式.??
open:test
第1次開始發(fā)送數(shù)據(jù)...
第2次開始發(fā)送數(shù)據(jù)...
prepare:test
Bolt第1接受的消息:這是個(gè)測(cè)試消息!
Bolt第2接受的消息:這是個(gè)測(cè)試消息!
資源釋放
關(guān)閉...
到這里,是不是基本上對(duì)Storm的運(yùn)作有些了解了呢。
這個(gè)demo達(dá)到了上述的三種模式圖中的第一種,一個(gè)Spout傳輸數(shù)據(jù),一個(gè)Bolt處理
雌。
那么如果我們想達(dá)到第二種模式呢,那又該如何做呢?
假如我們想統(tǒng)計(jì)下在一段文本中的單詞出現(xiàn)頻率的話,我們只需執(zhí)行一下步驟就可以了。
.首先將中的消息進(jìn)行更改為數(shù)組,并依次將消息發(fā)送到
1SpoutmessageTestBolto
2.然后TestBolt將獲取的數(shù)據(jù)進(jìn)行分割,將分割的數(shù)據(jù)發(fā)送到TestBolt2。
3,TestBolt2對(duì)數(shù)據(jù)進(jìn)行統(tǒng)計(jì),在程序關(guān)閉的時(shí)候進(jìn)行打印。
4.Topology成功配置并且啟動(dòng)之后,等待20秒左右,關(guān)閉程序,然后得到輸出的結(jié)果。
代碼示例如下:
Spout
用于發(fā)送消息。
importjava.util.Map;
importorg.apache,storm,spout.SpoutOutputCollector;
importorg.apache,storm,task.TopologyContext;
importorg.apache,storm,topology.OutputFie1dsDec1arer;
importorg.apache,storm,topology,base.BaseRichSpout;
importorg.apache,storm,tuple.Fields;
importorg.apache,storm,tuple.Values;
/**
*
*Title:TestSpout
*Description:
*發(fā)送信息
*Version:1.0.0
*?authorpancm
*?date2018年3月6日
*/
publicclassTestSpoutextendsBaseRichSpout{
privatestaticfinallongserialVersionUID=
225243592780939490L;
privateSpoutOutputCollectorcollector;
privatestaticfinalStringfield二〃word”;
privateintcount=l;
privateStringf]message={
“Mynicknameisxuwujing",
“Myblogaddressishttp://www.panchengming.com//7,
“Myinterestisplayinggames”
};
/**
*open。方法中是在ISpout接口中定義,在Spout組件初始化時(shí)被調(diào)
用。
*有三個(gè)參數(shù):
*1.Storm配置的Map;
*2.topology中組件的信息;
*3.發(fā)射tuple的方法;
*/
?Override
publicvoidopen(Mapmap,TopologyContextargl,
SpoutOutputCollectorcollector){
System,out.printIn(^open:z,+map.get("test"));
this,collector=collector;
)
/**
*ncxtTupleO方法是Spout實(shí)現(xiàn)的核心。
*也就是主要執(zhí)行方法,用于輸出信息,通過(guò)collector.emit方法發(fā)射。
*/
?Override
publicvoidnextTupleO{
if(count<=message.length){
System,out.printin("第〃+count+”次開始發(fā)送數(shù)
據(jù)…〃);
this,collector,emit(newValues(message[count-
1]));
count++;
/**
*declarcOutputFields是在[Component接口中定義,用于聲明數(shù)據(jù)格
式。
*即輸出的一個(gè)Tuple中,包含幾個(gè)字段。
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 二零二五年度環(huán)保污水處理設(shè)備采購(gòu)專項(xiàng)合同
- 2025年度文化創(chuàng)意產(chǎn)業(yè)人才聘用合同模板
- 2025年度教育工作者校園文化活動(dòng)組織合同
- 2025年度智能家居系統(tǒng)集成服務(wù)合同范本
- 2025年度建筑鋼材期貨交易及風(fēng)險(xiǎn)管理合同
- 2025年度股權(quán)激勵(lì)與員工福利結(jié)合合同
- 2025年度節(jié)能型鍋爐技術(shù)改造咨詢合同
- 2025年度航空航天產(chǎn)業(yè)入股協(xié)議范本合同
- 2025年度海洋運(yùn)輸危險(xiǎn)品運(yùn)輸合同及應(yīng)急處理方案
- 2025年度國(guó)際旅游度假村合作開發(fā)合同
- 不銹鋼欄桿施工工藝
- 陜西演藝集團(tuán)有限公司招聘筆試題庫(kù)2023
- vc約起來(lái)史上最全180個(gè)知名投資人聯(lián)系方式
- 中國(guó)酒文化英文介紹
- 部編版五年級(jí)語(yǔ)文下冊(cè)課文四字詞總結(jié)
- 社會(huì)穩(wěn)定風(fēng)險(xiǎn)評(píng)估報(bào)告風(fēng)險(xiǎn)評(píng)估參考
- 制冷操作證培訓(xùn)教材-制冷與空調(diào)設(shè)備運(yùn)行操作作業(yè)培課件
- 勞動(dòng)感悟800字作文30篇
- 上下樓梯安全我知道安全教育課件
- 市級(jí)臨床重點(diǎn)專科申報(bào)書
- 《醫(yī)院重點(diǎn)??平ㄔO(shè)專項(xiàng)資金管理辦法》
評(píng)論
0/150
提交評(píng)論