實(shí)時(shí)計(jì)算:Kafka Streams:Kafka Streams架構(gòu)與原理_第1頁(yè)
實(shí)時(shí)計(jì)算:Kafka Streams:Kafka Streams架構(gòu)與原理_第2頁(yè)
實(shí)時(shí)計(jì)算:Kafka Streams:Kafka Streams架構(gòu)與原理_第3頁(yè)
實(shí)時(shí)計(jì)算:Kafka Streams:Kafka Streams架構(gòu)與原理_第4頁(yè)
實(shí)時(shí)計(jì)算:Kafka Streams:Kafka Streams架構(gòu)與原理_第5頁(yè)
已閱讀5頁(yè),還剩15頁(yè)未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡(jiǎn)介

實(shí)時(shí)計(jì)算:KafkaStreams:KafkaStreams架構(gòu)與原理1實(shí)時(shí)計(jì)算的重要性實(shí)時(shí)計(jì)算在現(xiàn)代數(shù)據(jù)處理領(lǐng)域扮演著至關(guān)重要的角色,尤其是在需要即時(shí)響應(yīng)和處理大量流數(shù)據(jù)的場(chǎng)景中。例如,金融交易、社交媒體分析、物聯(lián)網(wǎng)(IoT)數(shù)據(jù)處理、實(shí)時(shí)日志分析和網(wǎng)絡(luò)監(jiān)控等,都離不開實(shí)時(shí)計(jì)算技術(shù)的支持。實(shí)時(shí)計(jì)算能夠幫助系統(tǒng)在數(shù)據(jù)產(chǎn)生的瞬間進(jìn)行處理和分析,從而實(shí)現(xiàn)即時(shí)決策和響應(yīng),這對(duì)于提高業(yè)務(wù)效率和用戶體驗(yàn)至關(guān)重要。1.1金融交易在金融領(lǐng)域,實(shí)時(shí)計(jì)算可以用于高頻交易、欺詐檢測(cè)和市場(chǎng)趨勢(shì)分析。例如,通過實(shí)時(shí)分析股票交易數(shù)據(jù),系統(tǒng)可以立即識(shí)別出異常交易模式,觸發(fā)警報(bào)或自動(dòng)執(zhí)行交易策略,以減少損失或抓住投資機(jī)會(huì)。1.2社交媒體分析社交媒體平臺(tái)每天產(chǎn)生海量的數(shù)據(jù),實(shí)時(shí)計(jì)算技術(shù)可以即時(shí)分析這些數(shù)據(jù),幫助平臺(tái)快速識(shí)別熱點(diǎn)話題、用戶情緒和趨勢(shì),從而優(yōu)化內(nèi)容推薦,增強(qiáng)用戶參與度。1.3物聯(lián)網(wǎng)(IoT)數(shù)據(jù)處理物聯(lián)網(wǎng)設(shè)備產(chǎn)生的數(shù)據(jù)量巨大且持續(xù)不斷,實(shí)時(shí)計(jì)算可以處理這些數(shù)據(jù),實(shí)現(xiàn)設(shè)備狀態(tài)的實(shí)時(shí)監(jiān)控、預(yù)測(cè)性維護(hù)和智能控制,提高設(shè)備的運(yùn)行效率和安全性。1.4實(shí)時(shí)日志分析在IT運(yùn)維中,實(shí)時(shí)分析系統(tǒng)日志可以幫助快速定位問題,減少故障恢復(fù)時(shí)間。例如,通過實(shí)時(shí)監(jiān)控和分析網(wǎng)絡(luò)流量日志,可以即時(shí)檢測(cè)到DDoS攻擊,啟動(dòng)防御措施。1.5網(wǎng)絡(luò)監(jiān)控實(shí)時(shí)計(jì)算技術(shù)在網(wǎng)絡(luò)安全中也發(fā)揮著重要作用,通過實(shí)時(shí)分析網(wǎng)絡(luò)數(shù)據(jù)包,可以即時(shí)檢測(cè)到網(wǎng)絡(luò)入侵和異常行為,保護(hù)網(wǎng)絡(luò)免受攻擊。2KafkaStreams的簡(jiǎn)介KafkaStreams是ApacheKafka的一個(gè)核心組件,它提供了一種用于處理流數(shù)據(jù)的輕量級(jí)、易于使用的Java庫(kù)。KafkaStreams允許開發(fā)者在Kafka主題上執(zhí)行復(fù)雜的流處理操作,如過濾、映射、聚合、連接和窗口操作,而無(wú)需編寫復(fù)雜的分布式系統(tǒng)代碼。它將流處理任務(wù)抽象為簡(jiǎn)單的數(shù)據(jù)流操作,使得開發(fā)者可以專注于業(yè)務(wù)邏輯,而不是底層的分布式處理細(xì)節(jié)。2.1KafkaStreams的核心概念2.1.1Streams在KafkaStreams中,數(shù)據(jù)流被視為無(wú)限的、連續(xù)的數(shù)據(jù)記錄序列。這些數(shù)據(jù)流可以從一個(gè)或多個(gè)Kafka主題讀取數(shù)據(jù),經(jīng)過處理后,將結(jié)果寫入到一個(gè)或多個(gè)輸出主題。2.1.2StateStoresKafkaStreams支持狀態(tài)存儲(chǔ),允許在流處理過程中維護(hù)和更新狀態(tài)信息。這使得KafkaStreams能夠執(zhí)行復(fù)雜的操作,如聚合和窗口計(jì)算,而無(wú)需將所有數(shù)據(jù)加載到內(nèi)存中。2.1.3ProcessingTopologiesKafkaStreams的處理邏輯被定義為一個(gè)處理拓?fù)?,它描述了?shù)據(jù)流的處理流程,包括數(shù)據(jù)源、處理操作和數(shù)據(jù)目標(biāo)。處理拓?fù)淇梢员豢醋魇且粋€(gè)有向無(wú)環(huán)圖(DAG),其中節(jié)點(diǎn)代表處理操作,邊代表數(shù)據(jù)流。2.2KafkaStreams的架構(gòu)KafkaStreams的架構(gòu)設(shè)計(jì)為開發(fā)者提供了高度的靈活性和可擴(kuò)展性。它基于Kafka的分布式消息系統(tǒng),利用Kafka的高吞吐量和低延遲特性,實(shí)現(xiàn)高效的數(shù)據(jù)流處理。KafkaStreams的架構(gòu)主要包括以下組件:2.2.1StreamProcessingTask這是KafkaStreams的核心處理單元,每個(gè)任務(wù)負(fù)責(zé)處理一個(gè)或多個(gè)數(shù)據(jù)流。任務(wù)可以并行執(zhí)行,以提高處理效率。2.2.2StateStoresKafkaStreams使用狀態(tài)存儲(chǔ)來(lái)保存中間處理結(jié)果,這使得流處理可以持續(xù)進(jìn)行,即使在系統(tǒng)重啟或故障恢復(fù)后,也能從上次停止的地方繼續(xù)處理。2.2.3StreamThread每個(gè)KafkaStreams應(yīng)用程序可以包含一個(gè)或多個(gè)流線程,每個(gè)流線程可以并行執(zhí)行多個(gè)任務(wù),從而實(shí)現(xiàn)應(yīng)用程序級(jí)別的并行處理。2.2.4StandaloneandDistributedModesKafkaStreams支持兩種運(yùn)行模式:獨(dú)立模式和分布式模式。在獨(dú)立模式下,應(yīng)用程序在單個(gè)進(jìn)程中運(yùn)行,適用于小型數(shù)據(jù)流處理任務(wù)。在分布式模式下,應(yīng)用程序可以在多個(gè)進(jìn)程中并行運(yùn)行,適用于處理大規(guī)模數(shù)據(jù)流。2.3KafkaStreams的原理KafkaStreams使用了流處理的基本原理,包括事件驅(qū)動(dòng)、無(wú)狀態(tài)和有狀態(tài)處理、窗口操作和連接操作。它將這些原理與Kafka的分布式消息系統(tǒng)相結(jié)合,實(shí)現(xiàn)了高效、可靠的數(shù)據(jù)流處理。2.3.1事件驅(qū)動(dòng)KafkaStreams基于事件驅(qū)動(dòng)模型,數(shù)據(jù)流中的每個(gè)事件都會(huì)觸發(fā)處理邏輯的執(zhí)行。這使得KafkaStreams能夠?qū)崟r(shí)響應(yīng)數(shù)據(jù)流中的變化,實(shí)現(xiàn)即時(shí)處理和分析。2.3.2無(wú)狀態(tài)和有狀態(tài)處理KafkaStreams支持無(wú)狀態(tài)處理和有狀態(tài)處理。無(wú)狀態(tài)處理是指每個(gè)事件的處理不依賴于前一個(gè)事件的狀態(tài),而有狀態(tài)處理則需要維護(hù)和更新狀態(tài)信息,以執(zhí)行復(fù)雜的流處理操作。2.3.3窗口操作窗口操作允許KafkaStreams在指定的時(shí)間窗口內(nèi)對(duì)數(shù)據(jù)流進(jìn)行聚合和分析。這在需要基于時(shí)間范圍進(jìn)行數(shù)據(jù)處理的場(chǎng)景中非常有用,例如,計(jì)算過去5分鐘內(nèi)的平均交易價(jià)格。2.3.4連接操作KafkaStreams支持流與流之間的連接操作,這使得多個(gè)數(shù)據(jù)流可以被合并和關(guān)聯(lián),以執(zhí)行更復(fù)雜的分析和處理。例如,將用戶行為數(shù)據(jù)流與用戶信息數(shù)據(jù)流連接,以分析用戶行為模式。2.4示例:使用KafkaStreams進(jìn)行實(shí)時(shí)數(shù)據(jù)聚合下面是一個(gè)使用KafkaStreams進(jìn)行實(shí)時(shí)數(shù)據(jù)聚合的簡(jiǎn)單示例。假設(shè)我們有一個(gè)名為transactions的主題,其中包含交易數(shù)據(jù),我們想要計(jì)算每個(gè)用戶的交易總額。importorg.apache.kafka.streams.KafkaStreams;

