筆記-大數據技術之亦博_第1頁
筆記-大數據技術之亦博_第2頁
筆記-大數據技術之亦博_第3頁
筆記-大數據技術之亦博_第4頁
筆記-大數據技術之亦博_第5頁
已閱讀5頁,還剩42頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

KafkaKafkaKafka是什在流式計算中,Kafka一般用來緩存數據,StormKafkaApacheKafka是一個開源消息系統(tǒng),由Scala寫成。是由Apache開發(fā)的KafkaLinkedIn2011年初開源。201210ApacheIncubator畢業(yè)。該項目的目標是為處理實時數據提供一個、高通量、待的平臺。Kafka是一個分布式消息隊列。KafkaTopic進行歸類,發(fā)送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)broker。kafkaproducerconsumerzookeepermeta消息隊列實現原收者接收處理,即使有多個消息者也是如此。發(fā)布/訂閱模式(類似,一對多,數據生產后,推送給所有訂閱者為什么需要消息隊要你的處理系統(tǒng)明確的該消息已經被處理完畢從而確保你的數據被安全的保存直到你&在量劇增的情況下,應用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見。如果為以能處理這類峰值為標準來投入資源隨時待命無疑是巨大的浪費使用消息隊列能夠使關鍵組件頂住突發(fā)的壓力,而不會因為突發(fā)的超負荷的請求而完全。能保證數據會按照特定的順序來處理。(KafkaPartition內的消息的有序性)Kafka架Producerkafkabroker發(fā)消息的客戶端。2)Consumerkafkabroker取消息的客戶端3)Topic:可以理解為一個隊列。4)ConsumerGroup(CG):kafka提供的可擴展且具有容錯性的消費者機制。既然是一個組,那么組內必然可以有多個消費者或消費者實例(consumerinstance),它們共個有分區(qū)(partition)consumer來消費。5)Brokerkafkabrokerbrokerbrokertopic。topicpartitionpartition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將消息發(fā)給consumertopic的整體(partition間)的順序。Offset:kafka的文件都是按照offset.kafka來命名,用offset做名字的好處是方便查20492048.kafkatheoffset就00000000000.kafkaKafka每個的多個分區(qū)日志分布式地在Kafka集群上,同時為了故障容錯,每個分區(qū)都會以副本的方式到多個消息節(jié)點上。其中一個節(jié)點會作為主副本Kafka端負載均衡;消息有鍵時,根據分區(qū)語義(例如hash)確保相同鍵的消息總是發(fā)送到同一Kafka的消費者通過訂閱來消費消息,并且每個消費者都會設置一個消費組名稱。(133個分區(qū)。在下圖(2)中,333300MB(100B(2900MB處理性能。同一個消費組下多個消費者互相協調消費工作,Kafka會將所有的分區(qū)平均地分配給所有的消費者實例,這樣每個消費者都可以分配到數量均等的分區(qū)。Kafka的消費組管理協議會動態(tài)地消費組的成員列表當一個新消費者加入消費者組或者有消費者離開消費組,Kafka的消費者消費消息時,只保證在一個分區(qū)內的消息的完全有序性,并不保證同一個匯中多個分區(qū)的消息順序。而且,消費者一個分區(qū)消息的順序和生產者寫入到這 o”和“Kafka”兩條消息到分區(qū)P1,則消Key可以是userid等)Kafka集群部2.1環(huán)境準2.1.1集群規(guī)2.1.2jar包虛擬機準3ip4)3臺主機分別關閉[root@bigdata11itstar]#chkconfigiptablesoff[root@bigdata12itstar]#chkconfigiptablesoff[root@bigdata13itstar]#chkconfigiptables0)bigdata11、bigdata12bigdata13Zookeeper。解壓zookeeper安裝包到 [itstar@bigdata11software]$tar-zxvfzookeeper-3.4.10.tar.gz-C mkdir-pzkData重命名/opt/module/zookeeper-3.4.10/conf這個 下的zoo_sample.cfg為zoo.cfgmvzoo_sample.cfgzoo.cfgzoo.cfgABipCLeaderD是萬一集群中的Leader服務器掛了,需要一個端口來重新進行,選出一個新的Leader,而這個端口就是用來執(zhí)行時服務器相互通信的端口。myiddataDir下,這個文件里面有一個數據就是A的值,Zookeeper啟動時此文件,拿到里面的數據與zoo.cfg里面的配server。在/opt/module/zookeeper-3.4.10/zkDatamyid myidlinuxnotepad++myidvi在文件中添加與server對應的:如zookeeperscp-rzookeeper-3.4.10/ scp-rzookeeper-3.4.10/ :/opt/modulemyid3、[root@bigdata11zookeeper-3.4.10]#bin/zkServer.shstart[root@bigdata12zookeeper-3.4.10]#bin/zkServer.shstart[root@bigdata13zookeeper-3.4.10]#bin/zkServer.sh[root@bigdata11zookeeper-3.4.10]#bin/zkServer.shstatusJMXenabledbydefaultUsingconfig:/opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfgMode:follower[root@bigdata12zookeeper-3.4.10]#bin/zkServer.shstatusJMXenabledbydefaultUsingconfig:/opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfgMode:leader[root@bigdata13zookeeper-3.4.5]#bin/zkServer.shstatusJMXenabledbydefaultUsingconfig:/opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfgMode:followerKafka集群部[itstar@bigdata11software]$tarzxvfkafka_2.11-.tgzCopt/module/[itstar@bigdata11module]$mvkafka_2.11-/在 下創(chuàng)建logs文件[itstar@bigdata11kafka]$mkdirlogs[itstar@bigdata11kafka]$cd[itstar@bigdata11config]$vi#broker#broker的全局唯 ,不能重#topic#IO #kafka運行日志存放的路徑#topicbroker上的分區(qū)個數[root@bigdata11module]#viexportKAFKA_HOME=/opt/module/kafka[root@bigdata11module]#sourceetc/profile[root@bigdata11etc]#xsync[itstar@bigdata11module]$xsyncbigdata12bigdata13上修改配置文件/opt/module/kafka/config/pertiesbroker.id=1、broker.id=2注:broker.id依次在bigdata11、bigdata12、bigdata13節(jié)點上啟動kafka(加上&,是在啟動)[itstar@bigdata11kafka]$bin/kafka-server-start.shconfig/perties&[itstar@bigdata12kafka]$bin/kafka-server-start.shconfig/perties&[itstar@bigdata13kafka]$bin/kafka-server-start.shconfig/perties&[itstar@bigdata11kafka]$bin/kafka-server-stop.shstop[itstar@bigdata12kafka]$bin/kafka-server-stop.shstop[itstar@bigdata13kafka]$bin/kafka-server-stop.shKafka命令行操[itstar@bigdata11kafka]$bin/kafka-topics.sh--zookeeperbigdata13:2181-- bin/kafka- --replication-factor3--partitions1----topictopic -- [itstar@bigdata11kafka]$bin/kafka-topics.sh--zookeeperbigdata11:2181--delete--[itstar@bigdata11kafka]$bin/kafka-console-producer.sh--broker-listbigdata11:9092-- o [itstar@bigdata12kafka]$bin/kafka-console-consumer.sh--bootstrap-server--from-beginning-- 查看某個Topic[itstar@bigdata11kafka]$bin/kafka-topics.sh--zookeeperbigdata11:2181--describe--Kafka配置信Broker配置信屬默認描fka-Kafka數據存放的 中間用逗號分隔,當新partition被創(chuàng)建的會被存放到當前存放partition最少 的連接串,格式為:zookeeperkafka建議在此配置中指定本集群存放,格式 ame3:port3/chroot/path。需要注意的是, um.message.size因為生產者生產的消息太大導致消費者無法8IO線程數,此參I/O prefersforsocket prefersforsocket 服務器允許請求的最大值,用來防止內存溢出,其值應該小于Javaheapsize.1partitiontopic在創(chuàng)建時沒partition Segment segment7過此時間日志就會被刪除。此參數可以被topic級別參數覆蓋。數據量大時,建議減小-partition的最大容量,若數據量超過此partitiontopiclog5topic12在此窗口時間內沒有收到follower的fetch如果replica節(jié)點leader節(jié)點此值大小leaders Thesocketreceivebufferforrequeststotheleaderforreplicating Thenumberofbyesofmessagestoattempttofetchforeachpartitioninthefetchrequeststhereplicassendto umamountoftimetowaittimefordatatoarriveontheleaderinthefetchrequestssentbythereplicasto1Numberofthreadsusedtomessagesfromleaders.Increasingvaluecanincreasethedegreeofparallelisminthefollower requests)ofthefetchrequestZooKeepersession超時時間。如果在此時serverzookeeper發(fā)送心跳,zookeeper太遲發(fā)現節(jié)點HZKfollowerZKleader的時間brokershutdownIfthisisenabledthecontrollerwillautomaticallytrytobalanceleadershipforpartitionsamongthebrokersbyperiodicallyreturningleadershiptothe“preferred”replicaforeachpartitionitis allowedperbroker.ThecontrollerwillrebalanceleadershipifthisratioabovetheconfiguredvalueperThefrequencywithwhichtocheckleader umamountofmetadataallowclientstosavewiththeir connectionsthatidlemorethan1Thenumberofthreadsperdatatobeusedforlogrecoveryatstartupandflushingatshutdown.IndicateswhethertoenablereplicasnotintheISRsettobeelectedasleaderasalastresort,eventhoughngsoresultindataThenumberofpartitionsfortheoffsetcommittopic.Sincechangingthisafterdeploymentiscurrentlyunsupported,mendusingahighersettingproduction(e.g.,100-Offsetsthatareolderthanthisagewillbemarkedfordeletion.Theactualpurgewilloccurwhenthelogcleanercompactsoffsets managerchecksforstale3Thereplicationfactorfortheoffsetcommittopic.Ahighersetting(e.g.,threeorfour)is mendedinordertoensurehigheravailability.IftheoffsetstopiciscreatedwhenfewerbrokersthanthereplicationfactorthenthetopicwillbecreatedwithfewerSegmentsizefortheoffsetstopic.Sinceitusesacompactedtopic,thisshouldbekeptrelativelylowinordertofasterlogcompactionandAnoffsetloadoccurswhenabrokerestheoffsetmanagerforasetofconsumergroups(i.e.,when esleaderforanoffsetstopicpartition).settingcorrespondstothebatchsize(inbytes)tousewhenreadingfromtheoffsetssegmentswhenloadingintotheoffsetmanager’s-Thenumberofacknowledgementsthatarerequiredbeforetheoffsetcommitcanbeaccepted.Thisissimilartotheproducer’sacknowledgementsetting.Ingeneral,thedefaultshouldnotTheoffsetcommitwillbedelayeduntilthistimeoutortherequirednumberofreplicashavereceivedtheoffsetcommit.ThisissimilartotheproducerProducer配置信屬默認描啟動時producer查詢brokers的列表,可以個參數只是用來獲取topic的元信息用,producer會從元信息中挑選合適的并與之建立socket連接。格式是:host1:port1,host2:port203.2Brokerack的超時時間,若等待時間超batchpushbroker性能,設置為異步序列號類,.byte[]producer“nonegzipandsnappy”。關于4.1節(jié)啟用壓縮的topic名稱。若上面參數選擇了一個壓縮格式,那么壓縮僅對本參數指定的topic有效,若本參數為空,則對所有s3Producer發(fā)送失敗時重試次數。若網絡出現Beforeeachretry,theproducerrefreshesthemetadataofrelevanttopicstoseeifanewleaderhasbeenelected.Sinceleaderelectiontakesabitoftime,thispropertyspecifiestheamountoftimethattheproducerwaitsbeforerefreshing Theproducergenerallyrefreshesthetopicmetadatafrombrokerswhenthereisafailure(partitionmissing,leadernotavailable…).Itwillalsopollregularly(default:every10minso600000ms).Ifyousetthistoanegativevalue,metadatawillonlygetrefreshedonfailure.Ifyousetthistozero,themetadatawillgetrefreshedaftereachmessagesent(not refreshhappenonlyAFTERthemessageissent,soiftheproducerneversendsmessagethemetadataisnever比如設置成1000時,它會緩存1秒的數據再一次發(fā)送出去,這樣可以極大的增加producerbuffer隊列里最大緩存的消息數量,如果超過這個數值,producer-producer阻塞等待的時間。如果值設置為0buffer隊列滿時producer不會阻塞,消息直接被丟掉。若值采用異步模式時,一個batchproducer才會發(fā)送消 SocketwritebufferTheclientidisauser-specifiedstringsentineachrequesttohelptracecalls.ItshouldlogicallyidentifythemakingtheConsumer配置信屬默認描Consumer的組ID,相同goup.idConsumer的zookeeper連接串,要和 socketmax.fetch.wait+socket.timeout.ms確 Thesocketreceivebufferfor 查詢topic-partitionconsumerpartition緩存此大小的消息到內存,因此,這個參數可以控制server1Thenumberfetcherthreadsusedtooffsetzookeeper當consumer Consumeroffsetzookeeper的周2consumermessage數量 每個 可以緩 Consumeroffsetzookeeper的周2consumermessage數量 每個 可以緩 1Theminimumamountofdatatheservershouldreturnforafetchrequest.Ifinsufficientdataisavailabletherequestwillwaitforthatmuchdatatobeforeansweringthe umamountoftimetheserverwillblockbeforeansweringthefetchrequestifthereisn’tsufficientdataimmediaysatisfyfet BackofftimetowaitbeforetryingtodeterminetheleaderofapartitionhasjustlostitsWhattodowhenthereisnoinitialinZooKeeperorifanoffsetisoutofrange;smallest:automaticallyresettheoffsettothesmallestoffset;largest:automaticallyresettheoffsettothelargestoffset;anythingelse:exceptiontothe-Whethermessagesfrominternaltopics(suchasoffsets)shouldbeexposedtoZooKeepersessiontimeout.IftheconsumerfailstoheartbeattoZooKeeperforthisperiodoftimeitisdeadandarebalancewillThemaxtimethattheclientwaitsestablishingaconnectiontoHowfaraZKfollowercanbebehindaKafka工作流程分Kafka生產過程分寫入方producer采用推(push)broker,每條消息都被追加(append)到分分區(qū)這個的消費者都可以接收到生成者寫入的新消息。Kafka集群為每個了分布式的分區(qū)(partition)日志文件,物理意義上可以。配到一個單調遞增的順序,叫做偏移量(offset),這個偏移量能夠唯一地定位當前分。消息發(fā)送時都被發(fā)送到一個topic,其本質就是一個 ,而topic是由一些Partitiontopic30開始,不同分區(qū)之間的偏移量都logoffsetPartitionPartitionKafka比傳統(tǒng)消息系統(tǒng)有更強的順序性保證,它使用的分區(qū)作為消息處理的并行單元。Kafka以分區(qū)作為最小的粒度,將每個分區(qū)分配給消費者組中不同的而且是唯一的消費只要分區(qū)的消息是有序的,消費者處理的消息順序就有保證。每個有多個分區(qū),不同的Kafka指定了patition未指定patitionkeykey的value進行hashpatitionkeypatitionDefaultPartitionerDefaultPartitionerpublicintpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster){List<PartitionInfo>partitions=intnumPartitions=if(keyBytes==null)intnextValue=if(availablePartitions.size()>0)=intpart=Utils.toPositive(nextValue)%return}else//nopartitionsareavailable,giveanon-available}}else//hashthekeyBytestochooseareturnUtils.toPositive(Utils.murmur2(keyBytes))%}}副本同一個partitionreplication(對應pertiesproducerpatitionreplicationpartitionreplicationreplication之間選出一個leader,producerconsumer只與leader其它replication作為follower從leader寫入流producer寫入消息流程如下1)producerzookeeper"/brokers/.../state"partitionleader2)producerleaderleaderfollowersleaderpulllogleaderoffset)producerBroker保存消方置),每個patition物理上對應一個文件夾(該文件夾該patition的所有消息和索引文[itstar@bigdata11logs]$drwxrwxr-x.2itstaritstar40968月614:37-drwxrwxr-x.2itstar 40968 6 drwxrwxr-x.2itstaritstar 40968月 614:37 [itstar@bigdata11logs]$cd -0]$-rw-rw-r--.1itstaritstar104857608 614:33-rw-rw-r--.1itstar 2198 615:07-rw-rw-r--.1itstaritstar104857568 614:33-rw-rw-r--.1itstar 88 614:37leader-epoch-策無論消息是否被消費,kafka需要注意的是,因為Kafka特定消息的時間復雜度為O(1),即與文件大小無關,所以這里刪除過期文件與提高Kafka性能無關。Zookeeper結注意:producer不在zk中,消費者在zk中Kafka消費過程分kafkaconsumerAPIConsumerAPIAPI消費模者A的消費進度是3,消費者B的消費進度是6(watermark)控制,生產者寫入的消息如果還沒有達到備份數量,對消費者是不可見APIAPIoffsetzookeeper自行管理。zookeeperoffset去接著獲取數據(1zookeeper可以使用group來區(qū)分對同一個topic的不同程序分離開來(不同的group記錄不同的offset,這樣不同程序同一個topic才不會因為offset互相影響)API不能細化控制如分區(qū)、副本、zk等API能夠讓開發(fā)者自己控制offset,想從哪里就從哪里。對zookeeper的依賴性降低(如:offset不一定非要靠zk,自行offset即可,APIoffsetleader消費者consumergroup消費者組的方式工作,由一個或者多個消費者組成一個組,共同消費一個topic每個分區(qū)在同一時間只能由group中的一個消費者但是多個group可以同時消費這個partition。在圖中,有一個由三個消費者組成的group,有一個消費者讀取中的兩個分區(qū),另外兩個分別一個分區(qū)。某個消費者某個分區(qū),也可以叫做消費者失敗了,那么其他的group成員會自動負載均衡之前失敗的消費者的分區(qū)。消費方consumer采用pull(拉)模式從broker中數據它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現就是服務以及網絡擁塞。而pull模式則可以根據consumer的消費能力以適consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時,pullkafka沒有數據,消費者可能會陷入循環(huán)中,一直等待數據到達。為了避免這種情況在的拉請求中有參數,允許消費者請求在等待數據到達,消費者案例group.id屬性為任意組名。[itstar@bigdata12config]$pertiesbigdata11、bigdata12 node3:9092--topicsecond--consumer.configconfig/perties node3:9092--topicsecond--consumer.configconfig/pertiesbigdata13[itstar@bigdata13kafka]$bin/kafka-console-producer.sh--broker-list obigdata11和bigdata12的接收者。KafkaAPI實環(huán)境準eclipsejava 創(chuàng)建一個lib文件解壓kafka安裝包,將安裝包libs 下的jar包拷貝到工程的lib 4)啟動zk和kafka集群,在kafka集群中打開一個消費者 Kafka生產Java創(chuàng)建生產者(過時的ducer.KeyedMessage;ducer.ProducerConfig;publicclassOldProducer{publicstaticvoidmain(String[]args)Propertiesproperties=newProperties();properties.put("metadata.broker.list","bigdata11:9092");properties.put("request.required.acks","1"); "," oworld");}}創(chuàng)建生產者(新packagepackageimport publicclassNewProducerpublicstaticvoidmain(String[]args){Propertiesprops=newKafka服務端的主機名和端props.put("bootstrap.servers",//props.put("acks",//props.put("retries",//props.put("batch.size",//props.put("linger.ms",//keyvalueKafkaProducer<String,String>producer=newfor(inti=0;i<50;i++)producer.send(newProducerRecord<String,String>("oworld-"+i));}","}}創(chuàng)建生產者帶回調函數(新packagepackageimport publicclassCallBackProducerpublicstaticvoidmain(String[]args){Propertiesprops=newKafka服務端的主機名和端props.put("bootstrap.servers",//props.put("acks",////props.put("retries",//props.put("batch.size",////keyvalueprops.put("value.serializer",KafkaProducer<String,String>kafkaProducer=newfor(inti=0;i<50;i++)kafkaProducer.send(newProducerRecord<String, ", o"+i),Callback()publicpletion(RecordMetadatametadata,Exceptionexception)if(metadata!=null)System.out.println(metadata.partition()+"---"+}}}}}自定義分區(qū)生產需求:將所有數據到topic的第0號分區(qū)Partitioner接口,重寫里面的方法(packagepackageimportimportpublicclassCustomPartitionerimplementsPartitionerpublicCustomPartitioner()publicCustomPartitioner()}publicintpartition(Objectkey,intnumPartitions)//return}}packagepackageimport import publicclassCustomPartitionerimplementsPartitioner{publicvoidconfigure(Map<String,?>configs)}publicintpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster){//return}publicvoidclose()}}packagepackageimport publicpublicclassPartitionerProducerpublicstaticvoidmain(String[]args)Propertiesprops=newKafkaprops.put("bootstrap.servers",//props.put("acks",//props.put("retries",//props.put("batch.size",//props.put("linger.ms",//keyvalue//Producer<String,String>producer=newKafkaProducer<>(props);producer.send(newProducerRecord<String,String>(" ","1","itstar"));}}在bigdata11上/opt/module/kafka/logs/下3個分區(qū)的log日志動[itstar@bigdata11-0]$tail-f00000000000000000000.log[itstar@bigdata11-1]$tail-f00000000000000000000.log[itstar@bigdata11-2]$tail-f發(fā)現數據都到指定的分區(qū)了Kafka消費Java[itstar@bigdata13kafka]$bin/kafka-console-producer.sh--broker-listbigdata11:9092-- opackagepackageimportjava.util.HashMap;importjava.util.List;importjava.util.Map;importjava.util.Properties;importkafka.consumer.Consumer;importkafka.consumer.ConsumerConfig;importkafka.consumer.KafkaStream;importpublicclassCustomConsumer{publicstaticvoidmain(String[]args){Propertiesproperties=newProperties();properties.put("group.id","g1");properties.put("zookeeper.session.timeout.ms","500");properties.put("zookeeper.sync.time.ms","250"); ","1000");//ConsumerConnectorconsumer=Consumer.createJavaConsumerConnector(newHashMap<String,Integer>topicCount=newHashMap<>(); ",1);=KafkaStream<byte[],byte[]>stream= ConsumerIterator<ConsumerIterator<byte[],byte[]>it=while(it.hasNext())}}}提供案例(自動消費情況)(新packagepackageimportimport publicclassCustomNewConsumerpublicstaticvoidmain(String[]args){Propertiesprops=newProperties();//kakfabrokerprops.put("bootstrap.servers",//consumergroupprops.put("group.id"test");// mit",//offsetkey", value //KafkaConsumer<String,String>consumer=new//topic, ",while(true) 超時時間為100msConsumerRecords<String,String>records=consumer.poll(100);forfor(ConsumerRecord<String,String>record:records)System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),}}}五Kafkaproducer器5.1器原Producer器(interceptor)是在Kafka0.10版本被引入的主要用于實現clients端的定對消息做一些定制化需求,比如修改消息等。同時,producerinterceptor按序作用于同一條消息從而形成一個鏈(interceptorchain)。Intercetpor的實現接口是KafkaProducer.send方法中,即它運行在用戶主線程中。Producer確保在topic和分區(qū),否則會影響目標分區(qū)的計算onAcknowledgement(RecordMetadata,該方在消息被應答或消息發(fā)送失敗時調用,并且通常都是在producer回調邏輯觸發(fā)之前。onAcknowledgementproducerIO線程中,因此不要在該方法中放入很重producer的消息發(fā)送效率如前所述,interceptor可能被運行在多個線程中,因此在具體實現時用戶需要自行確保線程安全。另外倘若指定了多個interceptor,則producer將按照指定順序調用它們,并僅僅5.2器案實現一個簡單的雙interceptor組成的鏈。第一個interceptor會在消息發(fā)送前將時間戳信息加到消息value的最前部;第二個interceptor會在消息發(fā)送后更新成功發(fā)送消息數或案例增加時間戳packagepackageimport publicclassTimeInterceptorimplementsProducerInterceptor<String,String>{publicvoidconfigure(Map<String,?>configs)}publicProducerRecord<String,String>onSend(ProducerRecord<String,String>{//recordreturnnewProducerRecord(record.topic(),record.partition(),record.timestamp(),System.currentTimeMillis()+","+}publicvoidonAcknowledgement(RecordMetadatametadata,Exceptionexception)}publicvoidclose()}}producerimportjava.util.Map;importjava.util.Map; publicclassCounterInterceptorimplementsProducerInterceptor<String,String>{privateinterrorCounter=0;privateintsuccessCounter=publicvoidconfigure(Map<String,?>configs)}publicProducerRecord<String,String>onSend(ProducerRecord<String,String>{return}publicvoidonAcknowledgement(RecordMetadatametadata,Exceptionexception)//if(exception==null){}else}}publicvoidclose()//System.out.println("Successfulsent:"+successCounter);System.out.println("Failedsent:"+errorCounter);}}producerpackagepackageimportjava.util.ArrayList;importjava.util.List;importjava.util.Properties; publicclassInterceptorProducerpublicstaticvoidmain(String[]args)t

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論