尚大數(shù)據(jù)技術(shù)之尚SparkStreaming_第1頁(yè)
尚大數(shù)據(jù)技術(shù)之尚SparkStreaming_第2頁(yè)
尚大數(shù)據(jù)技術(shù)之尚SparkStreaming_第3頁(yè)
尚大數(shù)據(jù)技術(shù)之尚SparkStreaming_第4頁(yè)
尚大數(shù)據(jù)技術(shù)之尚SparkStreaming_第5頁(yè)
已閱讀5頁(yè),還剩39頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

1SparkStreamingSparkStreamingSparkStreamingSparkCore=>=>SparkStreaming使用離散化流(DiscretizedStream)作為抽象表示,叫作DStream。在DStream內(nèi)部,每個(gè)時(shí)間區(qū)間收到的數(shù)據(jù)都作為RDD存在,而DStream是由這些RDD所組成的序列(時(shí)刻時(shí)刻時(shí)刻時(shí)刻時(shí)刻 oo……封裝成封裝成封裝成SparkStreamingSpark1.5Receiver的數(shù)據(jù)接收速率,可以通過(guò)設(shè)置靜態(tài)配制參的處理能力,防止內(nèi)存溢出,但也會(huì)引入其它問(wèn)題。比如:producermaxRate,當(dāng)maxRate,這就會(huì)造成資源利用率下降等問(wèn)題。為了更好的協(xié)調(diào)數(shù)據(jù)接收速率與資源處理能力,1.5SparkStreaming可以動(dòng)態(tài)控制數(shù)據(jù)接收速率來(lái)適配集群數(shù)據(jù)處理能力。背壓機(jī)制(SparkStreamingBackpressure):根據(jù)JobSchedulerReceiver數(shù)據(jù)接收率。SparkStreamingSpark2章DStream需求:使用netcat工具向9999端口不斷的發(fā)送數(shù)據(jù),通過(guò)SparkStreaming端口數(shù)據(jù)并packagepackageimportimportorg.apache.spark.streaming.{Seconds,objectobjectSparkStreaming01_WordCountdefmain(args:Array[String]):Unit=//1.初始化Spark =//2.初始化valssc=newStreamingContext(sparkConf, 端口創(chuàng)建DStream,讀進(jìn)來(lái)的數(shù)據(jù)為一行vallineDStream=ssc.socketTextStream("hadoop102",valwordDStream=lineDStream.flatMap(_.split("http://3.2將單詞映射成元組valwordToOneDStream=wordDStream.map((_,valwordToSumDStream=//3.4//4啟動(dòng)SparkStreamingContext//將主線程阻塞,主 }}pertiesresources里面,就能更改打印日志的級(jí)別為log4j.rootLogger=error,stdout,Rlog4j.rootLogger=error,stdout,Rlog4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-ddHH:mm:ss,SSS}%5p---[%50t]%-80c(line:%5L):log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-ddHH:mm:ss,SSS} %5p---[%50t]%-80c(line:%6L):%m%n[atguigu@hadoop102[atguigu@hadoop102~]$nc-lk9999osparkTime:1602731772000(DStreamSparkStreamingSpark算子操作后RDDRDDDStream。對(duì)RDDSpark引擎來(lái)計(jì)算。程序啟動(dòng):時(shí)刻時(shí)刻程序啟動(dòng):時(shí)刻時(shí)刻時(shí)刻時(shí)刻時(shí)刻 oo……ooo … … …… 3DStreamRDDRDDRDD放入隊(duì)列。通過(guò)SparkStreamingDStream1)packagepackageimportorg.apache.spark.SparkConfimportorg.apache.spark.rdd.RDDimportorg.apache.spark.streaming.{Seconds,StreamingContext}importscala.collection.mutableobjectSparkStreaming02_RDDStreamdefmain(args:Array[String]):Unit=//1.初始化Spark =//2.初始化valssc=newStreamingContext(conf,//3.創(chuàng)建RDDvalrddQueue=new//4.創(chuàng)建//oneAtATime=true默認(rèn),一 //oneAtATime=false,按照設(shè)定的時(shí)間 valinputDStream=ssc.queueStream(rddQueue,oneAtATime=//5.處理隊(duì)列中的RDDvalsumDStream=//8.循環(huán)創(chuàng)建并向RDD隊(duì)列中放入RDDfor(i<-1to5){rddQueue+=ssc.sparkContext.makeRDD(1to5)}}}Time:1603347444000Time:1603347448000Time:1603347452000packagepackageimportjava.io.{BufferedReader,InputStreamReader}import.Socketimportimportimportorg.apache.spark.storage.StorageLevelimportorg.apache.spark.streaming.receiver.Receiverimportorg.apache.spark.streaming.{Seconds,StreamingContext}objectSparkStreaming03_CustomerReceiver{defmain(args:Array[String]):Unit=//1.初始化Spark =//2.初始化valssc=newStreamingContext(sparkConf,//3.創(chuàng)建自定義receiver的vallineDStream=ssc.receiverStream(newvalwordDStream=lineDStream.flatMap(_.split("valwordToOneDStream=wordDStream.map((_,valwordToSumDStream=wordToOneDStream.reduceByKey(_+//8.啟動(dòng)SparkStreamingContext}}@paramhost@paramportStorageLevel.MEMORY_ONLY:返回 最初啟動(dòng)的時(shí)候,調(diào)用該方法,作用為:讀數(shù)據(jù)并將數(shù)據(jù)發(fā)送給SparkoverridedefonStart():Unit={newThread("SocketReceiver"){overridedefrun(){}}讀數(shù)據(jù)并將數(shù)據(jù)發(fā)送給Sparkdefreceive():Unit={創(chuàng)建一個(gè)varsocket:Socket=newSocket(host,//創(chuàng)建一個(gè)BufferedReader用 valreader=newBufferedReader(newInputStreamReader(socket.getInputStream,varinput:String=//當(dāng)receiver沒有關(guān)閉并且輸入數(shù)據(jù)不為空,則循環(huán)發(fā)送數(shù)據(jù)給Sparkwhile(!isStopped()&&input!=null){input=}如果循環(huán)結(jié)束,則關(guān)閉資源}overridedefonStop():Unit=}[atguigu@hadoop102[atguigu@hadoop102~]$nc-lk9999osparkKafka數(shù)據(jù)源(面試、開發(fā)重點(diǎn)Rci需要一個(gè)專門的Eecorecr做計(jì)算。EcutEcutcutEcutDirectAPI:是由計(jì)算的Executor來(lái)主動(dòng)消費(fèi)KafkaKafkaspark3.0.0Direct總結(jié):不同版本的offset位0-8ReceiverAPIoffset默認(rèn)在:Zookeeper0-8DirectAPIoffset默認(rèn)在手動(dòng):MySQL等有事務(wù)的系0-10DirectAPIoffset默認(rèn)在:_consumer_offsets系統(tǒng)手動(dòng):MySQL等有事務(wù)的系Kafka0-10Direct需求:通過(guò)SparkStreaming從Kafka數(shù)據(jù),并將過(guò)來(lái)的數(shù)據(jù)做簡(jiǎn)單計(jì)算,最終打印packagepackageimportorg.apache.kafka. s.consumer.{ConsumerConfig,ConsumerRecord}importor importimportorg.apache.spark.streaming.dstream.{DStream, importorg.apache.spark.streaming.{Seconds,StreamingContext}objectSparkStreaming04_DirectAuto{defmain(args:Array[String]):Unit=//1.創(chuàng)建 //2.創(chuàng)建valssc=newStreamingContext(sparkConf,//3.定義Kafka參數(shù):kafka集群地址、消費(fèi)者組名稱、key序列化、valuevalkafkaPara:Map[String,Object]=Map[String, ConsumerConfig.GROUP_ID_CONFIG->"atguiguGroup", ) Kafka數(shù)據(jù)創(chuàng)建 KafkaUtils.createDirectStream[String,String](LocationStrategies.PreferConsistent,//優(yōu)先位置 )//5.將每條消息的KV //6.計(jì)算WordCountvalueDStream.flatMap(_.split.map((_,.reduceByKey(_+}}Zookeeper和Kafka[atguigu@hadoop102[atguigu@hadoop102~]$zk.shstart[atguigu@hadoop102~]$kf.sh創(chuàng)建一個(gè)Kafka的TopictestTopic,兩個(gè)分[atguigu@hadoop102[atguigu@hadoop102kafka]$bin/kafka-topics.sh--zookeeper--create--replication-factor1--partitions2--topic查看Topic[atguigu@hadoop102[atguigu@hadoop102kafka]$bin/kafka-topics.sh--zookeeperhadoop102:2181/kafka-查看Topic[atguigu@hadoop102[atguigu@hadoop102kafka]$bin/kafka-topics.sh--zookeeperhadoop102:2181/kafka--describe--topic創(chuàng)建Kafka[atguigu@hadoop102[atguigu@hadoop102kafka]$bin/kafka-console-producer.sh--broker-listhadoop102:9092--topic創(chuàng)建Kafka[atguigu@hadoop102[atguigu@hadoop102kafka]$bin/kafka-console-consumer.sh--bootstrap-serverhadoop102:9092--from-beginning--topic hadoop102:9092--describe--groupatguiguGroupPARTITIONCURRENT-OFFSETLOG-END- 4DStreamDStreamRDD的類似,分為轉(zhuǎn)換和輸出兩種,此外轉(zhuǎn)換操作中還有一些比較特殊的原語(yǔ),如:updateStateByKey()、transform()Window相關(guān)的原語(yǔ)。RDD轉(zhuǎn)化操作應(yīng)用到DStream每個(gè)批次上,每個(gè)批次相互獨(dú)立,DStreamDStream自己的APIScala對(duì)DStream中的每個(gè)元素應(yīng)用給組成的DStreamScala對(duì)DStream中的每個(gè)元素應(yīng)用給組成的DStreamds.map(x=>x+f:(T)->對(duì)DStream中的每個(gè)元素應(yīng)用給器組成的DStreamds.flatMap(x=>x.split("f:T->IDStream的元素組成的ds.filter(x=>x!=f:T->DStreamN/ds.reduceByKey((x,y)=>x+f:T,T->N/DStream在內(nèi)RDDRDD批次上的。需求:通過(guò)TransformDStreamRDDpackagepackageimportorg.apache.spark.SparkConfimportorg.apache.spark.rdd.RDDimportorg.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}importorg.apache.spark.streaming.{Seconds,StreamingContext}objectSparkStreaming05_Transformdefmain(args:Array[String]):Unit=//1創(chuàng)建 //2創(chuàng)建valssc=newStreamingContext(sparkConf,//3創(chuàng)建 ssc.socketTextStream("hadoop102",9999)在Driver :"+//4轉(zhuǎn)換為RDDvalwordToSumDStream:DStream[(String,Int)]=rdd=>//在Driver端執(zhí)行(ctrl+nJobGenerator),一個(gè)批次一次println("222222Thread.currentThread().getName)valwords:RDD[String]=rdd.flatMap(_.split(""))valwordToOne:RDD[(String,Int)]=在Executorprintln("333333:"+(x,valvalue:RDD[(String,Int)]=wordToOne.reduceByKey(_+_)})//5//6}}[atguigu@hadoop102[atguigu@hadoop102~]$nc-lk9999osparkupdateStateByKey()DStream,可以記錄歷史批次狀態(tài)。例如可以實(shí)現(xiàn)累WordCount。DStream。 checkpointpackageimportimportpackageimportimportorg.apache.spark.streaming.dstream.ReceiverInputDStreamimportorg.apache.spark.streaming.{Seconds,StreamingContext}objectsparkStreaming06_updateStateByKey定義更新狀態(tài)方法,參數(shù)seq為當(dāng)前批次單詞次數(shù),statevalupdateFunc=(seq:Seq[Int],state:Option[Int])=>valcurrentCount=valpreviousCount=Some(currentCount+}defcreateSCC():StreamingContext=//1創(chuàng)建=//2//2創(chuàng)建valssc=newStreamingContext(conf,Seconds(3))vallines=ssc.socketTextStream("hadoop102",//4valwords=lines.flatMap(_.split("valwordToOne=words.map(word=>(word,//6使用updateStateByKeyvalstateDstream=wordToOne.updateStateByKey[Int](updateFunc)}defmain(args:Array[String]):Unit==//7開啟任務(wù)}}[atguigu@hadoop102[atguigu@hadoop102~]$nc-lkTime:1603441344000(Time:1603441347000(程序啟動(dòng):時(shí)刻 時(shí)刻 時(shí)刻 時(shí)刻 時(shí)刻

o

o

… … (

……WindowOperationsStreaming的如下圖所示W(wǎng)ordCount2時(shí)刻時(shí)刻時(shí)刻時(shí)刻時(shí)刻 oo…… …基本語(yǔ)法:window(windowLength,slideInterval):基于對(duì)源DStream窗口的批次進(jìn)行計(jì)算返回一個(gè)新的DStream。WordCount:3126packagepackageimportimportimportorg.apache.spark.streaming.{Seconds,StreamingContext}objectSparkStreaming07_window{defmain(args:Array[String]):Unit=//1初始化 valssc=newStreamingContext(conf,Seconds(3))//2通 端口創(chuàng)建DStream,讀進(jìn)來(lái)的數(shù)據(jù)為一行vallines=ssc.socketTextStream("hadoop102",3切割=valwordToOneDStream=lines.flatMap(_.split(".map((_,4 wordToOneDStream.window(Seconds(12),Seconds(6))5 //6啟動(dòng)=》阻塞}}[atguigu@hadoop102[atguigu@hadoop102~]$nc-lk9999reduceByKeyAndWindow(funcwindowLength,slideInterval,[numTasks]):當(dāng)在一個(gè)(K,V)對(duì)的reducekeyvalue值。WordCount:3126packagepackageimportimportorg.apache.spark.streaming.{Seconds,StreamingContext}objectSparkStreaming08_reduceByKeyAndWindow{defmain(args:Array[String]):Unit=//1初始化 valssc=newStreamingContext(conf,Seconds(3))//2通 端口創(chuàng)建DStream,讀進(jìn)來(lái)的數(shù)據(jù)為一行vallines=ssc.socketTextStream("hadoop102",3切割=valwordToOne=lines.flatMap(_.split(".map((_,valwordCounts=wordToOne.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(12),Seconds(6))5//6啟動(dòng)=》阻塞}}[atguigu@hadoop102[atguigu@hadoop102~]$nc-lk9999oatguigureduceByKeyAndWindow(funcinvFuncwindowLength,slideIntervalnumTasks]):這個(gè)函數(shù)是形式傳入)。如前述函數(shù),reduce任務(wù)的數(shù)量通過(guò)可選參數(shù)來(lái)配置。WordCount:3126packagepackageimportorg.apache.spark.{HashPartitioner,importorg.apache.spark.streaming.{Seconds,StreamingContext}objectSparkStreaming09_reduceByKeyAndWindow_reduce{defmain(args:Array[String]):Unit=//1初始化 valvalssc=newStreamingContext(conf,//2通 端口創(chuàng)建DStream,讀進(jìn)來(lái)的數(shù)據(jù)為一行vallines=ssc.socketTextStream("hadoop102",3切割=valwordToOne=lines.flatMap(_.split(".map((_, (a:Int,b:Int)=>(a+(x:Int,y:Int)=>(x-y), (a:Int,b:Int)=>(a+(x:Int,y:Int)=>(x-y),newHashPartitioner(2),(x:(String,Int))=>x._2>0)5//6啟動(dòng)=》阻塞}}WindowreduceByWindow(funcwindowLength,slideInterval):通過(guò)使用自定義函數(shù)整合滑動(dòng)區(qū)5DStreamDStreamRDDDStream及其派生出的DStream都沒有被執(zhí)Context就都不會(huì)啟動(dòng)。1)輸出操作APIsaveAsTextFiles(prefix,[suffix]):以text文件形式這個(gè)DStream的內(nèi)容。每一批次的存prefixsuffix?!皃refix-Time_IN_MS[.suffix]”。saveAsObjectFiles(prefix,suffix])Java對(duì)象序列化的方式將DStreamsaveAsHadoopFiles(prefix,suffix]):將StreamHadoopfiles。每一批次的存print():在運(yùn)行流程序的驅(qū)動(dòng)結(jié)點(diǎn)上打印DStream10個(gè)元素。這foreachRDD(func)funcDStream的每一個(gè)RDD。其中參數(shù)傳入的函數(shù)funcRDD中數(shù)據(jù)推送到外部系統(tǒng),如將RDD存入文件或者寫入數(shù)據(jù)庫(kù)。packageimportimportorg.apache.spark.streaming.{Seconds,StreamingContext}objectSparkStreaming10_output{defmain(args:Array[String]):Unit=//1初始化 packageimportimportorg.apache.spark.streaming.{Seconds,StreamingContext}objectSparkStreaming10_output{defmain(args:Array[String]):Unit=//1初始化 valssc=newStreamingContext(conf,Seconds(3))//2通 端口創(chuàng)建DStream,讀進(jìn)來(lái)的數(shù)據(jù)為一行vallineDStream=ssc.socketTextStream("hadoop102",3切割=valvalwordToOneDStream=lineDStream.flatMap(_.split(".map((_,4在Driver端執(zhí)行(ctrl+nJobScheduler)在JobScheduler中查找(ctrlf)streaming-job-executorprintln("222222:"+Thread.currentThread().getName))})//5啟動(dòng)=》阻塞}}連接不能寫在Driver層面(序列化增加foreachPartition,在分區(qū)創(chuàng)建(獲取)6流式任務(wù)需要7*24小時(shí)執(zhí)行,但是有時(shí)涉及到升級(jí)代碼需要主動(dòng)停止程序,但是分布式程序,沒辦法做到一個(gè)個(gè)進(jìn)程去殺死,所以配置優(yōu)雅的關(guān)閉就顯得至關(guān)重要了。關(guān)閉方式:使用外部文件系統(tǒng)來(lái)控制內(nèi)部程序關(guān)閉。packagepackageimportimportorg.apache.hadoop.conf.Configurationimportorg.apache.hadoop.fs.{FileSystem,Path}importorg.apache.spark.SparkConfimport objectSparkStreaming11_stopdefmain(args:Array[String]):Unit=//1.//1.初始化Spark =//2.初始化valssc:StreamingContext=newStreamingContext(sparkconf, ssc.socketTextStream("hadoop102",9999)lineDStream.flatMap(_.split("http://開 程newThread(new//4啟動(dòng)SparkStreamingContext//將主線程阻塞,主 }}classMonitorStop(ssc:StreamingContext)extendsoverridedefrun():Unit=獲取HDFSvalfs:FileSystem=FileSystem.get(newURI("hdfs://hadoop102:8020"),newwhile(true){ ifvalstate:StreamingContextState=if(state==ssc.stop(stopSparkContext=true,stopGracefully=true)}}}}}[atguigu@hadoop102[atguigu@hadoop102~]$nc-lk9999啟動(dòng)Hadoop[atguigu@hadoop102[atguigu@hadoop102hadoop-3.1.3]$sbin/start-[atguigu@hadoop102hadoop-3.1.3]$hadoopfs-mkdir7SparkStreaming案例pom<!--.alibaba/pertiesresources里面,就能更改打印日志的級(jí)別為log4j.rootLogger=error,stdout,Rlog4j.rootLogger=error,stdout,Rlog4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-ddHH:mm:ss,SSS}%5p---[%50t]%-80c(line:%5L):log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-ddHH:mm:ss,SSS} %5p---[%50t]%-80c(line:%6L):%m%npackagepackageimportjava.io.InputStreamReaderimportjava.util.PropertiesobjectPropertiesUtildefload(propertiesName:String):Properties={valprop=newProperties()m(propertiesName),"UTF-8"))}}在 下創(chuàng)建perties文添加如下配置到perties#JDBC#JDBC#Kafka配置packagepackageimportscala.collection.mutable.ListBufferimportscala.util.Random//value值出現(xiàn)的比例,例如:(男,8)(女:2)caseclassRanOpt[T](value:TweightInt)objectRandomOptionsdefapply[T](opts:RanOpt[T]*):RandomOptions[T]={valrandomOptions=newRandomOptions[T]()for(opt<-opts)//累積總的權(quán)重:8+2randomOptions.totalWeightopt.weight根據(jù)每個(gè)元素的自己的權(quán)重,向bufferfor(i<-1toopt.weight)randomOptions.optsBuffer+=}}}defmain(args:Array[String]):Unit={for(i<-1to10){println(RandomOptions(RanOpt("男8),RanOpt("女}}}classRandomOptions[T](opts:RanOpt[T]*){vartotalWeight=0varoptsBuffer=newdefgetRandomOpt:T=隨機(jī)選擇:0-valrandomNum:Int=new}}} packagepackageimportimportcom.atguigu.util.{PropertiesUtil,RanOpt, importscala.collection.mutable.ArrayBufferimportscala.util.Random//城市信息表:city_id:城市idcity_name:城市名稱 caseclassCityInfo(city_id:Long,city_name:String,area:String)objectMockerRealTimetimestampareacityuserid 3defgenerateMockData():Array[String]=valarray:ArrayBuffer[String]=ArrayBuffer[String]()valCityRandomOpt=RandomOptions(RanOpt(CityInfo(1, 華北RanOpt(CityInfo(2, 華東RanOpt(CityInfo(3,廣州華南RanOpt(CityInfo(4, 華南RanOpt(CityInfo(5, 華北)valrandom=new//timestampprovincecityuseridadidfor(i<-0to50){valtimestamp:Long=System.currentTimeMillis()valcityInfo:CityInfo=CityRandomOpt.getRandomOptvalcity:String=cityInfo.city_namevalarea:String=valadid:Int=1+random.nextInt(6)valuserid:Int=1+random.nextInt(6)拼接實(shí)時(shí)數(shù)據(jù)array+=timestamp+""+area+""+city+""+userid+""+}}defmain(args:Array[String]):Unit=獲取配置文件perties中的Kafkavalconfig:Properties=PropertiesUtil.load("perties")valbrokers:String=config.getProperty("kafka.broker.list")valtopic:String=config.getProperty("kafka.topic")valprop=new//添加配置prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers) //根據(jù)配置創(chuàng)建KafkavalkafkaProducer:KafkaProducer[String,String]=newKafkaProducer[String,while(true)//隨機(jī)產(chǎn)生實(shí)時(shí)數(shù)據(jù)并通過(guò)Kafka生產(chǎn)者發(fā)送到Kafkafor(line<-generateMockData())kafkaProducer.send(newProducerRecord[String,String](topic,line))}}}}啟動(dòng)Kafka[atguigu@hadoop102[atguigu@hadoop102~]$zk.shstart[atguigu@hadoop102~]$kf.sh消費(fèi)KafkatestTopic hadoop102:9092--from-beginning--topictestTopic2020-10-10,2020-10-10,zhangsan,A,2020-10-10,lisi,B,5.4.加 業(yè) 4.2.13.1.初始化Spark實(shí)現(xiàn)實(shí)時(shí)的動(dòng)態(tài)機(jī)制:將每天對(duì)某個(gè)點(diǎn)擊超過(guò)30次的用戶拉黑。注:保存到MySQL中???.2.3點(diǎn)擊次數(shù)大于30MySQL存放用戶的CREATECREATETABLEblack_listuseridCHAR(1)PRIMARYKEY--用戶CREATECREATETABLEuser_ad_countdtVARCHAR(255),--時(shí)間useridCHAR1用戶adidadidCHAR(1),--COUNTBIGINT,-PRIMARYKEYdt,userid,adid)insertinsertintouser_ad_count(dt,userid,adid,count)onduplicatekeyupdatecount=count+5CREATECREATETABLEuser_ad_count_testdtVARCHAR(255),--時(shí)間useridCHAR(1),--用戶idadidCHAR(1),-- COUNTBIGINT-- insertinsertintouser_ad_count_test(dt,userid,adid,count)onduplicatekeyupdatecount=count+5MyKafkaUtilpackageimportimport importor importorg.apache.spark.streaming.StreamingContextimportorg.apache.spark.streaming.dstream.{DStream,InputDStream} objectMyKafkaUtil{packageimportimport importor importorg.apache.spark.streaming.StreamingContextimportorg.apache.spark.streaming.dstream.{DStream,InputDStream} objectMyKafkaUtil{privatevalproperties:Properties= privateprivatevalbrokers:String=創(chuàng)建DStream//LocationStrategies:根據(jù)給定 和集群地址創(chuàng)建LocationStrategies.PreferConsistent:持續(xù)的在所有Executor//ConsumerStrategies:選擇如何在Driver和Executor上創(chuàng)建和配置Kafka// InputDStream[ConsumerRecord[String,String]]={valkafkaParam=Map("bootstrap.servers"->brokers,"key.deserializer"->classOf[StringDeserializer],"value.deserializer"classOf[StringDeserializer],"group.id"->"commerce-consumer-group"http://消費(fèi)者組)=KafkaUtils.createDirectStream[String,String](ConsumerStrategies.Subscribe[String,String](Array(topic),)}}JDBCUtilpackagepackageimportjava.sql.{Connection,PreparedStatement,ResultSet}importjava.util.Propertiesimportcom.alibaba.druid.pool.DruidDataSourceFactoryimportjavax.sql.DataSourceobjectJDBCUtilvardataSource:DataSource=definit():DataSource=valproperties=newvalconfig:Properties=properties.setProperty("driverClassName","com.mysql.jdbc.Driver")properties.setProperty("url",config.getProperty("jdbc.url"))properties.setProperty("username",config.getProperty("jdbc.user"))properties.setProperty("password",config.getProperty("jdbc.password"))}//獲取MySQLdefgetConnection:Connection={}//執(zhí)行SQL語(yǔ)句,defexecuteUpdate(connection:Connection,sql:String,params:Array[Any]):=varrtn=varpstmt:PreparedStatement=try pstmt=if(params!=null&¶ms.length>0){for(i<-params.indices){pstmt.setObject(i+1,}}rtn=}catchcasee:Exception=>}}def ist(connection:Connection,sql:String,params:Array[Any]):=varflag:Boolean=varpstmt:PreparedStatement=trypstmt=for(i<-params.indices){pstmt.setObject(i+1,params(i))}flag=pstmt.executeQuery().next()}catchcasee:Exception=>}}//獲取MySQLdefgetDataFromMysql(connection:Connection,sql:String,params:Array[Any]):Long={varresult:Long=varpstmt:PreparedStatement=trypstmt=for(i<-params.indices){pstmt.setObject(ifor(i<-params.indices){pstmt.setObject(i+1,params(i))}valresultSet:ResultSet=pstmt.executeQuery()while(resultSet.next()){result=}}catchcasee:Exception=>}}defmain(args:Array[String]):Unit=valconnection:Connection=//2預(yù)編譯valstatement:PreparedStatement=connection.prepareStatement("select*fromuser_ad_countwhereuserid=?")statement.setObject(1,//4執(zhí)行valresultSet:ResultSet=while(resultSet.next())println("111:"+}//6關(guān)閉資源}}BlackListHandler業(yè)packagepackageimportjava.sql.Connectionimportjava.text.SimpleDateFormatimportjava.util.Dateimportcom.atguigu.app.Ads_logimportcom.atguigu.util.JDBCUtilimportobjectBlackListHandlerprivatevalsdf=newSimpleDateFormat("yyyy-MM-defdsLogDStream:DStream[Ads_log]):Unit= =>((valdateUserAdToCount:DStream[((String,String,String),Long)]= adsLog=>valdate:String=sdf.format(new((date,adsLog.userid,adsLog.adid),}).reduceByKey(_+//2rdd=>{iter=>{valconnection:Connection=iter.foreach{case((dt,user,ad),count)向MySQL中user_ad_count|INSERTINTOuser_ad_count|VALUES|ONDUPLICATE|UPDATE""".stripMargin,Array(dt,user,ad,count,)userid=?andadid

//查詢user_ad_count表 valct:Long=JDBCUtil.getDataFromMysql(|selectcountfromuser_ad_countwheredt=?Array(dt,user,)點(diǎn)擊次數(shù)>30if(ct>=30){|INSERTINTOblack_list(userid)VALUES(?)DUPLICATEKEYupdate)

Array(user,user)}}})}})}中deffilterByBlackList(adsLogDStream:DStream[Ads_log]):DStream[Ads_log]=adsLog=>{valconnection:Connection=valbool:Boolean=JDBCUtil.i |select*fromblack_listwhere)})}}packagepackageimportjava.util.{Date,importimportcom.atguigu.util.{MyKafkaUtil,PropertiesUtil}importorg.apache.kafka. importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.dstream.{DStream,InputDStream}importorg.apache.spark.streaming.{Seconds,StreamingContext}objectRealTimeAppdefmain(args:Array[String]):Unit=//1.創(chuàng)建 //2.創(chuàng)建valssc=newStreamingContext(sparkConf, valproperties:Properties=PropertiesUtil.load("perties")valtopic:String=properties.getProperty("kafka.topic") MyKafkaUtil.getKafkaStream(topic,ssc)//4.將從KafkavaladsLogDStream:DStream[Ads_log]=kafkaDStream.map(record=>valvalvalue:String=valarr:Array[String]=value.split("Ads_log(arr(0).toLong,arr(1),arr(2),arr(3),//5.需求一:根據(jù)MySQL中 = }}時(shí)間地區(qū)城市用戶caseclassAds_log(timestamp:Long,area:String,city:String,userid:String,adid:String)啟動(dòng)Kafka集群[atguigu@hadoop102[atguigu@hadoop102~]$zk.shstart[atguigu@hadoop102~]$kf.sh啟 主程序:?jiǎn)?dòng)日志生成程序:觀察spark2020中user_ad_count和black_list中數(shù)據(jù)變化觀察到:中包含所有用戶id,用戶統(tǒng)計(jì)表中,不會(huì)有數(shù)據(jù)再更描述:實(shí)時(shí)統(tǒng)計(jì)每天各地區(qū)各城市各的點(diǎn)擊總流量,并將其存入MySQL2020-10-10,zhangsan,A,2020-10-10,lisi,B,3.1連接 4.2.1 4.2 業(yè)2.消費(fèi)kafka4.2.2從L總4.2.3點(diǎn)擊次數(shù)大于30市MySQL建表CREATECREATETABLEarea_city_ad_count(dtVARCHAR(255),areaVARCHAR(255),cityVARCHAR(255),adidcountPRIMARYKEYpackagepackageimportjava.sql.Connectionimportjava.text.SimpleDateFormatimportjava.util.Dateimportcom.atguigu.app.Ads_logimportcom.atguigu.util.JDBCUtilimportorg.apache.spark.streaming.dstream.DStreamobjectDateAreaCityAdCountHandler{privatevalsdf:SimpleDateFormat=newSimpleDateFormat("yyyy-MM- 點(diǎn)擊總數(shù)并保存至MySQL Unit={ = dsLogDStream.map(ads_log=>valvaldt:String=sdf.format(new((dt,ads_log.area,ads_log.city,ads_log.adid),}).reduceByKey(_+//2.將單個(gè)批次統(tǒng)計(jì)之后的數(shù)據(jù)集合MySQLdateAreaCityAdToCount.foreachRDD(rdd=>rdd.foreachPartition(itervalconnection:Connection=iter.foreach{case((dt,area,city,adid),ct)=>|INSERTINTOarea_city_ad_count|ONDUPLICATE|UPDATEcount=count+?;Array(dt,area,city,adid,ct,)}}}packagepackageimportjava.util.{Date,importcom.atguigu.handler.{BlackListHandler,DateAreaCityAdCountHandler}importcom.atguigu.util.{MyKafkaUtil,PropertiesUtil}importorg.apache.kafka. importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.dstream.{DStream,InputDStream}importorg.apache.spark.streaming.{Seconds,StreamingContext}objectRealTimeAppdefmain(args:Array[String]):Unit=//1.創(chuàng)建 //2.創(chuàng)建valssc=newStreamingContext(sparkConf, valproperties:Properties=PropertiesUtil.load("perties")valtopic:String=properties.getProperty("kafka.topic") MyKafkaUtil.getKafkaStream(topic,MyKafkaUtil.getKafkaStream(topic,//4.將從KafkavaladsLogDStream:DStream[Ads_log]=kafkaDS

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論