importorg.apache.kafka.streams.StreamsBuilder;

importorg.apache.kafka.streams.StreamsConfig;

importorg.apache.kafka.streams.kstream.KStream;

importorg.apache.kafka.streams.kstream.Materialized;

importmon.serialization.Serdes;

publicclassTransactionAggregator{

publicstaticvoidmain(String[]args){

finalStreamsConfigconfig=newStreamsConfig(newHashMap<String,String>(){{

put(StreamsConfig.APPLICATION_ID_CONFIG,"transaction-aggregator");

put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());

put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());

}});

finalStreamsBuilderbuilder=newStreamsBuilder();

finalKStream<String,String>transactions=builder.stream("transactions");

//將交易數(shù)據(jù)轉(zhuǎn)換為用戶ID和交易金額的鍵值對(duì)

finalKStream<String,Double>transactionAmounts=transactions

.mapValues(value->{

//假設(shè)交易數(shù)據(jù)格式為:user_id,transaction_amount

finalString[]parts=value.split(",");

returnDouble.parseDouble(parts[1]);

});

//使用用戶ID作為鍵,對(duì)交易金額進(jìn)行聚合

transactionAmounts.groupByKey()

.reduce((aggValue,newValue)->aggValue+newValue,Materialized.as("user-aggregates"))

