分析大數(shù)據(jù)-學(xué)習(xí)lva1app_第1頁(yè)
分析大數(shù)據(jù)-學(xué)習(xí)lva1app_第2頁(yè)
分析大數(shù)據(jù)-學(xué)習(xí)lva1app_第3頁(yè)
分析大數(shù)據(jù)-學(xué)習(xí)lva1app_第4頁(yè)
分析大數(shù)據(jù)-學(xué)習(xí)lva1app_第5頁(yè)
已閱讀5頁(yè),還剩38頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

1、Apache Flink TrainingDataSet API AdvancedJune 15th, 2015AgendaData types and keysMore transformationsFurther API concepts2Type System and KeysWhat kind of data can Flink handle?3Apache Flinks Type SystemFlink aims to support all data typesEase of programmingSeamless integration with existing codePro

2、grams are analyzed before executionUsed data types are identifiedSerializer & comparator are configured4Apache Flinks Type SystemData types are eitherAtomic types (like Java Primitives)Composite types (like Flink Tuples)Composite types nest other typesNot all data types can be used as keys!Flink gro

3、ups, joins & sorts DataSets on keysKey types must be comparable5Atomic TypesFlink TypeJava TypeCan be used as key?BasicTypeJava Primitives (Integer, String, )YesArrayTypeArrays of Java primitives or objectsNoWritableTypeImplements HadoopsWritable interfaceYes, if implements parableGenericTypeAny oth

4、er typeYes, if implements Comparable6Composite TypesAre composed of fields with other typesFields types can be atomic or compositeFields can be addressed as keysField type must be a key type!A composite type can be a key type All field types must be key types!7TupleTypeJava: org.apache.flink.api.jav

5、a.tuple.Tuple1 to Tuple25Scala: use default Scala tuples (1 to 22 fields)Tuple fields are typedTuple3 t3 = new Tuple3(1, “2”, 3.0);val t3: (Int, String, Double) = (1, ”2”, 3.0)Tuples give the best performance8TupleTypeDefine keys by field positionDataSetTuple3 d = / group on String fieldd.groupBy(1)

