第十一講流計算圖計算_第1頁
第十一講流計算圖計算_第2頁
第十一講流計算圖計算_第3頁
第十一講流計算圖計算_第4頁
第十一講流計算圖計算_第5頁
已閱讀5頁,還剩84頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領

文檔簡介

第十一講-流計算、圖計算圖計算流計算流計算什么是流計算流計算處理流程流計算應用實例流計算框架–TwitterStorm流計算框架匯總參考資料流計算產(chǎn)生的背景大數(shù)據(jù)時代數(shù)據(jù)處理及業(yè)務的變化?初期:數(shù)據(jù)量小,業(yè)務簡單

–少量人力、服務器就可以滿足需求?過渡期:數(shù)據(jù)量有所膨脹,業(yè)務較復雜

–需要增加大量服務器以支撐業(yè)務?大數(shù)據(jù)時期:數(shù)據(jù)量急劇膨脹,業(yè)務很復雜

–傳統(tǒng)方案扛不住,簡單的增加服務器已不能滿足需求挑戰(zhàn)?數(shù)據(jù)量膨脹所帶來的質(zhì)變?個性化、實時化的需求?…什么是流計算流計算來自于一個信念:數(shù)據(jù)的價值隨著時間的流逝而降低,所以事件出現(xiàn)后必須盡快地對它們進行處理,最好數(shù)據(jù)出現(xiàn)時便立刻對其進行處理,發(fā)生一個事件進行一次處理,而不是緩存起來成一批再處理。流計算的概念:?流計算是針對流式數(shù)據(jù)的實時計算。?流式數(shù)據(jù)(流數(shù)據(jù)):是指將數(shù)據(jù)看作數(shù)據(jù)流的形式來處理。數(shù)據(jù)流是在時間分布和數(shù)量上無限的一系列動態(tài)數(shù)據(jù)集合體;數(shù)據(jù)記錄是數(shù)據(jù)流的最小組成單元。?流數(shù)據(jù)具有數(shù)據(jù)實時持續(xù)不斷到達、到達次序獨立、數(shù)據(jù)來源眾多格式復雜、數(shù)據(jù)規(guī)模大且不十分關注存儲、注重數(shù)據(jù)的整體價值而不關注個別數(shù)據(jù)等特點。什么是流計算流計算應用場景?流計算是針對流數(shù)據(jù)的實時計算,其主要應用在于產(chǎn)生大量流數(shù)據(jù)、同時對實時性要求高的領域。?流計算一方面可應用于處理金融服務如股票交易、銀行交易等產(chǎn)生的大量實時數(shù)據(jù)。?另一方面流計算主要應用于各種實時Web服務中,如搜索引擎、購物網(wǎng)站的實時廣告推薦,SNS社交類網(wǎng)站的實時個性化內(nèi)容推薦,大型網(wǎng)站、網(wǎng)店的實時用戶訪問情況分析等。什么是流計算流計算:對流數(shù)據(jù)實時分析,從而獲取有價值的實時信息流計算與關系存儲模型的區(qū)別主要區(qū)別有如下幾個方面:?流中的數(shù)據(jù)元素在線到達;?系統(tǒng)無法控制將要處理的新到達的數(shù)據(jù)元素的順序;?數(shù)據(jù)流的潛在大小也許是無窮無盡的;?一旦數(shù)據(jù)流中的某個元素經(jīng)過處理,要么被丟棄,要么被歸檔存儲。因此,除非該數(shù)據(jù)被直接存儲在內(nèi)存中,否則將不容易被檢索。相對于數(shù)據(jù)流的大小,這是一種典型的極小相關。流計算需求對于一個流計算系統(tǒng)來說,它應達到如下需求:?高性能:處理大數(shù)據(jù)的基本要求,如每秒處理幾十萬條數(shù)據(jù)。?海量式:支持TB級甚至是PB級的數(shù)據(jù)規(guī)模。?實時性:必須保證一個較低的延遲時間,達到秒級別,甚至是毫秒級別。?分布式:支持大數(shù)據(jù)的基本架構(gòu),必須能夠平滑擴展。?易用性:能夠快速進行開發(fā)和部署。?可靠性:能可靠地處理流數(shù)據(jù)。?針對不同的應用場景,相應的流計算系統(tǒng)會有不同的需求,但是,針對海量數(shù)據(jù)的流計算,無論在數(shù)據(jù)采集、數(shù)據(jù)處理中都應達到秒級別的要求。流計算與Hadoop?Hadoop的批量化處理是人們喜愛它的地方,但這在某些領域仍顯不足,尤其是在例如移動、Web客戶端或金融、網(wǎng)頁廣告等需要實時計算的領域。這些領域產(chǎn)生的數(shù)據(jù)量極大,沒有足夠的存儲空間來存儲每個業(yè)務收到的數(shù)據(jù)。而流計算則可以實時對數(shù)據(jù)進行分析,并決定是否拋棄無用的數(shù)據(jù),而這無需經(jīng)過Map/Reduce的環(huán)節(jié)。?MapReduce框架為批處理做了高度優(yōu)化,系統(tǒng)典型地通過調(diào)度批量任務來操作靜態(tài)數(shù)據(jù),任務不是常駐服務,數(shù)據(jù)也不是實時流入;而數(shù)據(jù)流計算的典型范式之一是不確定數(shù)據(jù)速率的事件流流入系統(tǒng),系統(tǒng)處理能力必須與事件流量匹配。數(shù)據(jù)流實時處理的模式?jīng)Q定了要和批處理使用非常不同的架構(gòu),試圖搭建一個既適合流式計算又適合批處理的通用平臺,結(jié)果可能會是一個高度復雜的系統(tǒng),并且最終系統(tǒng)可能對兩種計算都不理想。流計算與Hadoop基于MapReduce的業(yè)務不得不面對處理延遲的問題。有一種想法是將基于MapReduce的批量處理轉(zhuǎn)為小批量處理,將輸入數(shù)據(jù)切成小的片段,每隔一個周期就啟動一次MapReduce作業(yè),這種實現(xiàn)需要減少每個片段的延遲,并且需要考慮系統(tǒng)的復雜度:將輸入數(shù)據(jù)分隔成固定大小的片段,再由MapReduce平臺處理,缺點在于處理延遲與數(shù)據(jù)片段的長度、初始化處理任務的開銷成正比。小的分段是會降低延遲,但是,也增加附加開銷,并且分段之間的依賴管理更加復雜(例如一個分段可能會需要前一個分段的信息);反之,大的分段會增加延遲。最優(yōu)化的分段大小取決于具體應用。為了支持流式處理,MapReduce需要被改造成Pipeline的模式,而不是reduce直接輸出;考慮到效率,中間結(jié)果最好只保存在內(nèi)存中等等。這些改動使得原有的MapReduce框架的復雜度大大增加,不利于系統(tǒng)的維護和擴展。用戶被迫使用MapReduce的接口來定義流式作業(yè),這使得用戶程序的可伸縮性降低。傳統(tǒng)數(shù)據(jù)處理流程傳統(tǒng)數(shù)據(jù)處理流程示意圖?傳統(tǒng)的數(shù)據(jù)操作,首先將數(shù)據(jù)采集并存儲在DBMS中,然后通過query和DBMS進行交互,得到用戶想要的結(jié)果。這樣的一個流程隱含了兩個前提:–Dataisold。當對數(shù)據(jù)做查詢的時候,里面數(shù)據(jù)其實是過去某一個時刻數(shù)據(jù)的一個snapshot,數(shù)據(jù)可能已經(jīng)過期了;–這樣的流程需要人們主動發(fā)出query。也就是說用戶是主動的,而DBMS系統(tǒng)是被動的。流計算處理流程?流計算一般有三個處理流程:數(shù)據(jù)實時采集、數(shù)據(jù)實時計算、實時查詢服務。實時計算三個階段流計算處理流程階段一:數(shù)據(jù)實時采集為流計算提供實時數(shù)據(jù),要保證實時性、低延遲、穩(wěn)定可靠。許多開源分布式日志收集系統(tǒng)均可滿足每秒數(shù)百MB的數(shù)據(jù)采集和傳輸需求。–Hadoop的Chukwa–Facebook的Scribe–LinkedIn的Kafka–Cloudera的Flume–淘寶的TimeTunnel流計算的階段階段一:數(shù)據(jù)實時采集?數(shù)據(jù)采集系統(tǒng)基本架構(gòu)一般由三部分組成–Agent:主動采集數(shù)據(jù),并把數(shù)據(jù)推送到collector–Collector:接收多個Agent的數(shù)據(jù),并實現(xiàn)有序、可靠、高性能的轉(zhuǎn)發(fā)–Store:存儲Collector的數(shù)據(jù)(對于流計算來說,這邊接收的數(shù)據(jù)一般直接用于計算)實時采集系統(tǒng)基本架構(gòu)流計算的階段階段二:數(shù)據(jù)實時計算?現(xiàn)在大量存在的實時數(shù)據(jù),人們需要根據(jù)當前的數(shù)據(jù)實時的作出判斷。流計算在流數(shù)據(jù)不斷變化的運動過程中實時地進行分析,捕捉到可能對用戶有用的信息,并把結(jié)果發(fā)送出去,在這種情況下:–能對流數(shù)據(jù)做出實時回應;–用戶是被動的而DBMS是主動的。數(shù)據(jù)實時計算示意圖流計算的階段階段三:實時查詢服務?經(jīng)由流計算框架得出的結(jié)果可供實時查詢、展示或存儲。流計算的應用分析系統(tǒng)?傳統(tǒng)的分析系統(tǒng)都是離線計算,即將數(shù)據(jù)全部保存下來,然后每隔一定時間進行離線分析,再將結(jié)果保存。但這樣會有一定的延時,這取決于離線計算的間隔時間和計算時長。?而通過流計算,能在秒級別內(nèi)得到實時分析結(jié)果,有利于根據(jù)實時分析結(jié)果及時做出決策、調(diào)整?;诜治鱿到y(tǒng)的應用場景?廣告系統(tǒng):如搜索引擎和購物網(wǎng)站,實時分析用戶信息,展示更佳的相關廣告。?個性化推薦:如社交網(wǎng)站,實時統(tǒng)計和分析用戶行為,精確推薦,增加用戶粘性。?…流計算的應用–量子恒道挑戰(zhàn)?實時計算處理數(shù)據(jù)3T/日?離線分布式計算處理數(shù)據(jù)超過20T/日?服務超過百萬的淘寶賣家?…問題?離線計算分析延時太大,對于需要實時分析數(shù)據(jù)的應用場景(如雙11,雙12,一年就一次,需要實時數(shù)據(jù)來幫助調(diào)整決策),如何實現(xiàn)秒級別的實時分析?流計算的應用–量子恒道SuperMario2.0流計算框架?海量數(shù)據(jù)實時計算引擎、實時流傳輸框架?基于Erlang+Zookeeper開發(fā)?低延遲、高可靠性SuperMario2.0(監(jiān)控界面)流計算的應用–量子恒道實時數(shù)據(jù)處理流程?Log數(shù)據(jù)由TimeTunnel在毫秒級別內(nèi)實時送達。?實時數(shù)據(jù)經(jīng)由SuperMario流計算框架進行處理。?HBase輸出、存儲結(jié)果實現(xiàn)效果?可處理每天TB級的實時流數(shù)據(jù)。?從用戶發(fā)起請求到數(shù)據(jù)展示,延時控制在2-3秒內(nèi)。量子恒道實時數(shù)據(jù)處理示意圖流計算的應用–實時交通信息管理?IBM的流計算平臺InfoSphereStreams能夠廣泛應用于制造、零售、交通運輸、金融證券以及監(jiān)管各行各業(yè)的解決方案之中,使得實時快速做出決策的理念得以實現(xiàn)。匯總來自不同源的實時數(shù)據(jù)InfoSphereStream界面流計算的應用–實時交通信息管理?Streams應用于斯德哥爾摩的交通信息管理,通過結(jié)合來自不同源的實時數(shù)據(jù),Streams可以生成動態(tài)的、多方位的看待交通流量的方式,為城市規(guī)劃者和乘客提供實時交通狀況查看。通過InfoSphereStreams分析實時交通信息流計算框架要求流計算框架要求?高性能–處理大數(shù)據(jù)的基本要求,如每秒處理幾十萬條數(shù)據(jù)?海量式–支持TB級數(shù)據(jù),甚至是PB級?實時性–保證較低延遲事件,達到秒級,最好是毫秒級?分布式–支持大數(shù)據(jù)的基本架構(gòu),必須能平滑擴展?易用性?可靠性?…TwitterStorm簡介?免費、開源的分布式實時計算系統(tǒng)?簡單、高效、可靠地處理大量的流數(shù)據(jù)?Storm對于實時計算的意義類似于Hadoop對于批處理的意義?基于Clojure和Java開發(fā)Storm流式計算TwitterStorm簡介Twitter數(shù)據(jù)系統(tǒng)分層處理架構(gòu)?為了處理最近的數(shù)據(jù),需要一個實時系統(tǒng)和批處理系統(tǒng)同時運行。要計算一個查詢函數(shù),需要查詢批處理視圖和實時視圖,并把它們合并起來以得到最終的數(shù)據(jù)。?Twitter中進行實時計算的系統(tǒng)就是Storm,它在數(shù)據(jù)流上進行持續(xù)計算,并且對這種流式數(shù)據(jù)處理提供了有力保障。?Twitter分層的數(shù)據(jù)處理架構(gòu)由Hadoop和ElephantDB組成批處理系統(tǒng),Storm和Cassandra組成實時系統(tǒng),實時系統(tǒng)處理的結(jié)果最終會由批處理系統(tǒng)來修正,正是這個觀點使得Storm的設計與眾不同。Twitter數(shù)據(jù)系統(tǒng)分層處理架構(gòu)Storm應用領域?流計算(Streamprocessing)?實時分析(Real-timeanalytics)?連續(xù)計算(Continuouscomputation)?分布式遠程過程調(diào)用(DistributedRPC)?在線機器學習(Onlinemachinelearning)?更多…Storm主要特點?簡單的編程模型:Storm降低了進行實時處理的復雜性。?支持各種編程語言:默認支持Clojure、Java、Ruby和Python。要增加對其他語言的支持,只需實現(xiàn)一個簡單的Storm通信協(xié)議即可。?容錯性:Storm會自動管理工作進程和節(jié)點的故障。?水平擴展:計算是在多個線程、進程和服務器之間并行進行的。?可靠的消息處理:Storm保證每個消息至少能得到一次完整處理。?快速:系統(tǒng)的設計保證了消息能得到快速的處理。?本地模式:Storm有一個“本地模式”,可以在處理過程中完全模擬Storm集群,這樣可以快速進行開發(fā)和單元測試。?容易部署:Storm集群易于部署,只需少量的安裝和配置就可運行。Storm設計思想?Storm對于流Stream的抽象:流是一個不間斷的無界的連續(xù)Tuple(元組,是元素有序列表)。Stream消息流,是一個沒有邊界的Tuple序列,這些Tuples會被以一種分布式的方式并行地創(chuàng)建和處理。Stream消息流,是一個沒有邊界的Tuple序列,這些Tuples會被以一種分布式的方式并行地創(chuàng)建和處理。Storm設計思想?Storm認為每個Stream都有一個源頭,它將這個源頭抽象為Spouts。Spouts流數(shù)據(jù)源,它會從外部讀取流數(shù)據(jù)并發(fā)出Tuple。Spouts流數(shù)據(jù)源,它會從外部讀取流數(shù)據(jù)并發(fā)出Tuple。Storm設計思想?Storm將流的中間狀態(tài)轉(zhuǎn)換抽象為Bolts,Bolts可以處理Tuples,同時它也可以發(fā)送新的流給其他Bolts使用。Bolts消息處理者,所有的消息處理邏輯被封裝在Bolts里面,處理輸入的數(shù)據(jù)流并產(chǎn)生輸出的新數(shù)據(jù)流,可執(zhí)行過濾,聚合,查詢數(shù)據(jù)庫等操作。Bolts消息處理者,所有的消息處理邏輯被封裝在Bolts里面,處理輸入的數(shù)據(jù)流并產(chǎn)生輸出的新數(shù)據(jù)流,可執(zhí)行過濾,聚合,查詢數(shù)據(jù)庫等操作。Storm設計思想?為了提高效率,在Spout源接上多個Bolts處理器。Storm將這樣的無向環(huán)圖抽象為Topology(拓撲)。Topology是Storm中最高層次的抽象概念,它可以被提交到Storm集群執(zhí)行,一個拓撲就是一個流轉(zhuǎn)換圖。圖中的邊表示Bolt訂閱了哪些流。當Spout或者Bolt發(fā)送元組到流時,它就發(fā)送元組到每個訂閱了該流的Bolt上進行處理。Storm設計思想?Topology實現(xiàn):Storm中拓撲定義僅僅是一些Thrift結(jié)構(gòu)體(Thrift是基于二進制的高性能的通訊中間件),這樣一來就可以使用其他語言來創(chuàng)建和提交拓撲。?Tuple實現(xiàn):一個Tuple就是一個值列表。列表中的每個value都有一個name,并且該value可以是基本類型,字符類型,字節(jié)數(shù)組等,也可以是其他可序列化的類型。?拓撲的每個節(jié)點都要說明它所發(fā)射出的元組字段的name,其他節(jié)點只需要訂閱該name就可以接收數(shù)據(jù)。Storm設計思想?Streamgroupings(消息分發(fā)策略):定義一個Stream應該如何分配給Bolts,解決兩個組件(Spout和Bolt)之間發(fā)送tuple元組的問題。?Task(任務):每一個Spout和Bolt會被當作很多task在整個集群里面執(zhí)行,每一個task對應到一個線程。Streamgroupings示意圖Task示意圖Storm設計思想一個Topology的完整示意圖Storm框架設計?Storm集群表面類似Hadoop集群。?在Hadoop上運行的是“MapReducejobs”,在Storm上運行的是“Topologies”。兩者大不相同,一個關鍵不同是一個MapReduce的Job最終會結(jié)束,而一個Topology永遠處理消息(或直到kill它)。?Storm集群有兩種節(jié)點:控制(Master)節(jié)點和工作者(Worker)節(jié)點。?控制節(jié)點運行一個稱之為“Nimbus”的后臺程序,負責在集群范圍內(nèi)分發(fā)代碼、為worker分配任務和故障監(jiān)測。?每個工作者節(jié)點運行一個稱之“Supervisor”的后臺程序,監(jiān)聽分配給它所在機器的工作,基于Nimbus分配給它的事情來決定啟動或停止工作者進程。Storm框架設計?一個Zookeeper集群負責Nimbus和多個Supervisor之間的所有協(xié)調(diào)工作(一個完整的拓撲可能被分為多個子拓撲并由多個supervisor完成)。Storm框架設計?Nimbus后臺程序和Supervisor后臺程序都是快速失?。╢ail-fast)和無狀態(tài)的,所有狀態(tài)維持在Zookeeper或本地磁盤。?這種設計中master并沒有直接和worker通信,而是借助中介Zookeeper,這樣一來可以分離master和worker的依賴,將狀態(tài)信息存放在zookeeper集群內(nèi)以快速回復任何失敗的一方。?這意味著你可以kill殺掉nimbus進程和supervisor進程,然后重啟,它們將恢復狀態(tài)并繼續(xù)工作,這種設計使Storm極其穩(wěn)定。Storm框架設計Storm工作流程示意圖Storm實例單詞統(tǒng)計?編程模型非常簡單,通過Topology定義整個處理邏輯。?Topology中定義了一個Spout和兩個處理消息的Bolt。Bolt通過訂閱Tuple的name值來接收數(shù)據(jù)Storm實例單詞統(tǒng)計?ShuffleGrouping是隨機分組,表示Tuple會被隨機的分發(fā)給Bolt。?FieldsGrouping是按字段分組,保證具有相同field值的Tuple會分發(fā)給同一個Task進行統(tǒng)計,保證統(tǒng)計的準確性。Storm實例SplitSentenceStorm實例WordCountStorm實例?每個從spout發(fā)送出來的消息(英文句子)都會觸發(fā)很多的task被創(chuàng)建。?Bolts將句子分解為獨立的單詞,然后發(fā)射這些單詞。?最后,實時的輸出每個單詞以及它出現(xiàn)過的次數(shù)。一個句子經(jīng)單詞統(tǒng)計后的統(tǒng)計結(jié)果示意圖Storm實例Storm應用使用Storm的公司和項目Storm應用?淘寶、阿里巴巴將流計算廣泛應用于業(yè)務監(jiān)控、廣告推薦、買家實時數(shù)據(jù)分析等場景。淘寶數(shù)據(jù)部新架構(gòu)流計算框架匯總?IBMInfoSphereStreams:商業(yè)級高級計算平臺,幫助用戶開發(fā)的應用程序快速攝取、分析和關聯(lián)來自數(shù)千個實時源的信息。/software/products/cn/zh/infosphere-streams/?IBMStreamBase:IBM開發(fā)的另一款商業(yè)流計算系統(tǒng),在金融部門和政府部門使用。?TwitterStorm:免費、開源的分布式實時計算系統(tǒng),可簡單、高效、可靠地處理大量的流數(shù)據(jù)/?Yahoo!S4(SimpleScalableStreamingSystem):開源流計算平臺,是通用的、分布式的、可擴展的、分區(qū)容錯的、可插拔的流式系統(tǒng)。/s4/?FacebookPuma:Facebook使用Puma和Hbase相結(jié)合來處理實時數(shù)據(jù)。?DStream:百度正在開發(fā)的屬于百度的通用實時數(shù)據(jù)流計算系統(tǒng)。?銀河流數(shù)據(jù)處理平臺:淘寶開發(fā)的通用流數(shù)據(jù)實時計算系統(tǒng)。?SuperMario:基于erlang語言和zookeeper模塊開發(fā)的高性能數(shù)據(jù)流處理框架。?Hstream、Esper、SQLstream等…網(wǎng)上資料?關于流計算的文章–對互聯(lián)網(wǎng)海量數(shù)據(jù)實時計算的理解/panfeng412/archive/2011/10/28/2227195.html–BeyondMapReduce:談2011年風靡的數(shù)據(jù)流計算系統(tǒng)/9642/?TwitterStorm–/(Storm官方網(wǎng)站)–/nathanmarz/storm(Storm的GitHub主頁,有完善的Wiki)–(徐明明,GitHub上Storm的核心貢獻者,博客中提供了不少關于Storm的文章)–(量子恒道提供的Storm入門教程)圖計算圖計算簡介GooglePregel圖計算模型Pregel的C++APIPregel模型的基本體系結(jié)構(gòu)Pregel模型的應用實例改進的圖計算模型參考資料圖計算中的問題大型圖(像社交網(wǎng)絡和網(wǎng)絡圖等)常常作為現(xiàn)在系統(tǒng)計算需要的一部分?,F(xiàn)在存在許多圖計算問題像最短路徑、集群、網(wǎng)頁排名、最小切割、連通分支等等,但還沒有一個可擴展的通用系統(tǒng)來解決這些問題。解決這些問題的算法的特點:它們常常表現(xiàn)為比較差的內(nèi)存訪問局部性、針對單個頂點的處理工作過少、以及計算過程中伴隨著的并行度的改變等問題??赡艿慕鉀Q方法:為特定的圖應用定制相應的分布式實現(xiàn)基于現(xiàn)有的分布式計算平臺使用單機的圖算法庫——如BGL,LEAD,NetworkX,JDSL,Standford,GraphBase,F(xiàn)GL等使用已有的并行圖計算系統(tǒng)——如ParallelBGL,CGMgraph等圖計算的兩種軟件目前通用的圖處理軟件主要包括兩種。一種主要基于遍歷算法、實時的圖數(shù)據(jù)庫,如Neo4j,OrientDB,DEX,和InfiniteGraph.另一種則是以圖頂點為中心的消息傳遞批處理的并行引擎,如Hama,GoldenOrb,Giraph,和Pregel.第一種基本都基于tinkerpop的圖基礎框架,tinkerpop項目關系如圖1所示:BSP模型以圖頂點為中心的消息傳遞批處理的并行引擎主要是基于BSP(BulkSynchronousParallel)模型所實現(xiàn)的并行圖處理包。BSP是由哈佛大學Viliant和牛津大學BillMcColl提出的并行計算模型。一個BSP模型由大量相互關聯(lián)的處理器(processor)所組成,它們之間形成了一個通信網(wǎng)絡。每個處理器都有快速的本地內(nèi)存和不同的計算線程。一次BSP計算過程由一系列全局超步組成,超步就是計算中一次迭代。每個超步主要包括三個組件:并發(fā)計算(Concurrentcomputation):每個參與的處理器都有自身的計算任務,它們只讀取存儲在本地內(nèi)存的值。這些計算都是異步并且獨立的。通訊(Communication):處理器群相互交換數(shù)據(jù),交換的形式:由一方發(fā)起推送(put)和獲取(get)操作。柵欄同步(Barriersynchronisation):當一個處理器遇到路障,會等到其他所有處理器完成它們的計算步驟。每一次同步也是一個超步的完成和下一個超步的開始。SuperstepPregel圖計算框架Pregel是由Google開發(fā)的一個用于分布式圖計算的計算框架,主要用于圖遍歷(BFS)、最短路徑(SSSP)、PageRank計算等等。共享內(nèi)存的運行庫有很多,但是對于Google來說,一臺機器早已經(jīng)放不下需要計算的數(shù)據(jù)了,所以需要分布式的這樣一個計算環(huán)境。沒有Pregel之前,可以選擇用MapReduce來做,但是效率很低。下面簡單介紹一下PageRank算法在Pregel和MapReduce中的實現(xiàn)。PageRank算法作為Google的網(wǎng)頁鏈接排名算法,具體公式如下:

對任意一個鏈接,其PR值為鏈入到該鏈接的源鏈接的PR值對該鏈接的貢獻和(分母Ni為第i個源鏈接的鏈出度)。Pregel的計算模型主要來源于BSP并行計算模型的啟發(fā)。要用Pregel計算模型實現(xiàn)PageRank算法,也就是將網(wǎng)頁排名算法映射到圖計算中,這其實是很自然的,網(wǎng)絡鏈接是一個連通圖。PageRank在Pregel中的實現(xiàn)上圖就是四個網(wǎng)頁(A,B,C,D)互相鏈入鏈出組成的聯(lián)通圖。根據(jù)Pregel的計算模型,將計算定義到頂點(vertex)即A,B,C,D上來,對應一個對象,即一個計算單元。每一個計算單元包含三個成員變量:?Vertexvalue:頂點對應的PR值?Outedge:只需要表示一條邊,可以不取值?Message:傳遞的消息,因為需要將本vertex對其它vertex的PR貢獻傳遞給目標vertex每一個計算單元包含一個成員函數(shù):?Compute:該函數(shù)定義了vertex上的運算,包括該vertex的PR值計算,以及從該vertex發(fā)送消息到其鏈出vertexPageRank在Pregel中的實現(xiàn)classPageRankVertex