.to("user-aggregates");

finalKafkaStreamsstreams=newKafkaStreams(builder.build(),config);

streams.start();

//添加關(guān)閉鉤子,以優(yōu)雅地關(guān)閉應(yīng)用程序

Runtime.getRuntime().addShutdownHook(newThread(streams::close));

}

}在這個(gè)示例中,我們首先配置了KafkaStreams應(yīng)用程序的基本參數(shù),然后使用StreamsBuilder構(gòu)建了處理拓?fù)?。我們從transactions主題讀取數(shù)據(jù),將每條交易數(shù)據(jù)轉(zhuǎn)換為用戶ID和交易金額的鍵值對(duì),然后對(duì)交易金額進(jìn)行聚合,將結(jié)果寫入到user-aggregates主題中。通過這個(gè)示例,我們可以看到KafkaStreams如何簡(jiǎn)化了實(shí)時(shí)數(shù)據(jù)流處理的復(fù)雜性,使得開發(fā)者可以專注于業(yè)務(wù)邏輯的實(shí)現(xiàn),而無(wú)需關(guān)心底層的分布式處理細(xì)節(jié)。3實(shí)時(shí)計(jì)算:KafkaStreams架構(gòu)與原理3.1KafkaStreams架構(gòu)3.1.1KafkaStreams的組件KafkaStreams是一個(gè)用于處理和分析實(shí)時(shí)數(shù)據(jù)流的客戶端庫(kù),它基于ApacheKafka構(gòu)建。KafkaStreams的架構(gòu)設(shè)計(jì)圍繞幾個(gè)核心組件:StreamProcessingApplication:這是用戶編寫的流處理應(yīng)用程序,它使用KafkaStreamsAPI來(lái)定義數(shù)據(jù)流的處理邏輯。應(yīng)用程序可以讀取一個(gè)或多個(gè)主題的數(shù)據(jù),處理數(shù)據(jù),然后將結(jié)果寫入一個(gè)或多個(gè)輸出主題。KafkaStreamsClient:這是KafkaStreams的運(yùn)行時(shí)環(huán)境,它負(fù)責(zé)執(zhí)行流處理應(yīng)用程序定義的邏輯??蛻舳税ㄒ粋€(gè)或多個(gè)線程,每個(gè)線程負(fù)責(zé)處理一部分?jǐn)?shù)據(jù)流。KafkaBroker:KafkaStreams與Kafka集群集成,從KafkaBroker讀取數(shù)據(jù)并寫入結(jié)果。Broker保證了數(shù)據(jù)的高吞吐量、低延遲和持久性。StateStores:KafkaStreams使用狀態(tài)存儲(chǔ)來(lái)保存中間結(jié)果和狀態(tài)信息,這使得流處理應(yīng)用程序能夠進(jìn)行復(fù)雜的狀態(tài)化操作,如窗口操作和聚合。Topology:流處理應(yīng)用程序的邏輯被定義為一個(gè)處理拓?fù)?,它描述了?shù)據(jù)流的路徑和處理步驟。拓?fù)淇梢园ǘ鄠€(gè)處理節(jié)點(diǎn),如源節(jié)點(diǎn)、處理器節(jié)點(diǎn)和匯節(jié)點(diǎn)。3.1.2流處理任務(wù)的生命周期KafkaStreams中的流處理任務(wù)(Task)是流處理應(yīng)用程序執(zhí)行的基本單位。每個(gè)任務(wù)都有一個(gè)生命周期,包括以下幾個(gè)階段:創(chuàng)建:當(dāng)KafkaStreams應(yīng)用程序啟動(dòng)時(shí),它會(huì)根據(jù)定義的處理拓?fù)鋭?chuàng)建任務(wù)。每個(gè)任務(wù)負(fù)責(zé)處理一部分分區(qū)的數(shù)據(jù)。初始化:在任務(wù)開始處理數(shù)據(jù)之前,它會(huì)初始化狀態(tài)存儲(chǔ),并從KafkaBroker中讀取最新的偏移量。運(yùn)行:任務(wù)開始處理數(shù)據(jù)流,執(zhí)行定義的處理邏輯。數(shù)據(jù)被連續(xù)地讀取、處理和寫入。提交:任務(wù)定期向KafkaBroker提交處理過的數(shù)據(jù)的偏移量,這確保了即使在故障發(fā)生時(shí),處理狀態(tài)也不會(huì)丟失。故障恢復(fù):如果任務(wù)失敗,KafkaStreams會(huì)自動(dòng)重啟任務(wù),并從最近提交的偏移量開始重新處理數(shù)據(jù),以確保數(shù)據(jù)處理的正確性和一致性。關(guān)閉:當(dāng)應(yīng)用程序關(guān)閉或重新平衡發(fā)生時(shí),任務(wù)會(huì)關(guān)閉其狀態(tài)存儲(chǔ)并釋放資源。3.1.3KafkaStreams與Kafka的集成KafkaStreams與Kafka集群的集成是其架構(gòu)的關(guān)鍵部分。這種集成使得KafkaStreams能夠:讀取數(shù)據(jù):KafkaStreams應(yīng)用程序可以訂閱一個(gè)或多個(gè)Kafka主題,從Broker中讀取數(shù)據(jù)。寫入數(shù)據(jù):處理后的數(shù)據(jù)可以被寫入到一個(gè)或多個(gè)輸出主題中。狀態(tài)存儲(chǔ):KafkaStreams使用Kafka的主題作為狀態(tài)存儲(chǔ),這提供了高可用性和持久性。故障恢復(fù):KafkaStreams利用Kafka的特性來(lái)實(shí)現(xiàn)故障恢復(fù),確保數(shù)據(jù)處理的正確性和一致性。3.2示例代碼:KafkaStreams應(yīng)用程序下面是一個(gè)簡(jiǎn)單的KafkaStreams應(yīng)用程序示例,該程序讀取一個(gè)主題的數(shù)據(jù),將每個(gè)消息轉(zhuǎn)換為大寫,然后寫入到另一個(gè)主題:importorg.apache.kafka.streams.KafkaStreams;

