消息隊(duì)列:Pulsar:Pulsar消息模型與消息類型_第1頁(yè)
消息隊(duì)列:Pulsar:Pulsar消息模型與消息類型_第2頁(yè)
消息隊(duì)列:Pulsar:Pulsar消息模型與消息類型_第3頁(yè)
消息隊(duì)列:Pulsar:Pulsar消息模型與消息類型_第4頁(yè)
消息隊(duì)列:Pulsar:Pulsar消息模型與消息類型_第5頁(yè)
已閱讀5頁(yè),還剩22頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

消息隊(duì)列:Pulsar:Pulsar消息模型與消息類型1消息隊(duì)列基礎(chǔ)1.1消息隊(duì)列的定義消息隊(duì)列是一種應(yīng)用程序間通信(IPC)的模式,它允許消息在發(fā)送者和接收者之間異步傳遞。消息隊(duì)列中的消息遵循先進(jìn)先出(FIFO)原則,但同時(shí)也支持更復(fù)雜的路由策略。消息隊(duì)列可以提高系統(tǒng)的解耦性、可擴(kuò)展性和容錯(cuò)能力。1.2消息隊(duì)列的作用消息隊(duì)列在現(xiàn)代分布式系統(tǒng)中扮演著關(guān)鍵角色,主要作用包括:異步處理:允許發(fā)送者和接收者異步操作,提高系統(tǒng)響應(yīng)速度。負(fù)載均衡:通過(guò)消息隊(duì)列,可以將任務(wù)均勻地分配給多個(gè)處理者。削峰填谷:在高負(fù)載時(shí),消息隊(duì)列可以緩存消息,避免系統(tǒng)過(guò)載。系統(tǒng)解耦:發(fā)送者和接收者不需要直接通信,降低了系統(tǒng)的耦合度。數(shù)據(jù)持久化:消息隊(duì)列通常會(huì)將消息持久化到磁盤(pán),確保消息不會(huì)因系統(tǒng)故障而丟失。1.3Pulsar簡(jiǎn)介ApachePulsar是一個(gè)高性能、可擴(kuò)展的分布式消息隊(duì)列系統(tǒng),由Yahoo開(kāi)發(fā)并開(kāi)源,現(xiàn)已成為Apache軟件基金會(huì)的頂級(jí)項(xiàng)目。Pulsar提供了消息隊(duì)列和發(fā)布/訂閱兩種消息模式,支持多種消息類型,包括二進(jìn)制、JSON、Avro等。Pulsar的核心特性包括:持久化和非持久化消息:消息可以被持久化到磁盤(pán),也可以選擇在內(nèi)存中短暫存儲(chǔ)。多租戶和多層安全性:支持多租戶環(huán)境,提供細(xì)粒度的訪問(wèn)控制和身份驗(yàn)證。水平擴(kuò)展:可以輕松地通過(guò)增加更多的Broker來(lái)擴(kuò)展系統(tǒng)的吞吐量和存儲(chǔ)能力。全球分布:支持跨數(shù)據(jù)中心的全球分布,確保數(shù)據(jù)的高可用性和低延遲訪問(wèn)。1.3.1Pulsar消息模型Pulsar的消息模型基于主題(Topic)和訂閱(Subscription)。一個(gè)主題可以有多個(gè)生產(chǎn)者(Producer)和消費(fèi)者(Consumer),生產(chǎn)者向主題發(fā)送消息,消費(fèi)者從主題中消費(fèi)消息。訂閱者可以以獨(dú)占、共享或鍵共享模式訂閱主題,這決定了消息如何在消費(fèi)者之間分發(fā)。1.3.2Pulsar消息類型Pulsar支持多種消息類型,包括:二進(jìn)制消息:最基礎(chǔ)的消息類型,可以是任何二進(jìn)制數(shù)據(jù)。JSON消息:用于傳輸結(jié)構(gòu)化數(shù)據(jù),便于解析和處理。Avro消息:提供數(shù)據(jù)序列化和反序列化,支持模式演進(jìn)。示例:使用Java發(fā)送JSON消息到Pulsarimportorg.apache.pulsar.client.api.Schema;

importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

publicclassPulsarProducer{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

//創(chuàng)建Pulsar客戶端

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

//創(chuàng)建生產(chǎn)者,使用JSON模式

Producer<String>producer=client.newProducer(Schema.JSON(String.class))

.topic("my-topic")

.create();

//發(fā)送消息

for(inti=0;i<10;i++){

Stringmessage="HelloPulsar"+i;

producer.send(message);

}

//關(guān)閉生產(chǎn)者和客戶端

producer.close();

client.close();

}

}在上述示例中,我們首先創(chuàng)建了一個(gè)Pulsar客戶端,然后使用Schema.JSON(String.class)指定了消息的類型為JSON。接著,我們創(chuàng)建了一個(gè)生產(chǎn)者并發(fā)送了10條消息到主題my-topic。最后,我們關(guān)閉了生產(chǎn)者和客戶端,確保資源被正確釋放。示例:使用Java從Pulsar消費(fèi)JSON消息importorg.apache.pulsar.client.api.Consumer;

importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

importorg.apache.pulsar.client.api.SubscriptionType;