:publicVertex<double,void,double>{public:

virtualvoidCompute(MessageIterator*msgs){

if(superstep()>=1){

doublesum=0;

for(;!msgs->Done();

msgs->Next())sum+=msgs->Value();

*MutableValue()=0.15/NumVertices()+0.85*sum;

}

if(superstep()<30){

constint64n=GetOutEdgeIterator().size();

SendMessageToAllNeighbors(GetValue()/n);

}else{

VoteToHalt();

}

}};PageRank在Pregel中的實現(xiàn)Pregel的執(zhí)行包含PageRankVertex類,它繼承了Vertex類。該類頂點值的類型是double,用來存儲暫定的PageRank,消息類型也是double,用來傳遞PageRank的部分。圖在第0個超步中被初始化,所以它的每個頂點值為1.0。在每個超步中,每個頂點都會沿著它的出射邊發(fā)送它的PageRank值除以出射邊數(shù)后的結(jié)果值。從第1個超步開始,每個頂點會將到達的消息中的值加到sum值中,同時將它的PageRank值設為0.15/NumVertices()+0.85*sum。為了收斂,可以設置一個超步數(shù)量的限制或用aggregators來檢查是否滿足收斂條件PageRank在MapReduce中的實現(xiàn)階段1:解析網(wǎng)頁Maptask把(URL,pagecontent)對映射為(URL,(PRinit,list-of-urls))PRinit是URL的“seed”PageRank。list-of-urls包含通過URL指向的所有頁。Reducetask只是恒等函數(shù)。階段2:PageRank分配Maptask得到(URL,(cur_rank,url_list))對于每一個url_list中的u,輸出(u,cur_rank/|url_list|)。輸出(URL,url_list)通過迭代器來獲取列表的指向。Reducetask獲得(URL,url_list)和很多(URL,var)值對匯總vals,乘上d(0.85)。輸出(URL,(new_rank,url_list))。最后階段:一個非并行組件決定是否達到收斂。如果達到收斂,寫出PageRank生成的列表。否則,回退到第2階段的輸出,進行另一個第2階段的迭代。MapReduce也是Google提出的一種計算模型,它是為全量計算而設計。它實現(xiàn)MapReduce需要以下三個階段:PageRank在MapReduce中的實現(xiàn)?下面是第二階段,把網(wǎng)頁鏈接映射到key-value對的偽代碼:Mapper函數(shù)的偽碼:input<PageN,RankN>->PageA,PageB,PageC...//鏈接關系begin