importorg.apache.kafka.streams.StreamsBuilder;

importorg.apache.kafka.streams.StreamsConfig;

importorg.apache.kafka.streams.kstream.KStream;

importmon.serialization.Serdes;

importjava.util.Properties;

publicclassUppercaseStreamApp{

publicstaticvoidmain(String[]args){

Propertiesprops=newProperties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG,"uppercase-stream-app");

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

StreamsBuilderbuilder=newStreamsBuilder();

KStream<String,String>input=builder.stream("input-topic");

KStream<String,String>output=input.mapValues(value->value.toUpperCase());

output.to("output-topic");

KafkaStreamsstreams=newKafkaStreams(builder.build(),props);

streams.start();

//AddshutdownhooktorespondtoSIGTERMandgracefullycloseKafkaStreams

Runtime.getRuntime().addShutdownHook(newThread(streams::close));

}

}3.2.1代碼解釋配置:首先,我們創(chuàng)建一個(gè)Properties對(duì)象來(lái)配置KafkaStreams應(yīng)用程序。配置包括應(yīng)用程序ID、Broker地址、默認(rèn)的鍵和值序列化器。構(gòu)建拓?fù)洌菏褂肧treamsBuilder來(lái)定義處理拓?fù)?。我們?chuàng)建一個(gè)KStream對(duì)象來(lái)讀取input-topic主題的數(shù)據(jù),然后使用mapValues方法將每個(gè)消息值轉(zhuǎn)換為大寫,最后將結(jié)果寫入到output-topic主題。創(chuàng)建并啟動(dòng)Streams:使用StreamsBuilder構(gòu)建的拓?fù)浜团渲脛?chuàng)建一個(gè)KafkaStreams對(duì)象,并啟動(dòng)它。關(guān)閉鉤子:添加一個(gè)關(guān)閉鉤子,當(dāng)接收到SIGTERM信號(hào)時(shí),優(yōu)雅地關(guān)閉KafkaStreams應(yīng)用程序。3.3數(shù)據(jù)樣例假設(shè)input-topic主題包含以下數(shù)據(jù):{

"id":"1",

"name":"john",

"age":"30"

}

{

"id":"2",

"name":"jane",

"age":"25"

}經(jīng)過KafkaStreams應(yīng)用程序處理后,output-topic主題將包含以下數(shù)據(jù):{

"ID":"1",

"NAME":"JOHN",

"AGE":"30"

}

{

"ID":"2",

"NAME":"JANE",

"AGE":"25"

}注意,雖然示例中的轉(zhuǎn)換是簡(jiǎn)單的大小寫轉(zhuǎn)換,KafkaStreams支持更復(fù)雜的處理邏輯,包括聚合、窗口操作和連接操作。3.4結(jié)論KafkaStreams提供了一個(gè)強(qiáng)大而靈活的框架,用于構(gòu)建實(shí)時(shí)數(shù)據(jù)流處理應(yīng)用程序。通過其組件、任務(wù)生命周期和與Kafka的緊密集成,KafkaStreams能夠處理大規(guī)模的數(shù)據(jù)流,同時(shí)保證數(shù)據(jù)處理的正確性和一致性。上述代碼示例展示了如何使用KafkaStreamsAPI來(lái)定義和執(zhí)行一個(gè)簡(jiǎn)單的流處理任務(wù)。4實(shí)時(shí)計(jì)算:KafkaStreams架構(gòu)與原理4.1KafkaStreams原理4.1.1數(shù)據(jù)流模型KafkaStreams是一個(gè)用于處理和分析實(shí)時(shí)數(shù)據(jù)流的客戶端庫(kù),它允許開發(fā)者在應(yīng)用程序中直接處理存儲(chǔ)在ApacheKafka中的數(shù)據(jù)。數(shù)據(jù)流模型是KafkaStreams的核心,它將數(shù)據(jù)視為連續(xù)的、無(wú)界的流,而不是靜態(tài)的、有界的數(shù)據(jù)集。這種模型非常適合實(shí)時(shí)處理場(chǎng)景,因?yàn)樗梢猿掷m(xù)地處理數(shù)據(jù),而不需要等待數(shù)據(jù)集的完整加載。示例:使用KafkaStreams進(jìn)行數(shù)據(jù)流處理importorg.apache.kafka.streams.KafkaStreams;

importorg.apache.kafka.streams.StreamsBuilder;

importorg.apache.kafka.streams.StreamsConfig;

importorg.apache.kafka.streams.kstream.KStream;

importmon.serialization.Serdes;

