itridentspout firstn取top實(shí)現(xiàn)流合并關(guān)聯(lián)_第1頁(yè)
itridentspout firstn取top實(shí)現(xiàn)流合并關(guān)聯(lián)_第2頁(yè)
itridentspout firstn取top實(shí)現(xiàn)流合并關(guān)聯(lián)_第3頁(yè)
itridentspout firstn取top實(shí)現(xiàn)流合并關(guān)聯(lián)_第4頁(yè)
itridentspout firstn取top實(shí)現(xiàn)流合并關(guān)聯(lián)_第5頁(yè)
已閱讀5頁(yè),還剩5頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

Storm流計(jì)算從入門到精通—技術(shù)篇講師:Cloudy(北風(fēng)網(wǎng)版權(quán)所有)23、ITridentSpout、FirstN(取TopN)實(shí)現(xiàn)、流合并和joinITridentSpout基于事務(wù)staticinterfaceITridentSpout.BatchCoordinator<X>

staticinterfaceITridentSpout.Emitter<X>接口類的實(shí)現(xiàn)和之前事務(wù)ITransactionalSpout非常類似。Topo例子topology.newDRPCStream("top",drpc)

.each(newFields("args"),newSplit(“”),newFields("time"))

.parallelismHint(5)

.stateQuery(myStates,newFields("time"),newQueryPacketDB(),newFields("srcip","byt","pkt"))

.groupBy(newFields("srcip"))

.chainedAgg()

.aggregate(newFields("byt"),newSum(),newFields("yt"))

.aggregate(newFields("pkt"),newSum(),newFields("kt"))

.chainEnd()

.applyAssembly(newFirstN(10,"yt",true));調(diào)用鏈用于執(zhí)行多個(gè)聚合如果想同事執(zhí)行多個(gè)聚合,可以使用如下的調(diào)用鏈mystream.chainedAgg().partitionAggregate(newCount(),newFields("count")).partitionAggregate(newFields("b"),newSum(),newFields("sum")).chainEnd()這個(gè)代碼將會(huì)在每個(gè)分區(qū)上執(zhí)行count和sum聚合。輸出將包含【“count”,“sum”】字段。投影(projection)投影操作是對(duì)數(shù)據(jù)上進(jìn)行列裁剪。如果你有一個(gè)流有【“a”,“b”,“c”,“d”】四個(gè)字段,執(zhí)行下面的代碼:ject(newFields("b","d"));輸出流將只有【“b”,“d”】?jī)蓚€(gè)字段。重分區(qū)(repartition)操作

重分區(qū)操作是通過(guò)一個(gè)函數(shù)改變?cè)M(tuple)在task之間的分布,重分區(qū)(repatition)需要網(wǎng)絡(luò)傳輸,目的是方便聚合或查詢。如下是重分區(qū)函數(shù):1.

Shuffle:2.

Broadcast:每個(gè)元組重復(fù)的發(fā)送到所有的目標(biāo)分區(qū)。這個(gè)在DRPC中很有用。

如果你想做在每個(gè)分區(qū)上做一個(gè)statequery。3.

paritionBy:根據(jù)一系列分發(fā)字段(fields)做一個(gè)語(yǔ)義的分區(qū)。通過(guò)對(duì)這些字

段取hash值并對(duì)目標(biāo)分區(qū)數(shù)取模獲取目標(biāo)分區(qū)。paritionBy保證相同的分發(fā)

字段(fields)分發(fā)到相同的目標(biāo)分區(qū)。4.

global:所有的tuple分發(fā)到相同的分區(qū)。5.

batchGobal:本批次的所有tuple發(fā)送到相同的分區(qū),不通批次可以在不通的分

區(qū)。6.

patition:這個(gè)函數(shù)接受用戶自定義的分區(qū)函數(shù)。用戶自定義函數(shù)事項(xiàng)

backtype.storm.grouping.CustomStreamGrouping接口。

合并和關(guān)聯(lián)合并(merge)多個(gè)流成為一個(gè)流,可以如下:topology.merge(stream1,stream2,stream3);Trident合并的流字段會(huì)以第一個(gè)流的字段命名。另一個(gè)合并流的方法是join。類似SQL的join都是對(duì)固定輸入的。而流的輸入是不固定的,所以不能按照sql的方法做join。Trident中的join只會(huì)在spout發(fā)出的每個(gè)批次間進(jìn)行。如一個(gè)流包含字段【“key”,“val1”,“val2”】,另一個(gè)流包含字段【“x”,“val1”】:topology.join(stream1,newFields("key"),stream2,newFields("x"),newFields("key","a","b","c"));Stream1的“key”和stream2的“x”關(guān)聯(lián),Trident要求所有的字段要改名字。1.首先是join字段。例子中stream1中的“key”對(duì)應(yīng)stream2中的“x”。2.接下來(lái),會(huì)把非join字段依次列出來(lái),排列順序按照傳給join的順序。例子中“a”,“b”對(duì)應(yīng)stream1中的“val1”和“wal2”,“c”對(duì)應(yīng)stream2中的“val1”

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論