




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)
文檔簡介
1、Spark2.1.0入門:DStream轉(zhuǎn)換操作林子雨老師2017年3月4日482【版權(quán)聲明】博客內(nèi)容由廈門大學(xué)數(shù)據(jù)庫實(shí)驗(yàn)室擁有版權(quán),未經(jīng)允許,請勿轉(zhuǎn)載! HYPERLINK /blog/spark/ t /blog/1390-2/_blank 返回Spark教程首頁DStream轉(zhuǎn)換操作包括無狀態(tài)轉(zhuǎn)換和有狀態(tài)轉(zhuǎn)換。無狀態(tài)轉(zhuǎn)換:每個批次的處理不依賴于之前批次的數(shù)據(jù)。有狀態(tài)轉(zhuǎn)換:當(dāng)前批次的處理需要使用之前批次的數(shù)據(jù)或者中間結(jié)果。有狀態(tài)轉(zhuǎn)換包括基于滑動窗口的轉(zhuǎn)換和追蹤狀態(tài)變化的轉(zhuǎn)換(updateStateByKey)。DStream無狀態(tài)轉(zhuǎn)換操作下面給出一些無狀態(tài)轉(zhuǎn)換操作的含義:* map(fun
2、c) :對源DStream的每個元素,采用func函數(shù)進(jìn)行轉(zhuǎn)換,得到一個新的DStream;* flatMap(func): 與map相似,但是每個輸入項(xiàng)可用被映射為0個或者多個輸出項(xiàng);* filter(func): 返回一個新的DStream,僅包含源DStream中滿足函數(shù)func的項(xiàng);* repartition(numPartitions): 通過創(chuàng)建更多或者更少的分區(qū)改變DStream的并行程度;* union(otherStream): 返回一個新的DStream,包含源DStream和其他DStream的元素;* count():統(tǒng)計源DStream中每個RDD的元素數(shù)量;* re
3、duce(func):利用函數(shù)func聚集源DStream中每個RDD的元素,返回一個包含單元素RDDs的新DStream;* countByValue():應(yīng)用于元素類型為K的DStream上,返回一個(K,V)鍵值對類型的新DStream,每個鍵的值是在原DStream的每個RDD中的出現(xiàn)次數(shù);* reduceByKey(func, numTasks):當(dāng)在一個由(K,V)鍵值對組成的DStream上執(zhí)行該操作時,返回一個新的由(K,V)鍵值對組成的DStream,每一個key的值均由給定的recuce函數(shù)(func)聚集起來;* join(otherStream, numTasks):當(dāng)
4、應(yīng)用于兩個DStream(一個包含(K,V)鍵值對,一個包含(K,W)鍵值對),返回一個包含(K, (V, W)鍵值對的新DStream;* cogroup(otherStream, numTasks):當(dāng)應(yīng)用于兩個DStream(一個包含(K,V)鍵值對,一個包含(K,W)鍵值對),返回一個包含(K, SeqV, SeqW)的元組;* transform(func):通過對源DStream的每個RDD應(yīng)用RDD-to-RDD函數(shù),創(chuàng)建一個新的DStream。支持在新的DStream中做任何RDD操作。無狀態(tài)轉(zhuǎn)換操作實(shí)例:我們之前“ HYPERLINK /blog/1387-2/ t /blo
5、g/1390-2/_blank 套接字流”部分介紹的詞頻統(tǒng)計,就是采用無狀態(tài)轉(zhuǎn)換,每次統(tǒng)計,都是只統(tǒng)計當(dāng)前批次到達(dá)的單詞的詞頻,和之前批次無關(guān),不會進(jìn)行累計。DStream有狀態(tài)轉(zhuǎn)換操作對于DStream有狀態(tài)轉(zhuǎn)換操作而言,當(dāng)前批次的處理需要使用之前批次的數(shù)據(jù)或者中間結(jié)果。有狀態(tài)轉(zhuǎn)換包括基于滑動窗口的轉(zhuǎn)換和追蹤狀態(tài)變化(updateStateByKey)的轉(zhuǎn)換?;瑒哟翱谵D(zhuǎn)換操作滑動窗口轉(zhuǎn)換操作的計算過程如下圖所示,我們可以事先設(shè)定一個滑動窗口的長度(也就是窗口的持續(xù)時間),并且設(shè)定滑動窗口的時間間隔(每隔多長時間執(zhí)行一次計算),然后,就可以讓窗口按照指定時間間隔在源DStream上滑動,每次窗
6、口停放的位置上,都會有一部分DStream被框入窗口內(nèi),形成一個小段的DStream,這時,就可以啟動對這個小段DStream的計算。圖 滑動窗口的計算過程下面給給出一些窗口轉(zhuǎn)換操作的含義:* window(windowLength, slideInterval) 基于源DStream產(chǎn)生的窗口化的批數(shù)據(jù),計算得到一個新的DStream;* countByWindow(windowLength, slideInterval) 返回流中元素的一個滑動窗口數(shù);* reduceByWindow(func, windowLength, slideInterval) 返回一個單元素流。利用函數(shù)func聚
7、集滑動時間間隔的流的元素創(chuàng)建這個單元素流。函數(shù)func必須滿足結(jié)合律,從而可以支持并行計算;* reduceByKeyAndWindow(func, windowLength, slideInterval, numTasks) 應(yīng)用到一個(K,V)鍵值對組成的DStream上時,會返回一個由(K,V)鍵值對組成的新的DStream。每一個key的值均由給定的reduce函數(shù)(func函數(shù))進(jìn)行聚合計算。注意:在默認(rèn)情況下,這個算子利用了Spark默認(rèn)的并發(fā)任務(wù)數(shù)去分組??梢酝ㄟ^numTasks參數(shù)的設(shè)置來指定不同的任務(wù)數(shù);* reduceByKeyAndWindow(func, invFunc
8、, windowLength, slideInterval, numTasks) 更加高效的reduceByKeyAndWindow,每個窗口的reduce值,是基于先前窗口的reduce值進(jìn)行增量計算得到的;它會對進(jìn)入滑動窗口的新數(shù)據(jù)進(jìn)行reduce操作,并對離開窗口的老數(shù)據(jù)進(jìn)行“逆向reduce”操作。但是,只能用于“可逆reduce函數(shù)”,即那些reduce函數(shù)都有一個對應(yīng)的“逆向reduce函數(shù)”(以InvFunc參數(shù)傳入);* countByValueAndWindow(windowLength, slideInterval, numTasks) 當(dāng)應(yīng)用到一個(K,V)鍵值對組成的D
9、Stream上,返回一個由(K,V)鍵值對組成的新的DStream。每個key的值都是它們在滑動窗口中出現(xiàn)的頻率。窗口轉(zhuǎn)換操作實(shí)例:在上一節(jié)的“ HYPERLINK /blog/1098-2/ t /blog/1390-2/_blank Apache Kafka作為DStream數(shù)據(jù)源”內(nèi)容中,在我們已經(jīng)使用了窗口轉(zhuǎn)換操作,也就是,在KafkaWordCount.scala代碼中,你可以找到下面這一行:val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2)這行代碼中就是一個窗口轉(zhuǎn)換操作re
10、duceByKeyAndWindow,其中,Minutes(2)是滑動窗口長度,Seconds(10)是滑動窗口時間間隔(每隔多長時間滑動一次窗口)。reduceByKeyAndWindow中就使用了加法和減法這兩個reduce函數(shù),加法和減法這兩種reduce函數(shù)都是“可逆的reduce函數(shù)”,也就是說,當(dāng)滑動窗口到達(dá)一個新的位置時,原來之前被窗口框住的部分?jǐn)?shù)據(jù)離開了窗口,又有新的數(shù)據(jù)被窗口框住,但是,這時計算窗口內(nèi)單詞的詞頻時,不需要對當(dāng)前窗口內(nèi)的所有單詞全部重新執(zhí)行統(tǒng)計,而是只要把窗口內(nèi)新增進(jìn)來的元素,增量加入到統(tǒng)計結(jié)果中,把離開窗口的元素從統(tǒng)計結(jié)果中減去,這樣,就大大提高了統(tǒng)計的效率。
11、尤其對于窗口長度較大時,這種“逆函數(shù)”帶來的效率的提高是很明顯的。updateStateByKey操作當(dāng)我們需要在跨批次之間維護(hù)狀態(tài)時,就必須使用updateStateByKey操作。下面我們就給出一個具體實(shí)例。我們還是以前面在“ HYPERLINK /blog/1083-2/ t /blog/1390-2/_blank 套接字流”部分講過的NetworkWordCount為例子來介紹,在之前的套接字流的介紹中,我們統(tǒng)計單詞詞頻采用的是無狀態(tài)轉(zhuǎn)換操作,也就是說,每個批次的單詞發(fā)送給NetworkWordCount程序處理時,NetworkWordCount只對本批次內(nèi)的單詞進(jìn)行詞頻統(tǒng)計,不會考
12、慮之前到達(dá)的批次的單詞,所以,不同批次的單詞詞頻都是獨(dú)立統(tǒng)計的。對于有狀態(tài)轉(zhuǎn)換操作而言,本批次的詞頻統(tǒng)計,會在之前批次的詞頻統(tǒng)計結(jié)果的基礎(chǔ)上進(jìn)行不斷累加,所以,最終統(tǒng)計得到的詞頻,是所有批次的單詞的總的詞頻統(tǒng)計結(jié)果。下面,我們來改造一下在套接字流介紹過的NetworkWordCount程序。請登錄Linux系統(tǒng),打開一個終端,然后,執(zhí)行下面命令:cd /usr/local/spark/mycode/streaming /這個streaming目錄是之前已經(jīng)創(chuàng)建好的mkdir statefulcd statefulmkdir -p src/main/scalacd src/main/scalav
13、im NetworkWordCountStateful.scalaShell 命令 HYPERLINK /blog/1390-2/javascript:void(0); o 復(fù)制代碼 HYPERLINK /blog/1390-2/javascript:void(0); o 查看純文本代碼 上面使用vim編輯器新建了一個NetworkWordCountStateful.scala代碼文件,請?jiān)诶锩孑斎胍韵麓a:package org.apache.spark.examples.streamingimport org.apache.spark._import org.apache.spark.st
14、reaming._import org.apache.spark.storage.StorageLevelobject NetworkWordCountStateful def main(args: ArrayString) /定義狀態(tài)更新函數(shù) val updateFunc = (values: SeqInt, state: OptionInt) = val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) Stre
15、amingExamples.setStreamingLogLevels() /設(shè)置log4j日志級別 val conf = new SparkConf().setMaster(local2).setAppName(NetworkWordCountStateful) val sc = new StreamingContext(conf, Seconds(5) sc.checkpoint(file:/usr/local/spark/mycode/streaming/stateful/) /設(shè)置檢查點(diǎn),檢查點(diǎn)具有容錯機(jī)制 val lines = sc.socketTextStream(localho
16、st, 9999) val words = lines.flatMap(_.split( ) val wordDstream = words.map(x = (x, 1) val stateDstream = wordDstream.updateStateByKeyInt(updateFunc) stateDstream.print() sc.start() sc.awaitTermination() 保存該文件退出vim編輯器。這里要對這段代碼中新增的updataStateByKey稍微解釋一下。Spark Streaming的updateStateByKey可以把DStream中的數(shù)據(jù)按k
17、ey做reduce操作,然后對各個批次的數(shù)據(jù)進(jìn)行累加。注意,wordDstream.updateStateByKey HYPERLINK /blog/1390-2/updateFunc t /blog/1390-2/_blank Int每次傳遞給updateFunc函數(shù)兩個參數(shù),其中,第一個參數(shù)是某個key(即某個單詞)的當(dāng)前批次的一系列值的列表(SeqInt形式),updateFunc函數(shù)中 val currentCount = values.foldLeft(0)(_ + _)的作用(請參考之前章節(jié)“ HYPERLINK /blog/1122-2/ t /blog/1390-2/_blan
18、k fold操作”的介紹),就是計算這個被傳遞進(jìn)來的與某個key對應(yīng)的當(dāng)前批次的所有值的總和,也就是當(dāng)前批次某個單詞的出現(xiàn)次數(shù),保存在變量currentCount中。傳遞給updateFunc函數(shù)的第二個參數(shù)是某個key的歷史狀態(tài)信息,也就是某個單詞歷史批次的詞頻匯總結(jié)果。實(shí)際上,某個單詞的歷史詞頻應(yīng)該是一個Int類型,這里為什么要采用OptionInt呢?OptionInt是類型 Int的容器(請參考之前章節(jié)“ HYPERLINK /blog/956-2/ t /blog/1390-2/_blank 模式匹配”了解Option類的使用方法),更確切地說,你可以把它看作是某種集合,這個特殊的集
19、合要么只包含一個元素(即單詞的歷史詞頻),要么就什么元素都沒有(這個單詞歷史上沒有出現(xiàn)過,所以沒有歷史詞頻信息)。之所以采用 OptionInt保存歷史詞頻信息,這是因?yàn)椋瑲v史詞頻可能不存在,很多時候,在值不存在時,需要進(jìn)行回退,或者提供一個默認(rèn)值,Scala 為Option類型提供了getOrElse方法,以應(yīng)對這種情況。 state.getOrElse(0)的含義是,如果該單詞沒有歷史詞頻統(tǒng)計匯總結(jié)果,那么,就取值為0,如果有歷史詞頻統(tǒng)計結(jié)果,就取歷史結(jié)果,然后賦值給變量previousCount。最后,當(dāng)前值和歷史值進(jìn)行求和,并包裝在Some中返回。然后,再次使用vim編輯器新建一個St
20、reamingExamples.scala文件,用于設(shè)置log4j日志級別,代碼如下:package org.apache.spark.examples.streamingimport org.apache.spark.Loggingimport org.apache.log4j.Level, Logger/* Utility functions for Spark Streaming examples. */object StreamingExamples extends Logging /* Set reasonable logging levels for streaming if th
21、e user has not configured log4j. */ def setStreamingLogLevels() val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) / We first log something to initialize Sparks default logging, then we override the / logging level. logInfo(Setting log level to WARN fo
22、r streaming example. + To override add a custom perties to the classpath.) Logger.getRootLogger.setLevel(Level.WARN) 退出vim編輯器。下面要對代碼進(jìn)行sbt打包編譯。這里需要一個simple.sbt文件,使用vim編輯器創(chuàng)建一個:cd /usr/local/spark/mycode/streaming/statefulvim simple.sbtShell 命令 HYPERLINK /blog/1390-2/javascript:void(0); o 復(fù)制代碼 HYPERLIN
23、K /blog/1390-2/javascript:void(0); o 查看純文本代碼 在simple.sbt中輸入以下內(nèi)容:name := Simple Projectversion := 1.0scalaVersion := 2.11.8libraryDependencies += org.apache.spark % spark-streaming % 2.1.0創(chuàng)建好simple.sbt文件后,退出vim編輯器。然后,執(zhí)行下面命令:cd /usr/local/spark/mycode/streaming/statefulfind .Shell 命令 HYPERLINK /blog/1
24、390-2/javascript:void(0); o 復(fù)制代碼 HYPERLINK /blog/1390-2/javascript:void(0); o 查看純文本代碼 屏幕上返回的信息,應(yīng)該是類似下面的文件結(jié)構(gòu):./src./src/main./src/main/scala./src/main/scala/NetworkWordCountStateful.scala./src/main/scala/StreamingExamples.scala./simple.sbt然后,就可以執(zhí)行sbt打包編譯了,命令如下:cd /usr/local/spark/mycode/streaming/sta
25、teful/usr/local/sbt/sbt packageShell 命令 HYPERLINK /blog/1390-2/javascript:void(0); o 復(fù)制代碼 HYPERLINK /blog/1390-2/javascript:void(0); o 查看純文本代碼 (備注:根據(jù)筆者實(shí)際測試,在NetworkWordCountStateful.scala代碼中使用 StreamingExamples.setStreamingLogLevels() ,還不能保證屏幕上輸出我們預(yù)期的結(jié)果,同時還需要把/usr/local/spark/conf/perties設(shè)置為log4j.rootCategory=WARN, console,這樣才可以讓屏幕上顯示我們預(yù)期的結(jié)果)下面可以再設(shè)置一下log4j格式,請?jiān)诮K端內(nèi)輸入如下命令:cd /usr/local/spark/confvim perties #如果不存在,就從perties.template拷貝一份得到pertiesShell 命令 HYPERLINK /blog/1390-2/javascript:void(0); o 復(fù)制代碼 HYPERLINK /blog/1390-2/
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年度寵物照料保姆雇傭合同協(xié)議書
- 商鋪轉(zhuǎn)讓服務(wù)合同
- 2025年度撫養(yǎng)權(quán)變更與財產(chǎn)分割調(diào)解合同模板
- 2025年度個人挖機(jī)租賃與施工驗(yàn)收服務(wù)合同
- 2025年度房東轉(zhuǎn)租合同-科技園區(qū)房產(chǎn)租賃
- 2025年度醫(yī)院醫(yī)護(hù)人員崗位調(diào)整與勞動合同
- 2025年度互聯(lián)網(wǎng)企業(yè)期權(quán)投資合作協(xié)議
- 2025年度影視作品宣傳策劃代理合同
- 二零二五年度數(shù)字經(jīng)濟(jì)領(lǐng)域聘用業(yè)務(wù)經(jīng)理專屬合同
- 2025年度原油出口退稅及關(guān)稅優(yōu)惠合同
- 藥品經(jīng)營質(zhì)量管理制度樣本
- 有機(jī)農(nóng)業(yè)概述課件
- 學(xué)校托幼機(jī)構(gòu)腸道傳染病消毒隔離及防控要求
- 生產(chǎn)加工型小微企業(yè)安全管理考試(含答案)
- A類《職業(yè)能力傾向測驗(yàn)》貴州省畢節(jié)地區(qū)2024年事業(yè)單位考試考前沖刺試卷含解析
- 沙子檢測報告
- 2023-2024學(xué)年部編版必修下冊 1-1 《子路、曾皙、冉有、公西華侍坐》教案2
- 無線電測向幻燈教材課件
- 第1課《我們的閑暇時光》課件
- 商務(wù)ktv項(xiàng)目計劃書
- 腦血管造影術(shù)護(hù)理查房課件
評論
0/150
提交評論