
版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
1、kafka producer consumerproducer api ducer.kafkaproducer 假如想學(xué)習(xí)java工程化、高性能及分布式、深化淺出。微服務(wù)、spring,mybatis,netty源碼分析的伴侶可以加我的java高級(jí)溝通:854630135,群里有阿里大牛直播講解技術(shù),以及java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)共享給大家。 1 props.put("bootstrap.servers", "28:9092"); 2 pr
2、ops.put("acks", "all"); 3 props.put("retries", 0); 4 props.put("batch.size", 16384); 5 props.put("linger.ms", 1); 6 props.put("buffer.memory", 33554432); 7 props.put("key.serializer&a
3、mp;quot;, "mon.serialization.stringserializer"); 8 props.put("value.serializer", "mon.serialization.stringserializer"); 9 10 producer producer = new kafkaproducer(props); 11 for (int i = 0; i ("foo", integer.tostring(i), integer
4、.tostring(i), new callback() 13 override 14 public void oncompletion(recordmetadata recordmetadata, exception e) 15 if (null != e) 16 e.printstacktrace(); 17 else 18 system.out.println("callback: " + recordmetadata.topic() + " " + recordmetadata.offset(); 19 20 21
5、 ); 22 23 producer.close(); producer由一個(gè)緩沖池組成,這個(gè)緩沖池中維護(hù)著那些還沒有被傳送到服務(wù)器上的記錄,而且有一個(gè)后臺(tái)的i/o線程負(fù)責(zé)將這些記錄轉(zhuǎn)換為哀求并將其傳送到集群上去。 send()辦法是異步的。當(dāng)調(diào)用它以后就把記錄放到buffer中并立刻返回。這就允許生產(chǎn)者批量的發(fā)送記錄。 acks配置項(xiàng)控制的是完成的標(biāo)準(zhǔn),即什么樣的哀求被認(rèn)為是完成了的。本例中其值設(shè)置的是"all"表示客戶端會(huì)等待直到全部記錄徹低被提交,這是最慢的一種方式也是持久化最好的一種方式。 假如哀求失敗了,生產(chǎn)者可以自動(dòng)重試。由于這里我們?cè)O(shè)置retr
6、ies為0,所以它不重試。 生產(chǎn)者對(duì)每個(gè)分區(qū)都維護(hù)了一個(gè)buffers,其中放的是未被發(fā)送的記錄。這些buffers的大小是通過batch.size配置項(xiàng)來控制的。 默認(rèn)狀況下,即使一個(gè)buffer還有未用法的空間(ps:buffer沒滿)也會(huì)立刻發(fā)送。假如你想要削減哀求的次數(shù),你可以設(shè)置linger.ms為一個(gè)大于0的數(shù)。這個(gè)命令將告知生產(chǎn)者在發(fā)送哀求之前先等待多少毫秒,以希翼能有更多的記錄到達(dá)好填滿buffer。在本例中,我們?cè)O(shè)置的是1毫秒,表示我們的哀求將會(huì)延遲1毫秒發(fā)送,這樣做是為了等待更多的記錄到達(dá),1毫秒之后即使buffer沒有被填滿,哀求也會(huì)發(fā)送。(ps:略微說明一下這段話,pr
7、oducer調(diào)用send()辦法只是將記錄放到buffer中,然后由一個(gè)后臺(tái)線程將buffer中的記錄傳送到服務(wù)器上。這里所說的哀求指的是從buffer到服務(wù)器。默認(rèn)狀況下記錄被放到buffer以后立刻被發(fā)送到服務(wù)器,為了削減哀求服務(wù)器的次數(shù),可以通過設(shè)置linger.ms,這個(gè)配置項(xiàng)表示等多少毫秒以后再發(fā)送,這樣做是希翼每次哀求可以發(fā)送更多的記錄,以此削減哀求次數(shù)) 假如想學(xué)習(xí)java工程化、高性能及分布式、深化淺出。微服務(wù)、spring,mybatis,netty源碼分析的伴侶可以加我的java高級(jí)溝通:854630135,群里有阿里大牛直播講解技術(shù),以及java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)共
8、享給大家。 buffer.memory控制的是總的buffer內(nèi)存數(shù)量 key.serializer 和 value.serializer表示怎樣將key和value對(duì)象轉(zhuǎn)成字節(jié) 從kafka 0.11開頭,kafkaproducer支持兩種模型:the idempotent producer and the transactional producer(冪等producer和事務(wù)producer)。冪等producer強(qiáng)調(diào)的是起碼一次精確的投遞。事務(wù)producer允許應(yīng)用程序原子的發(fā)送消息到多個(gè)分區(qū)或者主題。 為了啟用冪等性,必需將enable.idempotence這個(gè)配置的值設(shè)為tru
9、e。假如你這樣設(shè)置了,那么retries默認(rèn)是integer.max_value,并且acks默認(rèn)是all。為了利用冪等producer的優(yōu)勢(shì),請(qǐng)避開應(yīng)用程序級(jí)別的重新發(fā)送。 為了用法事務(wù)producer,你必需配置transactional.id。假如transactional.id被設(shè)置,冪等性自動(dòng)被啟用。 1 properties props = new properties(); 2 props.put("bootstrap.servers", "28:9092"); 3 props.put(
10、"transactional.id", "my-transactional-id"); 4 5 producer producer = new kafkaproducer(props, new stringserializer(), new stringserializer(); 6 7 producer.inittransactions(); 8 9 try 10 producer.begintransaction(); 11 12 for (int i = 11; i ("bar",
11、 integer.tostring(i), integer.tostring(i); 14 15 / this method will flush any unsent records before actually committing the transaction 16 mittransaction(); 17 catch (producerfencedexception | outofordersequenceexception | authorizationexception e) 18 producer.close(); 19 catch (kafkaexception e) 20
12、 / by calling producer.aborttransaction() upon receiving a kafkaexception we can ensure 21 / that any successful writes are marked as aborted, hence keeping the transactional guarantees. 22 producer.aborttransaction(); 23 24 25 producer.close(); 假如想學(xué)習(xí)java工程化、高性能及分布式、深化淺出。微服務(wù)、spring,mybatis,netty源碼分析
13、的伴侶可以加我的java高級(jí)溝通:854630135,群里有阿里大牛直播講解技術(shù),以及java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)共享給大家。 consumer api org.apache.kafka.clients.consumer.kafkaconsumer offsets and consumer position 對(duì)于分區(qū)中的每條記錄,kafka維護(hù)一個(gè)數(shù)值偏移量。這個(gè)偏移量是分區(qū)中一條記錄的唯一標(biāo)識(shí),同時(shí)也是消費(fèi)者在分區(qū)中的位置。例如,一個(gè)消費(fèi)者在分區(qū)中的位置是5,表示它已經(jīng)消費(fèi)了偏移量從0到4的記錄,并且接下來它將消費(fèi)偏移量為5的記錄。相對(duì)于消費(fèi)者用戶來說,這里事實(shí)上有兩個(gè)位置的概念。 消費(fèi)
14、者的position表示下一條將要消費(fèi)的記錄的offset。每次消費(fèi)者通過調(diào)用poll(long)接收消息的時(shí)候這個(gè)position會(huì)自動(dòng)增強(qiáng)。 committed position表示已經(jīng)被存儲(chǔ)的最后一個(gè)偏移量。消費(fèi)者可以自動(dòng)的周期性提交offsets,也可以通過調(diào)用提交api(e.g. commitsync and commitasync)手動(dòng)的提交position。 consumer groups and topic subscriptions kafka用"consumer groups"(消費(fèi)者組)的概念來允許一組進(jìn)程分開處理和消費(fèi)記錄。這些處理在
15、同一個(gè)機(jī)器上舉行,也可以在不同的機(jī)器上。同一個(gè)消費(fèi)者組中的消費(fèi)者實(shí)例有相同的group.id 組中的每個(gè)消費(fèi)者可以動(dòng)態(tài)設(shè)置它們想要訂閱的主題列表。kafka給每個(gè)訂閱的消費(fèi)者組都投遞一份消息。這歸功于消費(fèi)者組中全部成員之間的均衡分區(qū),以至于每個(gè)分區(qū)都可以被指定到組中精確的一個(gè)消費(fèi)者。假設(shè)一個(gè)主題有4個(gè)分區(qū),一個(gè)組中有2個(gè)消費(fèi)者,那么每個(gè)消費(fèi)者將處理2個(gè)分區(qū)。 消費(fèi)者組中的成員是動(dòng)態(tài)維護(hù)的:假如一個(gè)消費(fèi)者處理失敗了,那么分配給它的分區(qū)將會(huì)被重新分給組中其它消費(fèi)者。 在概念上,你可以把一個(gè)消費(fèi)者組想象成一個(gè)單個(gè)的規(guī)律訂閱者,并且每個(gè)規(guī)律訂閱者由多個(gè)進(jìn)程組成。作為一個(gè)多訂閱系統(tǒng),kafka天生就支持
16、對(duì)于給定的主題可以有隨意數(shù)量的消費(fèi)者組。 automatic offset committing 1 properties props = new properties(); 2 props.put("bootstrap.servers", "28:9092"); 3 props.put("group.id", "test"); 4 props.put("mit", "true
17、"); 5 props.put("erval.ms", "1000"); 6 props.put("key.deserializer", "mon.serialization.stringdeserializer"); 7 props.put("value.deserializer", "mon.serialization.stringdeserializer&
18、;quot;); 8 kafkaconsumer consumer = new kafkaconsumer(props); 9 consumer.subscribe(arrays.aslist("foo", "bar"); 10 while (true) 11 consumerrecords records = consumer.poll(100); 12 for (consumerrecord record : records) 13 system.out.printf("offset = %d, ke
19、y = %s, value = %s%n", record.offset(), record.key(), record.value(); 14 15 設(shè)置mit意味著自動(dòng)提交已消費(fèi)的記錄的offset manual offset control 代替消費(fèi)者周期性的提交已消費(fèi)的offsets,用戶可以控制什么時(shí)候記錄被認(rèn)為是已經(jīng)消費(fèi)并提交它們的offsets。 1 properties props = new properties(); 2 props.put("bootstrap.servers", "localhost:9092"); 3 props.put("group.id", "test"); 4 props.put("mit", "false"); 5 props.put("key.
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 沉箱碼頭施工方案
- 室內(nèi)電纜敷設(shè)施工方案
- 2025年東師復(fù)試化學(xué)試題及答案
- 2025年高職數(shù)據(jù)庫試題及答案
- 5年級(jí)下冊(cè)英語第1第2單元單詞跟讀
- 5年級(jí)上冊(cè)第5單元摘抄
- 燈泡溫度與電阻的關(guān)系式
- 地下車庫 行政復(fù)議申請(qǐng)
- 機(jī)械合同的施工方案
- 2025年合肥信息技術(shù)職業(yè)學(xué)院?jiǎn)握新殬I(yè)適應(yīng)性測(cè)試題庫學(xué)生專用
- 拼音瘋狂背古詩(6個(gè)單元120首)
- 閱讀讓我們更聰明
- 牙周病科普講座課件
- 實(shí)驗(yàn)室安全專項(xiàng)培訓(xùn)
- 工業(yè)地產(chǎn)營(yíng)銷推廣方案
- 2024年貴州能源集團(tuán)電力投資有限公司招聘筆試參考題庫附帶答案詳解
- 電子產(chǎn)品設(shè)計(jì)案例教程(微課版)-基于嘉立創(chuàng)EDA(專業(yè)版) 課件 第3章 多諧振蕩器的PCB設(shè)計(jì)
- 鐵路軌道與修理
- 紡織行業(yè)清潔生產(chǎn)評(píng)價(jià)指標(biāo)體系色紗
- 管理能力測(cè)試題大全
- 《風(fēng)景談》新教學(xué)課件
評(píng)論
0/150
提交評(píng)論