Kafka消費(fèi)者者詳解_第1頁(yè)
Kafka消費(fèi)者者詳解_第2頁(yè)
Kafka消費(fèi)者者詳解_第3頁(yè)
Kafka消費(fèi)者者詳解_第4頁(yè)
Kafka消費(fèi)者者詳解_第5頁(yè)
已閱讀5頁(yè),還剩24頁(yè)未讀 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

Kfaka消費(fèi)者詳解2知識(shí)目標(biāo)消費(fèi)者流程創(chuàng)建消費(fèi)者、提交偏移量消費(fèi)監(jiān)聽(tīng)、獨(dú)立消費(fèi)01能力目標(biāo)了解消費(fèi)者流程掌握kafka詳情總結(jié)Kafka消費(fèi)者02學(xué)習(xí)目標(biāo)3目錄01Kafka消費(fèi)者02創(chuàng)建、提交03分區(qū)監(jiān)聽(tīng)再均衡04獨(dú)立消費(fèi)、總結(jié)Kafka消費(fèi)者4首先先通過(guò)圖1生產(chǎn)與消費(fèi)的聯(lián)系圖來(lái)了解消費(fèi)者是什么。

圖1生產(chǎn)者與消費(fèi)者Kafka消費(fèi)者5

在實(shí)際生產(chǎn)過(guò)程中,每個(gè)topic都會(huì)有多個(gè)partitions。多個(gè)partitions的好處在于,

一方面能夠?qū)roker上的數(shù)據(jù)進(jìn)行分片有效減少了消息的容量從而提升io性能。

另外一方面,為了提高消費(fèi)端的消費(fèi)能力,一般會(huì)通過(guò)多個(gè)consumer去消費(fèi)同一個(gè)topic,也就是消費(fèi)端的負(fù)載均衡機(jī)制同一個(gè)consumergroup里面的consumer是怎么去分配該消費(fèi)哪個(gè)分區(qū)里的數(shù)據(jù)的呢?

對(duì)于圖來(lái)1說(shuō),這3個(gè)消費(fèi)者會(huì)分別消費(fèi)這個(gè)topic的3個(gè)分區(qū),也就是每個(gè)consumer消費(fèi)一個(gè)partition,如果有三個(gè)分區(qū)四個(gè)消費(fèi)者那么會(huì)有一個(gè)消費(fèi)者消費(fèi)不到消息。為什么會(huì)這樣?在kafka中,存在兩種分區(qū)分配策略,一種是Range(默認(rèn))、另一種是RoundRobin(輪詢)。通過(guò)partition.assignment.strategy這個(gè)參數(shù)來(lái)設(shè)置。Kafka消費(fèi)者和消費(fèi)者群組6同一個(gè)群組里的消費(fèi)者訂閱的同一個(gè)主題,每個(gè)消費(fèi)者接收主題的一部分分區(qū)的消息。

如果群組里的消費(fèi)者數(shù)量超過(guò)主題的分區(qū)數(shù)量,就會(huì)有一部分消費(fèi)者被閑置,不會(huì)接收到任何消息。

