![Hadoop大數(shù)據(jù)平臺(tái)構(gòu)建與應(yīng)用(工作手冊(cè)式)(微課版) 案例 9.2 廣告日志數(shù)據(jù)實(shí)時(shí)傳輸_第1頁(yè)](http://file4.renrendoc.com/view11/M01/3A/16/wKhkGWV8TcyAPQSrAAGZBQAwxCc446.jpg)
![Hadoop大數(shù)據(jù)平臺(tái)構(gòu)建與應(yīng)用(工作手冊(cè)式)(微課版) 案例 9.2 廣告日志數(shù)據(jù)實(shí)時(shí)傳輸_第2頁(yè)](http://file4.renrendoc.com/view11/M01/3A/16/wKhkGWV8TcyAPQSrAAGZBQAwxCc4462.jpg)
![Hadoop大數(shù)據(jù)平臺(tái)構(gòu)建與應(yīng)用(工作手冊(cè)式)(微課版) 案例 9.2 廣告日志數(shù)據(jù)實(shí)時(shí)傳輸_第3頁(yè)](http://file4.renrendoc.com/view11/M01/3A/16/wKhkGWV8TcyAPQSrAAGZBQAwxCc4463.jpg)
![Hadoop大數(shù)據(jù)平臺(tái)構(gòu)建與應(yīng)用(工作手冊(cè)式)(微課版) 案例 9.2 廣告日志數(shù)據(jù)實(shí)時(shí)傳輸_第4頁(yè)](http://file4.renrendoc.com/view11/M01/3A/16/wKhkGWV8TcyAPQSrAAGZBQAwxCc4464.jpg)
![Hadoop大數(shù)據(jù)平臺(tái)構(gòu)建與應(yīng)用(工作手冊(cè)式)(微課版) 案例 9.2 廣告日志數(shù)據(jù)實(shí)時(shí)傳輸_第5頁(yè)](http://file4.renrendoc.com/view11/M01/3A/16/wKhkGWV8TcyAPQSrAAGZBQAwxCc4465.jpg)
版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
廣告日志數(shù)據(jù)實(shí)時(shí)傳輸【任務(wù)描述】Flume作為日志收集系統(tǒng),可以從不同的數(shù)據(jù)源將數(shù)據(jù)源源不斷收集,但Flume不會(huì)持久地保存數(shù)據(jù),需要使用Sink將數(shù)據(jù)存儲(chǔ)到外部存儲(chǔ)系統(tǒng),如HDFS、HBase、Kafka等。Flume與HDFS、HBase的結(jié)合一般用于離線批處理。而Flume與Kafka的整合一般用于數(shù)據(jù)實(shí)時(shí)流處理,通過(guò)Flume的Agent代理收集日志數(shù)據(jù),再由Flume的Sink將數(shù)據(jù)傳送到Kafka集群,完成數(shù)據(jù)的生產(chǎn)流程,最后交給Storm、Flink、SparkStreaming等進(jìn)行實(shí)時(shí)消費(fèi)計(jì)算。本案例將基于項(xiàng)目8中的case_data_new.csv廣告日志數(shù)據(jù),使用Flume和Kafka整合實(shí)現(xiàn)廣告日志數(shù)據(jù)的實(shí)時(shí)傳輸。首先用腳本模擬實(shí)時(shí)生成的日志數(shù)據(jù)并存入MySQL,再使用Flume實(shí)時(shí)監(jiān)視MySQL中增加的數(shù)據(jù),采集到Kafka集群的主題中,并啟動(dòng)消費(fèi)者消費(fèi)主題數(shù)據(jù),最終實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)傳輸?!救蝿?wù)分析】廣告日志數(shù)據(jù)的實(shí)時(shí)傳輸?shù)膶?shí)現(xiàn)步驟如下。腳本定時(shí)抽取數(shù)據(jù)到指定目錄,模擬日志文件產(chǎn)生,并將其存入MySQL表中。創(chuàng)建Kafka主題,開啟消費(fèi)者以消費(fèi)數(shù)據(jù)。編寫conf采集配置文件,將存入MySQL中的數(shù)據(jù)傳入Kafka主題?!救蝿?wù)實(shí)施】創(chuàng)建腳本文件在master節(jié)點(diǎn)下運(yùn)行“mysql-uroot-pPassword123$”進(jìn)入MySQL數(shù)據(jù)庫(kù)中,創(chuàng)建數(shù)據(jù)庫(kù)kafka,并在kafka下創(chuàng)建表用于存儲(chǔ)數(shù)據(jù)。如REF_Ref100155267\h代碼91所示。代碼STYLEREF1\s9SEQ代碼\*ARABIC\s11創(chuàng)建數(shù)據(jù)表createdatabasekafka;usekafka;createtablecase_data(`rank`int,dtint,cookievarchar(200),ipvarchar(200),idfavarchar(200),imeivarchar(200),androidvarchar(200),openudidvarchar(200),macvarchar(200),timestampsint,campint,creativeidint,mobile_osint,mobile_typevarchar(200),app_key_md5varchar(200),app_name_md5varchar(200),placementidvarchar(200),useragentvarchar(200),mediaidvarchar(200),os_typevarchar(200),born_timeint);//開啟MySQL的local_infile服務(wù)setgloballocal_infile=1;打開一個(gè)新的master終端,運(yùn)行“vi/data/datamysql.sh”創(chuàng)建一個(gè)腳本文件,如REF_Ref100155277\h代碼92所示。腳本內(nèi)容為一個(gè)whiletrue循環(huán),每分鐘在case_data_new.csv隨機(jī)提取100條數(shù)據(jù),存入“/data/datamysql/mysqltmp.txt”文件中,再將文件中的數(shù)據(jù)存入MySQL數(shù)據(jù)庫(kù)中。代碼STYLEREF1\s9SEQ代碼\*ARABIC\s12腳本datamysql.sh#!/bin/bashwhiletruedotime=$(date"+%Y%m%d_%H%M%S")shuf-n100/opt/case_data_new.csv>/data/datamysql/mysqltmp.txtmysql-uroot-pPassword123$--local-infile-e"useKafka;loaddatalocalinfile'/data/datamysql/mysqltmp.txt'intotablecase_datafieldsterminatedby','OPTIONALLYENCLOSEDBY'\"';"sleep60done腳本創(chuàng)建完成后,賦予腳本權(quán)限,然后將其啟動(dòng),如REF_Ref100155288\h代碼93所示。啟動(dòng)成功后可能會(huì)發(fā)出如REF_Ref100155569\h圖91所示的警報(bào)信息,表示在命令行中直接輸入密碼賬戶信息是不安全的,該警報(bào)信息是在MySQL5.6版本后有的,并不影響運(yùn)行結(jié)果,可以選擇忽視。代碼STYLEREF1\s9SEQ代碼\*ARABIC\s13關(guān)于腳本的命令//腳本權(quán)限chmod777/data/data2mysql.sh//腳本啟動(dòng)命令sh/data/datamysql.sh&//腳本中斷命令psaux|grep"datamysql.sh"|grep-vgrep|cut-c9-15|xargskill-9圖STYLEREF1\s9SEQ圖\*ARABIC\s11執(zhí)行腳本文件在MySQL數(shù)據(jù)庫(kù)中查看數(shù)據(jù)是否成功存入表中,如REF_Ref100155298\h代碼94所示。結(jié)果如REF_Ref100155537\h圖92所示??梢钥闯鲆呀?jīng)有數(shù)據(jù)存入表中,并正在實(shí)時(shí)更新中。代碼STYLEREF1\s9SEQ代碼\*ARABIC\s14查看數(shù)據(jù)是否存入//進(jìn)入數(shù)據(jù)庫(kù)kafkausekafka;//查看表中有幾行selectcount(*)fromcase_data;圖STYLEREF1\s9SEQ圖\*ARABIC\s12數(shù)據(jù)已存入創(chuàng)建Kafka主題分別在slave1、slave2中開啟ZooKeeper、Kafka集群。在slave1節(jié)點(diǎn)創(chuàng)建一個(gè)Kafka主題RealTime,設(shè)置2個(gè)副本,2個(gè)分區(qū)。創(chuàng)建成功后,開啟消費(fèi)者消費(fèi),如REF_Ref100155306\h代碼95所示。代碼STYLEREF1\s9SEQ代碼\*ARABIC\s15創(chuàng)建Kafka主題并開啟消費(fèi)//創(chuàng)建RealTime主題kafka-topics.sh-create--topicRealTime--bootstrap-serverslave1:9092,slave2:9092--partitions2--replication-factor2//開啟消費(fèi)者kafka-console-consumer.sh--topicRealTime--bootstrap-serverslave1:9092,slave2:9092目前,該消費(fèi)者并沒(méi)有在指定主題中消費(fèi)到數(shù)據(jù)。Flume采集日志在Flume的conf目錄下創(chuàng)建一個(gè)“datamysql.conf”文件,實(shí)現(xiàn)從MySQL中采集數(shù)據(jù),并傳入RealTime主題中,如REF_Ref100155318\h代碼96所示。代碼STYLEREF1\s9SEQ代碼\*ARABIC\s16Flume腳本datamysql.confagent.sources=sql-sourceagent.sinks=k1agent.channels=chagent.sources.sql-source.type=org.keedio.flume.source.SQLSourceagent.sources.sql-source.hibernate.connection.url=jdbc:mysql://81:3306/kafka?&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMTagent.sources.sql-source.hibernate.connection.user=rootagent.sources.sql-source.hibernate.connection.password=Password123$agent.sources.sql-source.hibernate.dialect=org.hibernate.dialect.MySQLDialectagent.sources.sql-source.hibernate.driver_class=com.mysql.cj.jdbc.Driveragent.sources.sql-source.hibernate.connection.autocommit=trueagent.sources.sql-source.table=case_dataagent.sources.sql-source.columns.to.select=*agent.sources.sql-source.run.query.delay=10000agent.sources.sql-source.status.file.path=/var/lib/=sql-source.statusagent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSinkagent.sinks.k1.topic=RealTimeagent.sinks.k1.brokerList=slave1:9092,slave2:9092agent.sinks.k1.batchsize=200agent.sinks.kafkaSink.requiredAcks=1agent.sinks.k1.serializer.class=kafka.serializer.StringEncoderagent.sinks.kafkaSink.zookeeperConnect=slave1:2181,slave2:2181agent.channels.ch.type=memoryagent.channels.ch.capacity=10000agent.channels.ch.transactionCapacity=10000agent.channels.hbaseC.keep-alive=20agent.sources.sql-source.channels=chagent.sinks.k1.channel=ch啟動(dòng)FlumeAgent命令開始采集MySQL中的數(shù)據(jù),如REF_Ref100155329\h代碼97所示。切換到Kafka消費(fèi)者的終端,可以看到主題上已經(jīng)有數(shù)據(jù)被消費(fèi)者消費(fèi),如REF_Ref100155585\h圖93所示。觀察消費(fèi)者終端,可以看到消費(fèi)者每過(guò)一分鐘,就會(huì)有新數(shù)據(jù)消費(fèi),因?yàn)槟_本文件一直在模擬用戶產(chǎn)生數(shù)據(jù),而Flume在實(shí)時(shí)采集并傳入到Kafka主題上。代碼STYLEREF1\s9SEQ代碼\*ARABIC\s17執(zhí)行Flume腳本flume-ngagent-nagent-f/usr/local/src/flume/conf/datamysql.conf-c/usr/local/src/flume/conf/-Dflume.root.logger=INFO,console圖STYLEREF1\s9
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 人教版道德與法治七年級(jí)上冊(cè)8.1《生命可以永恒嗎》聽課評(píng)課記錄
- 環(huán)境衛(wèi)生教育在學(xué)科教學(xué)中的融合
- 現(xiàn)代診斷技術(shù)在老年腎臟疾病中的應(yīng)用研究
- 粵人版地理七年級(jí)上冊(cè)《第一節(jié) 世界的人口》聽課評(píng)課記錄2
- 電商巨頭如何通過(guò)網(wǎng)路直打造全新的購(gòu)物體驗(yàn)研究報(bào)告
- 現(xiàn)代企業(yè)匯報(bào)中的信息整合與呈現(xiàn)
- 現(xiàn)代平面設(shè)計(jì)在醫(yī)療領(lǐng)域的應(yīng)用前景
- 環(huán)境科學(xué)教育在城市青少年中的推廣
- 電信行業(yè)中基于大數(shù)據(jù)的用戶行為分析與服務(wù)優(yōu)化決策工具
- 【基礎(chǔ)卷】同步分層練習(xí):五年級(jí)下冊(cè)語(yǔ)文第14課《刷子李》(含答案)
- 2025至2030年中國(guó)減肥肽數(shù)據(jù)監(jiān)測(cè)研究報(bào)告
- 2024內(nèi)蒙古公務(wù)員省直行測(cè)、行政執(zhí)法、省考行測(cè)考試真題(5套)
- 2025年安徽馬鞍山市兩山綠色生態(tài)環(huán)境建設(shè)有限公司招聘筆試參考題庫(kù)附帶答案詳解
- 山東省濱州市濱城區(qū)2024-2025學(xué)年九年級(jí)上學(xué)期期末考試化學(xué)試題
- 國(guó)有企業(yè)內(nèi)部審計(jì)工作制度(2篇)
- 期末試卷:安徽省宣城市2021-2022學(xué)年七年級(jí)上學(xué)期期末歷史試題(解析版)
- 食品抽檢核查處置重點(diǎn)安全性指標(biāo)不合格原因分析排查手冊(cè)
- 幼兒教師新年規(guī)劃
- 2024年湖南省公務(wù)員錄用考試《行測(cè)》真題及答案解析
- 分布式光伏培訓(xùn)
- 2024新版(北京版)三年級(jí)英語(yǔ)上冊(cè)單詞帶音標(biāo)
評(píng)論
0/150
提交評(píng)論