Nn:=thenumberofoutlinksforPageN;

foreachoutlinkPageK

outputPageK-><PageN,RankN/Nn>//同時輸出鏈接關系,用于迭代

outputPageN->PageA,PageB,PageC...EndMapper的輸出如下(已經(jīng)排序,所以PageK的數(shù)據(jù)排在一起,最后一列則是鏈接關系對):PageK-><PageN1,RankN1/Nn1>PageK-><PageN2,RankN2/Nn2>...PageK-><PageAk,PageBk,PageCk>Reduce函數(shù)的偽碼:inputmapper‘s

outputbegin

RankK:=0;

foreachinlinkPageNi

RankK+=RankNi/Nni*beta

//outputthePageKanditsnewRankforthenextiteration

output<PageK,RankK>-><PageAk,PageBk,PageCk...>endPageRank在兩種模型中實現(xiàn)的總結(jié)總結(jié):簡單地來講,Pregel將PageRank處理對象看成是連通圖,而MapReduce則將其看成是Key-Value對。Pregel將計算細化到頂點vertex,同時在vertex內(nèi)控制循環(huán)迭代次數(shù),而MapReduce則將計算批量化處理,按任務進行循環(huán)迭代控制。PageRank算法如果用MapReduce實現(xiàn),需要一系列的MapReduce的調(diào)用。從一個階段到下一個階段,它需要傳遞整個圖的狀態(tài),這樣就需要許多的通信和隨之而來的序列化和反序列化的開銷。另外,這一連串的MapReduce作業(yè)各執(zhí)行階段需要的協(xié)同工作也增加了編程復雜度,而Pregel使用超步簡化了這個過程。Pregel的計算一個典型的Pregel計算過程如下:讀取輸入初始化該圖,當圖被初始化好后,運行一系列的超步直到整個計算結(jié)束,這些超步之間通過一些全局的同步點分隔開,輸出結(jié)果結(jié)束計算。在每一個超步中,計算框架都會針對每個頂點調(diào)用用戶自定義的函數(shù),這個過程是并行的。該函數(shù)描述的是一個頂點V在一個超步S中需要執(zhí)行的操作。函數(shù)可以:讀取前一個超步(S-1)中發(fā)送給V的消息發(fā)送消息給其他頂點,這些消息將會在下一個超步(S+1)中被接收修改頂點V及其出射邊的狀態(tài)發(fā)生拓撲變化整個Pregel程序的輸出是所有頂點輸出的集合。頂點:每一個頂點都有一個相應的由String描述的頂點標識符。每一個頂點都有一個與之對應的可修改的用戶自定義值。邊:每一條有向邊都和其源頂點關聯(lián),還記錄了其目標頂點的標識符。每一條有向邊擁有一個可修改的用戶自定義值。Pregel計算模型的進程在第0個超步,所有頂點都處于active狀態(tài)只有active頂點參與對應超步中的計算頂點通過將其自身的status設置成“halt”來進入inactive狀態(tài)inactive頂點收到其它頂點傳送的消息被喚醒進入active狀態(tài)整個計算在所有頂點都達到“inactive”狀態(tài),并且沒有message在傳送的時候宣告結(jié)束。Pregel的消息傳遞模型計算模型是一種純消息傳遞模型,忽略遠程數(shù)據(jù)讀取和其他共享內(nèi)存的方式,有兩個原因。第一,消息傳遞模型足夠表達所有圖算法。第二,出于性能的考慮。在一個集群環(huán)境中,從遠程機器上讀取一個值是會有很高的延遲的。而我們的消息傳遞模式通過異步的方式傳輸批量消息,可以減少遠程讀取的延遲。Pregel的一個消息傳遞的例子?通過一個簡單的例子來說明這些基本概念:給定一個強連通圖,圖中每個頂點都包含一個值,它會將最大值傳播到每個頂點。在每個超步中,頂點會從接收到的消息中選出一個最大值,并將這個值傳送給其所有的相鄰頂點。當某個超步中已經(jīng)沒有頂點更新其值,那么算法就宣告結(jié)束。PregelC++API編寫一個Pregel程序需要繼承Pregel中已預定義好的一個基類——Vertex類template<typenameVertexValue,typenameEdgeValue,typenameMessageValue>