同一個(gè)主題可以被多個(gè)消費(fèi)者群組消費(fèi),消費(fèi)者群組之間互不影響,而且正常情況下,同一條消息只能被群組里的一個(gè)消費(fèi)者消費(fèi)一次。如圖1、2.Kafka消費(fèi)者和消費(fèi)者群組7圖1圖2Kafka消費(fèi)者和消費(fèi)者群組8因?yàn)槿航M里的消費(fèi)者共同讀取主題的分區(qū),所以當(dāng)一個(gè)消費(fèi)者被關(guān)閉或發(fā)生崩潰時(shí),它就離開(kāi)了群組,原本由它讀取的分區(qū)將由群組里的其他消費(fèi)者來(lái)讀取。同時(shí)在主題發(fā)生變化時(shí),比如添加了新的分區(qū),也會(huì)發(fā)生分區(qū)與消費(fèi)者的重新分配,分區(qū)的所有權(quán)從一個(gè)消費(fèi)者轉(zhuǎn)移到另一個(gè)消費(fèi)者,這樣的行為被稱為再均衡。正是因?yàn)樵倬?,所以消費(fèi)費(fèi)者群組才能保證高可用性和伸縮性。消費(fèi)者通過(guò)向群組協(xié)調(diào)器所在的broker發(fā)送心跳來(lái)維持它們和群組的從屬關(guān)系以及它們對(duì)分區(qū)的所有權(quán)。只要消費(fèi)者以正常的時(shí)間間隔發(fā)送心跳,就被認(rèn)為是活躍的,說(shuō)明它還在讀取分區(qū)里的消息。消費(fèi)者會(huì)在輪詢消息或提交偏移量時(shí)發(fā)送心跳。如果消費(fèi)者停止發(fā)送心跳的時(shí)間足夠長(zhǎng),會(huì)話就會(huì)過(guò)期,群組協(xié)調(diào)器認(rèn)為它已經(jīng)死亡,就會(huì)觸發(fā)再均衡。創(chuàng)建Kafka消費(fèi)者9在創(chuàng)建消費(fèi)者的時(shí)候以下以下三個(gè)選項(xiàng)是必選的:bootstrap.servers:指定broker的地址清單,清單里不需要包含所有的broker地址,生產(chǎn)者會(huì)從給定的broker里查找broker的信息。不過(guò)建議至少要提供兩個(gè)broker的信息作為容錯(cuò);key.deserializer:指定鍵的反序列化器;value.deserializer:指定值的反序列化器。除此之外你還需要指明你需要想訂閱的主題,可以使用如下兩個(gè)API:consumer.subscribe(Collection<String>topics):指明需要訂閱的主題的集合;consumer.subscribe(Patternpattern):使用正則來(lái)匹配需要訂閱的集合。最后只需要通過(guò)輪詢API(poll)向服務(wù)器定時(shí)請(qǐng)求數(shù)據(jù)。一旦消費(fèi)者訂閱了主題,輪詢就會(huì)處理所有的細(xì)節(jié),包括群組協(xié)調(diào)、分區(qū)再均衡、發(fā)送心跳和獲取數(shù)據(jù),這使得開(kāi)發(fā)者只需要關(guān)注從分區(qū)返回的數(shù)據(jù),然后進(jìn)行業(yè)務(wù)處理。創(chuàng)建Kafka消費(fèi)者10創(chuàng)建的實(shí)例代碼如下圖3圖3創(chuàng)建實(shí)例代碼自動(dòng)提交偏移量11

消費(fèi)組發(fā)生再平衡時(shí)分區(qū)會(huì)被分配給新的消費(fèi)者,為了保證新消費(fèi)者能夠從分區(qū)的上一次消費(fèi)位置繼續(xù)拉取并處理消息,每個(gè)消費(fèi)者需要將分

區(qū)的消費(fèi)進(jìn)度,定時(shí)地同步給消費(fèi)組對(duì)應(yīng)的協(xié)調(diào)者節(jié)點(diǎn)。新AP

I為客戶端提供了兩種提交偏移鹽的方式:異步模式和同步模式

。

另外,如果消費(fèi)者客戶端設(shè)置了向動(dòng)提交(mit=true,默認(rèn)開(kāi)啟)的選項(xiàng),會(huì)在客戶端的輪詢操作中調(diào)度定時(shí)任務(wù),

定時(shí)任務(wù)也屬于異步模式提交偏移量的一種運(yùn)用場(chǎng)景自動(dòng)提交偏移量—重要性12Kafka的每一條消息都有一個(gè)偏移量屬性,記錄了其在分區(qū)中的位置,偏移量是一個(gè)單調(diào)遞增的整數(shù)。消費(fèi)者通過(guò)往一個(gè)叫作_consumer_offset的特殊主題發(fā)送消息,消息里包含每個(gè)分區(qū)的偏移量。如果消費(fèi)者一直處于運(yùn)行狀態(tài),那么偏移量就沒(méi)有什么用處。不過(guò),如果有消費(fèi)者退出或者新分區(qū)加入,此時(shí)就會(huì)觸發(fā)再均衡。完成再均衡之后,每個(gè)消費(fèi)者可能分配到新的分區(qū),而不是之前處理的那個(gè)。為了能夠繼續(xù)之前的工作,消費(fèi)者需要讀取每個(gè)分區(qū)最后一次提交的偏移量,然后從偏移量指定的地方繼續(xù)處理。因?yàn)檫@個(gè)原因,所以如果不能正確提交偏移量,就可能會(huì)導(dǎo)致數(shù)據(jù)丟失或者重復(fù)出現(xiàn)消費(fèi),比如下面情況:

如果提交的偏移量小于客戶端處理的最后一個(gè)消息的偏移量,那么處于兩個(gè)偏移量之間的消息就會(huì)被重復(fù)消費(fèi);如果提交的偏移量大于客戶端處理的最后一個(gè)消息的偏移量,那么處于兩個(gè)偏移量之間的消息將會(huì)丟失。自動(dòng)提交偏移量13Kafka支持自動(dòng)提交和手動(dòng)提交偏移量?jī)煞N方式。這里先介紹比較簡(jiǎn)單的自動(dòng)提交:只需要將消費(fèi)者的mit屬性配置為true即可完成自動(dòng)提交的配置。此時(shí)每隔固定的時(shí)間,消費(fèi)者就會(huì)把poll()方法接收到的最大偏移量進(jìn)行提交,提交間隔由erval.ms屬性進(jìn)行配置,默認(rèn)值是5s。

使用自動(dòng)提交是存在隱患的,假設(shè)我們使用默認(rèn)的5s提交時(shí)間間隔,在最近一次提交之后的3s發(fā)生了再均衡,再均衡之后,消費(fèi)者從最后一次提交的偏移量位置開(kāi)始讀取消息。這個(gè)時(shí)候偏移量已經(jīng)落后了3s,所以在這3s內(nèi)到達(dá)的消息會(huì)被重復(fù)處理??梢酝ㄟ^(guò)修改提交時(shí)間間隔來(lái)更頻繁地提交偏移量,減小可能出現(xiàn)重復(fù)消息的時(shí)間窗,不過(guò)這種情況是無(wú)法完全避免的?;谶@個(gè)原因,Kafka也提供了手動(dòng)提交偏移量的API,使得用戶可以更為靈活的提交偏移量。手動(dòng)提交偏移量14用戶可以通過(guò)將mit設(shè)為false,然后手動(dòng)提交偏移量?;谟脩粜枨笫謩?dòng)提交偏移量可以分為兩大類:手動(dòng)提交當(dāng)前偏移量:即手動(dòng)提交當(dāng)前輪詢的最大偏移量;手動(dòng)提交固定偏移量:即按照業(yè)務(wù)需求,提交某一個(gè)固定的偏移量。而按照KafkaAPI,手動(dòng)提交偏移量又可以分為同步提交和異步提交。手動(dòng)提交偏移量—同步提交15通過(guò)調(diào)用mitSync()來(lái)進(jìn)行同步提交,不傳遞任何參數(shù)時(shí)提交的是當(dāng)前輪詢的最大偏移量。while(true){ConsumerRecords<String,String>records=consumer.poll(Duration.of(100,ChronoUnit.MILLIS));for(ConsumerRecord<String,String>record:records){System.out.println(record);}/*同步提交*/mitSync();}手動(dòng)提交偏移量—同步提交16

如果某個(gè)提交失敗,同步提交還會(huì)進(jìn)行重試,這可以保證數(shù)據(jù)能夠最大限度提交成功,但是同時(shí)也會(huì)降低程序的吞吐量?;谶@個(gè)原因,Kafka還提供了異步提交的API。手動(dòng)提交偏移量—異步提交17