publicclassWordCountApplication{

publicstaticvoidmain(String[]args){

//配置KafkaStreams

finalPropertiesstreamsConfiguration=newProperties();

streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount-stream-processing");

streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());

streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

//創(chuàng)建StreamsBuilder

finalStreamsBuilderbuilder=newStreamsBuilder();

//讀取輸入主題的數(shù)據(jù)流

finalKStream<String,String>textLines=builder.stream("input-topic");

//處理數(shù)據(jù)流,進(jìn)行單詞計(jì)數(shù)

finalKStream<String,Long>wordCounts=textLines

.flatMapValues(value->Arrays.asList(value.toLowerCase().split("\\W+")))

.groupBy((key,word)->word)

.count(Materialized.as("counts-store"));

//將處理后的數(shù)據(jù)流寫入輸出主題

wordCounts.to("output-topic");

//創(chuàng)建并啟動(dòng)KafkaStreams

finalKafkaStreamsstreams=newKafkaStreams(builder.build(),streamsConfiguration);

streams.start();

//等待應(yīng)用程序結(jié)束

Runtime.getRuntime().addShutdownHook(newThread(streams::close));

}

}在這個(gè)示例中,我們創(chuàng)建了一個(gè)簡(jiǎn)單的單詞計(jì)數(shù)應(yīng)用程序。首先,我們配置了KafkaStreams的參數(shù),然后使用StreamsBuilder構(gòu)建了數(shù)據(jù)流處理邏輯。我們從input-topic讀取數(shù)據(jù),將每行文本分割成單詞,對(duì)每個(gè)單詞進(jìn)行計(jì)數(shù),并將結(jié)果寫入output-topic。4.1.2狀態(tài)存儲(chǔ)機(jī)制狀態(tài)存儲(chǔ)是KafkaStreams的另一個(gè)關(guān)鍵特性,它允許應(yīng)用程序在處理數(shù)據(jù)流時(shí)保持狀態(tài)。狀態(tài)存儲(chǔ)可以用于實(shí)現(xiàn)復(fù)雜的流處理操作,如窗口聚合、狀態(tài)更新和連接操作。KafkaStreams提供了多種狀態(tài)存儲(chǔ)類型,包括內(nèi)存存儲(chǔ)、磁盤存儲(chǔ)和時(shí)間窗口存儲(chǔ),以滿足不同的性能和持久性需求。示例:使用狀態(tài)存儲(chǔ)進(jìn)行窗口聚合importorg.apache.kafka.streams.KafkaStreams;

importorg.apache.kafka.streams.StreamsBuilder;

importorg.apache.kafka.streams.StreamsConfig;

importorg.apache.kafka.streams.kstream.KStream;

importorg.apache.kafka.streams.kstream.TimeWindowedKStream;

importmon.serialization.Serdes;

publicclassWindowAggregationApplication{

publicstaticvoidmain(String[]args){

//配置KafkaStreams

finalPropertiesstreamsConfiguration=newProperties();

streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,"window-aggregation");

streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());

streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Long().getClass());

//創(chuàng)建StreamsBuilder

finalStreamsBuilderbuilder=newStreamsBuilder();

//讀取輸入主題的數(shù)據(jù)流

finalKStream<String,Long>input=builder.stream("input-topic");

//使用時(shí)間窗口進(jìn)行聚合

finalTimeWindowedKStream<String,Long>windowed=input.windowedBy(TimeWindows.of(Duration.ofMinutes(5)));

finalKStream<String,Long>aggregated=windowed.reduce(

(aggValue,newValue)->aggValue+newValue,

Materialized.<String,Long,WindowStore<Bytes,byte[]>>as("aggregated-store")

.withValueSerde(Serdes.Long())

);

//將處理后的數(shù)據(jù)流寫入輸出主題

aggregated.toStream().foreach((k,v)->System.out.println(k+":"+v));

aggregated.to("output-topic");

//創(chuàng)建并啟動(dòng)KafkaStreams

finalKafkaStreamsstreams=newKafkaStreams(builder.build(),streamsConfiguration);

streams.start();

//等待應(yīng)用程序結(jié)束

Runtime.getRuntime().addShutdownHook(newThread(streams::close));

}

}在這個(gè)示例中,我們使用了時(shí)間窗口進(jìn)行聚合操作。我們從input-topic讀取數(shù)據(jù),然后使用windowedBy方法創(chuàng)建了一個(gè)時(shí)間窗口流,窗口大小為5分鐘。接著,我們使用reduce方法對(duì)窗口內(nèi)的數(shù)據(jù)進(jìn)行聚合,將結(jié)果存儲(chǔ)在aggregated-store中,并將最終的聚合結(jié)果寫入output-topic。4.1.3窗口和時(shí)間概念窗口是KafkaStreams中用于處理數(shù)據(jù)流的另一個(gè)重要概念。窗口允許應(yīng)用程序在特定的時(shí)間段內(nèi)聚合數(shù)據(jù),這對(duì)于需要基于時(shí)間進(jìn)行分析的場(chǎng)景非常有用。KafkaStreams支持兩種類型的窗口:滑動(dòng)窗口和會(huì)話窗口?;瑒?dòng)窗口在固定的時(shí)間間隔內(nèi)聚合數(shù)據(jù),而會(huì)話窗口則基于數(shù)據(jù)的活動(dòng)性進(jìn)行聚合。時(shí)間概念在KafkaStreams中也非常重要,它包括事件時(shí)間(EventTime)和處理時(shí)間(ProcessingTime)。事件時(shí)間是指數(shù)據(jù)事件實(shí)際發(fā)生的時(shí)間,而處理時(shí)間是指數(shù)據(jù)事件被處理的時(shí)間。KafkaStreams支持基于事件時(shí)間的窗口處理,這使得應(yīng)用程序可以更準(zhǔn)確地處理和分析數(shù)據(jù)。示例:使用滑動(dòng)窗口進(jìn)行數(shù)據(jù)處理importorg.apache.kafka.streams.KafkaStreams;

importorg.apache.kafka.streams.StreamsBuilder;

importorg.apache.kafka.streams.StreamsConfig;

