




版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
./分布式流處理是對(duì)無(wú)邊界數(shù)據(jù)集進(jìn)行連續(xù)不斷的處理、聚合和分析。它跟MapReduce一樣是一種通用計(jì)算,但我們期望延遲在毫秒或者秒級(jí)別。這類系統(tǒng)一般采用有向無(wú)環(huán)圖〔DAG。DAG是任務(wù)鏈的圖形化表示,我們用它來(lái)描述流處理作業(yè)的拓?fù)?。如下圖,數(shù)據(jù)從sources流經(jīng)處理任務(wù)鏈到sinks。單機(jī)可以運(yùn)行DAG,但本篇文章主要聚焦在多臺(tái)機(jī)器上運(yùn)行DAG的情況。關(guān)注點(diǎn)當(dāng)選擇不同的流處理系統(tǒng)時(shí),有以下幾點(diǎn)需要注意的:運(yùn)行時(shí)和編程模型:平臺(tái)框架提供的編程模型決定了許多特色功能,編程模型要足夠處理各種應(yīng)用場(chǎng)景。這是一個(gè)相當(dāng)重要的點(diǎn),后續(xù)會(huì)繼續(xù)。函數(shù)式原語(yǔ):流處理平臺(tái)應(yīng)該能提供豐富的功能函數(shù),比如,map或者filter這類易擴(kuò)展、處理單條信息的函數(shù);處理多條信息的函數(shù)aggregation;跨數(shù)據(jù)流、不易擴(kuò)展的操作join。狀態(tài)管理:大部分應(yīng)用都需要保持狀態(tài)處理的邏輯。流處理平臺(tái)應(yīng)該提供存儲(chǔ)、訪問(wèn)和更新?tīng)顟B(tài)信息。消息傳輸保障:消息傳輸保障一般有三種:atmostonce,atleastonce和exactlyonce。Atmostonce的消息傳輸機(jī)制是每條消息傳輸零次或者一次,即消息可能會(huì)丟失;Atleastonce意味著每條消息會(huì)進(jìn)行多次傳輸嘗試,至少一次成功,即消息傳輸可能重復(fù)但不會(huì)丟失;Exactlyonce的消息傳輸機(jī)制是每條消息有且只有一次,即消息傳輸既不會(huì)丟失也不會(huì)重復(fù)。容錯(cuò):流處理框架中的失敗會(huì)發(fā)生在各個(gè)層次,比如,網(wǎng)絡(luò)部分,磁盤崩潰或者節(jié)點(diǎn)宕機(jī)等。流處理框架應(yīng)該具備從所有這種失敗中恢復(fù),并從上一個(gè)成功的狀態(tài)〔無(wú)臟數(shù)據(jù)重新消費(fèi)。性能:延遲時(shí)間〔Latency,吞吐量〔Throughput和擴(kuò)展性〔Scalability是流處理應(yīng)用中極其重要的指標(biāo)。平臺(tái)的成熟度和接受度:成熟的流處理框架可以提供潛在的支持,可用的庫(kù),甚至開(kāi)發(fā)問(wèn)答幫助。選擇正確的平臺(tái)會(huì)在這方面提供很大的幫助。運(yùn)行時(shí)和編程模型運(yùn)行時(shí)和編程模型是一個(gè)系統(tǒng)最重要的特質(zhì),因?yàn)樗鼈兌x了表達(dá)方式、可能的操作和將來(lái)的局限性。因此,運(yùn)行時(shí)和編程模型決定了系統(tǒng)的能力和適用場(chǎng)景。實(shí)現(xiàn)流處理系統(tǒng)有兩種完全不同的方式:一種是稱作原生流處理,意味著所有輸入的記錄一旦到達(dá)即會(huì)一個(gè)接著一個(gè)進(jìn)行處理。第二種稱為微批處理。把輸入的數(shù)據(jù)按照某種預(yù)先定義的時(shí)間間隔〔典型的是幾秒鐘分成短小的批量數(shù)據(jù),流經(jīng)流處理系統(tǒng)。兩種方法都有其先天的優(yōu)勢(shì)和不足。首先以原生流處理開(kāi)始,原生流處理的優(yōu)勢(shì)在于它的表達(dá)方式。數(shù)據(jù)一旦到達(dá)立即處理,這些系統(tǒng)的延遲性遠(yuǎn)比其它微批處理要好。除了延遲性外,原生流處理的狀態(tài)操作也容易實(shí)現(xiàn),后續(xù)將詳細(xì)講解。一般原生流處理系統(tǒng)為了達(dá)到低延遲和容錯(cuò)性會(huì)花費(fèi)比較大的成本,因?yàn)樗枰紤]每條記錄。原生流處理的負(fù)載均衡也是個(gè)問(wèn)題。比如,我們處理的數(shù)據(jù)按key分區(qū),如果分區(qū)的某個(gè)key是資源密集型,那這個(gè)分區(qū)很容易成為作業(yè)的瓶頸。接下來(lái)看下微批處理。將流式計(jì)算分解成一系列短小的批處理作業(yè),也不可避免的減弱系統(tǒng)的表達(dá)力。像狀態(tài)管理或者join等操作的實(shí)現(xiàn)會(huì)變的困難,因?yàn)槲⑴幚硐到y(tǒng)必須操作整個(gè)批量數(shù)據(jù)。并且,batchinterval會(huì)連接兩個(gè)不易連接的事情:基礎(chǔ)屬性和業(yè)務(wù)邏輯。相反地,微批處理系統(tǒng)的容錯(cuò)性和負(fù)載均衡實(shí)現(xiàn)起來(lái)非常簡(jiǎn)單,因?yàn)槲⑴幚硐到y(tǒng)僅發(fā)送每批數(shù)據(jù)到一個(gè)worker節(jié)點(diǎn)上,如果一些數(shù)據(jù)出錯(cuò)那就使用其它副本。微批處理系統(tǒng)很容易建立在原生流處理系統(tǒng)之上。編程模型一般分為組合式和聲明式。組合式編程提供基本的構(gòu)建模塊,它們必須緊密結(jié)合來(lái)創(chuàng)建拓?fù)洹P碌慕M件經(jīng)常以接口的方式完成。相對(duì)應(yīng)地,聲明式API操作是定義的高階函數(shù)。它允許我們用抽象類型和方法來(lái)寫函數(shù)代碼,并且系統(tǒng)創(chuàng)建拓?fù)浜蛢?yōu)化拓?fù)洹B暶魇紸PI經(jīng)常也提供更多高級(jí)的操作〔比如,窗口函數(shù)或者狀態(tài)管理。后面很快會(huì)給出樣例代碼。主流流處理系統(tǒng)有一系列各種實(shí)現(xiàn)的流處理框架,不能一一列舉,這里僅選出主流的流處理解決方案,并且支持ScalaAPI。因此,我們將詳細(xì)介紹ApacheStorm,Trident,SparkStreaming,Samza和ApacheFlink。前面選擇講述的雖然都是流處理系統(tǒng),但它們實(shí)現(xiàn)的方法包含了各種不同的挑戰(zhàn)。這里暫時(shí)不講商業(yè)的系統(tǒng),比如GoogleMillWheel或者AmazonKinesis,也不會(huì)涉及很少使用的IntelGearPump或者ApacheApex。ApacheStorm最開(kāi)始是由NathanMarz和他的團(tuán)隊(duì)于20XX在數(shù)據(jù)分析公司BackType開(kāi)發(fā)的,后來(lái)BackType公司被Twitter收購(gòu),接著Twitter開(kāi)源Storm并在20XX成為Apache頂級(jí)項(xiàng)目。毋庸置疑,Storm成為大規(guī)模流數(shù)據(jù)處理的先鋒,并逐漸成為工業(yè)標(biāo)準(zhǔn)。Storm是原生的流處理系統(tǒng),提供low-level的API。Storm使用Thrift來(lái)定義topology和支持多語(yǔ)言協(xié)議,使得我們可以使用大部分編程語(yǔ)言開(kāi)發(fā),Scala自然包括在。Trident是對(duì)Storm的一個(gè)更高層次的抽象,Trident最大的特點(diǎn)以batch的形式進(jìn)行流處理。Trident簡(jiǎn)化topology構(gòu)建過(guò)程,增加了窗口操作、聚合操作或者狀態(tài)管理等高級(jí)操作,這些在Storm中并不支持。相對(duì)應(yīng)于Storm的Atmostonce流傳輸機(jī)制,Trident提供了Exactlyonce傳輸機(jī)制。Trident支持Java,Clojure和Scala。當(dāng)前Spark是非常受歡迎的批處理框架,包含SparkSQL,MLlib和SparkStreaming。Spark的運(yùn)行時(shí)是建立在批處理之上,因此后續(xù)加入的SparkStreaming也依賴于批處理,實(shí)現(xiàn)了微批處理。接收器把輸入數(shù)據(jù)流分成短小批處理,并以類似Spark作業(yè)的方式處理微批處理。SparkStreaming提供高級(jí)聲明式API〔支持Scala,Java和Python。Samza最開(kāi)始是專為L(zhǎng)inkedIn公司開(kāi)發(fā)的流處理解決方案,并和LinkedIn的Kafka一起貢獻(xiàn)給社區(qū),現(xiàn)已成為基礎(chǔ)設(shè)施的關(guān)鍵部分。Samza的構(gòu)建嚴(yán)重依賴于基于log的Kafka,兩者緊密耦合。Samza提供組合式API,當(dāng)然也支持Scala。最后來(lái)介紹ApacheFlink。Flink是個(gè)相當(dāng)早的項(xiàng)目,開(kāi)始于20XX,但只在最近才得到注意。Flink是原生的流處理系統(tǒng),提供highlevel的API。Flink也提供API來(lái)像Spark一樣進(jìn)行批處理,但兩者處理的基礎(chǔ)是完全不同的。Flink把批處理當(dāng)作流處理中的一種特殊情況。在Flink中,所有的數(shù)據(jù)都看作流,是一種很好的抽象,因?yàn)檫@更接近于現(xiàn)實(shí)世界??焖俚慕榻B流處理系統(tǒng)之后,讓我們以下面的表格來(lái)更好清晰的展示它們之間的不同:WordCountWordcount之于流處理框架學(xué)習(xí),就好比helloworld之于編程語(yǔ)言學(xué)習(xí)。它能很好的展示各流處理框架的不同之處,讓我們從Storm開(kāi)始看看如何實(shí)現(xiàn)Wordcount:TopologyBuilderbuilder=newTopologyBuilder<>;builder.setSpout<"spout",newRandomSentenceSpout<>,5>;builder.setBolt<"split",newSplit<>,8>.shuffleGrouping<"spout">;builder.setBolt<"count",newWordCount<>,12>.fieldsGrouping<"split",newFields<"word">>;...Map<String,Integer>counts=newHashMap<String,Integer><>;publicvoidexecute<Tupletuple,BasicOutputCollectorcollector>{Stringword=tuple.getString<0>;Integercount=counts.containsKey<word>?counts.get<word>+1:1;counts.put<word,count>;collector.emit<newValues<word,count>>;}首先,定義topology。第二行代碼定義一個(gè)spout,作為數(shù)據(jù)源。然后是一個(gè)處理組件bolt,分割文本為單詞。接著,定義另一個(gè)bolt來(lái)計(jì)算單詞數(shù)〔第四行代碼。也可以看到魔數(shù)5,8和12,這些是并行度,定義集群每個(gè)組件執(zhí)行的獨(dú)立線程數(shù)。第八行到十五行是實(shí)際的WordCountbolt實(shí)現(xiàn)。因?yàn)镾torm不支持建的狀態(tài)管理,所有這里定義了一個(gè)局部狀態(tài)。按之前描述,Trident是對(duì)Storm的一個(gè)更高層次的抽象,Trident最大的特點(diǎn)以batch的形式進(jìn)行流處理。除了其它優(yōu)勢(shì),Trident提供了狀態(tài)管理,這對(duì)wordcount實(shí)現(xiàn)非常有用。publicstaticStormTopologybuildTopology<LocalDRPCdrpc>{FixedBatchSpoutspout=...TridentTopologytopology=newTridentTopology<>;TridentStatewordCounts=topology.newStream<"spout1",spout>.each<newFields<"sentence">,newSplit<>,newFields<"word">>.groupBy<newFields<"word">>.persistentAggregate<newMemoryMapState.Factory<>,newCount<>,newFields<"count">>;...}如你所見(jiàn),上面代碼使用higherlevel操作,比如each〔第七行代碼和groupby〔第八行代碼。并且使用Trident管理狀態(tài)來(lái)存儲(chǔ)單詞數(shù)〔第九行代碼。下面是時(shí)候祭出提供聲明式API的ApacheSpark。記住,相對(duì)于前面的例子,這些代碼相當(dāng)簡(jiǎn)單,幾乎沒(méi)有冗余代碼。下面是簡(jiǎn)單的流式計(jì)算單詞數(shù):valconf=newSparkConf<>.setAppName<"wordcount">valssc=newStreamingContext<conf,Seconds<1>>valtext=...valcounts=text.flatMap<line=>line.split<"">>.map<word=><word,1>>.reduceByKey<_+_>counts.print<>ssc.start<>ssc.awaitTermination<>每個(gè)SparkStreaming的作業(yè)都要有StreamingContext,它是流式函數(shù)的入口。StreamingContext加載第一行代碼定義的配置conf,但更重要地,第二行代碼定義batchinterval〔這里設(shè)置為1秒。第六行到八行代碼是整個(gè)單詞數(shù)計(jì)算。這些是標(biāo)準(zhǔn)的函數(shù)式代碼,Spark定義topology并且分布式執(zhí)行。第十二行代碼是每個(gè)SparkStreaming作業(yè)最后的部分:?jiǎn)?dòng)計(jì)算。記住,SparkStreaming作業(yè)一旦啟動(dòng)即不可修改。接下來(lái)看下ApacheSamza,另外一個(gè)組合式API例子:classWordCountTaskextendsStreamTask{overridedefprocess<envelope:IncomingMessageEnvelope,collector:MessageCollector,coordinator:TaskCoordinator>{valtext=envelope.getMessage.asInstanceOf[String]valcounts=text.split<"">.foldLeft<Map.empty[String,Int]>{<count,word>=>count+<word-><count.getOrElse<word,0>+1>>}collector.send<newOutgoingMessageEnvelope<newSystemStream<"kafka","wordcount">,counts>>}Samza的屬性配置文件定義topology,為了簡(jiǎn)明這里并沒(méi)把配置文件放上來(lái)。定義任務(wù)的輸入和輸出,并通過(guò)Kafkatopic通信。在單詞數(shù)計(jì)算整個(gè)topology是WordCountTask。在Samza中,實(shí)現(xiàn)特殊接口定義組件StreamTask,在第三行代碼重寫方法process。它的參數(shù)列表包含所有連接其它系統(tǒng)的需要。第八行到十行簡(jiǎn)單的Scala代碼是計(jì)算本身。
Flink的API跟SparkStreaming是驚人的相似,但注意到代碼里并未設(shè)置batchinterval。valenv=ExecutionEnvironment.getExecutionEnvironmentvaltext=env.fromElements<...>valcounts=text.flatMap<_.split<"">>.map<<_,1>>.groupBy<0>.sum<1>counts.print<>env.execute<"wordcount">上面的代碼是相當(dāng)?shù)闹卑?僅僅只是幾個(gè)函數(shù)式調(diào)用,Flink支持分布式計(jì)算。結(jié)論上面給出了基本的理論和主流流處理框架介紹,下篇文章將會(huì)更深入的探討其它關(guān)注點(diǎn)。希望你能對(duì)前面的文章感興趣,如果有任何問(wèn)題,請(qǐng)聯(lián)系我討論這些主題。在上篇文章中,我們過(guò)了下基本的理論,也介紹了主流的流處理框架:Storm,Trident,SparkStreaming,Samza和Flink。今天咱們來(lái)點(diǎn)有深度的topic,比如,容錯(cuò),狀態(tài)管理或者性能。除此之外,我們也將討論開(kāi)發(fā)分布式流處理應(yīng)用的指南,并給出推薦的流處理框架。容錯(cuò)性流處理系統(tǒng)的容錯(cuò)性與生俱來(lái)的比批處理系統(tǒng)難實(shí)現(xiàn)。當(dāng)批處理系統(tǒng)中出現(xiàn)錯(cuò)誤時(shí),我們只需要把失敗的部分簡(jiǎn)單重啟即可;但對(duì)于流處理系統(tǒng),出現(xiàn)錯(cuò)誤就很難恢復(fù)。因?yàn)榫€上許多作業(yè)都是7x24小時(shí)運(yùn)行,不斷有輸入的數(shù)據(jù)。流處理系統(tǒng)面臨的另外一個(gè)挑戰(zhàn)是狀態(tài)一致性,因?yàn)橹貑⒑髸?huì)出現(xiàn)重復(fù)數(shù)據(jù),并且不是所有的狀態(tài)操作是冪等的。容錯(cuò)性這么難實(shí)現(xiàn),那下面我們看看各大主流流處理框架是如何處理這一問(wèn)題。ApacheStorm:Storm使用上游數(shù)據(jù)備份和消息確認(rèn)的機(jī)制來(lái)保障消息在失敗之后會(huì)重新處理。消息確認(rèn)原理:每個(gè)操作都會(huì)把前一次的操作處理消息的確認(rèn)信息返回。Topology的數(shù)據(jù)源備份它生成的所有數(shù)據(jù)記錄。當(dāng)所有數(shù)據(jù)記錄的處理確認(rèn)信息收到,備份即會(huì)被安全拆除。失敗后,如果不是所有的消息處理確認(rèn)信息收到,那數(shù)據(jù)記錄會(huì)被數(shù)據(jù)源數(shù)據(jù)替換。這保障了沒(méi)有數(shù)據(jù)丟失,但數(shù)據(jù)結(jié)果會(huì)有重復(fù),這就是at-leastonce傳輸機(jī)制。Storm采用取巧的辦法完成了容錯(cuò)性,對(duì)每個(gè)源數(shù)據(jù)記錄僅僅要求幾個(gè)字節(jié)存儲(chǔ)空間來(lái)跟蹤確認(rèn)消息。純數(shù)據(jù)記錄消息確認(rèn)架構(gòu),盡管性能不錯(cuò),但不能保證exactlyonce消息傳輸機(jī)制,所有應(yīng)用開(kāi)發(fā)者需要處理重復(fù)數(shù)據(jù)。Storm存在低吞吐量和流控問(wèn)題,因?yàn)橄⒋_認(rèn)機(jī)制在反壓下經(jīng)常誤認(rèn)為失敗。SparkStreaming:SparkStreaming實(shí)現(xiàn)微批處理,容錯(cuò)機(jī)制的實(shí)現(xiàn)跟Storm不一樣的方法。微批處理的想法相當(dāng)簡(jiǎn)單。Spark在集群各worker節(jié)點(diǎn)上處理micro-batches。每個(gè)micro-batches一旦失敗,重新計(jì)算就行。因?yàn)閙icro-batches本身的不可變性,并且每個(gè)micro-batches也會(huì)持久化,所以exactlyonce傳輸機(jī)制很容易實(shí)現(xiàn)。Samza:Samza的實(shí)現(xiàn)方法跟前面兩種流處理框架完全不一樣。Samza利用消息系統(tǒng)Kafka的持久化和偏移量。Samza監(jiān)控任務(wù)的偏移量,當(dāng)任務(wù)處理完消息,相應(yīng)的偏移量被移除。消息的偏移量會(huì)被checkpoint到持久化存儲(chǔ)中,并在失敗時(shí)恢復(fù)。但是問(wèn)題在于:從上次checkpoint中修復(fù)偏移量時(shí)并不知道上游消息已經(jīng)被處理過(guò),這就會(huì)造成重復(fù)。這就是atleastonce傳輸機(jī)制。ApacheFlink:Flink的容錯(cuò)機(jī)制是基于分布式快照實(shí)現(xiàn)的,這些快照會(huì)保存流處理作業(yè)的狀態(tài)〔本文對(duì)Flink的檢查點(diǎn)和快照不進(jìn)行區(qū)分,因?yàn)閮烧邔?shí)際是同一個(gè)事物的兩種不同叫法。Flink構(gòu)建這些快照的機(jī)制可以被描述成分布式數(shù)據(jù)流的輕量級(jí)異步快照,它采用Chandy-Lamport算法實(shí)現(xiàn)。。如果發(fā)生失敗的情況,系統(tǒng)可以從這些檢查點(diǎn)進(jìn)行恢復(fù)。Flink發(fā)送checkpoint的柵欄〔barrier到數(shù)據(jù)流中〔柵欄是Flink的分布式快照機(jī)制中一個(gè)核心的元素,當(dāng)checkpoint的柵欄到達(dá)其中一個(gè)operator,operator會(huì)接所有收輸入流中對(duì)應(yīng)的柵欄〔比如,圖中checkpointn對(duì)應(yīng)柵欄n到n-1的所有輸入流,其僅僅是整個(gè)輸入流的一部分。所以相對(duì)于Storm,Flink的容錯(cuò)機(jī)制更高效,因?yàn)镕link的操作是對(duì)小批量數(shù)據(jù)而不是每條數(shù)據(jù)記錄。但也不要讓自己糊涂了,Flink仍然是原生流處理框架,它與SparkStreaming在概念上就完全不同。Flink也提供exactlyonce消息傳輸機(jī)制。狀態(tài)管理大部分大型流處理應(yīng)用都涉及到狀態(tài)。相對(duì)于無(wú)狀態(tài)的操作〔其只有一個(gè)輸入數(shù)據(jù),處理過(guò)程和輸出結(jié)果,有狀態(tài)的應(yīng)用會(huì)有一個(gè)輸入數(shù)據(jù)和一個(gè)狀態(tài)信息,然后處理過(guò)程,接著輸出結(jié)果和修改狀態(tài)信息。因此,我們不得不管理狀態(tài)信息,并持久化。我們期望一旦因某種原因失敗,狀態(tài)能夠修復(fù)。狀態(tài)修復(fù)有可能會(huì)出現(xiàn)小問(wèn)題,它并不總是保證exactlyonce,有時(shí)也會(huì)出現(xiàn)消費(fèi)多次,但這并不是我們想要的。據(jù)我們所知,Storm提供at-leastonce的消息傳輸保障。那我們又該如何使用Trident做到exactlyonce的語(yǔ)義。概念上貌似挺簡(jiǎn)單,你只需要提交每條數(shù)據(jù)記錄,但這顯然不是那么高效。所以你會(huì)想到小批量的數(shù)據(jù)記錄一起提交會(huì)優(yōu)化。Trident定義了幾個(gè)抽象來(lái)達(dá)到exactlyonce的語(yǔ)義,見(jiàn)下圖,其中也會(huì)有些局限。SparkStreaming是微批處理系統(tǒng),它把狀態(tài)信息也看做是一種微批量數(shù)據(jù)流。在處理每個(gè)微批量數(shù)據(jù)時(shí),Spark加載當(dāng)前的狀態(tài)信息,接著通過(guò)函數(shù)操作獲得處理后的微批量數(shù)據(jù)結(jié)果并修改加載過(guò)的狀態(tài)信息。1Samza實(shí)現(xiàn)狀態(tài)管理是通過(guò)Kafka來(lái)處理的。Samza有真實(shí)的狀態(tài)操作,所以其任務(wù)會(huì)持有一個(gè)狀態(tài)信息,并把狀態(tài)改變的日志推送到Kafka。如果需要狀態(tài)重建,可以很容易的從Kafka的topic重建。為了達(dá)到更快的狀態(tài)管理,Samza也支持把狀態(tài)信息放入本地key-value存儲(chǔ)中,所以狀態(tài)信息不必一直在Kafka中管理,見(jiàn)下圖。不幸的是,Samza只提供at-leastonce語(yǔ)義,exactlyonce的支持也在計(jì)劃中。Flink提供狀態(tài)操作,和Samza類似。Flink提供兩種類型的狀態(tài):一種是用戶自定義狀態(tài);另外一種是窗口狀態(tài)。如圖,第一個(gè)狀態(tài)是自定義狀態(tài),它和其它的的狀態(tài)不相互作用。這些狀態(tài)可以分區(qū)或者使用嵌入式Key-Value存儲(chǔ)狀態(tài)[文檔一和二]。當(dāng)然Flink提供exactly-once語(yǔ)義。下圖展示Flink長(zhǎng)期運(yùn)行的三個(gè)狀態(tài)。單詞計(jì)數(shù)例子中的狀態(tài)管理單詞計(jì)數(shù)的詳細(xì)代碼見(jiàn)上篇文章,這里僅關(guān)注狀態(tài)管理部分。讓我們先看Trident:publicstaticStormTopologybuildTopology<LocalDRPCdrpc>{FixedBatchSpoutspout=...TridentTopologytopology=newTridentTopology<>;TridentStatewordCounts=topology.newStream<"spout1",spout>.each<newFields<"sentence">,newSplit<>,newFields<"word">>.groupBy<newFields<"word">>.persistentAggregate<newMemoryMapState.Factory<>,newCount<>,newFields<"count">>;...}在第九行代碼中,我們通過(guò)調(diào)用persistentAggregate創(chuàng)建一個(gè)狀態(tài)。其中參數(shù)Count存儲(chǔ)單詞數(shù),如果你想從狀態(tài)中處理數(shù)據(jù),你必須創(chuàng)建一個(gè)數(shù)據(jù)流。從代碼中也可以看出實(shí)現(xiàn)起來(lái)不方便。SparkStreaming聲明式的方法稍微好點(diǎn)://InitialRDDinputtoupdateStateByKeyvalinitialRDD=ssc.sparkContext.parallelize<List.empty[<String,Int>]>vallines=...valwords=lines.flatMap<_.split<"">>valwordDstream=words.map<x=><x,1>>valtrackStateFunc=<batchTime:Time,word:String,one:Option[Int],state:State[Int]>=>{valsum=one.getOrElse<0>+state.getOption.getOrElse<0>valoutput=<word,sum>state.update<sum>Some<output>}valstateDstream=wordDstream.trackStateByKey<StateSpec.function<trackStateFunc>.initialState<initialRDD>>首先我們需要?jiǎng)?chuàng)建一個(gè)RDD來(lái)初始化狀態(tài)〔第二行代碼,然后進(jìn)行transformations〔第五行和六行代碼。接著在第八行到十四行代碼,我們定義函數(shù)來(lái)處理單詞數(shù)狀態(tài)。函數(shù)計(jì)算并更新?tīng)顟B(tài),最后返回結(jié)果。第十六行和十七行代碼,我們得到一個(gè)狀態(tài)信息流,其中包含單詞數(shù)。接著我們看下Samza,classWordCountTaskextendsStreamTaskwithInitableTask{privatevarstore:CountStore=_definit<config:Config,context:TaskContext>{this.store=context.getStore<"wordcount-store">.asInstanceOf[KeyValueStore[String,Integer]]}overridedefprocess<envelope:IncomingMessageEnvelope,collector:MessageCollector,coordinator:TaskCoordinator>{valwords=envelope.getMessage.asInstanceOf[String].split<"">words.foreach{key=>valcount:Integer=Option<store.get<key>>.getOrElse<0>store.put<key,count+1>collector.send<newOutgoingMessageEnvelope<newSystemStream<"kafka","wordcount">,<key,count>>>}}首先在第三行代碼定義狀態(tài),進(jìn)行Key-Value存儲(chǔ),在第五行到八行代碼初始化狀態(tài)。接著在計(jì)算中使用,上面的代碼已經(jīng)很直白。最后,講下Flink使用簡(jiǎn)潔的API實(shí)現(xiàn)狀態(tài)管理:valenv=ExecutionEnvironment.getExecutionEnvironmentvaltext=env.fromElements<...>valwords=text.flatMap<_.split<"">>words.keyBy<x=>x>.mapWithState{<word,count:Option[Int]>=>{valnewCount=count.getOrElse<0>+1valoutput=<word,newCount><output,Some<newCount>>}}我們僅僅需要在第六行代碼中調(diào)用mapwithstate函數(shù),它有一個(gè)函數(shù)參數(shù)〔函數(shù)有兩個(gè)變量,第一個(gè)是單詞,第二個(gè)是狀態(tài)。然后返回處理的結(jié)果和新的狀態(tài)。流處理框架性能這里所講的性能主要涉及到的是延遲性和吞吐量。對(duì)于延遲性來(lái)說(shuō),微批處理一般在秒級(jí)別,大部分原生流處理在百毫秒以下,調(diào)優(yōu)的情況下Storm可以很輕松的達(dá)到十毫秒。同時(shí)也要記住,消息傳輸機(jī)制保障,容錯(cuò)性和狀態(tài)恢復(fù)都會(huì)占用機(jī)器資源。例如,打開(kāi)容錯(cuò)恢復(fù)可能會(huì)降低10%到15%的性能,Storm可能降低70%的吞吐量??傊?天下沒(méi)有免費(fèi)的午餐。對(duì)于有狀態(tài)管理,Flink會(huì)降低25%的性能,SparkStreaming降低50%的性能。也要記住,各大流處理框架的所有操作都是分布式的,通過(guò)網(wǎng)絡(luò)發(fā)送數(shù)據(jù)是相當(dāng)耗時(shí)的,所以進(jìn)了利用數(shù)據(jù)本地性,也盡量?jī)?yōu)化你的應(yīng)用的序列化。項(xiàng)目成熟度當(dāng)你為應(yīng)用選型時(shí)一定會(huì)考慮項(xiàng)目的成熟度。下面來(lái)快速瀏覽一下:
Storm是第一個(gè)主流的流處理框架,后期已經(jīng)成為長(zhǎng)期的工業(yè)級(jí)的標(biāo)準(zhǔn),并在像Twitter,Yahoo,Spotify等大公司使用。SparkStreaming是最近最流行的Scala代碼實(shí)現(xiàn)的流處理框架。現(xiàn)在SparkStreaming被公司〔Netflix,Cisco,DataStax,Intel,IBM等日漸接受。Samza主要在LinkedIn公司使用。Flink是一個(gè)新興的項(xiàng)目,很有前景。你可能對(duì)項(xiàng)目的貢獻(xiàn)者數(shù)量也感興趣。Storm和Trident大概有180個(gè)代碼貢獻(xiàn)者;整個(gè)Spark有720多個(gè);根據(jù)github顯示,Samza有40個(gè);Flink有超過(guò)130個(gè)代碼貢獻(xiàn)者。小結(jié)在進(jìn)行流處理框架推薦之前,先來(lái)整體看下總結(jié)表:流處理框架推薦應(yīng)用選型是大家都會(huì)遇到的問(wèn)題,一般是根據(jù)應(yīng)用具體的場(chǎng)景來(lái)選擇特定的流處理框架。下面給出幾個(gè)作者認(rèn)為優(yōu)先考慮的點(diǎn):HighlevelAPI:具有highlevelAPI的流處理框架會(huì)更簡(jiǎn)潔和高效;狀態(tài)管理:大部分流處理應(yīng)用都涉及到狀態(tài)管理,因此你得
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 跨境電商市場(chǎng)機(jī)遇分析-深度研究
- 課題申報(bào)書職業(yè)學(xué)生自信
- 蘇州代寫課題申報(bào)書
- 課題申報(bào)書框架
- 高中課題研究申報(bào)書范文
- 發(fā)廊勞動(dòng)合同范本
- 借用資質(zhì)包工合同范本
- 制作合同范本定金
- 勞動(dòng)課題省級(jí)課題申報(bào)書
- 保安公司疫情合同范本
- 租金評(píng)估技術(shù)報(bào)告范文模版
- 中風(fēng)患者的護(hù)理及康復(fù)指南培訓(xùn)
- 數(shù)據(jù)中心運(yùn)維解決方案
- 滁州城市職業(yè)學(xué)院?jiǎn)握小堵殬I(yè)技能測(cè)試》參考試題庫(kù)(含答案)
- 基于單片機(jī)控制的充電樁設(shè)計(jì)
- SB-T 11238-2023 報(bào)廢電動(dòng)汽車回收拆解技術(shù)要求
- 開(kāi)題報(bào)告-基于單片機(jī)的溫度控制系統(tǒng)設(shè)計(jì)
- 鋰電池正極材料行業(yè)分析
- 國(guó)家級(jí)省級(jí)化工園區(qū)列表
- 肩關(guān)節(jié)脫位手法復(fù)位課件
- 汽車懸架概述
評(píng)論
0/150
提交評(píng)論