classVertex{

public:virtualvoidCompute(MessageIterator*msgs)=0;

conststring&vertex_id()const;

int64superstep()const;

constVertexValue&GetValue();

VertexValue*MutableValue();

OutEdgeIteratorGetOutEdgeIterator();

voidSendMessageTo(conststring&dest_vertex,constMessageValue&message);

voidVoteToHalt();};用戶覆寫Vertex類的虛函數(shù)Compute(),該函數(shù)會在每一個超步中對每一個頂點進行調(diào)用。Compute()方法可以通過調(diào)用GetValue()方法來得到當前頂點的值,或者通過調(diào)用MutableValue()方法來修改當前頂點的值。還可以通過由出射邊的迭代器提供的方法來查看修改出射邊對應的值。消息傳遞機制頂點之間的通信是直接通過發(fā)送消息,每條消息都包含了消息值和目標頂點的名稱。消息值的數(shù)據(jù)類型是由用戶通過Vertex類的模版參數(shù)來指定。在一個超步中,一個頂點可以發(fā)送任意多的消息。在該迭代器中并不保證消息的順序,但是可以保證消息一定會被傳送并且不會重復。消息可以傳給任意標識符已知的頂點combiner發(fā)送消息時,尤其是當目標頂點在另外一臺機器時,會產(chǎn)生一些開銷。某些情況可以用combiner降低這種開銷。比方說,假如Compute()收到許多的int值消息,而它僅僅關心的是這些值的和,而不是每一個int的值,這種情況下,系統(tǒng)可以將發(fā)往同一個頂點的多個消息combine成一個消息,該消息中僅包含它們的和值,這樣就可以減少傳輸和緩存的開銷。Combiners在默認情況下并沒有被開啟,而用戶如果想要開啟Combiner的功能,可以通過重載Combine()方法實現(xiàn)??蚣懿⒉粫_保哪些消息會被Combine而哪些不會,也不會確保傳送給Combine()的值和Combining操作的執(zhí)行順序。所以Combiner只應該對那些滿足交換律和結(jié)合律的操作打開。combiner例子:假設我們想統(tǒng)計在一組相關聯(lián)的頁面中所有頁面的鏈接數(shù)。?在第一個迭代中,對從每一個頂點(頁面)的鏈接,我們會向目標頁面發(fā)送一個消息。?這里輸入消息隊列上的count函數(shù)可以通過一個combiner來優(yōu)化性能。在這個求最大值的例子中,一個Maxcombiner可以減少通信負荷。aggregatorPregel的aggregators是一種提供全局通信,監(jiān)控和數(shù)據(jù)查看的機制。在一個超步S中,每一個頂點都可以向一個aggregator提供一個數(shù)據(jù),系統(tǒng)會使用一種reduce操作來負責聚合這些值,而產(chǎn)生的值將會對所有的頂點在超步S+1中可見。Aggregators可以用來做統(tǒng)計和全局協(xié)同。Aggregators可以通過把Aggregator類子類化來實現(xiàn)。應該滿足交換律和結(jié)合律默認情況下,一個aggregator僅僅會對來自同一個超步的輸入進行聚合。?例子:Sum運算符應用于每個頂點的出射邊數(shù)可以用來生成圖中邊的總數(shù)并使它能與所有的頂點相通信。更復雜的Reduce運算符甚至可以產(chǎn)生直方圖。在求最大值得例子中,我們我們可以通過運用一個Maxaggregator在一個超步中完成整個程序。topologymutationCompute()算法也可以用來修改圖的拓撲結(jié)構(gòu)。在請求發(fā)出后在該超步中發(fā)生拓撲變化。拓撲變化的順序:刪除操作在添加操作之前刪除邊操作在刪除頂點操作之前添加頂點操作在添加邊操作之前這種局部有序性解決了很多沖突,其余的沖突由用戶自定義的handlers解決。同一種handler機制將被用于解決由于多個頂點刪除請求或多個邊增加請求或刪除請求而造成的沖突。Pregel的協(xié)同機制是惰性的,全局的拓撲改變在被apply之前不需要進行協(xié)調(diào)這種設計的選擇是為了優(yōu)化流式處理。直觀來講就是對頂點V的修改引發(fā)的沖突由V自己來處理。Pregel同樣也支持純local的拓撲改變,Local的拓撲改變不會引發(fā)沖突,并且頂點或邊的本地增減能夠立即生效,很大程度上簡化了分布式的編程。InputandOutput可以采用多種格式進行圖的保存,比如可以用text文件,關系數(shù)據(jù)庫,或者Bigtable中的行。類似的,結(jié)果可以以任何一種格式輸出并根據(jù)應用程序選擇最適合的存儲方式。用戶可以通過繼承Reader和Writer類來定義他們自己的讀寫方式。ImplementationPregel是為Google的集群架構(gòu)而設計的。應用程序通常通過一個集群管理系統(tǒng)執(zhí)行,該管理系統(tǒng)會通過調(diào)度作業(yè)來優(yōu)化集群資源的使用率,有時候會殺掉一些任務或?qū)⑷蝿者w移到其他機器上去。持久化的數(shù)據(jù)被存儲在GFS或Bigtable中,而臨時文件比如緩存的消息則存儲在本地磁盤中。Pregellibrary將一張圖劃分成許多的partitions,每一個partition包含了一些頂點和以這些頂點為起點的邊。將一個頂點分配到某個partition上去取決于該頂點的ID。默認的partition函數(shù)為hash(ID)modN,N為所有partition總數(shù)。接下來描述一個Pregel程序執(zhí)行的幾個階段。執(zhí)行過程1.用戶程序的多個copy開始在集群中的機器上執(zhí)行。其中一個copy充當masterMaster不被分配圖的任意部分,它負責協(xié)調(diào)worker的活動2.master將圖進行分區(qū),然后將一個或多個partition分給worker;每一個worker會在內(nèi)存中維護分配到其之上的graphpartition的狀態(tài)。執(zhí)行它的頂點上的用戶定義的Compute()方法并管理來自或發(fā)給其他頂點的消息。執(zhí)行過程執(zhí)行過程3.Master為每個worker分配用戶輸入的一部分。輸入被看做一系列的記錄,每個記錄包含任意數(shù)量的頂點和邊。在輸入完成加載后,所有的頂點被標記為active。4.在一個超步中,master通知每一個worker去執(zhí)行,只要存在active頂點worker一直執(zhí)行,并為每一個active狀態(tài)的頂點調(diào)用compute()方法。它也會傳送以前的超步發(fā)送的消息。當worker完成后,它會向master作出響應,告訴master在下一個超步中active頂點的數(shù)量。5.計算結(jié)束后,master會通知所有的worker保存它那部分的計算結(jié)果。執(zhí)行過程執(zhí)行過程容錯性容錯是通過checkpointing來實現(xiàn)的。在每個超步的開始階段,master命令worker讓它保存它上面的partitions的狀態(tài)到持久存儲設備,包括頂點值,邊值,以及接收到的消息。Master通過ping消息檢測worker是否故障當一個或多個worker出現(xiàn)故障時,和它們關聯(lián)的分區(qū)的當前狀態(tài)就會丟失。Master重新分配圖的partition到當前可用的worker集合上。所有的partition會從最近的某超步S開始時寫出的checkpoint中重新加載狀態(tài)信息。該超步可能比在出故障的worker上最后運行的超步S’早好幾個階段整個系統(tǒng)從該超步重新開始Confinedrecovery可以改進恢復執(zhí)行的開銷和延遲。除了基本的checkpoint,worker同時還會將其在加載圖的過程中和超步中發(fā)送出去的消息寫入日志。這樣恢復就會被限制在丟掉的那些partitions上。Worker?一個worker機器會在內(nèi)存中維護分配到其之上的graphpartition的狀態(tài)。?當Compute()請求發(fā)送一個消息到其他頂點時,worker首先確認目標頂點是屬于遠程的worker機器,還是當前worker。如果是在遠程的worker機器上,那么消息就會被緩存,當緩存大小達到一個閾值,最大的那些緩存數(shù)據(jù)將會被異步地flush出去,作為單獨的一個網(wǎng)絡消息傳輸?shù)侥繕藈orker。如果是在當前worker,那么就可以做相應的優(yōu)化:消息就會直接被放到目標頂點的輸入消息隊列中。?如果用戶提供了Combiner,那么在消息被加入到輸出隊列或者到達輸入隊列時,會執(zhí)行combiner函數(shù)。后一種情況并不會節(jié)省網(wǎng)絡開銷,但是會節(jié)省用于消息存儲的空間。Master?Master主要負責的worker之間的工作協(xié)調(diào),每一個worker在其注冊到master的時候會被分配一個唯一的ID。Master內(nèi)部維護著一個當前活動的worker列表,master中保存這些信息的數(shù)據(jù)結(jié)構(gòu)大小與partitions的個數(shù)相關,與圖中的頂點和邊的數(shù)目無關。?絕大部分的master的工作,包括輸入,輸出,計算,保存以及從checkpoint中恢復,都將會在一個叫做barriers的地方終止:?Master同時還保存著整個計算過程以及整個graph的狀態(tài)的統(tǒng)計數(shù)據(jù)。為方便用戶監(jiān)控,Master在內(nèi)部運行了一個HTTP服務器來顯示這些信息。Aggregators?每個Aggregator會通過對一組value值集合應用aggregation函數(shù)計算出一個全局值。每一個worker都保存了一個aggregators的實例集,由typename和實例名稱來標識。當一個worker對graph的某一個partition執(zhí)行一個超級步時,worker會combine所有的提供給本地的那個aggregator實例的值到一個localvalue:即利用一個aggregator對當前partition中包含的所有頂點值進行局部規(guī)約。在超級步結(jié)束時,所有workers會將所有包含局部規(guī)約值的aggregators的值進行最后的匯總,并匯報給master。這個過程是由所有worker構(gòu)造出一棵規(guī)約樹而不是順序的通過流水線的方式來規(guī)約,這樣做的原因是為了并行化規(guī)約時cpu的使用。在下一個超級步開始時,master就會將aggregators的全局值發(fā)送給每一個worker。應用實例最短路徑二分匹配最短路徑最短路徑問題是圖論中最有名的問題之一了,同時具有廣泛的應用,該問題有幾個形式:單源最短路徑、s-t最短路徑、全局最短路徑。單源最短路徑在算法中我們假設每個頂點的關聯(lián)值被初始化為INF。在每個超步中,每個頂點會首先接收到來自鄰居傳送過來的消息,該消息包含更新過的從源頂點到該頂點的潛在的最短距離。如果這些更新里的最小值小于該頂點當前關聯(lián)值,那么頂點就會更新這個值,并發(fā)送消息給它的鄰居。在第一個超步中,只有源頂點會更新它的關聯(lián)值,然后發(fā)送消息給它的直接鄰居。然后這些鄰居會更新它們的關聯(lián)值,然后繼續(xù)發(fā)送消息給它們的鄰居,如此循環(huán)往復。當沒有更新再發(fā)生的時候,算法就結(jié)束,之后所有頂點的關聯(lián)值就是從源頂點到它的最短距離,若值為INF表示該頂點不可達。如果所有的邊權(quán)重都是非負的,就可以保證該過程肯定會結(jié)束。二分匹配?在循環(huán)的階段0,左邊集合中那些還未被匹配的頂點會發(fā)送消息給它的每個鄰居請求匹配,然后會無條件的VoteToHalt。如果它沒有發(fā)送消息,或者是所有的消息接收者都已經(jīng)被匹配,該頂點就不會再變?yōu)閍ctive狀態(tài)。?在循環(huán)的階段1,

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論