importorg.apache.kafka.streams.kstream.KStream;

importorg.apache.kafka.streams.kstream.TimeWindowedKStream;

importmon.serialization.Serdes;

publicclassSlidingWindowApplication{

publicstaticvoidmain(String[]args){

//配置KafkaStreams

finalPropertiesstreamsConfiguration=newProperties();

streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,"sliding-window");

streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());

streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Long().getClass());

//創(chuàng)建StreamsBuilder

finalStreamsBuilderbuilder=newStreamsBuilder();

//讀取輸入主題的數(shù)據(jù)流

finalKStream<String,Long>input=builder.stream("input-topic");

//使用滑動(dòng)窗口進(jìn)行聚合

finalTimeWindowedKStream<String,Long>windowed=input.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).advanceBy(Duration.ofMinutes(1)));

finalKStream<String,Long>aggregated=windowed.reduce(

(aggValue,newValue)->aggValue+newValue,

Materialized.<String,Long,WindowStore<Bytes,byte[]>>as("sliding-aggregated-store")

.withValueSerde(Serdes.Long())

);

//將處理后的數(shù)據(jù)流寫入輸出主題

aggregated.toStream().foreach((k,v)->System.out.println(k+":"+v));

aggregated.to("output-topic");

//創(chuàng)建并啟動(dòng)KafkaStreams

finalKafkaStreamsstreams=newKafkaStreams(builder.build(),streamsConfiguration);

streams.start();

//等待應(yīng)用程序結(jié)束

Runtime.getRuntime().addShutdownHook(newThread(streams::close));

}

}在這個(gè)示例中,我們使用了滑動(dòng)窗口進(jìn)行數(shù)據(jù)聚合。我們從input-topic讀取數(shù)據(jù),然后使用windowedBy方法創(chuàng)建了一個(gè)滑動(dòng)窗口流,窗口大小為5分鐘,窗口滑動(dòng)步長(zhǎng)為1分鐘。接著,我們使用reduce方法對(duì)窗口內(nèi)的數(shù)據(jù)進(jìn)行聚合,將結(jié)果存儲(chǔ)在sliding-aggregated-store中,并將最終的聚合結(jié)果寫入output-topic。通過這些示例,我們可以看到KafkaStreams如何利用數(shù)據(jù)流模型、狀態(tài)存儲(chǔ)和窗口概念來(lái)處理和分析實(shí)時(shí)數(shù)據(jù)。這些特性使得KafkaStreams成為構(gòu)建實(shí)時(shí)數(shù)據(jù)處理和分析應(yīng)用程序的理想選擇。5KafkaStreams的開發(fā)與使用5.1KafkaStreamsAPI概覽KafkaStreams是一個(gè)用于構(gòu)建實(shí)時(shí)流處理應(yīng)用程序的Java庫(kù)。它提供了高級(jí)API,允許開發(fā)者以聲明式的方式處理數(shù)據(jù)流,包括讀取、轉(zhuǎn)換、聚合和寫入數(shù)據(jù)。KafkaStreams的核心概念包括:StreamsBuilder:用于構(gòu)建流處理應(yīng)用程序的入口點(diǎn)。通過StreamsBuilder,開發(fā)者可以定義數(shù)據(jù)流的處理邏輯。KStream:代表原始的流數(shù)據(jù),可以進(jìn)行各種轉(zhuǎn)換操作,如map、filter、flatMap等。KTable:代表一個(gè)可以進(jìn)行查詢的、更新的數(shù)據(jù)結(jié)構(gòu),通常用于存儲(chǔ)聚合數(shù)據(jù)。GlobalKTable:與KTable類似,但其數(shù)據(jù)是全局共享的,可以跨多個(gè)KafkaStreams實(shí)例進(jìn)行訪問。5.1.1示例代碼importorg.apache.kafka.streams.KafkaStreams;

importorg.apache.kafka.streams.StreamsBuilder;

importorg.apache.kafka.streams.StreamsConfig;

importorg.apache.kafka.streams.kstream.KStream;

importmon.serialization.Serdes;

importjava.util.Properties;

publicclassWordCountApplication{

publicstaticvoidmain(String[]args){

Propertiesprops=newProperties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount");

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

StreamsBuilderbuilder=newStreamsBuilder();

KStream<String,String>textLines=builder.stream("input-topic");

KStream<String,Long>wordCounts=textLines

.flatMapValues(value->Arrays.asList(value.toLowerCase().split("\\W+")))

.groupBy((key,word)->word)

.count(Materialized.as("counts-store"));

wordCounts.to("output-topic");

KafkaStreamsstreams=newKafkaStreams(builder.build(),props);

streams.start();

}

}這段代碼展示了如何使用KafkaStreamsAPI構(gòu)建一個(gè)簡(jiǎn)單的單詞計(jì)數(shù)應(yīng)用程序。它從input-topic讀取數(shù)據(jù),將數(shù)據(jù)轉(zhuǎn)換為小寫并分割成單詞,然后對(duì)每個(gè)單詞進(jìn)行計(jì)數(shù),并將結(jié)果寫入output-topic。5.2構(gòu)建流處理應(yīng)用程序構(gòu)建KafkaStreams應(yīng)用程序涉及以下步驟:配置應(yīng)用程序:設(shè)置應(yīng)用程序ID、Kafka服務(wù)器地址、序列化和反序列化器等。定義數(shù)據(jù)流:使用StreamsBuilder定義輸入和輸出流,以及流的處理邏輯。處理數(shù)據(jù)流:通過KStream和KTableAPI對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換、聚合等操作。啟動(dòng)應(yīng)用程序:創(chuàng)建KafkaStreams實(shí)例并啟動(dòng)它。5.2.1示例代碼importorg.apache.kafka.streams.KafkaStreams;

