![丨變身單線程后的controller如何處理_第1頁](http://file4.renrendoc.com/view/0b8cd72ddc98b52a43bf077b647cff42/0b8cd72ddc98b52a43bf077b647cff421.gif)
![丨變身單線程后的controller如何處理_第2頁](http://file4.renrendoc.com/view/0b8cd72ddc98b52a43bf077b647cff42/0b8cd72ddc98b52a43bf077b647cff422.gif)
![丨變身單線程后的controller如何處理_第3頁](http://file4.renrendoc.com/view/0b8cd72ddc98b52a43bf077b647cff42/0b8cd72ddc98b52a43bf077b647cff423.gif)
![丨變身單線程后的controller如何處理_第4頁](http://file4.renrendoc.com/view/0b8cd72ddc98b52a43bf077b647cff42/0b8cd72ddc98b52a43bf077b647cff424.gif)
![丨變身單線程后的controller如何處理_第5頁](http://file4.renrendoc.com/view/0b8cd72ddc98b52a43bf077b647cff42/0b8cd72ddc98b52a43bf077b647cff425.gif)
版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)
文檔簡介
致,這部分代碼讀起來晦澀難懂,改動起來也重重,因為你根本不知道,變動了這個程的數(shù)據(jù),會不會影響到其他線程。同時,開發(fā)人員在修復(fù)ControllerBug時,也非鑒于這個原因,自版本開始,社區(qū)陸續(xù)對Controller代碼結(jié)構(gòu)進行了改造。其中這里的單線程,并非是指Controller只有一個線程了,而是指對局部狀態(tài)的限制在一個專屬線程上,即讓這個特定線程排他性地操作Controller元數(shù)據(jù)信息。這樣一來,整個組件代碼就不必擔(dān)心多線程的各種線程安全問題了,源碼也可以拋棄各種不必要的鎖機制,最終大大簡化了Conroller端的代碼結(jié)構(gòu)。這部分源碼非常重要,它能夠幫助你掌握Controller端處理各類的原理,這將極大地提升你在實際場景中處理Controller各類問題的能力。因此,我建議你多讀幾遍,徹底了解Controller是怎么處理各種的。接下來,我們先宏觀領(lǐng)略一下Controller單線程隊列處理模型及其基礎(chǔ)組件從圖中可見,Controller端有多個線程向隊列寫入不同種類的,比如ZooKeeper端的Watcher線程、KafkaRequestHandler線程、Kafka定時任務(wù)程,等等。而在隊列的另一端,只有一個名為ControllerEventThread的線程專門負參與實現(xiàn)這個模型的源碼類有4個。ControllerEventProcessor:Controller端的處理器接口。ControllerEvent:Controller,也就是隊列中被處理的對象。ControllerEventManager:處理器,用于創(chuàng)建和管理ControllerEventThread。ControllerEventThread:專屬的處理線程,唯一的作用是處理不同種類ControllEvent。這個類是ControllerEventManager類內(nèi)部定義的線程類今天,我們的重要目標(biāo)就是要搞懂這4個類。就像我前面說的,它們完整地構(gòu)建出了單線程隊列模型。下面一個一個地學(xué)習(xí)它們的源碼,你要重點掌握隊列的實現(xiàn)以這個接口位于controller下的ControllerEventManager.scala件中。它定義了一個支持普通處理和搶占處理Controller的接口,代碼如下所示:traitControllerEventProcessordefprocess(event:ControllerEvent):defpreempt(event:ControllerEvent):4該接口定義了兩個方法,分別是process和cess:接收一個Controller,并進行處理。preempt:接收一個Controller,并搶占隊列之前的進行優(yōu)先處理目前,在Kafka源碼中,KafkaController類是Controller組件的功能實現(xiàn)類,它也ControllerEventProcessor接口的唯一實現(xiàn)類對于這個接口,你要重點掌握process方法的作用,因為它是實現(xiàn)Controller處理的主力方法。你要了解process方法處理各類Controller的代碼結(jié)構(gòu)是什么樣的,而且至于preempt方法,你僅需要了解,Kafka使用它實現(xiàn)某些高優(yōu)先級的搶占處理即可,畢竟,目前在源碼中只有兩類(ShutdownEventThread和Expire)需要搶占式這就是前面說到的Controller,在源碼中對應(yīng)的就是ControllerEvent接口。該接口定義在KafkaController.scala文件中,本質(zhì)上是一個trait類型,如下所示:sealedtraitControllerEventdefstate:3每個ControllerEvent都定義了一個狀態(tài)。Controller在處理具體的時,會對狀態(tài)進行相應(yīng)的變更。這個狀態(tài)是由源碼文件ControllerState.scala中的抽象類ControllerState123456classControllerStatedefvalue:defame:Option[String]if(hasRateAndTimeMetric)Some(s"${toString}RateAndTimeMs")elseprotecteddefhasRateAndTimeMetric:Boolean=}每類ControllerState都定義一個value值,表示Controller狀態(tài)的序號,從0開始。另外 ame方法是用于構(gòu)造Controller狀態(tài)速率的指標(biāo)名稱的比如,TopicChange是一類ControllerState,用于表示總數(shù)發(fā)生了變化。為了這類狀態(tài)變更速率,代碼中的r ame方定義一個名為TopicChangeRateAndTimeMs指標(biāo)。當(dāng)然,并非所有的ControllerState有對應(yīng)的速率指標(biāo),比如,表示空閑狀態(tài)的Idle就沒有對應(yīng)的指標(biāo)。目前,Controller總共定義了25類和17種狀態(tài),它們的對應(yīng)關(guān)系如下表所示內(nèi)容看著好像有很多,那我們應(yīng)該怎樣使用這張表格Controller狀態(tài)變更速率異常的時候,你可以通過這張表格,快速確定可能造成瓶頸的Controller,并定位處理該的函數(shù)代碼,輔助你進一步地調(diào)試問題。另外,你要了解的是,多個ControllerEvent可能歸屬于相同的ControllerState比如,TopicChange和PartitionModifications都屬于TopicChange狀態(tài),畢竟,它們都與Topic的變更有關(guān)。前者是創(chuàng)建Topic,后者是修改Topic的屬性,比如,分區(qū)再比如,BrokerChange和BrokerModifications都屬于BrokerChange狀態(tài),表征的都是對Broker屬性的修改。有了這些鋪墊,我們就可以開始學(xué)習(xí)處理器的實現(xiàn)代碼了在Kafka中,Controller處理器代碼位于controller包下ControllerEventManager.scala文件下。我用一張圖來展示下這個文件的結(jié)構(gòu)如圖所示,該文件主要由4個部分組成 ControllerEventProcessor:前面講過的處理器接口,目前只有KafkaController實現(xiàn)了這個接QueuedEvent:表征隊列上的對象ControllerEventManagerClass:ControllerEventManager伴生類,主要用于創(chuàng)ControllerEventThread線程類,還有一些其他值得我們學(xué)習(xí)的重要方法,一會兒我們ControllerEventManager對象僅僅定義了3個公共變量,沒有任何邏輯,你簡單看下就行。至于ControllerEventProcessor我們剛剛已經(jīng)學(xué)習(xí)過了。接下來,我們重點學(xué)我們先來看QueuedEvent的定義,全部代碼如下1234//每個QueuedEvent定義了兩個//event:ControllerEvent類,表示//enqueueTimeMs:表示被放入classQueuedEvent(valevent:556789valenqueueTimeMs:Long)//標(biāo) valprocessingStarted=new//valspent=new//defprocess(processor:ControllerEventProcessor):Unit{if(spent.getAndSet(true))}//defpreempt(processor:ControllerEventProcessor):Unit{if(spent.getAndSet(true))}//defawaitProcessing():Unit{}overridedeftoString:String{可以看到,每個QueuedEvent對象實例都裹挾了一個ControllerEvent。另外,每個QueuedEvent定義了process、preemptawaitProcessing法,分別表示處理事其中,process方法和preempt方法的實現(xiàn)原理,就是調(diào)用給ControllerEventProcessor口的processpreempt法,非常簡單在QueuedEvent對象中,我們再一次看到了CountDownLatch的身影,我在第7節(jié)里提到過它。Kafka源碼非常喜歡用CountDownLatch來做各種條件控制,比如用于偵在這里,QueuedEvent使用它的唯一目的,是確保Expire在建立ZooKeeper會話如果不是在這個場景下,那么,代碼就用spent來標(biāo)識該是否已經(jīng)被處理過了,如已經(jīng)被處理過了,再次調(diào)用process方法時就會直接返回,什么都不做了解了QueuedEvent,我們來看下消費它們的ControllerEventThread。代代classControllerEventThread(name:String)extendsShutdownableThread(name=logIdent=s"[ControllerEventThreadcontrollerId=$controllerId]4這個類就是一個普通的線程類,繼承了ShutdownableThread基類,而后者是Kafka為很多線程類定義的公共父類。該父類是JavaThread類的子類,其線程邏輯方法run的主defdoWork():overridedefrun():Unit=trywhile}catch 11可見,這個父類會循環(huán)地執(zhí)行doWork方法的邏輯,而該方法的具體實現(xiàn)則交由子類來完作為Controller唯一的處理線程,我們要時刻關(guān)注這個線程的運行狀態(tài)。因此,我們必須要知道這個線程在JVM上的名字,這樣后續(xù)我們就能有針對性地對其展開。這個線程的名字是由ControllerEventManagerObject中ControllerEventThreadName變量34objectControllerEventManagervalControllerEventThreadName="controller-event-現(xiàn)在我們看看ControllerEventThread類的doWork是如何實現(xiàn)的。代碼如 //如果是關(guān)閉線程,什么都不用做。關(guān)閉線程由外部來執(zhí)6caseShutdownEventThread7casecontrollerEvent8_state=9//更新對應(yīng)在隊列中保存的時eventQueueTimeHist.update(liseconds()-trydefprocess():Unit=//處理,同時計算處理速rateAndTimeMetrics.get(state)matchcaseSome(timer)=>timer.time{process()caseNone=>}}catchcasee:Throwable=>error(s"Uncaughterrorprocessingevent}_state=}23我用一張圖來展示下具體的執(zhí)行流大體上看,執(zhí)行邏輯很簡單首先是調(diào)用LineBlocingQee的take方法,去獲取待處理的QeeEet對象實例。注意,這里用的是tae方法,這說明,如果隊列中沒有QeeEent,那么,ConrollerEenThread線程將一直處于阻塞狀態(tài),直到隊列上插入了新的待處理事件。一旦拿到QueuedEvent后,線程會判斷是否是ShutdownEventThread。當(dāng)ControllerEventManager關(guān)閉時,會顯式地向隊列中塞入ShutdownEventThread,表明要關(guān)閉ControllerEventThread線程。如果是該,那么ControllerEventThread什么都不用做,畢竟要關(guān)閉這個線程了。相反地,如果是其他的,就調(diào)QueuedEvent的process方法執(zhí)行對應(yīng)的處理邏輯,同時計算被處理的速率該process方法底層調(diào)用的是ControllerEventProcessor的process方法,如下所defprocess(processor:ControllerEventProcessor):Unit=//若已經(jīng)被處理過,直接if//調(diào)用ControllerEventProcessor的process方法處8方法首先會判斷該是否已經(jīng)被處理過,如果是,就直接返回;如果不是,就調(diào)ControllerEventProcessor的process方法處理你可能很關(guān)心,每個ControllerEventProcessor的process方法是在哪里實現(xiàn)的?實際上,它們都封裝在KafkaController.scala文件中。還記得我之前,KafkaController類是目前源碼中ControllerEventProcessor接口的唯一實現(xiàn)類嗎?實際上,就是KafkaController實現(xiàn)了ControllerEventProcessorprocess法。由overridedefprocess(event:ControllerEvent):Unit=try//依次匹配eventmatchcaseevent:MockEventcaseShutdownEventThreaderror("ReceivedaShutdownEventThreadevent.ThistypeofeventiscaseAutoPreferredReplicaLeaderElection }catch//如果Controller換成了別的casee:ControllerMovedExceptioninfo(s"Controllermovedtoanotherbrokerwhenprocessing$event.",//執(zhí)行Controller卸任casee:Throwableerror(s"Errorprocessingevent$event",}finally 24這個process方法接收一個ControllerEvent實例,接著會判斷它是哪類Controller事件,并調(diào)用相應(yīng)的處理方法。比如,如果是AutoPreferredReplicaLeaderElection,則調(diào)用processAutoPreferredReplicaLeaderElection方法;如果是其他類型的,則調(diào)用process***方法。其他方除了QueuedEventControllerEventThread外,put法clearAndPut很重要。如果說ControllerEventThread是隊列的,那么,這兩個方法就是向列生產(chǎn)元素的在這兩個方法中,put是把指定ControllerEvent插入到隊列,而clearAndPut則是下面這兩段源碼分別對應(yīng)于這兩個方123456789//putdefput(event:ControllerEvent):QueuedEvent=inLock(putLock)//構(gòu)建QueuedEventvalqueuedEvent=newQueuedEvent(event,////返回新建QueuedEvent實}//clearAndPutdefclearAndPut(event:ControllerEvent):QueuedEvent=inLock(putLock)//優(yōu)先處理搶占//隊//調(diào)用上面的put}你注意到,源碼中的put方法使用putLock對代碼進行保護了嗎?就我個人而言,我覺得這個putLock是不需要的,因為LinkedBlockingQueue數(shù)據(jù)結(jié)構(gòu)本身就已經(jīng)是線程安全的了。put方法只會與全局共享變量queue打交道,因此,它們的線程安全性完全可以委托LinkedBlockingQueue實現(xiàn)。更何況,LinkedBlockingQueue內(nèi)部已經(jīng)了一個putLock和一個takeLock,專門保護讀寫操作。當(dāng)然,我同意在clearAndPut中使用鎖的做法,畢竟,我們要保證,搶占式和清今天,我們重點學(xué)習(xí)了Controller端的單線程隊列實現(xiàn)方式,即ControllerEventManager通過構(gòu)建Controlle
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 小學(xué)三年級口算題500道
- 2025年和田道路運輸從業(yè)資格證考哪些項目
- 企業(yè)成長與融資選擇
- 2024-2025學(xué)年高中英語閱讀理解五練習(xí)含解析新人教版必修2
- 2024年高中化學(xué)第三章有機化合物第二節(jié)第1課時乙烯精練含解析新人教版必修2
- 中藥與醫(yī)院合作協(xié)議
- 上學(xué)期學(xué)校工作計劃
- 公司出納人員個人工作計劃
- 村民糾紛協(xié)議書
- 騰訊廣告合作協(xié)議
- 西安經(jīng)濟技術(shù)開發(fā)區(qū)管委會招聘筆試真題2024
- 2025屆浙江省高三歷史選考總復(fù)習(xí)模擬測試(八)歷史試題(含答案)
- 廣州2025年第一次廣東廣州市白云區(qū)政務(wù)服務(wù)和數(shù)據(jù)管理局政府雇員招聘筆試歷年參考題庫附帶答案詳解
- 【市質(zhì)檢】泉州市2025屆高中畢業(yè)班質(zhì)量監(jiān)測(二) 生物試卷(含答案解析)
- 六年級2025寒假特色作業(yè)
- DCS-應(yīng)急預(yù)案演練方案
- 2025年江蘇轄區(qū)農(nóng)村商業(yè)銀行招聘筆試參考題庫含答案解析
- 2025年中華財險湖南分公司招聘筆試參考題庫含答案解析
- 人教版六年級數(shù)學(xué)下冊完整版教案及反思
- 少兒財商教育講座課件
- 2025年中國科協(xié)所屬單位招聘15名社會在職人員歷年高頻重點提升(共500題)附帶答案詳解
評論
0/150
提交評論