publicclassPulsarConsumer{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

//創(chuàng)建Pulsar客戶端

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

//創(chuàng)建消費(fèi)者,使用JSON模式

Consumer<String>consumer=client.newConsumer(Schema.JSON(String.class))

.topic("my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Exclusive)

.subscribe();

//消費(fèi)消息

while(true){

Message<String>msg=consumer.receive();

System.out.println("Receivedmessage:"+msg.getValue());

//確認(rèn)消息已被消費(fèi)

consumer.acknowledge(msg);

}

}

}在這個(gè)消費(fèi)示例中,我們創(chuàng)建了一個(gè)消費(fèi)者,同樣使用了Schema.JSON(String.class)來(lái)指定消息類型。我們訂閱了主題my-topic,并使用了獨(dú)占訂閱模式(SubscriptionType.Exclusive)。然后,我們進(jìn)入一個(gè)無(wú)限循環(huán),接收并打印消息,最后確認(rèn)消息已被消費(fèi)。通過(guò)這些示例,我們可以看到Pulsar如何處理不同類型的異步消息,以及如何在生產(chǎn)者和消費(fèi)者之間建立通信。Pulsar的靈活性和強(qiáng)大的功能使其成為構(gòu)建現(xiàn)代分布式系統(tǒng)時(shí)的首選消息隊(duì)列系統(tǒng)。2消息隊(duì)列:Pulsar:深入理解Pulsar消息模型2.1消息的結(jié)構(gòu)在ApachePulsar中,消息的結(jié)構(gòu)設(shè)計(jì)得非常靈活和高效,以適應(yīng)不同的應(yīng)用場(chǎng)景。一個(gè)Pulsar消息主要由以下幾部分組成:Payload:消息的實(shí)際內(nèi)容,可以是任何類型的數(shù)據(jù),如JSON、XML或二進(jìn)制數(shù)據(jù)。Properties:一組鍵值對(duì),用于存儲(chǔ)消息的元數(shù)據(jù),如消息的創(chuàng)建時(shí)間、內(nèi)容類型等。MessageID:每個(gè)消息都有一個(gè)唯一的ID,用于消息的追蹤和管理。Schema:定義了消息的結(jié)構(gòu),使得消息的發(fā)送者和接收者能夠理解消息的內(nèi)容。2.1.1示例代碼importorg.apache.pulsar.client.api.Schema;

importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

publicclassMessageStructureExample{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Producer<String>producer=client.newProducer(Schema.STRING)

.topic("persistent://sample/standalone/ns/my-topic")

.create();

StringmessageContent="{\"name\":\"John\",\"age\":30}";

Message<String>message=Message.builder()

.value(messageContent)

.properties(Map.of("type","user","timestamp",String.valueOf(System.currentTimeMillis())))

.build();

producer.send(message);

producer.close();

client.close();

}

}在上述代碼中,我們創(chuàng)建了一個(gè)Producer對(duì)象,使用Schema.STRING定義了消息的結(jié)構(gòu)。然后,我們構(gòu)建了一個(gè)消息,其中包含了實(shí)際的payload(messageContent)和一些properties(如類型和時(shí)間戳)。2.2消息的生命周期Pulsar消息的生命周期從消息被生產(chǎn)者發(fā)送開(kāi)始,直到被消費(fèi)者消費(fèi)并確認(rèn)為止。在這個(gè)過(guò)程中,消息可能會(huì)經(jīng)歷以下?tīng)顟B(tài):待發(fā)送:消息在生產(chǎn)者緩存中等待發(fā)送。發(fā)送中:消息正在通過(guò)網(wǎng)絡(luò)發(fā)送到PulsarBroker。已發(fā)送:消息成功發(fā)送到Broker,存儲(chǔ)在消息日志中。待消費(fèi):消息存儲(chǔ)在Broker中,等待被消費(fèi)者消費(fèi)。消費(fèi)中:消費(fèi)者正在處理消息。已消費(fèi):消費(fèi)者成功處理消息,并向Broker發(fā)送確認(rèn)。2.2.1示例代碼importorg.apache.pulsar.client.api.Consumer;

importorg.apache.pulsar.client.api.MessageId;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

publicclassMessageLifecycleExample{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Consumer<String>consumer=client.newConsumer(Schema.STRING)

.topic("persistent://sample/standalone/ns/my-topic")

.subscriptionName("my-subscription")

.subscribe();

Message<String>message=consumer.receive();

System.out.println("Receivedmessage:"+message.getValue());

//消費(fèi)者處理消息

//...

//確認(rèn)消息已被消費(fèi)

consumer.acknowledge(message.getMessageId());

consumer.close();

client.close();

}

}在本例中,我們創(chuàng)建了一個(gè)Consumer對(duì)象,訂閱了特定的主題。當(dāng)消息到達(dá)時(shí),consumer.receive()方法將消息從Broker中取出,進(jìn)入“消費(fèi)中”狀態(tài)。一旦消息被處理,我們通過(guò)調(diào)用consumer.acknowledge()方法來(lái)確認(rèn)消息已被消費(fèi),從而完成其生命周期。2.3消息的發(fā)布與訂閱機(jī)制Pulsar提供了多種發(fā)布與訂閱機(jī)制,以滿足不同的需求。主要的發(fā)布與訂閱模式包括:獨(dú)占訂閱(Exclusive):只有一個(gè)消費(fèi)者可以訂閱一個(gè)主題,所有消息都只能被這一個(gè)消費(fèi)者消費(fèi)。共享訂閱(Shared):多個(gè)消費(fèi)者可以訂閱一個(gè)主題,消息會(huì)被分發(fā)給所有訂閱者中的一個(gè),確保每個(gè)消息只被消費(fèi)一次。故障轉(zhuǎn)移訂閱(Failover):類似于獨(dú)占訂閱,但是當(dāng)當(dāng)前的消費(fèi)者不可用時(shí),消息會(huì)被轉(zhuǎn)發(fā)給下一個(gè)消費(fèi)者。2.3.1示例代碼importorg.apache.pulsar.client.api.Consumer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