importorg.apache.kafka.streams.StreamsBuilder;

importorg.apache.kafka.streams.kstream.KStream;

importmon.serialization.Serdes;

importorg.apache.kafka.streams.StreamsConfig;

importjava.util.Properties;

publicclassStreamApplication{

publicstaticvoidmain(String[]args){

Propertiesprops=newProperties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG,"stream-app");

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

StreamsBuilderbuilder=newStreamsBuilder();

KStream<String,String>input=builder.stream("input-topic");

KStream<String,String>output=input.mapValues(value->value.toUpperCase());

output.to("output-topic");

KafkaStreamsstreams=newKafkaStreams(builder.build(),props);

streams.start();

}

}此示例展示了如何構(gòu)建一個(gè)簡(jiǎn)單的流處理應(yīng)用程序,該程序?qū)膇nput-topic讀取的字符串轉(zhuǎn)換為大寫,并將結(jié)果寫入output-topic。5.3配置與優(yōu)化KafkaStreams提供了多種配置選項(xiàng),以優(yōu)化應(yīng)用程序的性能和資源使用。關(guān)鍵配置包括:processing.guarantee:設(shè)置數(shù)據(jù)處理的一致性級(jí)別,可以選擇at_least_once或exactly_once。cache.max.bytes.buffering:設(shè)置緩存的最大字節(jié)數(shù),以減少對(duì)Kafka的讀寫操作。erval.ms:設(shè)置狀態(tài)更改的提交間隔,以平衡延遲和吞吐量。5.3.1示例配置Propertiesprops=newProperties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG,"my-stream-app");

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,"exactly_once");

props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,50*1024*1024L);

props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,10000);這些配置示例展示了如何設(shè)置應(yīng)用程序ID、Kafka服務(wù)器地址、序列化器、數(shù)據(jù)處理保證、緩存大小和提交間隔,以優(yōu)化KafkaStreams應(yīng)用程序的性能。5.4總結(jié)通過上述內(nèi)容,我們了解了KafkaStreamsAPI的基本概念,學(xué)習(xí)了如何構(gòu)建流處理應(yīng)用程序,并探討了關(guān)鍵的配置選項(xiàng)以優(yōu)化應(yīng)用程序的性能。KafkaStreams為開發(fā)者提供了一個(gè)強(qiáng)大的工具,用于處理實(shí)時(shí)數(shù)據(jù)流,實(shí)現(xiàn)復(fù)雜的數(shù)據(jù)處理邏輯。6實(shí)時(shí)數(shù)據(jù)分析示例:KafkaStreams在實(shí)際場(chǎng)景中的應(yīng)用在實(shí)時(shí)數(shù)據(jù)處理領(lǐng)域,KafkaStreams以其高效、靈活和可擴(kuò)展的特性,成為處理流數(shù)據(jù)的首選工具。本教程將通過一個(gè)具體的案例分析,深入探討KafkaStreams在實(shí)際場(chǎng)景中的應(yīng)用,包括其架構(gòu)設(shè)計(jì)和核心原理。6.1案例背景假設(shè)我們正在為一家電子商務(wù)公司開發(fā)一個(gè)實(shí)時(shí)數(shù)據(jù)分析系統(tǒng),該系統(tǒng)需要從多個(gè)數(shù)據(jù)源(如用戶行為日志、產(chǎn)品信息、庫(kù)存狀態(tài)等)收集數(shù)據(jù),并實(shí)時(shí)分析這些數(shù)據(jù)以提供個(gè)性化推薦、庫(kù)存預(yù)警和實(shí)時(shí)報(bào)告等功能。為了實(shí)現(xiàn)這一目標(biāo),我們選擇使用KafkaStreams作為數(shù)據(jù)處理引擎。6.2KafkaStreams架構(gòu)概述KafkaStreams架構(gòu)主要由以下組件構(gòu)成:Kafka集群:作為數(shù)據(jù)的存儲(chǔ)和傳輸平臺(tái)。KafkaStreams應(yīng)用:運(yùn)行在獨(dú)立的JVM上,負(fù)責(zé)讀取Kafka中的數(shù)據(jù),進(jìn)行處理,并將結(jié)果寫回Kafka或外部系統(tǒng)。StateStores:用于存儲(chǔ)中間結(jié)果,支持窗口操作和聚合操作。Topology:定義數(shù)據(jù)流的處理邏輯,包括數(shù)據(jù)源、處理步驟和數(shù)據(jù)目標(biāo)。6.3實(shí)時(shí)數(shù)據(jù)分析流程數(shù)據(jù)收集:從Kafka的多個(gè)主題中收集數(shù)據(jù)。數(shù)據(jù)處理:使用KafkaStreamsAPI進(jìn)行數(shù)據(jù)清洗、轉(zhuǎn)換和聚合。結(jié)果輸出:將處理后的數(shù)據(jù)輸出到另一個(gè)Kafka主題或外部系統(tǒng)。6.4KafkaStreams核心原理KafkaStreams基于流處理模型,其核心原理包括:無(wú)狀態(tài)和有狀態(tài)處理:無(wú)狀態(tài)處理是對(duì)每條數(shù)據(jù)進(jìn)行獨(dú)立處理,有狀態(tài)處理則需要維護(hù)數(shù)據(jù)的狀態(tài),進(jìn)行聚合或窗口操作。時(shí)間窗口:KafkaStreams支持基于時(shí)間的窗口操作,可以對(duì)特定時(shí)間范圍內(nèi)的數(shù)據(jù)進(jìn)行聚合。并行處理:KafkaStreams應(yīng)用可以并行運(yùn)行在多個(gè)JVM上,每個(gè)實(shí)例處理數(shù)據(jù)流的一部分,提高處理效率。6.5代碼示例:實(shí)時(shí)數(shù)據(jù)分析下面是一個(gè)使用KafkaStreams進(jìn)行實(shí)時(shí)數(shù)據(jù)分析的Java代碼示例,該示例展示了如何從一個(gè)主題讀取數(shù)據(jù),進(jìn)行簡(jiǎn)單的聚合操作,并將結(jié)果寫入另一個(gè)主題。importorg.apache.kafka.streams.KafkaStreams;