6、.groupReduce();Or field names/ group on Double fieldd.groupBy(“f2”).groupReduce();9PojoTypeAny Java class thatHas an empty default constructorHas publicly accessible fields (Public or getter/setter)public class Person public int id; public String name; public Person() ; public Person(int id, String

7、name) ;DataSet p = env.fromElements(new Person(1, ”Bob”);10PojoTypeDefine keys by field nameDataSet p = / group on “name” fieldd.groupBy(“name”).groupReduce();11Scala CaseClassesScala case classes are natively supportedcase class Person(id: Int, name: String)d: DataSetPerson = env.fromElements(new P

8、erson(1, “Bob”)Define keys by field name/ use field “name” as keyd.groupBy(“name”).groupReduce()12Composite & nested keysDataSetTuple3 d = Composite keys are supported/ group on both long fieldsd.groupBy(0, 1).reduceGroup();Nested fields can be used as types/ group on nested “name” fieldd.groupBy(“”

9、).reduceGroup();Full types can be used as key using “*” wildcard/ group on complete nested Pojo fieldd.groupBy(“f1.*”).reduceGroup();“*” wildcard can also be used for atomic types13Join & CoGroup KeysKey types must match for binary operations!DataSetTuple2 d1 = DataSetTuple2 d2 = / worksd1.join(d2).

10、where(0).equalTo(1).with();/ worksd1.join(d2).where(“f0”).equalTo(0).with();/ does not work!d1.join(d2).where(1).equalTo(0).with();14KeySelectorsKeys can be computed using KeySelectorspublic class SumKeySelector implements KeySelectorTuple2, Long public Long getKey(Tuple2 t) return t.f0 + t.f1; Data

11、SetTuple2 d = d.groupBy(new SumKeySelector().reduceGroup();15Advanced Sources and SinksGetting data in and out16Supported File SystemsFlink build-in File Systems:LocalFileSystem (file:/)Hadoop Distributed File System (hdfs:/)Amazon S3 (s3:/)MapR FS (maprfs:/)Support for all Hadoop File SystemsNFS, T

12、achyon, FTP, har (Hadoop Archive), 17Input/Output FormatsFileInputFormat (recursive directory scans supported)DelimitedInputFormatTextInputFormat (Reads text files linewise)CsvInputFormat (Reads field delimited files)BinaryInputFormatAvroInputFormat (Reads Avro POJOs)JDBCInputFormat (Reads result of

13、 SQL query)HadoopInputFormat (Wraps any Hadoop InputFormat)18Hadoop Input/OutputFormatsSupport for all Hadoop I/OFormatsRead from and write toMongoDBApache ParquetApache ORCApache Kafka (for batch)Compressed file formats (.gz, .zip, .)and more19Using InputFormatsExecutionEnvironment env = / read tex

14、t file linewiseenv.readTextFile();/ read CSV fileenv.readCsvFile();/ read file with Hadoop FileInputFormatenv.readHadoopFile();/ use regular Hadoop InputFormatenv.createHadoopInput();/ use regular Flink InputFormatenv.createInput();20Transformations & Functions21TransformationsDataSet Basics present

15、ed:Map, FlatMap, GroupBy, GroupReduce, JoinReduceCoGroupCombineGroupSortAllReduce & AllGroupReduceUnionsee documentation for more transformations22GroupReduce (Hadoop-style)GroupReduceFunction gives iterator over elements of groupElements are streamed (possibly from disk), not materialized in memory

16、Group size can exceed available JVM heapInput type and output type may be different23Reduce (FP-style)Reduce like in functional programmingLess generic compared to GroupReduceFunction must be commutative and associativeInput type = Output typeSystem can apply more optimizationsAlways combinableMay u

17、se a hash strategy for execution (future)24Reduce (FP-style)DataSetTuple2 sum = data .groupBy(0) .reduce(new SumReducer();public static class SumReducer implements ReduceFunctionTuple2 Override public Tuple2 reduce( Tuple2 v1, Tuple2 v2) v1.f1 += v2.f1; return v1; 25CoGroupBinary operation (two inpu

18、ts)Groups both inputs on a keyProcesses groups with matching keys of both inputsSimilar to GroupReduce on two inputs26CoGroupDataSetTuple2 d1 = ;DataSet d2 = ;DataSet d3 = d1.coGroup(d2).where(0).equalTo(1).with(new CoGrouper();public static class CoGrouper implements CoGroupFunctionTuple2,Long,Stri

19、ng Override public void coGroup(IterableTuple2 vs1, Iterable vs2, Collector out) if(!vs2.iterator.hasNext() for(Tuple2 v1 : vs1) out.collect(v1.f1); 27CombinerLocal pre-aggregation of dataBefore data is sent to GroupReduce or CoGroup(functional) Reduce injects combiner automaticallySimilar to Hadoop

20、 CombinerOptional for semantics, crucial for performance!Reduces data before it is sent over the network28Combiner WordCount Example29MapMapMapCombineCombineCombineReduceReduceA B B A C AC A CB A BD A AA B C(A, 1)(B, 1)(B, 1)(A, 1)(C, 1)(A, 1)(C, 1)(A, 1)(C, 1)(B, 1)(A, 1)(B, 1)(D, 1)(A, 1)(A, 1)(A,

21、 1)(B, 1)(C, 1)(A, 3)(C, 1)(A, 2)(C, 2)(A, 3)(C, 1)(B, 2)(B, 2)(B, 1)(D, 1)(A, 8)(C, 4)(B, 5)(D, 1)Use a combinerImplement RichGroupReduceFunctionOverride combine(Iterable in, Collector);Same interface as reduce() methodAnnotate your GroupReduceFunction with binableCombiner will be automatically inj

22、ected into Flink programImplement a bineFunctionSame interface as GroupReduceFunction bineGroup()30GroupSortSort groups before they are handed to GroupReduce or CoGroup functionsMore (resource-)efficient user codeEasier user code implementationComes (almost) for freeAka secondary sort (Hadoop)31Data

23、SetTuple3 data = ;data.groupBy(0) .sortGroup(1, Order.ASCENDING) .groupReduce(new MyReducer();AllReduce / AllGroupReduceReduce / GroupReduce without GroupByOperates on a single group - Full DataSetFull DataSet is sent to one machine Will automatically run with parallelism of 1Careful with large Data

24、Sets!Make sure you have a Combiner32UnionUnion two data setBinary operation, same data type requiredNo duplicate elimination (SQL UNION ALL)Very cheap operation33DataSetTuple2 d1 = ;DataSetTuple2 d2 = ;DataSetTuple2 d3 = d1.union(d2);RichFunctionsFunction interfaces have only one methodSingle abstra

25、ct method (SAM)Support for Java8 Lambda functionsThere is a “Rich” variant for each function.RichFlatMapFunction, Additional methodsopen(Configuration c)close()getRuntimeContext()34RichFunctions & RuntimeContextRuntimeContext has useful methods:getIndexOfThisSubtask ()getNumberOfParallelSubtasks()ge

26、tExecutionConfig() Gives access to:AccumulatorsDistributedCache 35Further API Concepts36Broadcast Variables37mapmapmapExample: Tag words with IDs in text corpusDictionaryText data setbroadcast (small) dictionary to all mappers Broadcast variablesregister any DataSet as a broadcast variableavailable

27、on all parallel instances38/ 1. The DataSet to be broadcasted DataSet toBroadcast = env.fromElements(1, 2, 3); / 2. Broadcast the DataSet map().withBroadcastSet(toBroadcast, broadcastSetName); / 3. inside user defined functiongetRuntimeContext().getBroadcastVariable(broadcastSetName);AccumulatorsLightweight tool to compute stats on data

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論