2023學(xué)年完整公開課版Flink實時APITransformations_第1頁
2023學(xué)年完整公開課版Flink實時APITransformations_第2頁
2023學(xué)年完整公開課版Flink實時APITransformations_第3頁
2023學(xué)年完整公開課版Flink實時APITransformations_第4頁
2023學(xué)年完整公開課版Flink實時APITransformations_第5頁
已閱讀5頁,還剩17頁未讀 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

Flink實時API-TransformationsDataStream——TransformationsDataStreamAPISourceTransformationsSinkDataStream——TransformationsDataStreamAPISourceTransformationsSink定義了一系列算子,這些算子能夠?qū)⒁粋€或多個DataStream轉(zhuǎn)換為新的DataStreamDataStreamAPI–map,flatmap,filter常用的算子1)map:輸入一個元素,然后返回一個元素,中間可以做一些清洗轉(zhuǎn)換等操作valres=stream.map{x=>(x,1)}2)flatmap:輸入一個元素,可以返回零個,一個或者多個元素valres=stream.flatMap{x=>x.split("")}3)filter:過濾函數(shù),對傳入的數(shù)據(jù)進行判斷,符合條件的數(shù)據(jù)會被留下valres=stream.filter{x=>x%2==0}//1獲取flink執(zhí)行環(huán)境valenv=StreamExecutionEnvironment.getExecutionEnvironment//2創(chuàng)建初始的數(shù)據(jù)流vallineStrDS=env.socketTextStream(hostname="localhost",port=8989)//3詞頻統(tǒng)計vald1=lineStrDS.flatMap(line=>line.split("")).map(word=>(word,1))valwordCount=d1.keyBy(_._1).sum(position=1)wordCount.print()//5觸發(fā)真正執(zhí)行env.execute()DataStreamAPI–map,flatmap,filterkeyby算子邏輯地將一個流拆分成不相交的分區(qū),每個分區(qū)包含具有相同key的元素。需要的注意的是

keyby算子輸入必須是Tuple類型。dataStream.keyBy(0)

//指定Tuple中的第一個元素作為分組keyDataStreamAPI–KeyBy

valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalds=env.fromElements(("Grace","chinese",10),("Grace","math",20),("Grace","English",30),("Alice","chinese",10),("Alice","math",20),("Alice","English",30))ds.keyBy(0).print()env.execute()DataStreamAPI–KeyBy

valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalds=env.fromElements(("Grace","chinese",10),("Grace","math",20),("Grace","English",30),("Alice","chinese",10),("Alice","math",20),("Alice","English",30))ds.keyBy(0).print()env.execute()DataStreamAPI–KeyBy6>(Alice,chinese,10)5>(Grace,chinese,10)6>(Alice,math,20)5>(Grace,math,20)6>(Alice,English,30)5>(Grace,English,30)

valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalds=env.fromElements(("Grace","chinese",10),("Grace","math",20),("Grace","English",30),("Alice","chinese",10),("Alice","math",20),("Alice","English",30))ds.keyBy(2).print()env.execute()DataStreamAPI–KeyBy

valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalds=env.fromElements(("Grace","chinese",10),("Grace","math",20),("Grace","English",30),("Alice","chinese",10),("Alice","math",20),("Alice","English",30))ds.keyBy(2).print()env.execute()DataStreamAPI–KeyBy2>(Grace,English,30)5>(Grace,chinese,10)5>(Grace,math,20)5>(Alice,chinese,10)5>(Alice,math,20)2>(Alice,English,30)DataStream→DataStream:對兩個或者兩個以上的DataStream進行union操作,產(chǎn)生一個新DataStream,這新的DataStream包含所有流中的數(shù)據(jù)。注意——Union是一個限制,就是所有合并的流類型必須是一致的Steam1Steam2SteamUnionDataStreamAPI–Union

valenv=StreamExecutionEnvironment.getExecutionEnvironmentvards1=env.fromElements(1,2)vards2=env.fromElements(2,4)vards3=env.fromElements(1,5)

vards4=ds1.union(ds2,ds3)ds4.print()env.execute()DataStreamAPI–Union

valenv=StreamExecutionEnvironment.getExecutionEnvironmentvards1=env.fromElements(1,2)vards2=env.fromElements(2,4)vards3=env.fromElements(1,5)vards4=ds1.union(ds2,ds3)ds4.print()env.execute()DataStreamAPI–Union5>42>14>27>18>53>2DataStreamAPI–ConnectConnect算子:和union類似,但是只能連接兩個流,兩個流的數(shù)據(jù)類型可以不同。兩個數(shù)據(jù)流被Connect之后,只是被放在了一個同一個流中,內(nèi)部依然保持各自的數(shù)據(jù)和形式不發(fā)生任何變化,兩個流相互獨立。Steam1Steam2Steam1ConnectSteam2DataStreamAPI–ConnectConnect與

Union

區(qū)別:——Union之前兩個流的類型必須是一樣,Connect可以不一樣,在之后的Map中再去調(diào)整成為一樣的。——Connect只能操作兩個流,Union可以操作多個。

valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalds1=env.fromElements(1,2)valds2=env.fromElements("Hello","Spark")valds3=ds1.connect(ds2)//第一個函數(shù)是映射第一個數(shù)據(jù)流,第二個函數(shù)映射第二個ds2//處理后的數(shù)據(jù)類型保持一致

valres=ds3.map((num1:Int)=>{num1.toString},(num2:String)=>{num2})res.print()env.execute()DataStreamAPI–Connect

valenv=StreamExecutionEnvironment.getExecutionEnvironmentvalds1=env.fromElements(1,2)valds2=env.fromElements("Hello","Spark")valds3=ds1.connect(ds2)//第一個函數(shù)是映射第一個數(shù)據(jù)流,第二個函數(shù)映射第二個ds2//處理后的數(shù)據(jù)類型保持一致

valres=ds3.map((num1:Int)=>{num1.toString},(num2:String)=>{num2})res.print()env.execute()DataStreamAPI–Connect4>Hello3>25>Spark2>1KeyedStream→DataStreamReduce算子,基于keyBy算子,是一個分組數(shù)據(jù)流的聚合操作,合并當(dāng)前的元素和上次聚合的結(jié)果,產(chǎn)生一個新的值,返回的流中包含每一次聚合的結(jié)果,而不是只返回最后一次聚合的最終結(jié)果。DataStream——

Reduce算子DataStream——

Reduce算子

//1獲取flink執(zhí)行環(huán)境valenv=StreamExecutionEnvironment.getExecutionEnvironment//2創(chuàng)建初始的數(shù)據(jù)流valds=env.fromElements(("hello",1),("hello",1),("hello",1))//3Reduce算子

valds2=ds.keyBy(0)valds3=ds2.reduce((a,b)=>(a._1,a._2+b._2))//4打印輸出到控制臺ds3.print()//5觸發(fā)真正執(zhí)行env.execute()DataStream——

Reduce算子

//1獲取flink執(zhí)行環(huán)境valenv=StreamExecutionEnvironment.getExecutionEnvironment//2創(chuàng)建初始的數(shù)據(jù)流valds=env.fromElements(("hello",1),("hello",1),("hello",1))//3Reduce算子

valds2=ds.keyBy(0)valds3=ds2.reduce((a,b)=>(a._1,a._2+b._2))//4打印輸出到控制臺ds3.print()//5觸發(fā)真正執(zhí)行env.execute()3>(hello,1)3>(hello,2)3>(hello,3)綜合小案例

vallineStrDS=e

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論