publicclassPublishSubscribeExample{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Consumer<String>consumer1=client.newConsumer(Schema.STRING)

.topic("persistent://sample/standalone/ns/my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Exclusive)

.subscribe();

Consumer<String>consumer2=client.newConsumer(Schema.STRING)

.topic("persistent://sample/standalone/ns/my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Exclusive)

.subscribe();

//嘗試接收消息,由于是獨(dú)占訂閱,consumer2將無(wú)法接收消息

Message<String>message1=consumer1.receive();

Message<String>message2=consumer2.receive();//這里consumer2將阻塞,直到consumer1確認(rèn)或放棄消息

//消費(fèi)者處理消息

//...

//確認(rèn)消息已被消費(fèi)

consumer1.acknowledge(message1.getMessageId());

consumer1.close();

consumer2.close();

client.close();

}

}在上述代碼中,我們創(chuàng)建了兩個(gè)消費(fèi)者consumer1和consumer2,并使用SubscriptionType.Exclusive指定了獨(dú)占訂閱模式。這意味著所有發(fā)送到主題的消息將只被consumer1或consumer2中的一個(gè)消費(fèi),而不會(huì)同時(shí)被兩個(gè)消費(fèi)者接收。通過(guò)這些示例,我們可以看到Pulsar消息模型的靈活性和強(qiáng)大功能,以及如何通過(guò)代碼實(shí)現(xiàn)消息的結(jié)構(gòu)定義、生命周期管理和發(fā)布訂閱機(jī)制。這為構(gòu)建高效、可靠的消息處理系統(tǒng)提供了堅(jiān)實(shí)的基礎(chǔ)。3Pulsar消息類型詳解在ApachePulsar消息隊(duì)列中,消息可以采用多種格式進(jìn)行編碼和傳輸,以適應(yīng)不同的應(yīng)用場(chǎng)景和數(shù)據(jù)處理需求。本教程將深入探討Pulsar支持的四種主要消息類型:二進(jìn)制消息、JSON消息、Avro消息以及其他消息格式。3.1進(jìn)制消息二進(jìn)制消息是最基本的消息類型,它允許消息以原始字節(jié)流的形式發(fā)送。這種類型的消息適用于任何可以序列化為字節(jié)流的數(shù)據(jù),包括但不限于自定義的二進(jìn)制數(shù)據(jù)、圖像、音頻文件等。3.1.1示例代碼importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

publicclassBinaryProducer{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Producer<byte[]>producer=client.newProducer().topic("binary-topic").create();

//創(chuàng)建二進(jìn)制消息

byte[]binaryData=newbyte[]{0x01,0x02,0x03,0x04};

producer.send(binaryData);

//關(guān)閉生產(chǎn)者和客戶端

producer.close();

client.close();

}

}3.1.2解釋在上述示例中,我們創(chuàng)建了一個(gè)Pulsar客戶端,并使用它來(lái)創(chuàng)建一個(gè)生產(chǎn)者,該生產(chǎn)者將二進(jìn)制數(shù)據(jù)發(fā)送到名為binary-topic的主題中。二進(jìn)制數(shù)據(jù)直接以字節(jié)數(shù)組的形式發(fā)送,無(wú)需額外的編碼或解碼步驟。3.2JSON消息JSON(JavaScriptObjectNotation)是一種輕量級(jí)的數(shù)據(jù)交換格式,易于人閱讀和編寫(xiě),同時(shí)也易于機(jī)器解析和生成。Pulsar支持JSON消息,這使得在消息中傳輸結(jié)構(gòu)化數(shù)據(jù)變得簡(jiǎn)單。3.2.1示例代碼importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

importcom.fasterxml.jackson.databind.ObjectMapper;

publicclassJSONProducer{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Producer<String>producer=client.newProducer().topic("json-topic").create();

//創(chuàng)建JSON消息

ObjectMappermapper=newObjectMapper();

Useruser=newUser("JohnDoe",30);

StringjsonMessage=mapper.writeValueAsString(user);

producer.send(jsonMessage.getBytes());

//關(guān)閉生產(chǎn)者和客戶端

producer.close();

client.close();

}

staticclassUser{

Stringname;

intage;

publicUser(Stringname,intage){

=name;

this.age=age;

}

}

}3.2.2解釋此示例展示了如何使用Java的ObjectMapper類將一個(gè)簡(jiǎn)單的User對(duì)象轉(zhuǎn)換為JSON字符串,然后將其發(fā)送到Pulsar的主題json-topic中。User對(duì)象包含姓名和年齡兩個(gè)字段,通過(guò)序列化為JSON格式,可以方便地在消息中傳輸結(jié)構(gòu)化數(shù)據(jù)。3.3Avro消息ApacheAvro是一種數(shù)據(jù)序列化系統(tǒng),它不僅提供緊湊、快速的二進(jìn)制數(shù)據(jù)序列化,還支持模式演進(jìn),即在不破壞向后兼容性的情況下,可以修改數(shù)據(jù)模式。Pulsar通過(guò)Avro編碼支持高效地傳輸復(fù)雜數(shù)據(jù)結(jié)構(gòu)。3.3.1示例代碼importorg.apache.avro.Schema;