異步提交可以提高程序的吞吐量,因?yàn)榇藭r(shí)你可以盡管請(qǐng)求數(shù)據(jù),而不用等待Broker的響應(yīng)。代碼如下:while(true){ConsumerRecords<String,String>records=consumer.poll(Duration.of(100,ChronoUnit.MILLIS));for(ConsumerRecord<String,String>record:records){System.out.println(record);}手動(dòng)提交偏移量—異步提交18/*異步提交并定義回調(diào)*/mitAsync(newOffsetCommitCallback(){@OverridepublicvoidonComplete(Map<TopicPartition,OffsetAndMetadata>offsets,Exceptionexception){if(exception!=null){System.out.println("錯(cuò)誤處理");offsets.forEach((x,y)->System.out.printf("topic=%s,partition=%d,offset=%s\n",

x.topic(),x.partition(),y.offset()));}}});}手動(dòng)提交偏移量—異步提交19異步提交存在的問(wèn)題是,在提交失敗的時(shí)候不會(huì)進(jìn)行自動(dòng)重試,實(shí)際上也不能進(jìn)行自動(dòng)重試。假設(shè)程序同時(shí)提交了200和300的偏移量,此時(shí)200的偏移量失敗的,但是緊隨其后的300的偏移量成功了,此時(shí)如果重試就會(huì)存在200覆蓋300偏移量的可能。同步提交就不存在這個(gè)問(wèn)題,因?yàn)樵谕教峤坏那闆r下,300的提交請(qǐng)求必須等待服務(wù)器返回200提交請(qǐng)求的成功反饋后才會(huì)發(fā)出。基于這個(gè)原因,某些情況下,需要同時(shí)組合同步和異步兩種提交方式。注:雖然程序不能在失敗時(shí)候進(jìn)行自動(dòng)重試,但是我們是可以手動(dòng)進(jìn)行重試的,你可以通過(guò)一個(gè)Map<TopicPartition,Integer>offsets來(lái)維護(hù)你提交的每個(gè)分區(qū)的偏移量,然后當(dāng)失敗時(shí)候,你可以判斷失敗的偏移量是否小于你維護(hù)的同主題同分區(qū)的最后提交的偏移量,如果小于則代表你已經(jīng)提交了更大的偏移量請(qǐng)求,此時(shí)不需要重試,否則就可以進(jìn)行手動(dòng)重試。手動(dòng)提交偏移量—同步加異步提交20下面這種情況,在正常的輪詢中使用異步提交來(lái)保證吞吐量,但是因?yàn)樵谧詈蠹磳⒁P(guān)閉消費(fèi)者了,所以此時(shí)需要用同步提交來(lái)保證最大限度的提交成功。try{while(true){ConsumerRecords<String,String>records=consumer.poll(Duration.of(100,ChronoUnit.MILLIS));for(ConsumerRecord<String,String>record:records){System.out.println(record);}手動(dòng)提交偏移量—同步加異步提交21//異步提交mitAsync();}}catch(Exceptione){e.printStackTrace();}finally{try{//因?yàn)榧磳⒁P(guān)閉消費(fèi)者,所以要用同步提交保證提交成功mitSync();}finally{consumer.close();}}提交特定偏移量22在上面同步和異步提交的API中,實(shí)際上我們都沒(méi)有對(duì)commit方法傳遞參數(shù),此時(shí)默認(rèn)提交的是當(dāng)前輪詢的最大偏移量,如果你需要提交特定的偏移量,可以調(diào)用它們的重載方法。同步提交特定偏移量:commitSync(Map<TopicPartition,OffsetAndMetadata>offsets)異步提交特定偏移量:

commitAsync(Map<TopicPartition,OffsetAndMetadata>offsets,OffsetCommitCallbackcallback)需要注意的是,因?yàn)槟憧梢杂嗛喍鄠€(gè)主題,所以offsets中必須要包含所有主題的每個(gè)分區(qū)的偏移量,示例代碼如下:提交特定偏移量23try{while(true){ConsumerRecords<String,String>records=consumer.poll(Duration.of(100,ChronoUnit.MILLIS));for(ConsumerRecord<String,String>record:records){System.out.println(record);/*記錄每個(gè)主題的每個(gè)分區(qū)的偏移量*/TopicPartitiontopicPartition=newTopicPartition(record.topic(),record.partition());提交特定偏移量24OffsetAndMetadataoffsetAndMetadata=newOffsetAndMetadata(record.offset()+1,"nometaData");/*TopicPartition重寫(xiě)過(guò)hashCode和equals方法,所以能夠保證同一主題和分區(qū)的實(shí)例不會(huì)被重復(fù)添加*/offsets.put(topicPartition,offsetAndMetadata);}/*提交特定偏移量*/mitAsync(offsets,null);}}finally{consumer.close();}監(jiān)聽(tīng)分區(qū)再均衡25

因?yàn)榉謪^(qū)再均衡會(huì)導(dǎo)致分區(qū)與消費(fèi)者的重新劃分,有時(shí)候你可能希望在再均衡前執(zhí)行一些操作:比如提交已經(jīng)處理但是尚未提交的偏移量,關(guān)閉數(shù)據(jù)庫(kù)連接等。此時(shí)可以在訂閱主題時(shí)候,調(diào)用subscribe的重載方法傳入自定義的分區(qū)再均衡監(jiān)聽(tīng)器。Map<TopicPartition,OffsetAndMetadata>offsets=newHashMap<>();consumer.subscribe(Collections.singletonList(topic),newConsumerRebalanceListener(){/*該方法會(huì)在消費(fèi)者停止讀取消息之后,再均衡開(kāi)始之前就調(diào)用*/@OverridepublicvoidonPartitionsRevoked(Collection<TopicPartition>partitions){System.out.println("再均衡即將觸發(fā)");//提交已經(jīng)處理的偏移量mitSync(offsets);}監(jiān)聽(tīng)分區(qū)再均衡26/*該方法會(huì)在重新分配分區(qū)之后,消費(fèi)者開(kāi)始讀取消息之前被調(diào)用*/@OverridepublicvoidonPartitionsAssigned(Collection<TopicPartition>partitions){}});

try{while(true){ConsumerRecords<String,String>records=consumer.poll(Duration.of(100,ChronoUnit.MILLIS));for(ConsumerRecord<String,String>record:records)

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論