版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)
文檔簡介
代代////設(shè)置/deftransitionTo(groupState:GroupState):Unit=assertValidTransition(groupState)//state=groupState//currentStateTimestamp=Some(liseconds()////defcurrentState=//defis(groupState:GroupState)=state==//defnot(groupState:GroupState)=state!=//defcanRebalance=transitionTotransitionTo將消費者組狀態(tài)變更成給定狀態(tài)。在變更前,代碼需要確保這次變更必須是合法的狀態(tài)轉(zhuǎn)換。這是依靠每個GroupState實現(xiàn)類定義的validPreviousStates來完成的。只有在這個集合中的狀態(tài),才是合法的前置狀態(tài)。同時,該方法還會更新狀態(tài)變更的時間戳字段。KafkacanRebalance方法它用于判斷消費者組是否能夠開啟Rebalance操作。判斷依據(jù)是,當前狀態(tài)是否是PreparingRebalance狀態(tài)的合法前置狀態(tài)。只有 pletingRebalance和Empty這3類狀態(tài)的消費者組,才有資格開啟Rebalance。is和not至于is和not方法,它們分別判斷消費者組的狀態(tài)與給定狀態(tài)吻合還是不吻合,主要被用于執(zhí)行狀態(tài)校驗。特別是is方法,被大量用于上層調(diào)用代碼中,執(zhí)行各類消費者組管理任在介紹管理消費者組成員的方法之前,我先幫你回憶下GroupMetadata中保存成員的字段。GroupMetadata中使用members字段保存所有的成員信息。該字段是一個HashMap,Key是成員的memberID字段,Value是MemberMetadata類型,該類型保存了成員的所謂的管理成員,也就是添加成員(add方法)、移除成員(remove方法)(has、get、size方法等)先說添加成員的方法:add。addmembersLeader下面是add方法的源碼:代代123456789defadd(member:MemberMetadata,callback:JoinCallback=null):Unit=//if//就把該成員的procotolType設(shè)置為消費者組的protocolTtocolType=Some(tocolType)//確保成員元數(shù)據(jù)中的groupId和組Idassert(groupId==//確保成員元數(shù)據(jù)中的protoclType和組protocolTypeassert(tocolType.orNull==assert(supportsProtocols(tocolType,//如果尚未選出Leaderif//把該成員設(shè)定為LeaderleaderId=//將該成員添加進members.put(member.memberId,member.supportedProtocols.foreach{case(protocol,_)=>member.awaitingJoinCallback=ifnumMembersAwaitingJoin+=}第一步,add方法要判斷members字段是否包含已有成員。如果沒有,就說明要添加的protocolType。我在上節(jié)課中講過,對于普通的消費者而言,protocolType就是字符,add方進行校驗別確待添加成員的組ID、proTpe和組配置一致,以及該成員選定的分區(qū)分配策略與組選定的分區(qū)分配策略相匹配。如果這些校驗有任何一個未通過,就會立即拋出異常。第三步,判斷消費者組的Leader成員是否已經(jīng)選出了。如果還沒有選出,就將該成員設(shè)置成Leader成員。當然了,如果Leader已經(jīng)選出了,自然就不需要做這一步了。需要注意的是,這里的Leader和我們在學習副本管理器時學到的Leader副本是不同的概念。這里Leader消費者組下的一個成員。該成員負責為所有成員制定分區(qū)分配方案,第四步,更新消費者組分區(qū)分配策略支持票數(shù)。關(guān)于suppottocols了,可以再復(fù)下。作為關(guān)鍵的成員管理方法之一,add方法是實現(xiàn)消費者組Rebalance流程至關(guān)重要的一環(huán)。每當Rebalance開啟第一大步——加入組的操作時,本質(zhì)上就是在利用這個add方法實有addremoveremove代代defremove(memberId:String):Unit=//從membersmembers.remove(memberId).foreach{member//member.supportedProtocols.foreach{case(protocol,_)=>//ifnumMembersAwaitingJoin-= //如果該成員是Leader,選擇剩下成員列表中的第一個作為新的LeaderifleaderId=13removeadd首先members之后,更新LeaderLeader查詢members3代代defhas(memberId:String)=defget(memberId:String)=defsize=size其它的查詢方法邏輯也都很簡單,比如allMemberMetadata、rebalanceTimeoutMs,等等,我就不多講了。課后你可以自行閱讀下,重點是體會這些方法利用members都做除了組狀態(tài)和成員管理之外,GroupMetadata管理消費者組的提交位移(CommittedOffsets),主要包括添加和移除位移值。offsets代代1privatevaloffsets=newmutable.HashMap[TopicPartition,它是HashMap類型,Key是TopicPartition類型,表示一個分區(qū),而Value消費者組需要向Coordinator提交已消費消息的進度,在Kaa使用它來定位消費者組要消費的下一條消息。那么,提交位在Coordinator是如何保存的呢?它實際上是保存在內(nèi)部位移中。提交的方式是,消費者組成員向內(nèi)部寫入符合特定格式的消息,這類消息就是所謂的位移提CommitRd)。關(guān)于位移提交消息的格式,我會在第0,這里你可以暫時不用理會。而這里所說的CommitRecordMetadataffset代代caseclassCommitRecordMetadataAndOffset(appendedBatchOffset:Option[Long],defolderThan(that:CommitRecordMetadataAndOffset):Boolean=3在GroupMetadata中,有3個向offsets中添加訂閱分區(qū)的已消費位移值的方法,分別是initialize mitAppend和 代代123456defoffsets:collection.Map[TopicPartition,CommitRecordMetadataAndOffset],pendingTxnOffsets:Map[Long,mutable.Map[TopicPartition,CommitRecordMetadatthis.offsets++=offsetsmits++=} mitsKafka 1123456789ififmitAppend(topicPartition:mits.contains(topicPartition)){代thrownewIllegalStateException("Cannotcompleteoffsetcommitwritewit"inthelog.")//offsets//offsetsif(!offsets.contains(topicPartition)||//將該分區(qū)對應(yīng)的提交位移消息添加到offsets}caseSome(stagedOffset)matchcase_}}該方法在提交位移消息被成功寫入后調(diào)用。主要判斷的依據(jù),是offsets中是否已包含該主題分區(qū)對應(yīng)的消息值,或者說,offsets字段中該分區(qū)對應(yīng)的提交位移消息在位移中的位移值是否小于待寫入的位移值。如果是的話,就把該已提交的位移值添加到offsets第三個方法P mit的作用完成一個待決事務(wù)Pendisaction)的位移提交。所謂的待決事務(wù),就是指正在進行中、還沒有完成的事務(wù)。在處理待決事務(wù)的過程中,可能會出現(xiàn)將待決事務(wù)中涉及到的分區(qū)的位移值添加到offsets的情況。不過,由于該方法是與Kaa事務(wù)關(guān)的,你不需要重點掌握,這里我就不展開說了。offsets中訂閱分區(qū)的已消費位移值也是能夠被移除的。你還記得,Kafka中的消息有默認的留存時間設(shè)置嗎?位移是普通的Kafka,所以也要遵守相應(yīng)的規(guī)定。如果當前時間與已提交位移消息時間戳的差值,超過了Broker端參數(shù)offsets.retention.minutes值,Kafka就會將這條記錄從offsets字段中移除。這就是方法removeExpiredOffsets要做的事情。這個方法的代碼有點長,為了方便你掌握,我分塊給你介紹下。我先帶你了解下它的內(nèi)部嵌套類方法getxpirOffsta移除位移值的代碼原理了。首先,該方法定義了一個內(nèi)部嵌套方法getExpiredOffsets,專門用于獲取訂閱分區(qū)過期代代123456789defbaseTimestamp:CommitRecordMetadataAndOffset=>subscribedTopics:Set[String]=Set.empty):Map[TopicPartition,//遍歷offsets中的所有分區(qū),過濾出同時滿足以下3//條件1//條件2//條件3 offsets.filtercase(topicPartition,commitRecordMetadataAndOffset)!subscribedTopics.contains(topicPartition.topic()){{caseNonecurrentTimestamp-baseTimestamp(commitRecordMetadataAndOffset)>=caseSome(expireTimestamp)=>currentTimestamp>=}}}.map//為滿足以上3個條件的分區(qū)提取出commitRecordMetadataAndOffsetcase(topicPartition,commitRecordOffsetAndMetadata)=>(topicPartition,commitRecordOffsetAndMetadata.offsetAndMetadata)}baseTimestamp:CommitRecordMetadataAndOffset方法開始時,代碼從offsets3條件1:分區(qū)所屬不在訂閱列表之內(nèi)。當方法傳入了不為空的集合時,就說條件3:該分區(qū)在位移中對應(yīng)消息的存在時間超過了閾值。老版本的Kaa直接指定了過期時間戳,因此,只需要判斷當前時間是否越過了這個過期時間。但是,目Kaa基于消費者組狀態(tài)。如果是mpty狀態(tài),過期的判斷依據(jù)就是當前時間與組變?yōu)閙pty狀態(tài)時間的差值,是否超過Brokroffsts.rtttsmpty狀態(tài),就看當前時間與提交位移消息中的時間戳差值是否超過了offsets.retentts當過濾出同時滿足這3getExpiredOffsetsremoveExpiredOffsets代代123456789defcurrentTimestamp:Long,offsetRetentionMs:Long):Map[TopicPartition,//getExpiredOffsets方法代碼//調(diào)用getExpiredOffsetsvalexpiredOffsets:Map[TopicPartition,OffsetAndMetadata]=protocolTypemacaseSome(_)ifis(Empty)=>commitRecordMetadataAndOffset=>)caseSome(ConsumerProtocol.PROTOCOL_TYPE)ifsubscribedTopics.isDefined=>)caseNone=>case_=>}ifdebug(s"Expiredoffsetsfromgroup'$groupId'://offsets--=//}protocolTypegetExpiredOffsetsEmptyEmptyprotocolTypeNone,Standaloneoffsets最后,我們討論下消費者組分區(qū)分配策略的管理,也就是字段supportedProtocols的管理。supportedProtocols消費者組每次RebalanceRbalcecandidateProtocols找出組內(nèi)所有成員都支持的分區(qū)分配策略代代privatedefcandidateProtocols:Set[String]=valnumMembers=members.size////找出支持票數(shù)=supportedProtocols.filter(_._2==5該方法首先會獲取組內(nèi)的總成員數(shù),然后,找出suppottocols于總成員數(shù)的分配策略,并返回它們的名稱。成員都支持該策略。接下來,我們看下selectProtocol方法,它的作用是選出消費者組的分區(qū)消費分配策略代代defselectProtocol:String=//ifthrownewIllegalStateException("Cannotselectprotocolforempty//valcandidates=//val(protocol,_)=.maxBy{case(_,votes)=>votes.size13candidateProtocols方vote比如,candidates字段的值是[“策略A”,“策略B”],成員1支持[“策略B”,“策略A”],成員2支持[“策略A”,“策略B”,“策略C”],成員3支持[“策略D”,“策略B”,“策略A”],那么,vote方將candidates與每個成員的支持列表進行比對,找出成員支持列表中第一個包含在candidates中的策略。因此,對于這個例子來說,成員1投票策略B,成員2投票策略A,成員3投票策略B。可以看到,投票的結(jié)果是,策略B是兩票,策略A是1票。所以,selectProtocol方法返回策略B作為新的策略。有一點你需要注意,成員支持列表中的策略是有順序的B”,“策略A”]和[“策略A”,“策略B”]是不同的,成員會傾向于選擇靠前的策略。今天,我們結(jié)合GrouptaKaa括組狀態(tài)、成員、位移和分區(qū)分配策略四個維度。我建議你在課下再仔細地閱讀一下這些管理數(shù)據(jù)的方法,對照著源碼和注釋走一遍完整的操作流程。 PendingOfset
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
- 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 貴州財經(jīng)職業(yè)學院《教師職業(yè)道德規(guī)范和政策法規(guī)》2023-2024學年第一學期期末試卷
- 2025陜西省建筑安全員C證考試題庫
- 貴陽學院《數(shù)據(jù)庫課程設(shè)計》2023-2024學年第一學期期末試卷
- 2025年河北省建筑安全員B證(項目經(jīng)理)考試題庫
- 廣州幼兒師范高等??茖W?!墩c非營利組織會計》2023-2024學年第一學期期末試卷
- 2025年-山西省安全員《C證》考試題庫
- 廣州醫(yī)科大學《大學生職業(yè)生涯規(guī)劃與就業(yè)指導(二)》2023-2024學年第一學期期末試卷
- 2025年福建省安全員B證考試題庫附答案
- 2025陜西建筑安全員A證考試題庫附答案
- 2025年上海市安全員-C證考試(專職安全員)題庫及答案
- 中華傳統(tǒng)文化之文學瑰寶學習通超星期末考試答案章節(jié)答案2024年
- 2023年外交學院招聘筆試備考試題及答案解析
- 機械制圖-三視圖
- GB/T 17516.1-1998V帶和多楔帶傳動測定節(jié)面位置的動態(tài)試驗方法第1部分:V帶
- 供熱公司熱量管理辦法
- 致客戶通知函
- 各種預(yù)混料配方設(shè)計技術(shù)
- 12千伏環(huán)網(wǎng)柜(箱)標準化設(shè)計定制方案(2019版)
- 思想品德鑒定表(學生模板)
- 滿堂支架計算
- MA5680T開局配置
評論
0/150
提交評論