importorg.apache.avro.generic.GenericData;

importorg.apache.avro.generic.GenericRecord;

importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

importorg.apache.pulsar.client.impl.schema.AvroSchema;

publicclassAvroProducer{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Producer<GenericRecord>producer=client.newProducer(AvroSchema.of(User.class)).topic("avro-topic").create();

//創(chuàng)建Avro消息

Useruser=newUser();

user.setName("JaneDoe");

user.setAge(25);

producer.send(user);

//關(guān)閉生產(chǎn)者和客戶端

producer.close();

client.close();

}

staticclassUserextendsGenericData.Record{

publicUser(){

Schemaschema=newSchema.Parser().parse("{"

+"\"type\":\"record\","

+"\"name\":\"User\","

+"\"fields\":["

+"{\"name\":\"name\",\"type\":\"string\"},"

+"{\"name\":\"age\",\"type\":\"int\"}"

+"]"

+"}");

super(schema);

}

publicvoidsetName(Stringname){

put("name",name);

}

publicvoidsetAge(intage){

put("age",age);

}

}

}3.3.2解釋在AvroProducer示例中,我們首先定義了一個(gè)Avro模式,該模式描述了User對(duì)象的結(jié)構(gòu),包括name和age字段。然后,我們使用Pulsar的AvroSchema創(chuàng)建一個(gè)生產(chǎn)者,將User對(duì)象直接發(fā)送到avro-topic主題。Avro的模式演進(jìn)特性使得在數(shù)據(jù)結(jié)構(gòu)發(fā)生變化時(shí),仍然能夠保證消息的正確傳輸和解析。3.4其他消息格式除了上述提到的二進(jìn)制、JSON和Avro消息格式,Pulsar還支持其他消息格式,如Protobuf、XML等。這些格式的選擇取決于具體的應(yīng)用場(chǎng)景和數(shù)據(jù)處理需求。3.4.1Protobuf消息Protobuf(ProtocolBuffers)是Google開(kāi)發(fā)的一種數(shù)據(jù)交換格式,它提供了高效的序列化和反序列化機(jī)制,特別適合于傳輸大量結(jié)構(gòu)化數(shù)據(jù)。3.4.2示例代碼importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

importtobuf.GeneratedMessageV3;

publicclassProtobufProducer{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Producer<GeneratedMessageV3>producer=client.newProducer(Schema.PROTOBUF(UserProto.User.getDefaultInstance())).topic("protobuf-topic").create();

//創(chuàng)建Protobuf消息

UserProto.Useruser=UserProto.User.newBuilder()

.setName("Alice")

.setAge(35)

.build();

producer.send(user.toByteArray());

//關(guān)閉生產(chǎn)者和客戶端

producer.close();

client.close();

}

}3.4.3解釋在ProtobufProducer示例中,我們使用了Google的Protobuf庫(kù)來(lái)定義User消息的結(jié)構(gòu),并通過(guò)Pulsar的ProtobufSchema創(chuàng)建了一個(gè)生產(chǎn)者。User對(duì)象通過(guò)toByteArray方法轉(zhuǎn)換為字節(jié)數(shù)組,然后發(fā)送到protobuf-topic主題。Protobuf的高效性和緊湊性使其成為處理大量結(jié)構(gòu)化數(shù)據(jù)的理想選擇。3.5結(jié)論P(yáng)ulsar通過(guò)支持多種消息格式,如二進(jìn)制、JSON、Avro和Protobuf,為開(kāi)發(fā)者提供了靈活的數(shù)據(jù)傳輸和處理選項(xiàng)。選擇合適的消息格式可以顯著提高消息處理的效率和可靠性,同時(shí)簡(jiǎn)化數(shù)據(jù)的序列化和反序列化過(guò)程。在實(shí)際應(yīng)用中,應(yīng)根據(jù)數(shù)據(jù)的特性和處理需求來(lái)選擇最合適的格式。請(qǐng)注意,上述代碼示例假設(shè)你已經(jīng)定義了相應(yīng)的Protobuf消息結(jié)構(gòu)(UserProto),并且在項(xiàng)目中包含了必要的依賴庫(kù)。在實(shí)際應(yīng)用中,你可能需要根據(jù)自己的數(shù)據(jù)模型和需求進(jìn)行相應(yīng)的調(diào)整。4消息發(fā)布4.1生產(chǎn)者角色在ApachePulsar消息隊(duì)列中,生產(chǎn)者是負(fù)責(zé)生成和發(fā)送消息的組件。生產(chǎn)者通過(guò)連接到Pulsar的Broker,將消息發(fā)送到特定的Topic中。每個(gè)Topic可以有多個(gè)生產(chǎn)者,這意味著多個(gè)應(yīng)用程序或服務(wù)可以同時(shí)向同一個(gè)Topic發(fā)送消息,從而實(shí)現(xiàn)高并發(fā)的消息發(fā)布能力。4.1.1發(fā)布消息流程創(chuàng)建Producer對(duì)象:生產(chǎn)者首先需要?jiǎng)?chuàng)建一個(gè)Producer對(duì)象,這通常通過(guò)PulsarClient的createProducer方法完成。發(fā)送消息:使用Producer對(duì)象的send方法將消息發(fā)送到指定的Topic。消息可以是任意類型的數(shù)據(jù),但在發(fā)送前需要被序列化為字節(jié)流。確認(rèn)消息發(fā)送:Pulsar支持消息發(fā)送的確認(rèn)機(jī)制,確保消息被成功發(fā)送到Broker。如果消息發(fā)送失敗,生產(chǎn)者可以重新發(fā)送消息。關(guān)閉Producer:在完成消息發(fā)送后,生產(chǎn)者應(yīng)該調(diào)用close方法來(lái)釋放資源。4.1.2消息持久化策略Pulsar提供了多種消息持久化策略,以確保消息在Broker故障時(shí)不會(huì)丟失。這些策略包括:消息存儲(chǔ):Pulsar使用ApacheBookKeeper作為其后端存儲(chǔ)系統(tǒng),所有消息都會(huì)被持久化到磁盤(pán)上。消息復(fù)制:Pulsar支持消息的跨Broker復(fù)制,以提高系統(tǒng)的可用性和容錯(cuò)性。復(fù)制策略可以是同步或異步的。消息過(guò)期:Pulsar允許設(shè)置消息的過(guò)期時(shí)間,過(guò)期的消息將被自動(dòng)刪除,以節(jié)省存儲(chǔ)空間。4.2示例代碼:創(chuàng)建生產(chǎn)者并發(fā)送消息importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.ProducerConfiguration;

