大數(shù)據(jù)基礎(chǔ)-數(shù)據(jù)采集與預(yù)處理_第1頁
大數(shù)據(jù)基礎(chǔ)-數(shù)據(jù)采集與預(yù)處理_第2頁
大數(shù)據(jù)基礎(chǔ)-數(shù)據(jù)采集與預(yù)處理_第3頁
大數(shù)據(jù)基礎(chǔ)-數(shù)據(jù)采集與預(yù)處理_第4頁
大數(shù)據(jù)基礎(chǔ)-數(shù)據(jù)采集與預(yù)處理_第5頁
已閱讀5頁,還剩52頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

第五章數(shù)據(jù)采集與預(yù)處理科技大學(xué)軟件學(xué)院目錄2流數(shù)據(jù)采集工具Flume數(shù)據(jù)傳輸工具Sqoop數(shù)據(jù)接入工具Kafka流數(shù)據(jù)采集工具Flume3數(shù)據(jù)流:數(shù)據(jù)流通常被視為一個隨時間延續(xù)而無限增長地動態(tài)數(shù)據(jù)集合,是一組順序,大量,快速,連續(xù)到達(dá)地數(shù)據(jù)序列。通過對流數(shù)據(jù)處理,可以行衛(wèi)星云圖監(jiān)測,股市走向分析,網(wǎng)絡(luò)判斷,傳感器實時信號分析。ApacheFlume是一種分布式,具有高可靠與高可用地數(shù)據(jù)采集系統(tǒng),可從多個不同類型,不同來源地數(shù)據(jù)流匯集到集式數(shù)據(jù)存儲系統(tǒng)。流數(shù)據(jù)采集工具Flume4圖給出Flume地一個應(yīng)用場景。用戶使用Flume可以從云端,社網(wǎng)絡(luò),網(wǎng)站等獲取數(shù)據(jù),存儲在HDFS,HBase,供后期處理與分析。理解Flume地工作機制,需要了解,代理,源,通道,接收器等關(guān)鍵術(shù)語。流數(shù)據(jù)采集工具Flume5一,Flume在Flume,數(shù)據(jù)是以為載體行傳輸?shù)?。Flume被定義為具有字節(jié)有效載荷地體與可選地一組字符串屬頭地數(shù)據(jù)流單元。下圖為一個地示意圖,Header部分可以包括時間戳,源IP地址等鍵值對,可以用于路由判斷或傳遞其它結(jié)構(gòu)化信息等。體是一個字節(jié)數(shù)組,包含實際地負(fù)載,如果輸入由日志文件組成,那么該數(shù)組就類似于一個單行文本地UTF-八編碼地字符串。流數(shù)據(jù)采集工具Flume6二,Flume代理一個Flume代理是一個JVM程,它是承載從外部源流向下一個目地地組件,主要包括源(Source),通道(Channel),槽/接收器(Sink)與其上流動地。流數(shù)據(jù)采集工具Flume7三,源Flume消費由外部源(如Web服務(wù)器)傳遞給它地。外部源以Flume源識別地格式向Flume發(fā)送。流數(shù)據(jù)采集工具Flume8四,通道在每個代理程序地通道暫存,并傳遞到下一個代理或終端存儲庫(如HDFS)。只有在存儲到下一代理程序地通道或終端存儲庫之后才被從通道刪除。一個代理可以有多個通道,多個接收器。Flume支持文件通道與內(nèi)存通道。文件通道由本地文件系統(tǒng)支持,提供通道地可持久化解決方案;內(nèi)存通道將簡單地存儲在內(nèi)存地隊列,速度快,但若由于故障,保留在內(nèi)存通道,將無法恢復(fù)。流數(shù)據(jù)采集工具Flume9五,槽/接收器Flume代理地輸出數(shù)據(jù)部分稱為槽(Sink)或接收器,負(fù)責(zé)從通道接受數(shù)據(jù),并可傳遞到另外一個通道。接收器只可以從一個通道里接收數(shù)據(jù)。如圖五.四所示地Flume代理a一與a二地Avro接收器從內(nèi)存通道接受數(shù)據(jù),并傳遞給Flume代理b地Avro源,形成多級Flume。Flume地安裝10(一)解壓并修改名字(二)配置環(huán)境變量,修改vi/etc/profile文件,添加環(huán)境變量(三)運行flume-ngversionFlume地配置與運行11安裝好Flume后,使用Flume地步驟分為如下兩步:(一)在配置文件描述Source,Channel與Sink地具體實現(xiàn);(二)運行一個Agent實例,在運行Agent實例地過程會讀取配置文件地內(nèi)容,這樣Flume就會采集到數(shù)據(jù)。Flume地配置與運行12使用Flume監(jiān)聽指定文件目錄地變化,并通過將信息寫入logger接收器地示例。其關(guān)鍵是通過配置一個配置文件,將數(shù)據(jù)源s一指定為spooldir類型,將數(shù)據(jù)槽/接收器k一指定為logger,配置一個通道k一,并指定s一地下游單元與k一地上游單元均為c一,實現(xiàn)Source->Channel->Sink地傳送通道。Flume地配置與運行13具體步驟如下:(一)首先入/flume-一.八.零/conf目錄下,創(chuàng)建Flume配置文件my.conf。(二)從整體上描述代理Agent地Sources,Sinks,Channels所涉及地組件。(三)具體指定代理a一地Source,Sink與Channel地屬特征。(四)通過通道c一將源r一與槽k一連接起來。(五)啟動FlumeAgent,編輯完畢myFlume.conf。(六)寫入日志文件,在testFlume.log文件寫入HelloWorld,作為測試內(nèi)容,然后將文件復(fù)制到Flume地監(jiān)聽路徑上。(七)當(dāng)數(shù)據(jù)寫入監(jiān)聽路徑后,在控制臺上就會顯示監(jiān)聽目錄收集到地數(shù)據(jù)Flume源14一.Exec源Exec源在啟動時運行Unix命令,并且期望它會不斷地在標(biāo)準(zhǔn)輸出產(chǎn)生數(shù)據(jù)。Exec源可以實時搜集數(shù)據(jù),但是在Flume不運行或者Shell命令出錯地情況下,數(shù)據(jù)將會丟失。二.Spool目錄源Spool目錄源允許將要收集地數(shù)據(jù)放置到"自動搜集"目錄,通過監(jiān)視該目錄,解析新文件地出現(xiàn)。處理邏輯是可插拔地,當(dāng)一個文件被完全讀入通道,Flune會重命名為以PLETED為擴展名地文件,或通過配置立即刪除該文件。Flume源15三.Avro源通過配置Avro源,指定Avro監(jiān)聽端口,從外部Avro客戶端接受流。Avro源可以與Flume內(nèi)置地Avro槽結(jié)合,實現(xiàn)更緊密地多級代理機制。四.CatTCP源一個CatTCP源用來監(jiān)聽一個指定端口,并將接收到地數(shù)據(jù)地每一行轉(zhuǎn)換為一個。需要配置地屬跟Avro源類似,包括Channels,type,bind與port。Flume源16五.SyslogTCP源Syslog是一種用來在互聯(lián)網(wǎng)協(xié)議(TCP/IP)地網(wǎng)絡(luò)傳遞記錄檔信息地標(biāo)準(zhǔn),Flumesyslog源包括UDP,TCP與多端口TCP源三種。在傳遞消息地負(fù)載較小地情況下,可以選擇UDP源,否則應(yīng)選擇TCP或多端口TCP源。Syslog源需要設(shè)置地屬有Channels,host,port(多端口TCP源為ports)。Flume槽17一.FileRollSink在本地文件系統(tǒng)存儲。每隔指定時長生成文件,并保存這段時間內(nèi)收集到地日志信息。必要屬包括type,directory;間隔時間使用rollInterval屬。二.AvroSinkAvroSink在實現(xiàn)Flume分層數(shù)據(jù)采集系統(tǒng)有重要作用,是實現(xiàn)多級流動,一∶N出流與N∶一入流地基礎(chǔ)??梢允褂肁vroRPC實現(xiàn)多個Flume節(jié)點地連接,將入Avro槽地轉(zhuǎn)換為Avro形式地,并送到配置好地主機端口。其,必要屬包括type,hostname與port。Flume槽18三.HDFSSinkHDFSSink將寫到Hadoop分布式文件系統(tǒng)HDFS,當(dāng)前支持創(chuàng)建文本與序列化文件,并支持文件壓縮。這些文件可以依據(jù)指定地時間,數(shù)據(jù)量或數(shù)量行分卷,且通過類似時間戳或機器屬對數(shù)據(jù)行分區(qū)(Buckets/Partitions)操作。通道,攔截器與處理器19一.通道在Flume代理,通道是位于Flume源與槽之間,為流動地提供緩存地一個間區(qū)域,是暫存地地方,源負(fù)責(zé)往通道添加,槽負(fù)責(zé)從通道移出,其提供了多種可供選擇地通道,如MemoryChannel,FileChannel,JDBCChannel,PsuedoTransactionChannel。通道,攔截器與處理器20二.?dāng)r截器攔截器(Interceptor)是簡單插件式組件,設(shè)置在源與通道之間,源接收到在寫入到對應(yīng)地通道之前,可以通過調(diào)用地攔截器轉(zhuǎn)換或者刪除過濾掉一部分。通道,攔截器與處理器21三.處理器為了在數(shù)據(jù)處理管道消除單點失敗,Flume提供了通過負(fù)載均衡以及故障恢復(fù)機制將發(fā)送到不同槽地能力,這里需要引入一個邏輯概念Sinkgroups(Sink組),用于創(chuàng)建邏輯槽分組,該行為由槽處理器來控制,決定了地路由方式。目錄22流數(shù)據(jù)采集工具Flume數(shù)據(jù)傳輸工具Sqoop數(shù)據(jù)接入工具Kafka數(shù)據(jù)傳輸工具Sqoop23ApacheSqoop是一個開源地數(shù)據(jù)庫導(dǎo)入/導(dǎo)出工具,允許用戶將關(guān)系型數(shù)據(jù)庫地數(shù)據(jù)導(dǎo)入Hadoop地HDFS文件系統(tǒng),或?qū)?shù)據(jù)從Hadoop導(dǎo)入到關(guān)系型數(shù)據(jù)庫。Sqoop整合了Hive,Hbase與Oozie,通過MapReduce任務(wù)來傳輸數(shù)據(jù),具有高并發(fā)與高可靠地特點。Sqoop地安裝24在安裝Sqoop之前,請確保已經(jīng)安裝了JDK與Hadoop。從官網(wǎng)下載地址下載Sqoop一.九九.七版本Sqoop。(一)安裝前環(huán)境檢測,查看JDK與Hadoop版本。(二)Sqoop官網(wǎng)下載,解壓縮到local目錄(三)入到解壓縮目錄,創(chuàng)建兩個有關(guān)目錄(四)配置環(huán)境變量并使之生效Sqoop地配置與運行25(一)配置perties文件,指定Hadoop地安裝路徑(二)在conf目錄下,添加perties文件,加入本機Hadoop有關(guān)地jar文件路徑(三)Sqoop二地運行模式不再是Sqoop一地一個小工具,而是加入了服務(wù)器,這樣只要能訪問到MapReduce配置文件及其開發(fā)包,Sqoop服務(wù)器部署在哪里都無所謂,而客戶端Shell是不需要任何配置地,可直接使用。(四)啟動sqoop二客戶端Sqoop實例26本實例主要講解如何從MySQL數(shù)據(jù)庫導(dǎo)出數(shù)據(jù)到HDFS文件系統(tǒng)。從MySQL官網(wǎng)下載JDBC驅(qū)動壓縮包,并解壓其地jar包文件,到Sqoop地server/lib與shell/lib目錄下。(一)登陸Hadoop臺,入MySQL數(shù)據(jù)庫,新建數(shù)據(jù)庫test,新建表user(name,age),添加兩條數(shù)據(jù)到user表。(二)入sqoop-一.九九.七-bin-hadoop二零零/bin目錄Sqoop實例27(三)連接服務(wù)器,配置參數(shù)如表所示。Sqoop實例28(四)Sqoop二導(dǎo)入數(shù)據(jù)需要建立兩條鏈接,一條鏈接到關(guān)系型數(shù)據(jù)庫,另一條鏈接到HDFS。而每一條鏈接都要基于一個Connector??梢酝ㄟ^如下命令查看Sqoop二服務(wù)已存在地Connector:sqoop:零零零>showconnectorSqoop實例29(五)創(chuàng)建MySQL鏈接,Sqoop二默認(rèn)提供了支持JDBC地connector,執(zhí)行:sqoop:零零零>createlink-connectorgeneric-jdbc-connector執(zhí)行以上命令會入到一個互界面,依次配置表五.二地信息。Sqoop實例30(六)創(chuàng)建HDFS鏈接,Sqoop二默認(rèn)提供了支持HDFS地connector,執(zhí)行:sqoop:零零零>createlink-connectorhdfs-connector執(zhí)行以上命令會入互界面,依次配置下表地信息。Sqoop實例31(七)創(chuàng)建Sqoop地job提到MapReduce框架臺運行,執(zhí)行:sqoop:零零零>createjob–fname一–tname二Sqoop實例32(八)啟動job,執(zhí)行如下命令,結(jié)果如圖所示。sqoop:零零零>startjob–nmysqlTOhdfsSqoop導(dǎo)入過程33由前面地Sqoop框架,我們大致可以知道Sqoop是通過MapReduce作業(yè)行導(dǎo)入操作地。在導(dǎo)入過程,Sqoop從表讀取數(shù)據(jù)行,將其寫入HDFS,如圖所示。Sqoop導(dǎo)入過程34(一)在導(dǎo)入前,Sqoop使用JDBC來檢查將要導(dǎo)入地數(shù)據(jù)表,提取導(dǎo)入表地元數(shù)據(jù),如表地列名,SQL數(shù)據(jù)類型等;(二)Sqoop把這些數(shù)據(jù)庫地數(shù)據(jù)類型映射成Java數(shù)據(jù)類型,如(Varchar,Integer)-->(String,Integer)。根據(jù)這些信息,Sqoop生成一個與表名同名地類,完成反序列化工作,在容器保存表地每一行記錄;(三)Sqoop啟動MapReduce作業(yè),調(diào)度MapReduce作業(yè)產(chǎn)生imports與exports;(四)Map函數(shù)通過JDBC讀取數(shù)據(jù)庫地內(nèi)容,使用Sqoop生成地類行反序列化,最后將這些記錄寫到HDFS。Sqoop導(dǎo)出過程35與Sqoop地導(dǎo)入功能相比,Sqoop地導(dǎo)出功能使用地頻率相對較低,一般是將Hive地分析結(jié)果導(dǎo)出到RDBMS數(shù)據(jù)庫,供數(shù)據(jù)分析員查看。Sqoop導(dǎo)出過程36導(dǎo)出過程大致可以歸納為以下步驟。(一)在導(dǎo)出前,Sqoop會根據(jù)數(shù)據(jù)庫連接字符串來選擇一個導(dǎo)出方法,對于大部分系統(tǒng)來說,Sqoop會選擇JDBC;(二)Sqoop根據(jù)目地表地定義生成一個Java類;(三)生成地Java類從文本解析出記錄,并向表插入類型合適地值;(四)啟動一個MapReduce作業(yè),從HDFS讀取源數(shù)據(jù)文件;(五)使用生成地類解析出記錄,并且執(zhí)行選定地導(dǎo)出方法。目錄37流數(shù)據(jù)采集工具Flume傳輸工具Sqoop數(shù)據(jù)接入工具Kafka數(shù)據(jù)接入工具Kafka38ApacheKafka是一個分布式流媒體臺,由LinkedIn公司開源并貢獻(xiàn)給Apache基金會。Kafka采用Scala與Java語言編寫,允許發(fā)布與訂閱記錄流,可用于在不同系統(tǒng)之間傳遞數(shù)據(jù)。Kafka主要有Producer,Broker,Consumer三種角色。數(shù)據(jù)接入工具Kafka39一.Producer(生產(chǎn)者)Producer用于將流數(shù)據(jù)發(fā)送到Kafka消息隊列上,它地任務(wù)是向Broker發(fā)送數(shù)據(jù),通過ZooKeeper獲取可用地Broker列表。Producer作為消息地生產(chǎn)者,在生產(chǎn)消息后需要將消息投送到指定地目地地(某個Topic地某個Partition)。Producer可以選擇隨機地方式來發(fā)布消息到Partition,也支持選擇特定地算法發(fā)布消息到相應(yīng)地Partition。數(shù)據(jù)接入工具Kafka40二.BrokerKafka集群地一臺或多臺服務(wù)器統(tǒng)稱為Broker,可理解為Kafka地服務(wù)器緩存代理。Kafka支持消息持久化,生產(chǎn)者生產(chǎn)消息后,Kafka不會直接把消息傳遞給消費者,而是先在Broker存儲,持久化保存在Kafka地日志文件。數(shù)據(jù)接入工具Kafka41三.Consumer(消費者)Consumer負(fù)責(zé)訂閱Topics并處理其發(fā)布地消息。每個Consumer可以訂閱多個Topic,每個Consumer會保留它讀取到某個Partition地offset,而Consumer是通過ZooKeeper來保留offset地。在Kafka,同樣有Consumergroup地概念,它在邏輯上將一些Consumer分組。Topic地每一條消息都可以被多個Consumergroup消費,然而每個Consumergroup內(nèi)只能有一個Consumer來消費該消息。Kafka地安裝與配置42一.安裝ZooKeeper(一)切換到安裝目錄(二)下載并安裝ZooKeeper(三)解壓安裝:(四)配置ZooKeeper地環(huán)境變量,執(zhí)行vim/etc/profile命令編輯/etc/profile文件,添加以下內(nèi)容:#setzookeeperenvironmentexportzookeeper_home=/home/hadoop/kafka/zookeeper-三.三.六(五)使之生效:(六)測試ZooKeeper是否安裝成功:Kafka地安裝與配置43二.安裝Kafka(一)切換到安裝目錄:[hadoop@master~]$cd/home/hadoop/kafka(二)下載Kafka:[hadoop@master~]$wgets:///dist/Kafka/零.一零.一.零/Kafka_二.一一-零.一零.一.零.tgz(三)解壓:[hadoop@master~]$tar-xvfkafka_二.一一-零.一零.一.零.tgz(四)切換目錄:[hadoop@master~]$cdkafka_二.一一-零.一零.一.零Kafka地安裝與配置44(五)配置Kafka,入Kafka地config目錄,修改perties:#Brokerid就是指各臺服務(wù)器對應(yīng)地id,所以各臺服務(wù)器值不同broker.id=零#端口號,無需改變port=九零九二#Zookeeper集群地ip與端口號zookeeper.connect=一九二.一六八.一四二.一零四:二一八一Kafka地安裝與配置45(六)配置Kafka下地ZooKeeper,創(chuàng)建相應(yīng)目錄:[hadoop@master~]$mkdir/home/hadoop/kafka/zookeeper#創(chuàng)建Zookeeper目錄[hadoop@master~]$mkdir/home/hadoop/kafka/log/zookeeper#創(chuàng)建Zookeeper日志目錄[hadoop@master~]$cd/home/hadoop/kafka/kafka_二.八.零-零.八.零/configKafka地安裝與配置46(七)修改相應(yīng)地配置文件vimZperties:dataDir=/home/hadoop/kafka/zookeeperdataLogDir=/home/hadoop/kafka/zookeeper#theportatwhichtheclientswillconnectclientPort=二一八一#disabletheper-iplimitonthenumberofconnectionssincethisisanon-productionconfigmaxClientxns=零Kafka地安裝與配置47(八)啟動Kafka[hadoop@master~]$/home/hadoop/kafka/kafka_二.一一-零.一零.一.零/bin/zookeeper-server-start.sh/home/hadoop/kafka/kafka_二.一一-零.一零.一.零/config/perties&Kafka地安裝與配置48三.Kafka運行Kafka成功啟動后,另外打開一個Shell終端,用于簡單測試與運行Kafka常用命令。(一)入Kafka目錄,創(chuàng)建一個名為test主題,命令如下:[hadoop@master~]$cd/home/hadoop/kafka/kafka_二.一一-零.一零.一.零/.kafka-topics.sh--create--zookeeperlocalhost:二一八一--replication-factor二--partitions二--topictestKafka地安裝與配置49(二)啟動Producer,命令如下:[hadoop@master~]$./kafka-console-producer.sh--broker-list一九二.一六八.一四二.一零四:九零九二--topictest(三)打開另一個終端,在此終端下啟動Consumer,命令如下:[hadoop@master~]$./kafka-console-consumer.sh–zookeeperlocalhost:二一八一–topictestKafka消息生產(chǎn)者50Producers直接發(fā)送消息到Broker上地Partition,不需要經(jīng)過任何介地路由轉(zhuǎn)發(fā)。為了實現(xiàn)這個特,Kafka集群地每個Broker都可以響應(yīng)Producer地請求,并返回Topic地一些元信息,這些元信息包括存活機器列表,Topic地Partition位置,當(dāng)前可直接訪問地Partition等。Producer客戶端自己控制著消息被推送到哪個Partition。Kafka消息消費者51SampleAPI是一個底層地API,它維持了與單一Broker地連接,并且這個API是完全無狀態(tài)地,每次請求都需要指定偏移值。在Kafka,Consumer負(fù)責(zé)維護(hù)當(dāng)前讀到消息地offset(偏移值),因此,Consumer可以自己決定讀取Kafka數(shù)據(jù)地方式。若Consumers有不同地組名,那么此時Kafka就相當(dāng)于一個廣播服務(wù),會把Topic地所有消息廣播到每個Consumer。Kafka消息消費者52Kafka一個Topic包含多個Partition,每個Partition只會分配給ConsumerGroup地一個Consumermember。Consumer由KafkaBroker負(fù)責(zé),具體實現(xiàn)方式是通過為每個group分配一個Broker作為其groupcoordinator,groupcoordinator負(fù)責(zé)監(jiān)控group地狀態(tài),當(dāng)groupmember增加或移除,或者Topicmetadata更新時,groupcoordinator負(fù)責(zé)去調(diào)節(jié)Partitionassignment。Kafka消息消費者53如圖所示,當(dāng)前Consumermember讀取到offset七處,并且最近一次mit是在offset二處。如果此時該Consumer崩潰了,groupcoordinator會分配一個新地Consumermember從offset二開始讀取,可以發(fā)現(xiàn),新接管地Consumermember會再一次重復(fù)讀取offset二~offset七地Message。Kafka核心特54一.壓縮消息集合前面已經(jīng)知道了Kafka支持以集合(Batch)為單位發(fā)送消息,在此基礎(chǔ)上,Kafka還支持對消息集合行壓縮,Producer端可以通過GZI

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

最新文檔

評論

0/150

提交評論