importorg.apache.kafka.streams.StreamsBuilder;

importorg.apache.kafka.streams.StreamsConfig;

importorg.apache.kafka.streams.kstream.KStream;

importorg.apache.kafka.streams.kstream.Materialized;

importmon.serialization.Serdes;

importjava.util.Properties;

publicclassRealTimeDataAnalysis{

publicstaticvoidmain(String[]args){

Propertiesprops=newProperties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG,"real-time-data-analysis");

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

StreamsBuilderbuilder=newStreamsBuilder();

//從主題"user-activity"讀取數(shù)據(jù)

KStream<String,String>userActivity=builder.stream("user-activity");

//對(duì)數(shù)據(jù)進(jìn)行聚合,計(jì)算每個(gè)用戶的活動(dòng)次數(shù)

KStream<String,Long>userActivityCount=userActivity

.groupByKey()

.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))

.aggregate(

()->0L,

(key,value,aggregate)->aggregate+1,

Materialized.as("user-activity-count-store")

)

.toStream()

.peek((key,value)->System.out.println("User"+key+"has"+value+"activitiesinthelast5minutes."))

.mapValues(value->value.toString());

//將結(jié)果寫入主題"user-activity-count"

userActivityCount.to("user-activity-count");

KafkaStreamsstreams=newKafkaStreams(builder.build(),props);

streams.start();

}

}6.5.1代碼解析配置KafkaStreams應(yīng)用:通過Properties對(duì)象設(shè)置應(yīng)用ID、Kafka服務(wù)器地址以及默認(rèn)的序列化和反序列化類。創(chuàng)建流處理拓?fù)洌菏褂肧treamsBuilder定義數(shù)據(jù)流的處理邏輯。讀取數(shù)據(jù):從主題user-activity讀取數(shù)據(jù),數(shù)據(jù)類型為<String,String>,即用戶ID和活動(dòng)描述。聚合操作:對(duì)數(shù)據(jù)進(jìn)行分組和窗口操作,計(jì)算每個(gè)用戶在最近5分鐘內(nèi)的活動(dòng)次數(shù)。輸出結(jié)果:將聚合后的結(jié)果輸出到主題user-activity-count。通過這個(gè)示例,我們可以看到KafkaStreams如何簡(jiǎn)化實(shí)時(shí)數(shù)據(jù)處理的復(fù)雜性,提供了一種高效、靈活的方式來(lái)處理流數(shù)據(jù)。6.6結(jié)論KafkaStreams在實(shí)時(shí)數(shù)據(jù)分析領(lǐng)域提供了強(qiáng)大的支持,通過其靈活的API和高效的處理能力,可以輕松實(shí)現(xiàn)復(fù)雜的數(shù)據(jù)處理邏輯。在實(shí)際應(yīng)用中,KafkaStreams不僅可以處理簡(jiǎn)單的數(shù)據(jù)轉(zhuǎn)換,還可以進(jìn)行復(fù)雜的有狀態(tài)處理,如窗口操作和聚合,為實(shí)時(shí)數(shù)據(jù)分析提供了無(wú)限可能。7進(jìn)階主題7.1KafkaStreams的故障恢復(fù)KafkaStreams在設(shè)計(jì)上充分考慮了故障恢復(fù)機(jī)制,確保在處理流數(shù)據(jù)時(shí)的高可用性和數(shù)據(jù)一致性。其故障恢復(fù)機(jī)制主要依賴于Kafka的持久化存儲(chǔ)和狀態(tài)存儲(chǔ)特性,以及任務(wù)的重新分配。7.1.1狀態(tài)存儲(chǔ)KafkaStreams使用狀態(tài)存儲(chǔ)(StateStores)來(lái)保存中間處理結(jié)果,這些狀態(tài)存儲(chǔ)可以是內(nèi)存中的,也可以是磁盤上的。當(dāng)流處理應(yīng)用程序運(yùn)行時(shí),它會(huì)定期將狀態(tài)存儲(chǔ)中的數(shù)據(jù)持久化到Kafka的Topic中,這被稱為Checkpoint。Checkpoint機(jī)制確保了在應(yīng)用程序或節(jié)點(diǎn)失敗時(shí),可以從最近的Checkpoint恢復(fù)狀態(tài),從而繼續(xù)處理數(shù)據(jù)。7.1.2任務(wù)重新分配KafkaStreams將流處理任務(wù)分解為多個(gè)子任務(wù),每個(gè)子任務(wù)可以獨(dú)立運(yùn)行在不同的工作線程或節(jié)點(diǎn)上。當(dāng)一個(gè)節(jié)點(diǎn)或工作線程失敗時(shí),KafkaStreams會(huì)自動(dòng)將失敗的任務(wù)重新分配給集群中的其他節(jié)點(diǎn)或線程,確保流處理的連續(xù)性。7.1.3故障恢復(fù)流程當(dāng)KafkaStreams檢測(cè)到應(yīng)用程序或節(jié)點(diǎn)故障時(shí),它會(huì)觸發(fā)故障恢復(fù)流

溫馨提示

  • 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論