importorg.apache.pulsar.client.api.Schema;

publicclassPulsarProducerExample{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

//創(chuàng)建PulsarClient實(shí)例

PulsarClientclient=PulsarClient.builder()

.serviceUrl("pulsar://localhost:6650")

.build();

//創(chuàng)建Producer配置

ProducerConfiguration<String>producerConfiguration=ProducerConfiguration.builder()

.schema(Schema.STRING)

.build();

//創(chuàng)建Producer

Producer<String>producer=client.newProducer(producerConfiguration)

.topic("persistent://public/default/my-topic")

.create();

//發(fā)送消息

for(inti=0;i<10;i++){

Stringmessage="HelloPulsar"+i;

producer.send(message);

}

//關(guān)閉Producer

producer.close();

//關(guān)閉PulsarClient

client.close();

}

}4.2.1代碼解釋創(chuàng)建PulsarClient:通過(guò)PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();創(chuàng)建一個(gè)PulsarClient實(shí)例,其中serviceUrl指定了Pulsar服務(wù)的URL。創(chuàng)建Producer配置:使用ProducerConfiguration.builder().schema(Schema.STRING).build();創(chuàng)建一個(gè)Producer配置對(duì)象,指定了消息的Schema為字符串類型。創(chuàng)建Producer:通過(guò)client.newProducer(producerConfiguration).topic("persistent://public/default/my-topic").create();創(chuàng)建一個(gè)Producer,其中topic指定了消息將被發(fā)送到的Topic。發(fā)送消息:使用producer.send(message);方法發(fā)送消息到指定的Topic。關(guān)閉Producer和PulsarClient:在完成消息發(fā)送后,調(diào)用producer.close();和client.close();來(lái)釋放資源。通過(guò)以上步驟,我們可以看到Pulsar消息隊(duì)列中消息發(fā)布的完整流程,從創(chuàng)建生產(chǎn)者到發(fā)送消息,再到資源的釋放,每一步都遵循了Pulsar的API規(guī)范,確保了消息的正確發(fā)送和系統(tǒng)的穩(wěn)定運(yùn)行。5消息消費(fèi)5.1消費(fèi)者角色在ApachePulsar消息隊(duì)列中,消費(fèi)者是消息的接收者。它們訂閱主題,接收并處理由生產(chǎn)者發(fā)送的消息。消費(fèi)者可以是任何能夠接收和處理消息的應(yīng)用程序或服務(wù)。在Pulsar中,消費(fèi)者通過(guò)創(chuàng)建一個(gè)Consumer對(duì)象來(lái)訂閱主題,這個(gè)對(duì)象提供了接收消息的方法。5.1.1示例代碼importorg.apache.pulsar.client.api.Consumer;

importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

publicclassPulsarConsumerExample{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

//創(chuàng)建Pulsar客戶端

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

//創(chuàng)建消費(fèi)者,訂閱主題

Consumer<String>consumer=client.newConsumer()

.topic("persistent://public/default/my-topic")

.subscriptionName("my-subscription")

.subscribe();

//接收消息

Message<String>msg=consumer.receive();

//處理消息

System.out.println("Receivedmessage:"+msg.getValue());

//確認(rèn)消息已處理

consumer.acknowledge(msg);

//關(guān)閉消費(fèi)者和客戶端

consumer.close();

client.close();

}

}5.2消息消費(fèi)模式Pulsar支持兩種主要的消息消費(fèi)模式:獨(dú)占(Exclusive)和共享(Shared)。5.2.1獨(dú)占消費(fèi)在獨(dú)占消費(fèi)模式下,一個(gè)訂閱只能被一個(gè)消費(fèi)者使用。這意味著如果多個(gè)消費(fèi)者嘗試訂閱同一個(gè)主題和訂閱名稱,只有第一個(gè)消費(fèi)者能夠成功訂閱,其他消費(fèi)者將收到錯(cuò)誤。這種模式適用于需要確保消息只被一個(gè)消費(fèi)者處理的情況。5.2.2共享消費(fèi)共享消費(fèi)模式允許多個(gè)消費(fèi)者訂閱同一個(gè)主題和訂閱名稱。消息將被分發(fā)給訂閱中的任意一個(gè)消費(fèi)者,確保消息至少被處理一次。這種模式適用于需要高可用性和負(fù)載均衡的場(chǎng)景。5.2.3示例代碼//獨(dú)占消費(fèi)模式

