版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
修訂記錄課程編碼適用產(chǎn)品產(chǎn)品版本課程版本ISSUEModule14Streaming應用開發(fā)FusionInsightFusionInsightHDV100R002C60V1.0開發(fā)/優(yōu)化者時間審核人開發(fā)類型(新開發(fā)/優(yōu)化)本頁不打印FusionInsightHD
Streaming應用開發(fā)目標學完本課程后,您將能夠:了解Storm基本業(yè)務開發(fā)流程熟悉Storm常用API接口使用熟悉Storm業(yè)務設計基本原則了解Streaming二次開發(fā)環(huán)境了解CQL開發(fā)流程及使用目錄Streaming應用場景Streaming應用開發(fā)流程CQL應用開發(fā)流程應用開發(fā)案例分析常用開發(fā)接口示例CQL語法示例Streaming的定義Streaming基于開源Storm,是一個分布式、實時計算框架。事件驅動連續(xù)查詢數(shù)據(jù)不存儲,先計算實時響應,低延遲EventDataQueriesAlertsActionsNowaiting;Resultsdeliveredin-flightQueriesMemoryDisk傳統(tǒng)數(shù)據(jù)庫技術
數(shù)據(jù)先存儲,再查詢處理
Hadoop技術
數(shù)據(jù)先寫入文件系統(tǒng)進行批處理Streaming架構回顧Client提交拓撲SupervisorWorkerSupervisorNimbusWorkerWorkerExecutorExecutor下載Jar包啟動WorkerZookeeperZookeeperZookeeper監(jiān)控心跳,分配任務獲取分配任務上報心跳Streaming的適用場景Streaming主要應用于以下幾種對響應時延有嚴格要求的場景:
實時分析:如實時日志處理、交通流量分析等實時統(tǒng)計:如網(wǎng)站的實時訪問統(tǒng)計、排序等實時推薦:如實時廣告定位、事件營銷等目錄Streaming應用場景Streaming應用開發(fā)流程CQL應用開發(fā)流程應用開發(fā)案例分析常用開發(fā)接口示例CQL語法示例2Streaming應用開發(fā)流程準備開發(fā)環(huán)境下載并導入樣例工程根據(jù)場景開發(fā)拓撲打包Eclipse代碼打包完整業(yè)務提交拓撲查看運行結果制定業(yè)務目標2Streaming應用開發(fā)流程制定業(yè)務目標數(shù)據(jù)源?Spout從哪里獲取數(shù)據(jù),例如Kafka、TCP或者MQ等。結果輸出到哪里?寫入Kafka、HDFS或者Redis等。拓撲結構設計。Spout、Bolt如何組織??煽啃砸螅渴欠駧cker?2Streaming應用開發(fā)流程準備開發(fā)環(huán)境準備項說明操作系統(tǒng)Windows系統(tǒng),推薦Windows7以上版本。安裝JDK開發(fā)環(huán)境的基本配置。版本要求:1.7或者1.8。安裝和配置Eclipse用于開發(fā)Streaming拓撲的工具。網(wǎng)絡確保客戶端與Streaming服務主機在網(wǎng)絡上互通。2Streaming應用開發(fā)流程下載并導入Streaming樣例工程1、下載并解壓Streaming客戶端壓縮包2、在FusionInsightManager頁面新建用戶,用于登陸與操作3、下載用戶的認證憑據(jù)文件4、導入客戶端中“storm-examples”樣例工程到Eclipse開發(fā)環(huán)境5、配置認證憑據(jù)文件到樣例工程的“src/main/resources”下2Streaming應用開發(fā)流程2Streaming應用開發(fā)流程根據(jù)場景開發(fā)拓撲梳理業(yè)務場景根據(jù)功能實現(xiàn)Spout/Bolt熟悉Storm的API,調用相應的API構造拓撲業(yè)務代碼開發(fā)完成后,參考樣例代碼WordCountTopology.java選擇提交方式,如Local、Remote和CMD等。2Streaming應用開發(fā)流程打包Eclipse代碼業(yè)務開發(fā)完成后,在開發(fā)環(huán)境Eclipse工程上,右擊選擇“Export”,在“Export”面板中選擇“JarFile”,將工程中的“src/main/java”源碼打包到指定路徑,如“D:\\tmp\\example.jar”。2Streaming應用開發(fā)流程打包完整業(yè)務將Eclipse打出的example.jar和業(yè)務中引入的外部jar包和相關配置文件整合到同一目錄下,并打出最終的業(yè)務jar包在Eclipse工程的tools目錄下找到打包工具:“streaming-jartool.bat”來完成打包2Streaming應用開發(fā)流程提交拓撲當前streaming支持三種方式提交拓撲Linux命令行提交—CMD模式:將打出的業(yè)務jar包上傳至已安裝streaming客戶端的linux環(huán)境上,使用stormjar<jarFile><className><topoName>命令提交拓撲。在提交之前請使用已申請的安全用戶執(zhí)行kinit<userName>進行安全認證。Eclipse遠程提交——Remote模式:在提交前需要在本地進行安全準備,適配代碼中的業(yè)務jar包路徑和安全參數(shù),然后右鍵->單擊“Runas>JavaApplication”提交拓撲。本地模式提交——Local模式:該模式下需要拷貝業(yè)務所需的外部jar包及配置問價到本地工程并且加入到classpath中,然后右鍵->單擊“Runas>JavaApplication”提交拓撲。本地模式一般用來測試。2Streaming應用開發(fā)流程查看運行結果登錄FusionInsightManager系統(tǒng),選擇“服務管理>Streaming”,點擊進入StreamingWebUI,在StormUI中點擊應用名稱,查看應用程序運行情況。目錄Streaming應用場景Streaming應用開發(fā)流程CQL應用開發(fā)流程應用開發(fā)案例分析常用開發(fā)接口示例CQL語法示例3CQL應用開發(fā)流程Shell提交下載并安裝Streaming客戶端到Linux客戶端主機在FusionInsightManager頁面新建用戶,用于登陸與操作使用新建的用戶執(zhí)行kinit進行安全登錄進入安裝好的Streaming客戶端“streaming-cql-1.0/bin”目錄,執(zhí)行“cql”進入CQLShell在Shell中使用CQL語句定義并提交拓撲3CQL應用開發(fā)流程3CQL應用開發(fā)流程Eclipse遠程提交下載并安裝Streaming客戶端在FusionInsightManager頁面新建用戶,用于登陸與操作下載用戶的憑據(jù)文件導入客戶端中“cql-examples”樣例工程到Eclipse開發(fā)環(huán)境配置認證憑據(jù)文件到樣例工程的“src/main/resources”下在本地開發(fā)CQL應用并保存到“src/main/resources”下,如“example.cql”適配CQLExample.java中的安全參數(shù)和待執(zhí)行CQL文件地址右鍵->RunAs->JavaApplication執(zhí)行CQLExample.java目錄Streaming應用場景Streaming應用開發(fā)流程CQL應用開發(fā)流程應用開發(fā)案例分析常用開發(fā)接口示例CQL語法示例4應用開發(fā)案例分析固定時間窗口內TopN統(tǒng)計-業(yè)務目標datadata…TCPClientStormClusterTop1張三10次Top2李四9次Top3王五7次...Top10XX0次統(tǒng)計最近10分鐘內訪問游客的Top10,每10秒輸出一次統(tǒng)計結果4應用開發(fā)案例分析固定時間窗口內TopN統(tǒng)計-Spout設計功能要求收據(jù)接收啟動TCPServer數(shù)據(jù)反序列化需要對接收的數(shù)據(jù)進行反序列化處理數(shù)據(jù)拆分將反序列化后的數(shù)據(jù)根據(jù)預先設計好的schema進行拆分數(shù)據(jù)篩選對拆分后的數(shù)據(jù)進行篩選,篩選出關鍵信息數(shù)據(jù)緩存將提取的關鍵數(shù)(username)據(jù)緩存到固定大小的隊列中數(shù)據(jù)發(fā)送從緩存中取出數(shù)據(jù)發(fā)送4應用開發(fā)案例分析固定時間窗口內TopN統(tǒng)計-TimerSpout設計功能要求發(fā)送時間戳每10秒發(fā)送一次時間戳4應用開發(fā)案例分析固定時間窗口內TopN統(tǒng)計-CountingBolt設計功能要求窗口定義維護一個10min的窗口,窗口中緩存<username,count>,且只保留count值Top10數(shù)據(jù)刷新窗口判斷當前接收到的數(shù)據(jù)是不是時間戳,如果不是則刷新窗口中的數(shù)據(jù)并重新計算排序發(fā)送數(shù)據(jù)判斷當前接收到的數(shù)據(jù)是不是時間戳,如果是則將當前窗口內容發(fā)送給下一跳4應用開發(fā)案例分析固定時間窗口內TopN統(tǒng)計-GatherBolt設計功能要求數(shù)據(jù)匯聚將接收到的同一批次的數(shù)據(jù)匯總,根據(jù)時間戳頻率,每10秒完成一次匯聚結果處理將匯聚后的數(shù)據(jù)重新排序并保留Top10數(shù)據(jù)存儲將處理后的最終Top10結果存入Kakfa/Redis/DB等4應用開發(fā)案例分析固定時間窗口內TopN統(tǒng)計-拓撲設計SpoutTimerSpoutCountBoltCountBoltCountBoltGatherBolt4應用開發(fā)案例分析固定時間窗口內TopN統(tǒng)計-分組方式設計分組方式功能介紹fieldsGrouping(字段分組)按照消息的哈希值分組發(fā)送給目標Bolt的TaskglobalGrouping(全局分組)所有消息都發(fā)送給目標Bolt的固定一個TaskshuffleGrouping(隨機分組)消息發(fā)送給目標Bolt的隨機一個tasklocalOrShuffleGrouping(本地或者隨機分組)如果目標Bolt在同一工作進程存在一個或多個Task,數(shù)據(jù)會隨機分配給這些Task。否則,該分組方式與隨機分組方式相同allGrouping(廣播分組)消息群發(fā)給目標Bolt的所有TaskdirectGrouping(直接分組)由數(shù)據(jù)生產(chǎn)者決定數(shù)據(jù)發(fā)送給目標Bolt的哪一個Task。需在發(fā)送時使用emitDirect(taskID,tuple)接口指定TaskIDpartialKeyGrouping(局部字段分組)更均衡的字段分組noneGrouping(不分組)當前和隨機分組相同4應用開發(fā)案例分析固定時間窗口內TopN統(tǒng)計-分組方式設計SpoutCountBoltCountBoltCountBolt張三,張三,王五張三,李四張三,王五GatherBolt張三2王五1張三1李四1張三1王五1張三2張三1張三1李四1王五1王五1Spout和CountBolt之間采用隨機分組方式結果錯誤!4應用開發(fā)案例分析固定時間窗口內TopN統(tǒng)計-分組方式設計SpoutCountBoltCountBoltCountBolt張三,張三,張三李四,李四王五,王五GatherBolt張三3李四2王五2張三3李四2王五2Spout和CountBolt之間采用字段分組方式,以username為關鍵字結果正確!4應用開發(fā)案例分析固定時間窗口內TopN統(tǒng)計-分組方式設計TimerSpoutCountBoltCountBoltCountBoltGatherBoltallGroupingglobalGrouping需要根據(jù)場景選擇合適的分組方式,才能獲得預期的結果目錄Streaming應用場景Streaming應用開發(fā)流程CQL應用開發(fā)流程應用開發(fā)案例分析常用開發(fā)接口示例CQL語法示例5常用接口示例Storm提供接口:REST接口REST(RepresentationalStateTransfer)表述性狀態(tài)轉移接口Thrift接口由Nimbus提供。Thrift是一個基于靜態(tài)代碼生成的跨語言的RPC協(xié)議棧實現(xiàn),它可以生成包括C++,Java,Python,Ruby,PHP等主流語言的代碼,這些代碼實現(xiàn)了RPC的協(xié)議層和傳輸層功能,從而讓用戶可以集中精力于服務的調用和實現(xiàn)5常用接口示例-Spout接口5常用接口示例-Spout接口publicclassRandomSentenceSpoutextendsBaseRichSpout{SpoutOutputCollector_collector;Random_rand;
publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){_collector=collector;_rand=newRandom();}
publicvoidnextTuple(){String[]sentences=newString[]{"thecowjumpedoverthemoon","anappleadaykeepsthedoctoraway"};Stringsentence=sentences[_rand.nextInt(sentences.length)];_collector.emit(newValues(sentence));}publicvoidack(Objectid){}
publicvoidfail(Objectid){}
publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){declarer.declare(newFields("word"));}}open方法(Spout中第一個被調用的方法)中提供一個SpoutOutputcollector用來發(fā)射tuplenextTuple方法中從指定字符串列表中隨機選擇一個字符串,并發(fā)送出去,該方法被周期性調用declareOutputFields定義一個叫做”word”的字段的tupleStorm在檢測到一個Tuple被整個Topology成功處理的時候調用ack,否則調用fail5常用接口示例-Spout接口Spout的Ack開關不啟用Ack:
在nextTuple()方法中使用不帶ack的接口發(fā)送消息: collector.emit(newValues(sentence))啟用Ack:構造一個全局唯一的meaasgeId如:StringmeaasgeId=UUID.randomUUID().toString();在nextTuple()方法中使用帶ack的接口發(fā)送消息:collector.emit(newValues(sentence),messageID);5常用接口示例-Bolt接口5常用接口示例-Bolt接口classSplitSentenceimplementsIRichBolt{privateOutputCollectorcollector;publicvoidprepare(Mapconf,TopologyContextcontext,OutputCollectorcollector){this.collector=collector;}publicvoidexecute(Tupletuple){Stringsentence=tuple.getString(0);for(Stringword:sentence.split("")){collector.emit(newValues(word));}}publicvoidcleanup(){}publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){declarer.declare(newFields("word"));}}prepare方法在bolt開始處理消息之前調用,提供一個Outputcollector用來發(fā)射tupleexecute方法中從bolt的一個輸入接收tuple.并獲取tuple的第一個字段,即字符串內容,以空格拆分,并發(fā)送出去declareOutputFields定義一個tuple,包含有”word”字段cleanup在bolt被銷毀的時候調用5常用接口示例-Bolt接口Bolt的Ack開關不啟用Ack:
在execute(Tupletuple)方法中使用如下的接口發(fā)送消息: collector.emit(newValues(sentence))啟用Ack:方法1:在execute(Tupletuple)方法中使用如下接口發(fā)送消息,錨定上一條tuple: collector.emit(tuple,newValues(sentence));在emit執(zhí)行成功后執(zhí)行collector.ack()方法,或者emit執(zhí)行失敗后執(zhí)行collector.fail()方法,向Acker應答此消息的發(fā)送結果。方法2:Bolt繼承BaseBasicBolt,該父類會自動錨定tuple并且自動應答,客戶端只需要調用collector.emit(newValues(sentence))接口發(fā)送消息就可以了。5常用接口示例-創(chuàng)建拓撲publicstaticvoidmain(String[]args)throwsException{TopologyBuilderbuilder=newTopologyBuilder();builder.setSpout("spout",newRandomSentenceSpout(),5);builder.setBolt("split",newSplitSentence(),8).shuffleGrouping("spout");builder.setBolt("count",newWordCount(),12).fieldsGrouping("split",newFields("word"));Configconf=newConfig();conf.setDebug(true);conf.setNumWorkers(3);conf.setNumAckers(1);StormSubmitter.submitTopologyWithProgressBar(args[0],conf,builder.createTopology());}使用TopologyBuilder容器來定義拓撲。setSpout()方法設置Spout和其并發(fā)度。setBolt()方法設置Bolt,并在setBolt()方法的返回值上設置分組方式。初始化Config對象,并且設置客戶端參數(shù)。storm提供了大量客戶端參數(shù)供用戶設置,比如:TOPOLOGY_WORKERS(setNumWorkers)用來設置拓撲的worker數(shù)量。 TOPOLOGY_ACKER_EXECUTORS(setNumAckers)用來設置Acker的數(shù)量,設置為0表示不開啟Acker。StormSubmitter.submitTopology(topology-name,conf,topology)接口來提交拓撲。目錄Streaming應用場景Streaming應用開發(fā)流程CQL應用開發(fā)流程應用開發(fā)案例分析常用開發(fā)接口示例CQL語法示例6CQL語法示例-創(chuàng)建輸入流CREATEINPUTSTREAMexample(eventIdINT,eventDescSTRING)COMMENT"thisisaexampleofcreateinputstream."SERDESimpleSerDePROPERTIES(separator="|")SOURCETCPClientInputPROPERTIES(server="",port="9999")PARALLEL2;6CQL語法示例-JoinINSERT
INTOSTREAMrsSELECT*FROMS1[RANGE20SECONDSBATCH]JOINS2[RANGEUNBOUNDED]ONs1.id=s2.idWHEREs1.id>5;INSERT
INTOSTREAMrsSELECT*FROMS1[ROWS10SLIDE]LEFT
JOINS2[rangetodayts]ONs1.id=s2.id;6CQL語法示例-窗口語法名稱說明S[ROWSN1BATCH]長度跳動窗窗口內最大保存N1個事件,當有新事件產(chǎn)生的時候,窗口內每攢滿N1個事件,就過期依次。同時過期的所有事件處于同一批次。S[RANGET1SLIDE]時間滑動窗窗口內保存最近T1時間范圍內的數(shù)據(jù),T1是一個時間單位,可以加入Seconds等時間單位。窗口內的事件依次過期。每個過期事件的批次都不同。S[ROWSN1SLIDEPARTITIONBYEXP1]分組長度滑動窗同長度滑動窗,但是加入了分組的概念,事件歸屬于不同的分組,每個分組的長度為N1,逐個過期。S[RANGET1SLIDETRIGGERBYEXP1]事件驅動時間滑動窗窗口內保存最近T1時間單位的數(shù)據(jù),exp1是一個返回值為時間類型的表達式,每次產(chǎn)生數(shù)據(jù)之后,都會和窗口內的數(shù)據(jù)做對比,然后將大于T1時間單位的數(shù)據(jù)吐出,每次只吐出一個數(shù)據(jù)。……6CQL語法示例-窗口--按照type對窗口內數(shù)據(jù)進行分組,每組容量為10SELECT*FROMtransformEvent[ROWS10SLIDEPARTITIONBY
TYPE];--時間排序窗,一般用來解決數(shù)據(jù)亂序問題SELECT*FROMtransformEvent[RANGE1000MILLISECONDSSORTBYdte];--時間驅動滑動窗INSERT
INTOSTREAMrssum(OrderPrice),avg(OrderPrice),count(OrderPrice)
FROMtransformEvent[RANGE10SECONDSSLIDETRIGGERbyTSEXCLUDEnow];--保存周期為一個自然天的分組窗INSERT
INTO
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經(jīng)權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年企業(yè)間共同投資合作合同范本
- 2025年度高科技項目投資擔保合同范本(含知識產(chǎn)權)
- 2025年度年度會議場地租賃合同樣本二零二五年度版
- 二零二四年度新能源充電樁充電設施投資與運營管理合同3篇
- 2025年度知識產(chǎn)權轉讓與股權合作合同集
- 2025年度插畫藝術培訓課程合作合同
- 2025年度多功能儲藏室購置及維護合同4篇
- 2025年度金融衍生品交易合同范本學習指南
- 2025年度新型股權質押融資合同
- 2025年度廣告效果評估與優(yōu)化居間服務合同
- 2025-2030年中國電動高爾夫球車市場運行狀況及未來發(fā)展趨勢分析報告
- 河南省濮陽市2024-2025學年高一上學期1月期末考試語文試題(含答案)
- 長沙市2025屆中考生物押題試卷含解析
- 2024年08月北京中信銀行北京分行社會招考(826)筆試歷年參考題庫附帶答案詳解
- 蘇教版二年級數(shù)學下冊全冊教學設計
- 職業(yè)技術學院教學質量監(jiān)控與評估處2025年教學質量監(jiān)控督導工作計劃
- 金字塔原理與結構化思維考核試題及答案
- 基礎護理學導尿操作
- 標牌加工風險防范方案
- 2015-2024北京中考真題英語匯編:閱讀單選CD篇
- 臨床放射性皮膚損傷的護理
評論
0/150
提交評論