




版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
致,這部分代碼讀起來晦澀難懂,改動(dòng)起來也重重,因?yàn)槟愀静恢?,變?dòng)了這個(gè)程的數(shù)據(jù),會(huì)不會(huì)影響到其他線程。同時(shí),開發(fā)人員在修復(fù)ControllerBug時(shí),也非鑒于這個(gè)原因,自版本開始,社區(qū)陸續(xù)對(duì)Controller代碼結(jié)構(gòu)進(jìn)行了改造。其中這里的單線程,并非是指Controller只有一個(gè)線程了,而是指對(duì)局部狀態(tài)的限制在一個(gè)專屬線程上,即讓這個(gè)特定線程排他性地操作Controller元數(shù)據(jù)信息。這樣一來,整個(gè)組件代碼就不必?fù)?dān)心多線程的各種線程安全問題了,源碼也可以拋棄各種不必要的鎖機(jī)制,最終大大簡化了Conroller端的代碼結(jié)構(gòu)。這部分源碼非常重要,它能夠幫助你掌握Controller端處理各類的原理,這將極大地提升你在實(shí)際場景中處理Controller各類問題的能力。因此,我建議你多讀幾遍,徹底了解Controller是怎么處理各種的。接下來,我們先宏觀領(lǐng)略一下Controller單線程隊(duì)列處理模型及其基礎(chǔ)組件從圖中可見,Controller端有多個(gè)線程向隊(duì)列寫入不同種類的,比如ZooKeeper端的Watcher線程、KafkaRequestHandler線程、Kafka定時(shí)任務(wù)程,等等。而在隊(duì)列的另一端,只有一個(gè)名為ControllerEventThread的線程專門負(fù)參與實(shí)現(xiàn)這個(gè)模型的源碼類有4個(gè)。ControllerEventProcessor:Controller端的處理器接口。ControllerEvent:Controller,也就是隊(duì)列中被處理的對(duì)象。ControllerEventManager:處理器,用于創(chuàng)建和管理ControllerEventThread。ControllerEventThread:專屬的處理線程,唯一的作用是處理不同種類ControllEvent。這個(gè)類是ControllerEventManager類內(nèi)部定義的線程類今天,我們的重要目標(biāo)就是要搞懂這4個(gè)類。就像我前面說的,它們完整地構(gòu)建出了單線程隊(duì)列模型。下面一個(gè)一個(gè)地學(xué)習(xí)它們的源碼,你要重點(diǎn)掌握隊(duì)列的實(shí)現(xiàn)以這個(gè)接口位于controller下的ControllerEventManager.scala件中。它定義了一個(gè)支持普通處理和搶占處理Controller的接口,代碼如下所示:traitControllerEventProcessordefprocess(event:ControllerEvent):defpreempt(event:ControllerEvent):4該接口定義了兩個(gè)方法,分別是process和cess:接收一個(gè)Controller,并進(jìn)行處理。preempt:接收一個(gè)Controller,并搶占隊(duì)列之前的進(jìn)行優(yōu)先處理目前,在Kafka源碼中,KafkaController類是Controller組件的功能實(shí)現(xiàn)類,它也ControllerEventProcessor接口的唯一實(shí)現(xiàn)類對(duì)于這個(gè)接口,你要重點(diǎn)掌握process方法的作用,因?yàn)樗菍?shí)現(xiàn)Controller處理的主力方法。你要了解process方法處理各類Controller的代碼結(jié)構(gòu)是什么樣的,而且至于preempt方法,你僅需要了解,Kafka使用它實(shí)現(xiàn)某些高優(yōu)先級(jí)的搶占處理即可,畢竟,目前在源碼中只有兩類(ShutdownEventThread和Expire)需要搶占式這就是前面說到的Controller,在源碼中對(duì)應(yīng)的就是ControllerEvent接口。該接口定義在KafkaController.scala文件中,本質(zhì)上是一個(gè)trait類型,如下所示:sealedtraitControllerEventdefstate:3每個(gè)ControllerEvent都定義了一個(gè)狀態(tài)。Controller在處理具體的時(shí),會(huì)對(duì)狀態(tài)進(jìn)行相應(yīng)的變更。這個(gè)狀態(tài)是由源碼文件ControllerState.scala中的抽象類ControllerState123456classControllerStatedefvalue:defame:Option[String]if(hasRateAndTimeMetric)Some(s"${toString}RateAndTimeMs")elseprotecteddefhasRateAndTimeMetric:Boolean=}每類ControllerState都定義一個(gè)value值,表示Controller狀態(tài)的序號(hào),從0開始。另外 ame方法是用于構(gòu)造Controller狀態(tài)速率的指標(biāo)名稱的比如,TopicChange是一類ControllerState,用于表示總數(shù)發(fā)生了變化。為了這類狀態(tài)變更速率,代碼中的r ame方定義一個(gè)名為TopicChangeRateAndTimeMs指標(biāo)。當(dāng)然,并非所有的ControllerState有對(duì)應(yīng)的速率指標(biāo),比如,表示空閑狀態(tài)的Idle就沒有對(duì)應(yīng)的指標(biāo)。目前,Controller總共定義了25類和17種狀態(tài),它們的對(duì)應(yīng)關(guān)系如下表所示內(nèi)容看著好像有很多,那我們應(yīng)該怎樣使用這張表格Controller狀態(tài)變更速率異常的時(shí)候,你可以通過這張表格,快速確定可能造成瓶頸的Controller,并定位處理該的函數(shù)代碼,輔助你進(jìn)一步地調(diào)試問題。另外,你要了解的是,多個(gè)ControllerEvent可能歸屬于相同的ControllerState比如,TopicChange和PartitionModifications都屬于TopicChange狀態(tài),畢竟,它們都與Topic的變更有關(guān)。前者是創(chuàng)建Topic,后者是修改Topic的屬性,比如,分區(qū)再比如,BrokerChange和BrokerModifications都屬于BrokerChange狀態(tài),表征的都是對(duì)Broker屬性的修改。有了這些鋪墊,我們就可以開始學(xué)習(xí)處理器的實(shí)現(xiàn)代碼了在Kafka中,Controller處理器代碼位于controller包下ControllerEventManager.scala文件下。我用一張圖來展示下這個(gè)文件的結(jié)構(gòu)如圖所示,該文件主要由4個(gè)部分組成 ControllerEventProcessor:前面講過的處理器接口,目前只有KafkaController實(shí)現(xiàn)了這個(gè)接QueuedEvent:表征隊(duì)列上的對(duì)象ControllerEventManagerClass:ControllerEventManager伴生類,主要用于創(chuàng)ControllerEventThread線程類,還有一些其他值得我們學(xué)習(xí)的重要方法,一會(huì)兒我們ControllerEventManager對(duì)象僅僅定義了3個(gè)公共變量,沒有任何邏輯,你簡單看下就行。至于ControllerEventProcessor我們剛剛已經(jīng)學(xué)習(xí)過了。接下來,我們重點(diǎn)學(xué)我們先來看QueuedEvent的定義,全部代碼如下1234//每個(gè)QueuedEvent定義了兩個(gè)//event:ControllerEvent類,表示//enqueueTimeMs:表示被放入classQueuedEvent(valevent:556789valenqueueTimeMs:Long)//標(biāo) valprocessingStarted=new//valspent=new//defprocess(processor:ControllerEventProcessor):Unit{if(spent.getAndSet(true))}//defpreempt(processor:ControllerEventProcessor):Unit{if(spent.getAndSet(true))}//defawaitProcessing():Unit{}overridedeftoString:String{可以看到,每個(gè)QueuedEvent對(duì)象實(shí)例都裹挾了一個(gè)ControllerEvent。另外,每個(gè)QueuedEvent定義了process、preemptawaitProcessing法,分別表示處理事其中,process方法和preempt方法的實(shí)現(xiàn)原理,就是調(diào)用給ControllerEventProcessor口的processpreempt法,非常簡單在QueuedEvent對(duì)象中,我們?cè)僖淮慰吹搅薈ountDownLatch的身影,我在第7節(jié)里提到過它。Kafka源碼非常喜歡用CountDownLatch來做各種條件控制,比如用于偵在這里,QueuedEvent使用它的唯一目的,是確保Expire在建立ZooKeeper會(huì)話如果不是在這個(gè)場景下,那么,代碼就用spent來標(biāo)識(shí)該是否已經(jīng)被處理過了,如已經(jīng)被處理過了,再次調(diào)用process方法時(shí)就會(huì)直接返回,什么都不做了解了QueuedEvent,我們來看下消費(fèi)它們的ControllerEventThread。代代classControllerEventThread(name:String)extendsShutdownableThread(name=logIdent=s"[ControllerEventThreadcontrollerId=$controllerId]4這個(gè)類就是一個(gè)普通的線程類,繼承了ShutdownableThread基類,而后者是Kafka為很多線程類定義的公共父類。該父類是JavaThread類的子類,其線程邏輯方法run的主defdoWork():overridedefrun():Unit=trywhile}catch 11可見,這個(gè)父類會(huì)循環(huán)地執(zhí)行doWork方法的邏輯,而該方法的具體實(shí)現(xiàn)則交由子類來完作為Controller唯一的處理線程,我們要時(shí)刻關(guān)注這個(gè)線程的運(yùn)行狀態(tài)。因此,我們必須要知道這個(gè)線程在JVM上的名字,這樣后續(xù)我們就能有針對(duì)性地對(duì)其展開。這個(gè)線程的名字是由ControllerEventManagerObject中ControllerEventThreadName變量34objectControllerEventManagervalControllerEventThreadName="controller-event-現(xiàn)在我們看看ControllerEventThread類的doWork是如何實(shí)現(xiàn)的。代碼如 //如果是關(guān)閉線程,什么都不用做。關(guān)閉線程由外部來執(zhí)6caseShutdownEventThread7casecontrollerEvent8_state=9//更新對(duì)應(yīng)在隊(duì)列中保存的時(shí)eventQueueTimeHist.update(liseconds()-trydefprocess():Unit=//處理,同時(shí)計(jì)算處理速rateAndTimeMetrics.get(state)matchcaseSome(timer)=>timer.time{process()caseNone=>}}catchcasee:Throwable=>error(s"Uncaughterrorprocessingevent}_state=}23我用一張圖來展示下具體的執(zhí)行流大體上看,執(zhí)行邏輯很簡單首先是調(diào)用LineBlocingQee的take方法,去獲取待處理的QeeEet對(duì)象實(shí)例。注意,這里用的是tae方法,這說明,如果隊(duì)列中沒有QeeEent,那么,ConrollerEenThread線程將一直處于阻塞狀態(tài),直到隊(duì)列上插入了新的待處理事件。一旦拿到QueuedEvent后,線程會(huì)判斷是否是ShutdownEventThread。當(dāng)ControllerEventManager關(guān)閉時(shí),會(huì)顯式地向隊(duì)列中塞入ShutdownEventThread,表明要關(guān)閉ControllerEventThread線程。如果是該,那么ControllerEventThread什么都不用做,畢竟要關(guān)閉這個(gè)線程了。相反地,如果是其他的,就調(diào)QueuedEvent的process方法執(zhí)行對(duì)應(yīng)的處理邏輯,同時(shí)計(jì)算被處理的速率該process方法底層調(diào)用的是ControllerEventProcessor的process方法,如下所defprocess(processor:ControllerEventProcessor):Unit=//若已經(jīng)被處理過,直接if//調(diào)用ControllerEventProcessor的process方法處8方法首先會(huì)判斷該是否已經(jīng)被處理過,如果是,就直接返回;如果不是,就調(diào)ControllerEventProcessor的process方法處理你可能很關(guān)心,每個(gè)ControllerEventProcessor的process方法是在哪里實(shí)現(xiàn)的?實(shí)際上,它們都封裝在KafkaController.scala文件中。還記得我之前,KafkaController類是目前源碼中ControllerEventProcessor接口的唯一實(shí)現(xiàn)類嗎?實(shí)際上,就是KafkaController實(shí)現(xiàn)了ControllerEventProcessorprocess法。由overridedefprocess(event:ControllerEvent):Unit=try//依次匹配eventmatchcaseevent:MockEventcaseShutdownEventThreaderror("ReceivedaShutdownEventThreadevent.ThistypeofeventiscaseAutoPreferredReplicaLeaderElection }catch//如果Controller換成了別的casee:ControllerMovedExceptioninfo(s"Controllermovedtoanotherbrokerwhenprocessing$event.",//執(zhí)行Controller卸任casee:Throwableerror(s"Errorprocessingevent$event",}finally 24這個(gè)process方法接收一個(gè)ControllerEvent實(shí)例,接著會(huì)判斷它是哪類Controller事件,并調(diào)用相應(yīng)的處理方法。比如,如果是AutoPreferredReplicaLeaderElection,則調(diào)用processAutoPreferredReplicaLeaderElection方法;如果是其他類型的,則調(diào)用process***方法。其他方除了QueuedEventControllerEventThread外,put法clearAndPut很重要。如果說ControllerEventThread是隊(duì)列的,那么,這兩個(gè)方法就是向列生產(chǎn)元素的在這兩個(gè)方法中,put是把指定ControllerEvent插入到隊(duì)列,而clearAndPut則是下面這兩段源碼分別對(duì)應(yīng)于這兩個(gè)方123456789//putdefput(event:ControllerEvent):QueuedEvent=inLock(putLock)//構(gòu)建QueuedEventvalqueuedEvent=newQueuedEvent(event,////返回新建QueuedEvent實(shí)}//clearAndPutdefclearAndPut(event:ControllerEvent):QueuedEvent=inLock(putLock)//優(yōu)先處理搶占//隊(duì)//調(diào)用上面的put}你注意到,源碼中的put方法使用putLock對(duì)代碼進(jìn)行保護(hù)了嗎?就我個(gè)人而言,我覺得這個(gè)putLock是不需要的,因?yàn)長inkedBlockingQueue數(shù)據(jù)結(jié)構(gòu)本身就已經(jīng)是線程安全的了。put方法只會(huì)與全局共享變量queue打交道,因此,它們的線程安全性完全可以委托LinkedBlockingQueue實(shí)現(xiàn)。更何況,LinkedBlockingQueue內(nèi)部已經(jīng)了一個(gè)putLock和一個(gè)takeLock,專門保護(hù)讀寫操作。當(dāng)然,我同意在clearAndPut中使用鎖的做法,畢竟,我們要保證,搶占式和清今天,我們重點(diǎn)學(xué)習(xí)了Controller端的單線程隊(duì)列實(shí)現(xiàn)方式,即ControllerEventManager通過構(gòu)建Controlle
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 廣東藥科大學(xué)《朝鮮語會(huì)話(Ⅲ)》2023-2024學(xué)年第一學(xué)期期末試卷
- 富寧縣2025年四年級(jí)數(shù)學(xué)第二學(xué)期期末質(zhì)量跟蹤監(jiān)視試題含解析
- 揚(yáng)州大學(xué)《房地產(chǎn)營銷》2023-2024學(xué)年第二學(xué)期期末試卷
- 北京市一零一中學(xué)2024-2025學(xué)年高三第一次調(diào)研考試化學(xué)試題理試題含解析
- 合肥經(jīng)濟(jì)學(xué)院《校園音樂活動(dòng)策劃》2023-2024學(xué)年第二學(xué)期期末試卷
- DB1411T 74-2024長山藥種植技術(shù)規(guī)程
- DB15T 3521-2024奶山羊?qū)︼暳宵S曲霉毒素代謝評(píng)價(jià)技術(shù)規(guī)程
- ?;穬?chǔ)存的事故案例分析與應(yīng)用考核試卷
- 國際稅務(wù)規(guī)劃與企業(yè)稅負(fù)優(yōu)化策略培訓(xùn)考核試卷
- 保溫顆粒施工方案
- 工業(yè)互聯(lián)網(wǎng)平臺(tái)的商業(yè)模式與盈利策略
- 2024年09月2024渤海銀行上海分行校園招聘筆試歷年參考題庫附帶答案詳解
- 2025年遼寧省遼漁集團(tuán)招聘筆試參考題庫含答案解析
- 《員工招聘與選拔》課件
- 南昌起義模板
- 【MOOC】體育舞蹈與文化-大連理工大學(xué) 中國大學(xué)慕課MOOC答案
- 接處警流程培訓(xùn)
- 2024年商丘職業(yè)技術(shù)學(xué)院單招職業(yè)技能測(cè)試題庫附答案
- 《園林植物病蟲害》課件
- 小紅書食用農(nóng)產(chǎn)品承諾書示例
- 空調(diào)維保服務(wù)投標(biāo)方案 (技術(shù)方案)
評(píng)論
0/150
提交評(píng)論