Consumer<String>exclusiveConsumer=client.newConsumer()

.topic("persistent://public/default/my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Exclusive)

.subscribe();

//共享消費(fèi)模式

Consumer<String>sharedConsumer=client.newConsumer()

.topic("persistent://public/default/my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Shared)

.subscribe();5.3消息重試與死信隊(duì)列在處理消息時(shí),如果消費(fèi)者未能成功處理消息,Pulsar提供了消息重試機(jī)制。消息可以被重新發(fā)送給消費(fèi)者,直到成功處理。如果消息在多次重試后仍然無(wú)法處理,它可以被移動(dòng)到死信隊(duì)列(DeadLetterQueue)。5.3.1消息重試消費(fèi)者可以通過(guò)negativeAcknowledge方法來(lái)標(biāo)記消息為未處理,這將觸發(fā)消息重試。Pulsar允許配置重試策略,包括重試次數(shù)和重試間隔。5.3.2死信隊(duì)列死信隊(duì)列用于存儲(chǔ)那些無(wú)法被正常處理的消息。這些消息可以被單獨(dú)處理或分析,以找出處理失敗的原因。在Pulsar中,可以通過(guò)配置主題的策略來(lái)啟用死信隊(duì)列。5.3.3示例代碼//消費(fèi)者處理消息失敗,請(qǐng)求重試

consumer.negativeAcknowledge(msg);

//配置主題策略,啟用死信隊(duì)列

TopicPoliciespolicies=newTopicPolicies();

policies.setDeadLetterTopic("persistent://public/default/my-dead-letter-topic");

client.getAdmin().topics().updateTopicPolicies("persistent://public/default/my-topic",policies);通過(guò)上述代碼和解釋,我們深入了解了Pulsar中消費(fèi)者角色的定義、消息消費(fèi)的不同模式,以及如何處理消息重試和死信隊(duì)列。這些機(jī)制確保了消息的可靠處理和系統(tǒng)的高可用性。6消息路由策略6.1消息分發(fā)原理在消息隊(duì)列系統(tǒng)中,如ApachePulsar,消息的分發(fā)是一個(gè)關(guān)鍵過(guò)程,它決定了消息如何從生產(chǎn)者到達(dá)消費(fèi)者。Pulsar通過(guò)其獨(dú)特的消息模型和靈活的路由策略,確保了消息的高效、有序和可靠傳輸。消息分發(fā)原理基于以下幾點(diǎn):主題(Topic):在Pulsar中,消息被發(fā)布到特定的主題上,這些主題可以是持久的或非持久的,根據(jù)消息的存儲(chǔ)需求而定。分區(qū)(Partition):為了提高吞吐量和可擴(kuò)展性,主題可以被劃分為多個(gè)分區(qū),每個(gè)分區(qū)獨(dú)立存儲(chǔ)和處理消息。消費(fèi)者(Consumer):消費(fèi)者訂閱主題以接收消息。Pulsar支持獨(dú)占、共享和鍵共享三種訂閱類型,以滿足不同的消費(fèi)模式。消息路由:生產(chǎn)者發(fā)送消息時(shí),Pulsar根據(jù)配置的路由策略決定將消息發(fā)送到哪個(gè)分區(qū)。這確保了消息的均衡分布和處理。6.2路由策略類型Pulsar提供了多種消息路由策略,以適應(yīng)不同的應(yīng)用場(chǎng)景和需求:RoundRobin:輪詢策略,將消息均勻地分發(fā)到所有分區(qū),以實(shí)現(xiàn)負(fù)載均衡。KeyBased:基于消息鍵的策略,根據(jù)消息中的鍵將消息路由到特定的分區(qū),確保具有相同鍵的消息被發(fā)送到同一分區(qū),便于實(shí)現(xiàn)消息的有序處理。Custom:自定義策略,允許用戶實(shí)現(xiàn)自己的消息路由邏輯,提供最大的靈活性。6.2.1RoundRobin路由策略原理RoundRobin策略是最簡(jiǎn)單的消息分發(fā)方式,它將消息輪流發(fā)送到不同的分區(qū),確保每個(gè)分區(qū)的負(fù)載大致相同。這種策略適用于消息大小和處理時(shí)間相對(duì)均勻的場(chǎng)景。示例假設(shè)我們有一個(gè)主題my-topic,它被劃分為4個(gè)分區(qū)。使用RoundRobin策略,消息將按照以下順序被發(fā)送:第1條消息發(fā)送到分區(qū)0第2條消息發(fā)送到分區(qū)1第3條消息發(fā)送到分區(qū)2第4條消息發(fā)送到分區(qū)3第5條消息再次發(fā)送到分區(qū)0,以此類推。6.2.2KeyBased路由策略原理KeyBased策略根據(jù)消息中的鍵(key)將消息路由到特定的分區(qū)。如果消息包含相同的鍵,它們將被發(fā)送到同一分區(qū),這有助于實(shí)現(xiàn)消息的有序處理和聚合。鍵可以是消息中的任意字段,也可以是自定義的鍵生成邏輯。示例假設(shè)我們有一個(gè)主題my-topic,它被劃分為3個(gè)分區(qū)。消息包含一個(gè)用戶ID作為鍵,ID為1的消息將始終被發(fā)送到分區(qū)0,ID為2的消息將被發(fā)送到分區(qū)1,ID為3的消息將被發(fā)送到分區(qū)2,以此類推。#Python示例代碼

importpulsar

client=pulsar.Client('pulsar://localhost:6650')

producer=client.create_producer('my-topic',key_type='string')

#發(fā)送消息,鍵為用戶ID

foriinrange(10):

key=str(i%3)#生成鍵,確保消息被均勻分布到3個(gè)分區(qū)

producer.send(('message-'+str(i)).encode('utf-8'),key=key)

client.close()6.2.3Custom路由策略原理Custom策略允許用戶實(shí)現(xiàn)自己的消息路由邏輯。這提供了最大的靈活性,但同時(shí)也要求用戶對(duì)消息隊(duì)列的性能和可靠性有深入的理解。自定義策略可以通過(guò)實(shí)現(xiàn)Pulsar的MessageRouter接口來(lái)實(shí)現(xiàn)。示例假設(shè)我們想要根據(jù)消息的類型(例如,交易、日志、警報(bào))將消息路由到不同的分區(qū),可以實(shí)現(xiàn)一個(gè)自定義的路由策略。//Java示例代碼

importorg.apache.pulsar.client.api.MessageRouter;

importorg.apache.pulsar.client.api.MessageRouterBuilder;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

publicclassCustomMessageRouterimplementsMessageRouter<String>{

@Override

publicintroute(Stringkey,intnumPartitions){

if(key.startsWith("transaction")){

return0;//交易消息發(fā)送到分區(qū)0

}elseif(key.startsWith("log")){

return1;//日志消息發(fā)送到分區(qū)1

}elseif(key.startsWith("alert")){

return2;//警報(bào)消息發(fā)送到分區(qū)2

}

returnnumPartitions/2;//默認(rèn)情況下,將消息發(fā)送到中間分區(qū)

}

}

publicclassCustomProducer{

publicstaticvoidmain(String[]args)throwsException{

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

MessageRouterBuilder<String>routerBuilder=MessageRouter.custom(newCustomMessageRouter());

Producer<String>producer=client.newProducer(Schema.STRING)

.topic("my-topic")

.messageRoutingMode(MessageRoutingMode.SinglePartition)

.messageRouter(routerBuilder)

.create();

//發(fā)送不同類型的消息

producer.send("transaction-1","transaction");

producer.send("log-2","log");

producer.send("alert-3","alert");

client.close();

}

}6.3策略配置示例在Pulsar中,配置路由策略可以通過(guò)修改生產(chǎn)者創(chuàng)建時(shí)的參數(shù)來(lái)實(shí)現(xiàn)。以下是一個(gè)使用RoundRobin策略的配置示例://Java示例代碼

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.Producer;

PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

Producer<String>producer=client.newProducer(Schema.STRING)

.topic("my-topic")

.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)

.create();在這個(gè)示例中,messageRoutingMode參數(shù)被設(shè)置為RoundRobinPartition,這告訴Pulsar使用輪詢策略來(lái)分發(fā)消息。對(duì)于KeyBased策略,可以通過(guò)在發(fā)送消息時(shí)指定鍵來(lái)實(shí)現(xiàn):producer.send(("message-"+i).getBytes(),"key-"+(i%3));在這個(gè)示例中,消息的鍵被設(shè)置為"key-"+(i%3),這確保了具有相同鍵的消息被發(fā)送到同一分區(qū)。對(duì)于Custom策略,需要實(shí)現(xiàn)MessageRouter接口,并在創(chuàng)建生產(chǎn)者時(shí)指定自定義的路由策略。MessageRouterBuilder<String>routerBuilder=MessageRouter.custom(newCustomMessageRouter());

Producer<String>producer=client.newProducer(Schema.STRING)

.topic("my-topic")

.messageRoutingMode(MessageRoutingMode.SinglePartition)

.messageRouter(routerBuilder)

.create();在這個(gè)示例中,CustomMessageRouter類實(shí)現(xiàn)了自定義的路由邏輯,通過(guò)MessageRouterBuilder和messageRouter參數(shù),可以將這個(gè)自定義策略應(yīng)用到生產(chǎn)者上。通過(guò)以上示例,我們可以看到Pulsar提供了豐富的消息路由策略,以滿足不同場(chǎng)景的需求。選擇合適的策略對(duì)于優(yōu)化消息隊(duì)列的性能和可靠性至關(guān)重要。7高級(jí)消息處理7.1消息壓縮在消息隊(duì)列系統(tǒng)中,如ApachePulsar,消息壓縮是一種優(yōu)化網(wǎng)絡(luò)傳輸和存儲(chǔ)空間的有效手段。Pulsar支持多種壓縮算法,包括LZ4、ZLIB、ZSTD等,以適應(yīng)不同的性能和壓縮比需求。7.1.1原理消息壓縮在消息發(fā)送前進(jìn)行,將原始消息數(shù)據(jù)轉(zhuǎn)換為更小的二進(jìn)制格式,從而減少在網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù)量和存儲(chǔ)空間的占用。接收方在消費(fèi)消息時(shí),會(huì)自動(dòng)解壓縮消息,恢復(fù)原始數(shù)據(jù)。7.1.2示例在Pulsar中,可以通過(guò)設(shè)置producer的compressionType屬性來(lái)啟用消息壓縮。以下是一個(gè)使用LZ4壓縮算法的示例:importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

