版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
消息隊(duì)列:Pulsar:Pulsar消息模型與消息類型1消息隊(duì)列基礎(chǔ)1.1消息隊(duì)列的定義消息隊(duì)列是一種應(yīng)用程序間通信(IPC)的模式,它允許消息在發(fā)送者和接收者之間異步傳遞。消息隊(duì)列中的消息遵循先進(jìn)先出(FIFO)原則,但同時(shí)也支持更復(fù)雜的路由策略。消息隊(duì)列可以提高系統(tǒng)的解耦性、可擴(kuò)展性和容錯(cuò)能力。1.2消息隊(duì)列的作用消息隊(duì)列在現(xiàn)代分布式系統(tǒng)中扮演著關(guān)鍵角色,主要作用包括:異步處理:允許發(fā)送者和接收者異步操作,提高系統(tǒng)響應(yīng)速度。負(fù)載均衡:通過(guò)消息隊(duì)列,可以將任務(wù)均勻地分配給多個(gè)處理者。削峰填谷:在高負(fù)載時(shí),消息隊(duì)列可以緩存消息,避免系統(tǒng)過(guò)載。系統(tǒng)解耦:發(fā)送者和接收者不需要直接通信,降低了系統(tǒng)的耦合度。數(shù)據(jù)持久化:消息隊(duì)列通常會(huì)將消息持久化到磁盤(pán),確保消息不會(huì)因系統(tǒng)故障而丟失。1.3Pulsar簡(jiǎn)介ApachePulsar是一個(gè)高性能、可擴(kuò)展的分布式消息隊(duì)列系統(tǒng),由Yahoo開(kāi)發(fā)并開(kāi)源,現(xiàn)已成為Apache軟件基金會(huì)的頂級(jí)項(xiàng)目。Pulsar提供了消息隊(duì)列和發(fā)布/訂閱兩種消息模式,支持多種消息類型,包括二進(jìn)制、JSON、Avro等。Pulsar的核心特性包括:持久化和非持久化消息:消息可以被持久化到磁盤(pán),也可以選擇在內(nèi)存中短暫存儲(chǔ)。多租戶和多層安全性:支持多租戶環(huán)境,提供細(xì)粒度的訪問(wèn)控制和身份驗(yàn)證。水平擴(kuò)展:可以輕松地通過(guò)增加更多的Broker來(lái)擴(kuò)展系統(tǒng)的吞吐量和存儲(chǔ)能力。全球分布:支持跨數(shù)據(jù)中心的全球分布,確保數(shù)據(jù)的高可用性和低延遲訪問(wèn)。1.3.1Pulsar消息模型Pulsar的消息模型基于主題(Topic)和訂閱(Subscription)。一個(gè)主題可以有多個(gè)生產(chǎn)者(Producer)和消費(fèi)者(Consumer),生產(chǎn)者向主題發(fā)送消息,消費(fèi)者從主題中消費(fèi)消息。訂閱者可以以獨(dú)占、共享或鍵共享模式訂閱主題,這決定了消息如何在消費(fèi)者之間分發(fā)。1.3.2Pulsar消息類型Pulsar支持多種消息類型,包括:二進(jìn)制消息:最基礎(chǔ)的消息類型,可以是任何二進(jìn)制數(shù)據(jù)。JSON消息:用于傳輸結(jié)構(gòu)化數(shù)據(jù),便于解析和處理。Avro消息:提供數(shù)據(jù)序列化和反序列化,支持模式演進(jìn)。示例:使用Java發(fā)送JSON消息到Pulsarimportorg.apache.pulsar.client.api.Schema;
importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassPulsarProducer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
//創(chuàng)建Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建生產(chǎn)者,使用JSON模式
Producer<String>producer=client.newProducer(Schema.JSON(String.class))
.topic("my-topic")
.create();
//發(fā)送消息
for(inti=0;i<10;i++){
Stringmessage="HelloPulsar"+i;
producer.send(message);
}
//關(guān)閉生產(chǎn)者和客戶端
producer.close();
client.close();
}
}在上述示例中,我們首先創(chuàng)建了一個(gè)Pulsar客戶端,然后使用Schema.JSON(String.class)指定了消息的類型為JSON。接著,我們創(chuàng)建了一個(gè)生產(chǎn)者并發(fā)送了10條消息到主題my-topic。最后,我們關(guān)閉了生產(chǎn)者和客戶端,確保資源被正確釋放。示例:使用Java從Pulsar消費(fèi)JSON消息importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
importorg.apache.pulsar.client.api.SubscriptionType;
publicclassPulsarConsumer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
//創(chuàng)建Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建消費(fèi)者,使用JSON模式
Consumer<String>consumer=client.newConsumer(Schema.JSON(String.class))
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
//消費(fèi)消息
while(true){
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//確認(rèn)消息已被消費(fèi)
consumer.acknowledge(msg);
}
}
}在這個(gè)消費(fèi)示例中,我們創(chuàng)建了一個(gè)消費(fèi)者,同樣使用了Schema.JSON(String.class)來(lái)指定消息類型。我們訂閱了主題my-topic,并使用了獨(dú)占訂閱模式(SubscriptionType.Exclusive)。然后,我們進(jìn)入一個(gè)無(wú)限循環(huán),接收并打印消息,最后確認(rèn)消息已被消費(fèi)。通過(guò)這些示例,我們可以看到Pulsar如何處理不同類型的異步消息,以及如何在生產(chǎn)者和消費(fèi)者之間建立通信。Pulsar的靈活性和強(qiáng)大的功能使其成為構(gòu)建現(xiàn)代分布式系統(tǒng)時(shí)的首選消息隊(duì)列系統(tǒng)。2消息隊(duì)列:Pulsar:深入理解Pulsar消息模型2.1消息的結(jié)構(gòu)在ApachePulsar中,消息的結(jié)構(gòu)設(shè)計(jì)得非常靈活和高效,以適應(yīng)不同的應(yīng)用場(chǎng)景。一個(gè)Pulsar消息主要由以下幾部分組成:Payload:消息的實(shí)際內(nèi)容,可以是任何類型的數(shù)據(jù),如JSON、XML或二進(jìn)制數(shù)據(jù)。Properties:一組鍵值對(duì),用于存儲(chǔ)消息的元數(shù)據(jù),如消息的創(chuàng)建時(shí)間、內(nèi)容類型等。MessageID:每個(gè)消息都有一個(gè)唯一的ID,用于消息的追蹤和管理。Schema:定義了消息的結(jié)構(gòu),使得消息的發(fā)送者和接收者能夠理解消息的內(nèi)容。2.1.1示例代碼importorg.apache.pulsar.client.api.Schema;
importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassMessageStructureExample{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<String>producer=client.newProducer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.create();
StringmessageContent="{\"name\":\"John\",\"age\":30}";
Message<String>message=Message.builder()
.value(messageContent)
.properties(Map.of("type","user","timestamp",String.valueOf(System.currentTimeMillis())))
.build();
producer.send(message);
producer.close();
client.close();
}
}在上述代碼中,我們創(chuàng)建了一個(gè)Producer對(duì)象,使用Schema.STRING定義了消息的結(jié)構(gòu)。然后,我們構(gòu)建了一個(gè)消息,其中包含了實(shí)際的payload(messageContent)和一些properties(如類型和時(shí)間戳)。2.2消息的生命周期Pulsar消息的生命周期從消息被生產(chǎn)者發(fā)送開(kāi)始,直到被消費(fèi)者消費(fèi)并確認(rèn)為止。在這個(gè)過(guò)程中,消息可能會(huì)經(jīng)歷以下?tīng)顟B(tài):待發(fā)送:消息在生產(chǎn)者緩存中等待發(fā)送。發(fā)送中:消息正在通過(guò)網(wǎng)絡(luò)發(fā)送到PulsarBroker。已發(fā)送:消息成功發(fā)送到Broker,存儲(chǔ)在消息日志中。待消費(fèi):消息存儲(chǔ)在Broker中,等待被消費(fèi)者消費(fèi)。消費(fèi)中:消費(fèi)者正在處理消息。已消費(fèi):消費(fèi)者成功處理消息,并向Broker發(fā)送確認(rèn)。2.2.1示例代碼importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.MessageId;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassMessageLifecycleExample{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscribe();
Message<String>message=consumer.receive();
System.out.println("Receivedmessage:"+message.getValue());
//消費(fèi)者處理消息
//...
//確認(rèn)消息已被消費(fèi)
consumer.acknowledge(message.getMessageId());
consumer.close();
client.close();
}
}在本例中,我們創(chuàng)建了一個(gè)Consumer對(duì)象,訂閱了特定的主題。當(dāng)消息到達(dá)時(shí),consumer.receive()方法將消息從Broker中取出,進(jìn)入“消費(fèi)中”狀態(tài)。一旦消息被處理,我們通過(guò)調(diào)用consumer.acknowledge()方法來(lái)確認(rèn)消息已被消費(fèi),從而完成其生命周期。2.3消息的發(fā)布與訂閱機(jī)制Pulsar提供了多種發(fā)布與訂閱機(jī)制,以滿足不同的需求。主要的發(fā)布與訂閱模式包括:獨(dú)占訂閱(Exclusive):只有一個(gè)消費(fèi)者可以訂閱一個(gè)主題,所有消息都只能被這一個(gè)消費(fèi)者消費(fèi)。共享訂閱(Shared):多個(gè)消費(fèi)者可以訂閱一個(gè)主題,消息會(huì)被分發(fā)給所有訂閱者中的一個(gè),確保每個(gè)消息只被消費(fèi)一次。故障轉(zhuǎn)移訂閱(Failover):類似于獨(dú)占訂閱,但是當(dāng)當(dāng)前的消費(fèi)者不可用時(shí),消息會(huì)被轉(zhuǎn)發(fā)給下一個(gè)消費(fèi)者。2.3.1示例代碼importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassPublishSubscribeExample{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Consumer<String>consumer1=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
Consumer<String>consumer2=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
//嘗試接收消息,由于是獨(dú)占訂閱,consumer2將無(wú)法接收消息
Message<String>message1=consumer1.receive();
Message<String>message2=consumer2.receive();//這里consumer2將阻塞,直到consumer1確認(rèn)或放棄消息
//消費(fèi)者處理消息
//...
//確認(rèn)消息已被消費(fèi)
consumer1.acknowledge(message1.getMessageId());
consumer1.close();
consumer2.close();
client.close();
}
}在上述代碼中,我們創(chuàng)建了兩個(gè)消費(fèi)者consumer1和consumer2,并使用SubscriptionType.Exclusive指定了獨(dú)占訂閱模式。這意味著所有發(fā)送到主題的消息將只被consumer1或consumer2中的一個(gè)消費(fèi),而不會(huì)同時(shí)被兩個(gè)消費(fèi)者接收。通過(guò)這些示例,我們可以看到Pulsar消息模型的靈活性和強(qiáng)大功能,以及如何通過(guò)代碼實(shí)現(xiàn)消息的結(jié)構(gòu)定義、生命周期管理和發(fā)布訂閱機(jī)制。這為構(gòu)建高效、可靠的消息處理系統(tǒng)提供了堅(jiān)實(shí)的基礎(chǔ)。3Pulsar消息類型詳解在ApachePulsar消息隊(duì)列中,消息可以采用多種格式進(jìn)行編碼和傳輸,以適應(yīng)不同的應(yīng)用場(chǎng)景和數(shù)據(jù)處理需求。本教程將深入探討Pulsar支持的四種主要消息類型:二進(jìn)制消息、JSON消息、Avro消息以及其他消息格式。3.1進(jìn)制消息二進(jìn)制消息是最基本的消息類型,它允許消息以原始字節(jié)流的形式發(fā)送。這種類型的消息適用于任何可以序列化為字節(jié)流的數(shù)據(jù),包括但不限于自定義的二進(jìn)制數(shù)據(jù)、圖像、音頻文件等。3.1.1示例代碼importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassBinaryProducer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<byte[]>producer=client.newProducer().topic("binary-topic").create();
//創(chuàng)建二進(jìn)制消息
byte[]binaryData=newbyte[]{0x01,0x02,0x03,0x04};
producer.send(binaryData);
//關(guān)閉生產(chǎn)者和客戶端
producer.close();
client.close();
}
}3.1.2解釋在上述示例中,我們創(chuàng)建了一個(gè)Pulsar客戶端,并使用它來(lái)創(chuàng)建一個(gè)生產(chǎn)者,該生產(chǎn)者將二進(jìn)制數(shù)據(jù)發(fā)送到名為binary-topic的主題中。二進(jìn)制數(shù)據(jù)直接以字節(jié)數(shù)組的形式發(fā)送,無(wú)需額外的編碼或解碼步驟。3.2JSON消息JSON(JavaScriptObjectNotation)是一種輕量級(jí)的數(shù)據(jù)交換格式,易于人閱讀和編寫(xiě),同時(shí)也易于機(jī)器解析和生成。Pulsar支持JSON消息,這使得在消息中傳輸結(jié)構(gòu)化數(shù)據(jù)變得簡(jiǎn)單。3.2.1示例代碼importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
importcom.fasterxml.jackson.databind.ObjectMapper;
publicclassJSONProducer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<String>producer=client.newProducer().topic("json-topic").create();
//創(chuàng)建JSON消息
ObjectMappermapper=newObjectMapper();
Useruser=newUser("JohnDoe",30);
StringjsonMessage=mapper.writeValueAsString(user);
producer.send(jsonMessage.getBytes());
//關(guān)閉生產(chǎn)者和客戶端
producer.close();
client.close();
}
staticclassUser{
Stringname;
intage;
publicUser(Stringname,intage){
=name;
this.age=age;
}
}
}3.2.2解釋此示例展示了如何使用Java的ObjectMapper類將一個(gè)簡(jiǎn)單的User對(duì)象轉(zhuǎn)換為JSON字符串,然后將其發(fā)送到Pulsar的主題json-topic中。User對(duì)象包含姓名和年齡兩個(gè)字段,通過(guò)序列化為JSON格式,可以方便地在消息中傳輸結(jié)構(gòu)化數(shù)據(jù)。3.3Avro消息ApacheAvro是一種數(shù)據(jù)序列化系統(tǒng),它不僅提供緊湊、快速的二進(jìn)制數(shù)據(jù)序列化,還支持模式演進(jìn),即在不破壞向后兼容性的情況下,可以修改數(shù)據(jù)模式。Pulsar通過(guò)Avro編碼支持高效地傳輸復(fù)雜數(shù)據(jù)結(jié)構(gòu)。3.3.1示例代碼importorg.apache.avro.Schema;
importorg.apache.avro.generic.GenericData;
importorg.apache.avro.generic.GenericRecord;
importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
importorg.apache.pulsar.client.impl.schema.AvroSchema;
publicclassAvroProducer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<GenericRecord>producer=client.newProducer(AvroSchema.of(User.class)).topic("avro-topic").create();
//創(chuàng)建Avro消息
Useruser=newUser();
user.setName("JaneDoe");
user.setAge(25);
producer.send(user);
//關(guān)閉生產(chǎn)者和客戶端
producer.close();
client.close();
}
staticclassUserextendsGenericData.Record{
publicUser(){
Schemaschema=newSchema.Parser().parse("{"
+"\"type\":\"record\","
+"\"name\":\"User\","
+"\"fields\":["
+"{\"name\":\"name\",\"type\":\"string\"},"
+"{\"name\":\"age\",\"type\":\"int\"}"
+"]"
+"}");
super(schema);
}
publicvoidsetName(Stringname){
put("name",name);
}
publicvoidsetAge(intage){
put("age",age);
}
}
}3.3.2解釋在AvroProducer示例中,我們首先定義了一個(gè)Avro模式,該模式描述了User對(duì)象的結(jié)構(gòu),包括name和age字段。然后,我們使用Pulsar的AvroSchema創(chuàng)建一個(gè)生產(chǎn)者,將User對(duì)象直接發(fā)送到avro-topic主題。Avro的模式演進(jìn)特性使得在數(shù)據(jù)結(jié)構(gòu)發(fā)生變化時(shí),仍然能夠保證消息的正確傳輸和解析。3.4其他消息格式除了上述提到的二進(jìn)制、JSON和Avro消息格式,Pulsar還支持其他消息格式,如Protobuf、XML等。這些格式的選擇取決于具體的應(yīng)用場(chǎng)景和數(shù)據(jù)處理需求。3.4.1Protobuf消息Protobuf(ProtocolBuffers)是Google開(kāi)發(fā)的一種數(shù)據(jù)交換格式,它提供了高效的序列化和反序列化機(jī)制,特別適合于傳輸大量結(jié)構(gòu)化數(shù)據(jù)。3.4.2示例代碼importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
importtobuf.GeneratedMessageV3;
publicclassProtobufProducer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<GeneratedMessageV3>producer=client.newProducer(Schema.PROTOBUF(UserProto.User.getDefaultInstance())).topic("protobuf-topic").create();
//創(chuàng)建Protobuf消息
UserProto.Useruser=UserProto.User.newBuilder()
.setName("Alice")
.setAge(35)
.build();
producer.send(user.toByteArray());
//關(guān)閉生產(chǎn)者和客戶端
producer.close();
client.close();
}
}3.4.3解釋在ProtobufProducer示例中,我們使用了Google的Protobuf庫(kù)來(lái)定義User消息的結(jié)構(gòu),并通過(guò)Pulsar的ProtobufSchema創(chuàng)建了一個(gè)生產(chǎn)者。User對(duì)象通過(guò)toByteArray方法轉(zhuǎn)換為字節(jié)數(shù)組,然后發(fā)送到protobuf-topic主題。Protobuf的高效性和緊湊性使其成為處理大量結(jié)構(gòu)化數(shù)據(jù)的理想選擇。3.5結(jié)論P(yáng)ulsar通過(guò)支持多種消息格式,如二進(jìn)制、JSON、Avro和Protobuf,為開(kāi)發(fā)者提供了靈活的數(shù)據(jù)傳輸和處理選項(xiàng)。選擇合適的消息格式可以顯著提高消息處理的效率和可靠性,同時(shí)簡(jiǎn)化數(shù)據(jù)的序列化和反序列化過(guò)程。在實(shí)際應(yīng)用中,應(yīng)根據(jù)數(shù)據(jù)的特性和處理需求來(lái)選擇最合適的格式。請(qǐng)注意,上述代碼示例假設(shè)你已經(jīng)定義了相應(yīng)的Protobuf消息結(jié)構(gòu)(UserProto),并且在項(xiàng)目中包含了必要的依賴庫(kù)。在實(shí)際應(yīng)用中,你可能需要根據(jù)自己的數(shù)據(jù)模型和需求進(jìn)行相應(yīng)的調(diào)整。4消息發(fā)布4.1生產(chǎn)者角色在ApachePulsar消息隊(duì)列中,生產(chǎn)者是負(fù)責(zé)生成和發(fā)送消息的組件。生產(chǎn)者通過(guò)連接到Pulsar的Broker,將消息發(fā)送到特定的Topic中。每個(gè)Topic可以有多個(gè)生產(chǎn)者,這意味著多個(gè)應(yīng)用程序或服務(wù)可以同時(shí)向同一個(gè)Topic發(fā)送消息,從而實(shí)現(xiàn)高并發(fā)的消息發(fā)布能力。4.1.1發(fā)布消息流程創(chuàng)建Producer對(duì)象:生產(chǎn)者首先需要?jiǎng)?chuàng)建一個(gè)Producer對(duì)象,這通常通過(guò)PulsarClient的createProducer方法完成。發(fā)送消息:使用Producer對(duì)象的send方法將消息發(fā)送到指定的Topic。消息可以是任意類型的數(shù)據(jù),但在發(fā)送前需要被序列化為字節(jié)流。確認(rèn)消息發(fā)送:Pulsar支持消息發(fā)送的確認(rèn)機(jī)制,確保消息被成功發(fā)送到Broker。如果消息發(fā)送失敗,生產(chǎn)者可以重新發(fā)送消息。關(guān)閉Producer:在完成消息發(fā)送后,生產(chǎn)者應(yīng)該調(diào)用close方法來(lái)釋放資源。4.1.2消息持久化策略Pulsar提供了多種消息持久化策略,以確保消息在Broker故障時(shí)不會(huì)丟失。這些策略包括:消息存儲(chǔ):Pulsar使用ApacheBookKeeper作為其后端存儲(chǔ)系統(tǒng),所有消息都會(huì)被持久化到磁盤(pán)上。消息復(fù)制:Pulsar支持消息的跨Broker復(fù)制,以提高系統(tǒng)的可用性和容錯(cuò)性。復(fù)制策略可以是同步或異步的。消息過(guò)期:Pulsar允許設(shè)置消息的過(guò)期時(shí)間,過(guò)期的消息將被自動(dòng)刪除,以節(jié)省存儲(chǔ)空間。4.2示例代碼:創(chuàng)建生產(chǎn)者并發(fā)送消息importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.ProducerConfiguration;
importorg.apache.pulsar.client.api.Schema;
publicclassPulsarProducerExample{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
//創(chuàng)建PulsarClient實(shí)例
PulsarClientclient=PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
//創(chuàng)建Producer配置
ProducerConfiguration<String>producerConfiguration=ProducerConfiguration.builder()
.schema(Schema.STRING)
.build();
//創(chuàng)建Producer
Producer<String>producer=client.newProducer(producerConfiguration)
.topic("persistent://public/default/my-topic")
.create();
//發(fā)送消息
for(inti=0;i<10;i++){
Stringmessage="HelloPulsar"+i;
producer.send(message);
}
//關(guān)閉Producer
producer.close();
//關(guān)閉PulsarClient
client.close();
}
}4.2.1代碼解釋創(chuàng)建PulsarClient:通過(guò)PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();創(chuàng)建一個(gè)PulsarClient實(shí)例,其中serviceUrl指定了Pulsar服務(wù)的URL。創(chuàng)建Producer配置:使用ProducerConfiguration.builder().schema(Schema.STRING).build();創(chuàng)建一個(gè)Producer配置對(duì)象,指定了消息的Schema為字符串類型。創(chuàng)建Producer:通過(guò)client.newProducer(producerConfiguration).topic("persistent://public/default/my-topic").create();創(chuàng)建一個(gè)Producer,其中topic指定了消息將被發(fā)送到的Topic。發(fā)送消息:使用producer.send(message);方法發(fā)送消息到指定的Topic。關(guān)閉Producer和PulsarClient:在完成消息發(fā)送后,調(diào)用producer.close();和client.close();來(lái)釋放資源。通過(guò)以上步驟,我們可以看到Pulsar消息隊(duì)列中消息發(fā)布的完整流程,從創(chuàng)建生產(chǎn)者到發(fā)送消息,再到資源的釋放,每一步都遵循了Pulsar的API規(guī)范,確保了消息的正確發(fā)送和系統(tǒng)的穩(wěn)定運(yùn)行。5消息消費(fèi)5.1消費(fèi)者角色在ApachePulsar消息隊(duì)列中,消費(fèi)者是消息的接收者。它們訂閱主題,接收并處理由生產(chǎn)者發(fā)送的消息。消費(fèi)者可以是任何能夠接收和處理消息的應(yīng)用程序或服務(wù)。在Pulsar中,消費(fèi)者通過(guò)創(chuàng)建一個(gè)Consumer對(duì)象來(lái)訂閱主題,這個(gè)對(duì)象提供了接收消息的方法。5.1.1示例代碼importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassPulsarConsumerExample{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
//創(chuàng)建Pulsar客戶端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//創(chuàng)建消費(fèi)者,訂閱主題
Consumer<String>consumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.subscribe();
//接收消息
Message<String>msg=consumer.receive();
//處理消息
System.out.println("Receivedmessage:"+msg.getValue());
//確認(rèn)消息已處理
consumer.acknowledge(msg);
//關(guān)閉消費(fèi)者和客戶端
consumer.close();
client.close();
}
}5.2消息消費(fèi)模式Pulsar支持兩種主要的消息消費(fèi)模式:獨(dú)占(Exclusive)和共享(Shared)。5.2.1獨(dú)占消費(fèi)在獨(dú)占消費(fèi)模式下,一個(gè)訂閱只能被一個(gè)消費(fèi)者使用。這意味著如果多個(gè)消費(fèi)者嘗試訂閱同一個(gè)主題和訂閱名稱,只有第一個(gè)消費(fèi)者能夠成功訂閱,其他消費(fèi)者將收到錯(cuò)誤。這種模式適用于需要確保消息只被一個(gè)消費(fèi)者處理的情況。5.2.2共享消費(fèi)共享消費(fèi)模式允許多個(gè)消費(fèi)者訂閱同一個(gè)主題和訂閱名稱。消息將被分發(fā)給訂閱中的任意一個(gè)消費(fèi)者,確保消息至少被處理一次。這種模式適用于需要高可用性和負(fù)載均衡的場(chǎng)景。5.2.3示例代碼//獨(dú)占消費(fèi)模式
Consumer<String>exclusiveConsumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
//共享消費(fèi)模式
Consumer<String>sharedConsumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();5.3消息重試與死信隊(duì)列在處理消息時(shí),如果消費(fèi)者未能成功處理消息,Pulsar提供了消息重試機(jī)制。消息可以被重新發(fā)送給消費(fèi)者,直到成功處理。如果消息在多次重試后仍然無(wú)法處理,它可以被移動(dòng)到死信隊(duì)列(DeadLetterQueue)。5.3.1消息重試消費(fèi)者可以通過(guò)negativeAcknowledge方法來(lái)標(biāo)記消息為未處理,這將觸發(fā)消息重試。Pulsar允許配置重試策略,包括重試次數(shù)和重試間隔。5.3.2死信隊(duì)列死信隊(duì)列用于存儲(chǔ)那些無(wú)法被正常處理的消息。這些消息可以被單獨(dú)處理或分析,以找出處理失敗的原因。在Pulsar中,可以通過(guò)配置主題的策略來(lái)啟用死信隊(duì)列。5.3.3示例代碼//消費(fèi)者處理消息失敗,請(qǐng)求重試
consumer.negativeAcknowledge(msg);
//配置主題策略,啟用死信隊(duì)列
TopicPoliciespolicies=newTopicPolicies();
policies.setDeadLetterTopic("persistent://public/default/my-dead-letter-topic");
client.getAdmin().topics().updateTopicPolicies("persistent://public/default/my-topic",policies);通過(guò)上述代碼和解釋,我們深入了解了Pulsar中消費(fèi)者角色的定義、消息消費(fèi)的不同模式,以及如何處理消息重試和死信隊(duì)列。這些機(jī)制確保了消息的可靠處理和系統(tǒng)的高可用性。6消息路由策略6.1消息分發(fā)原理在消息隊(duì)列系統(tǒng)中,如ApachePulsar,消息的分發(fā)是一個(gè)關(guān)鍵過(guò)程,它決定了消息如何從生產(chǎn)者到達(dá)消費(fèi)者。Pulsar通過(guò)其獨(dú)特的消息模型和靈活的路由策略,確保了消息的高效、有序和可靠傳輸。消息分發(fā)原理基于以下幾點(diǎn):主題(Topic):在Pulsar中,消息被發(fā)布到特定的主題上,這些主題可以是持久的或非持久的,根據(jù)消息的存儲(chǔ)需求而定。分區(qū)(Partition):為了提高吞吐量和可擴(kuò)展性,主題可以被劃分為多個(gè)分區(qū),每個(gè)分區(qū)獨(dú)立存儲(chǔ)和處理消息。消費(fèi)者(Consumer):消費(fèi)者訂閱主題以接收消息。Pulsar支持獨(dú)占、共享和鍵共享三種訂閱類型,以滿足不同的消費(fèi)模式。消息路由:生產(chǎn)者發(fā)送消息時(shí),Pulsar根據(jù)配置的路由策略決定將消息發(fā)送到哪個(gè)分區(qū)。這確保了消息的均衡分布和處理。6.2路由策略類型Pulsar提供了多種消息路由策略,以適應(yīng)不同的應(yīng)用場(chǎng)景和需求:RoundRobin:輪詢策略,將消息均勻地分發(fā)到所有分區(qū),以實(shí)現(xiàn)負(fù)載均衡。KeyBased:基于消息鍵的策略,根據(jù)消息中的鍵將消息路由到特定的分區(qū),確保具有相同鍵的消息被發(fā)送到同一分區(qū),便于實(shí)現(xiàn)消息的有序處理。Custom:自定義策略,允許用戶實(shí)現(xiàn)自己的消息路由邏輯,提供最大的靈活性。6.2.1RoundRobin路由策略原理RoundRobin策略是最簡(jiǎn)單的消息分發(fā)方式,它將消息輪流發(fā)送到不同的分區(qū),確保每個(gè)分區(qū)的負(fù)載大致相同。這種策略適用于消息大小和處理時(shí)間相對(duì)均勻的場(chǎng)景。示例假設(shè)我們有一個(gè)主題my-topic,它被劃分為4個(gè)分區(qū)。使用RoundRobin策略,消息將按照以下順序被發(fā)送:第1條消息發(fā)送到分區(qū)0第2條消息發(fā)送到分區(qū)1第3條消息發(fā)送到分區(qū)2第4條消息發(fā)送到分區(qū)3第5條消息再次發(fā)送到分區(qū)0,以此類推。6.2.2KeyBased路由策略原理KeyBased策略根據(jù)消息中的鍵(key)將消息路由到特定的分區(qū)。如果消息包含相同的鍵,它們將被發(fā)送到同一分區(qū),這有助于實(shí)現(xiàn)消息的有序處理和聚合。鍵可以是消息中的任意字段,也可以是自定義的鍵生成邏輯。示例假設(shè)我們有一個(gè)主題my-topic,它被劃分為3個(gè)分區(qū)。消息包含一個(gè)用戶ID作為鍵,ID為1的消息將始終被發(fā)送到分區(qū)0,ID為2的消息將被發(fā)送到分區(qū)1,ID為3的消息將被發(fā)送到分區(qū)2,以此類推。#Python示例代碼
importpulsar
client=pulsar.Client('pulsar://localhost:6650')
producer=client.create_producer('my-topic',key_type='string')
#發(fā)送消息,鍵為用戶ID
foriinrange(10):
key=str(i%3)#生成鍵,確保消息被均勻分布到3個(gè)分區(qū)
producer.send(('message-'+str(i)).encode('utf-8'),key=key)
client.close()6.2.3Custom路由策略原理Custom策略允許用戶實(shí)現(xiàn)自己的消息路由邏輯。這提供了最大的靈活性,但同時(shí)也要求用戶對(duì)消息隊(duì)列的性能和可靠性有深入的理解。自定義策略可以通過(guò)實(shí)現(xiàn)Pulsar的MessageRouter接口來(lái)實(shí)現(xiàn)。示例假設(shè)我們想要根據(jù)消息的類型(例如,交易、日志、警報(bào))將消息路由到不同的分區(qū),可以實(shí)現(xiàn)一個(gè)自定義的路由策略。//Java示例代碼
importorg.apache.pulsar.client.api.MessageRouter;
importorg.apache.pulsar.client.api.MessageRouterBuilder;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
publicclassCustomMessageRouterimplementsMessageRouter<String>{
@Override
publicintroute(Stringkey,intnumPartitions){
if(key.startsWith("transaction")){
return0;//交易消息發(fā)送到分區(qū)0
}elseif(key.startsWith("log")){
return1;//日志消息發(fā)送到分區(qū)1
}elseif(key.startsWith("alert")){
return2;//警報(bào)消息發(fā)送到分區(qū)2
}
returnnumPartitions/2;//默認(rèn)情況下,將消息發(fā)送到中間分區(qū)
}
}
publicclassCustomProducer{
publicstaticvoidmain(String[]args)throwsException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
MessageRouterBuilder<String>routerBuilder=MessageRouter.custom(newCustomMessageRouter());
Producer<String>producer=client.newProducer(Schema.STRING)
.topic("my-topic")
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.messageRouter(routerBuilder)
.create();
//發(fā)送不同類型的消息
producer.send("transaction-1","transaction");
producer.send("log-2","log");
producer.send("alert-3","alert");
client.close();
}
}6.3策略配置示例在Pulsar中,配置路由策略可以通過(guò)修改生產(chǎn)者創(chuàng)建時(shí)的參數(shù)來(lái)實(shí)現(xiàn)。以下是一個(gè)使用RoundRobin策略的配置示例://Java示例代碼
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.Producer;
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Producer<String>producer=client.newProducer(Schema.STRING)
.topic("my-topic")
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.create();在這個(gè)示例中,messageRoutingMode參數(shù)被設(shè)置為RoundRobinPartition,這告訴Pulsar使用輪詢策略來(lái)分發(fā)消息。對(duì)于KeyBased策略,可以通過(guò)在發(fā)送消息時(shí)指定鍵來(lái)實(shí)現(xiàn):producer.send(("message-"+i).getBytes(),"key-"+(i%3));在這個(gè)示例中,消息的鍵被設(shè)置為"key-"+(i%3),這確保了具有相同鍵的消息被發(fā)送到同一分區(qū)。對(duì)于Custom策略,需要實(shí)現(xiàn)MessageRouter接口,并在創(chuàng)建生產(chǎn)者時(shí)指定自定義的路由策略。MessageRouterBuilder<String>routerBuilder=MessageRouter.custom(newCustomMessageRouter());
Producer<String>producer=client.newProducer(Schema.STRING)
.topic("my-topic")
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.messageRouter(routerBuilder)
.create();在這個(gè)示例中,CustomMessageRouter類實(shí)現(xiàn)了自定義的路由邏輯,通過(guò)MessageRouterBuilder和messageRouter參數(shù),可以將這個(gè)自定義策略應(yīng)用到生產(chǎn)者上。通過(guò)以上示例,我們可以看到Pulsar提供了豐富的消息路由策略,以滿足不同場(chǎng)景的需求。選擇合適的策略對(duì)于優(yōu)化消息隊(duì)列的性能和可靠性至關(guān)重要。7高級(jí)消息處理7.1消息壓縮在消息隊(duì)列系統(tǒng)中,如ApachePulsar,消息壓縮是一種優(yōu)化網(wǎng)絡(luò)傳輸和存儲(chǔ)空間的有效手段。Pulsar支持多種壓縮算法,包括LZ4、ZLIB、ZSTD等,以適應(yīng)不同的性能和壓縮比需求。7.1.1原理消息壓縮在消息發(fā)送前進(jìn)行,將原始消息數(shù)據(jù)轉(zhuǎn)換為更小的二進(jìn)制格式,從而減少在網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù)量和存儲(chǔ)空間的占用。接收方在消費(fèi)消息時(shí),會(huì)自動(dòng)解壓縮消息,恢復(fù)原始數(shù)據(jù)。7.1.2示例在Pulsar中,可以通過(guò)設(shè)置producer的compressionType屬性來(lái)啟用消息壓縮。以下是一個(gè)使用LZ4壓縮算法的示例:importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassCompressedProducer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<byte[]>producer=client.newProducer()
.topic("persistent://sample/standalone/ns/my-topic")
.compressionType(CompressionType.LZ4)
.create();
Stringmessage="Hello,Pulsar!";
producer.send(message.getBytes());
producer.close();
client.close();
}
}7.1.3解釋在上述代碼中,我們創(chuàng)建了一個(gè)PulsarClient實(shí)例,并指定了服務(wù)URL。然后,我們創(chuàng)建了一個(gè)Producer,設(shè)置了topic和compressionType為L(zhǎng)Z4。發(fā)送消息時(shí),原始字符串被轉(zhuǎn)換為字節(jié)數(shù)組,然后通過(guò)壓縮的Producer發(fā)送。接收方會(huì)自動(dòng)解壓縮消息,無(wú)需額外的解壓縮邏輯。7.2消息時(shí)間戳在Pulsar中,每條消息都有一個(gè)時(shí)間戳,用于記錄消息的創(chuàng)建時(shí)間或發(fā)送時(shí)間。時(shí)間戳可以用于實(shí)現(xiàn)時(shí)間窗口、消息延遲發(fā)送等功能。7.2.1原理Pulsar允許在消息發(fā)送時(shí)指定時(shí)間戳,如果沒(méi)有指定,系統(tǒng)會(huì)自動(dòng)使用消息發(fā)送時(shí)的時(shí)間作為時(shí)間戳。時(shí)間戳以毫秒為單位,存儲(chǔ)在消息的元數(shù)據(jù)中。7.2.2示例以下是一個(gè)在發(fā)送消息時(shí)指定時(shí)間戳的示例:importorg.apache.pulsar.client.api.MessageBuilder;
importorg.apache.pulsar.client.api.Producer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassTimestampProducer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<byte[]>producer=client.newProducer()
.topic("persistent://sample/standalone/ns/my-topic")
.create();
longtimestamp=System.currentTimeMillis()+10000;//發(fā)送10秒后的時(shí)間
Stringmessage="Hello,Pulsar!";
producer.newMessage()
.value(message.getBytes())
.timestamp(timestamp)
.send();
producer.close();
client.close();
}
}7.2.3解釋在本例中,我們創(chuàng)建了一個(gè)Producer,并通過(guò)newMessage()方法創(chuàng)建了一個(gè)消息構(gòu)建器。我們指定了一個(gè)未來(lái)的時(shí)間戳,使得消息在10秒后才被標(biāo)記為發(fā)送。這在實(shí)現(xiàn)延遲消息處理時(shí)非常有用。7.3消息順序保證在分布式系統(tǒng)中,消息的順序處理是一個(gè)常見(jiàn)的需求。Pulsar提供了消息順序保證的機(jī)制,確保消息按照發(fā)送的順序被消費(fèi)。7.3.1原理Pulsar通過(guò)在Producer和Consumer上設(shè)置相應(yīng)的屬性來(lái)實(shí)現(xiàn)消息順序。在Producer端,可以設(shè)置blockIfQueueFull屬性為true,以確保消息在隊(duì)列滿時(shí)被阻塞,直到隊(duì)列有空間。在Consumer端,可以設(shè)置subscriptionType為Exclusive,以確保一個(gè)Consumer獨(dú)占一個(gè)topic,從而保證消息順序。7.3.2示例以下是一個(gè)使用Pulsar保證消息順序的示例:importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
importorg.apache.pulsar.client.api.SubscriptionType;
importjava.util.concurrent.TimeUnit;
publicclassOrderedConsumer{
publicstaticvoidmain(String[]args)throwsPulsarClientException,InterruptedException{
PulsarClientclient=PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<byte[]>consumer=client.newConsumer()
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
while(true){
Message<byte[]>msg=consumer.receive(5,TimeUnit.SECONDS);
if(msg!=null){
System.out.println("Receivedmessage:
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 模特形象培訓(xùn)課程
- 泌尿系結(jié)石的運(yùn)動(dòng)治療
- 腫瘤流行病學(xué)的研究?jī)?nèi)容
- 保健品并購(gòu)策略
- 舞蹈室改造合同
- 畜牧業(yè)審查管理辦法
- 印刷包裝木方供應(yīng)合同
- 軟件開(kāi)發(fā)項(xiàng)目招投標(biāo)科研申請(qǐng)表
- 領(lǐng)導(dǎo)個(gè)人年終總結(jié)
- 建筑橋梁分包合同
- 百變扭扭棒 課件
- 復(fù)旦大學(xué)數(shù)學(xué)物理方法講義
- 上海破產(chǎn)管理人擴(kuò)容考試參考題庫(kù)(含答案)
- 繪本課件小兔子的月亮
- 基礎(chǔ)篇1、松下電器歷史簡(jiǎn)介
- 學(xué)生餐飲方面的消費(fèi)者行為分析
- 汽車維修價(jià)格表
- 川氣東送武漢隧道內(nèi)雙管安裝技術(shù)
- 中班音樂(lè)韻律游戲《阿凡提尋寶記》原版有聲動(dòng)態(tài)PPT課件
- 空調(diào)水系統(tǒng)的節(jié)能措施以及水泵調(diào)節(jié)
- 奇妙的黃金數(shù)在生活中廣泛應(yīng)用學(xué)習(xí)資料
評(píng)論
0/150
提交評(píng)論