publicclassCompressedProducer{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder()

.serviceUrl("pulsar://localhost:6650")

.build();

Producer<byte[]>producer=client.newProducer()

.topic("persistent://sample/standalone/ns/my-topic")

.compressionType(CompressionType.LZ4)

.create();

Stringmessage="Hello,Pulsar!";

producer.send(message.getBytes());

producer.close();

client.close();

}

}7.1.3解釋在上述代碼中,我們創(chuàng)建了一個(gè)PulsarClient實(shí)例,并指定了服務(wù)URL。然后,我們創(chuàng)建了一個(gè)Producer,設(shè)置了topic和compressionType為L(zhǎng)Z4。發(fā)送消息時(shí),原始字符串被轉(zhuǎn)換為字節(jié)數(shù)組,然后通過(guò)壓縮的Producer發(fā)送。接收方會(huì)自動(dòng)解壓縮消息,無(wú)需額外的解壓縮邏輯。7.2消息時(shí)間戳在Pulsar中,每條消息都有一個(gè)時(shí)間戳,用于記錄消息的創(chuàng)建時(shí)間或發(fā)送時(shí)間。時(shí)間戳可以用于實(shí)現(xiàn)時(shí)間窗口、消息延遲發(fā)送等功能。7.2.1原理Pulsar允許在消息發(fā)送時(shí)指定時(shí)間戳,如果沒(méi)有指定,系統(tǒng)會(huì)自動(dòng)使用消息發(fā)送時(shí)的時(shí)間作為時(shí)間戳。時(shí)間戳以毫秒為單位,存儲(chǔ)在消息的元數(shù)據(jù)中。7.2.2示例以下是一個(gè)在發(fā)送消息時(shí)指定時(shí)間戳的示例:importorg.apache.pulsar.client.api.MessageBuilder;

importorg.apache.pulsar.client.api.Producer;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

publicclassTimestampProducer{

publicstaticvoidmain(String[]args)throwsPulsarClientException{

PulsarClientclient=PulsarClient.builder()

.serviceUrl("pulsar://localhost:6650")

.build();

Producer<byte[]>producer=client.newProducer()

.topic("persistent://sample/standalone/ns/my-topic")

.create();

longtimestamp=System.currentTimeMillis()+10000;//發(fā)送10秒后的時(shí)間

Stringmessage="Hello,Pulsar!";

producer.newMessage()

.value(message.getBytes())

.timestamp(timestamp)

.send();

producer.close();

client.close();

}

}7.2.3解釋在本例中,我們創(chuàng)建了一個(gè)Producer,并通過(guò)newMessage()方法創(chuàng)建了一個(gè)消息構(gòu)建器。我們指定了一個(gè)未來(lái)的時(shí)間戳,使得消息在10秒后才被標(biāo)記為發(fā)送。這在實(shí)現(xiàn)延遲消息處理時(shí)非常有用。7.3消息順序保證在分布式系統(tǒng)中,消息的順序處理是一個(gè)常見(jiàn)的需求。Pulsar提供了消息順序保證的機(jī)制,確保消息按照發(fā)送的順序被消費(fèi)。7.3.1原理Pulsar通過(guò)在Producer和Consumer上設(shè)置相應(yīng)的屬性來(lái)實(shí)現(xiàn)消息順序。在Producer端,可以設(shè)置blockIfQueueFull屬性為true,以確保消息在隊(duì)列滿時(shí)被阻塞,直到隊(duì)列有空間。在Consumer端,可以設(shè)置subscriptionType為Exclusive,以確保一個(gè)Consumer獨(dú)占一個(gè)topic,從而保證消息順序。7.3.2示例以下是一個(gè)使用Pulsar保證消息順序的示例:importorg.apache.pulsar.client.api.Consumer;

importorg.apache.pulsar.client.api.Message;

importorg.apache.pulsar.client.api.PulsarClient;

importorg.apache.pulsar.client.api.PulsarClientException;

importorg.apache.pulsar.client.api.SubscriptionType;

importjava.util.concurrent.TimeUnit;

publicclassOrderedConsumer{

publicstaticvoidmain(String[]args)throwsPulsarClientException,InterruptedException{

PulsarClientclient=PulsarClient.builder()

.serviceUrl("pulsar://localhost:6650")

.build();

Consumer<byte[]>consumer=client.newConsumer()

.topic("persistent://sample/standalone/ns/my-topic")

.subscriptionName("my-subscription")

.subscriptionType(SubscriptionType.Exclusive)

.subscribe();

while(true){

Message<byte[]>msg=consumer.receive(5,TimeUnit.SECONDS);

if(msg!=null){

System.out.println("Receivedmessage:

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論