版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)
文檔簡介
第8章消息驅(qū)動框架SpringCloudStream《SpringCloud微服務(wù)架構(gòu)開發(fā)(第2版)》學(xué)習(xí)目標/Target了解SpringCloudStream概述,能夠簡述使用SpringCloudStream的好處哥SpringCloudStream核心概念,以及常見接口、注解掌握SpringCloudStream快速入門,能夠在SpringBoot應(yīng)用中接收RabbitMQ中的消息掌握SpringCloudStream的發(fā)布-訂閱模式,能夠通過SpringCloudStream發(fā)布消息,并由多個訂閱者同時消費所訂閱的消息掌握消費組,能夠通過SpringCloudStream消費組實現(xiàn)生產(chǎn)者發(fā)送的消息只能被消費組中的一個實例消費掌握消息分區(qū),能夠通過SpringCloudStream消息分區(qū)實現(xiàn)相同特征的消息都被同一個消費者實例處理章節(jié)概述/Summary在實際開發(fā)中,當系統(tǒng)需要實現(xiàn)異步通信、保證高可用性和可伸縮性時,通常會使用消息中間件實現(xiàn)服務(wù)與服務(wù)之間的通信,但是,以往使用的一些消息中間件對系統(tǒng)的耦合性較高,例如,當前系統(tǒng)使用RabbitMQ,如果要將現(xiàn)有的RabbitMQ替換為Kafka等其他消息中間件,會導(dǎo)致系統(tǒng)需要做較大的調(diào)整。為了降低系統(tǒng)與消息中間件間的耦合性,我們可以使用SpringCloudStream整合消息中間件。本章將對SpringCloudStream的相關(guān)內(nèi)容進行詳細講解。目錄/Contents8.18.2SpringCloudStream概述SpringCloudStream快速入門8.3SpringCloudStream的發(fā)布-訂閱模式8.4消費組和消息分區(qū)SpringCloudStream概述8.18.1
SpringCloudStream概述先定一個小目標!
先定一個小目標!了解SpringCloudStream概述,能夠簡述使用SpringCloudStream的好處,以及SpringCloudStream核心概念、常見接口和注解8.1
SpringCloudStream概述SpringCloudStream是一個用來為微服務(wù)應(yīng)用構(gòu)建消息驅(qū)動能力的框架,它封裝了SpringCloud對消息中間件的操作,使得我們在使用SpringCloudStream時可以忽略消息中間件的差異,降低了消息中間件的使用復(fù)雜度。8.1
SpringCloudStream概述1.SpringCloudStream核心概念綁定器(Binder)是SpringCloudStream的一個關(guān)鍵組件,它負責(zé)將應(yīng)用程序與消息中間件進行連接,充當了應(yīng)用程序與消息代理之間的適配器,隱藏了底層消息中間件的細節(jié),使開發(fā)人員可以專注于業(yè)務(wù)邏輯而不用擔(dān)心與特定消息代理之間的交互。當需要升級或者更換消息中間件時,開發(fā)者只需更換對應(yīng)的Binder綁定器,而不需要修改任何應(yīng)用程序的邏輯。綁定器提供了一組統(tǒng)一的接口,用于發(fā)送和接收消息。(1)綁定器8.1
SpringCloudStream概述1.SpringCloudStream核心概念SpringCloudStream的通道是用于在應(yīng)用程序內(nèi)部傳遞消息的抽象概念,可以看作是消息的管道,它連接了消息的生產(chǎn)者和消費者。SpringCloudStream提供了如下兩種類型的通道。輸入通道(InputChannel):用于接收外部傳入的消息。輸出通道(OutputChannel):用于向外部發(fā)送消息。(2)通道8.1
SpringCloudStream概述2.SpringCloudStream常見接口和注解Sink接口是SpringCloudStream的輸入通道接口,繼承自SpringIntegration的MessageChannel接口,在應(yīng)用程序中實現(xiàn)Sink接口可以讓應(yīng)用程序作為消息的訂閱者,接收到從消息中間件發(fā)送的消息,并進行相應(yīng)的處理。(1)Sink接口8.1
SpringCloudStream概述2.SpringCloudStream常見接口和注解Source接口是SpringCloudStream的輸出通道接口,繼承自SpringIntegration的MessageChannel接口,在應(yīng)用程序中實現(xiàn)該接口后,可以調(diào)用該通道的方法將消息發(fā)送到消息中間件。(2)Source接口8.1
SpringCloudStream概述2.SpringCloudStream常見接口和注解Processor接口是一個結(jié)合了輸入和輸出通道的特殊接口,它繼承了Sink接口和Source接口。通過實現(xiàn)Processor接口,應(yīng)用程序可以同時具備接收和發(fā)送消息的功能,充當消息的處理器。(3)Processor接口8.1
SpringCloudStream概述2.SpringCloudStream常見接口和注解@Input注解用于定義輸入通道,通過在方法上添加@Input注解,可以將方法與相應(yīng)的輸入通道進行綁定,從而接收訂閱的消息。@Input注解有兩個主要的屬性,具體如下。value:用于指定輸入通道的名稱,以便在應(yīng)用程序中引用和使用它。如果只定義value屬性,則可以省略屬性名。binding(可選):用于指定與輸入通道綁定的分組或者綁定器,以便將相同綁定器或分組的輸入通道進行關(guān)聯(lián)。(4)@Input8.1
SpringCloudStream概述2.SpringCloudStream常見接口和注解@Output注解用于定義輸出通道,通過在方法上添加@Output注解,并通過其value屬性指定輸出通道的名稱,將方法與相應(yīng)的輸出通道進行綁定,從而發(fā)送消息到指定的目標。(5)@Output8.1
SpringCloudStream概述2.SpringCloudStream常見接口和注解@EnableBinding注解用于啟用綁定器,將應(yīng)用程序與消息中間件進行綁定。通過在配置類上添加@EnableBinding注解,可以通過其value屬性指定要綁定的輸入通道和輸出通道,如果只定義value屬性,則可以省略屬性名。(6)@EnableBinding8.1
SpringCloudStream概述2.SpringCloudStream常見接口和注解@StreamListener注解用于將被修飾的方法注冊為消息中間件上數(shù)據(jù)流的事件監(jiān)聽器。使用@StreamListener注解可以將方法與輸入通道進行綁定,當有消息到達該輸入通道時,綁定的方法會自動被調(diào)用,處理接收到的消息。(7)@StreamListener8.1
SpringCloudStream概述2.SpringCloudStream常見接口和注解StreamBridge是SpringCloudStream提供的一個幫助類,可以用于在消息通道之間進行消息發(fā)送和接收。StreamBridge的send()方法可以將消息發(fā)送到指定的目標(輸出通道)。StreamBridge的receive()方法從指定的源頭(輸入通道)接收消息。(8)StreamBridge8.1
SpringCloudStream概述2.SpringCloudStream常見接口和注解從SpringCloudStream3.1版本開始,官方不再推薦使用@Input、@Output、@EnableBinding和@StreamListener注解進行消息處理,而是推薦使用函數(shù)式編程模型,通過定義函數(shù)式Bean處理消息。在推薦的函數(shù)式編程模型中,定義與消息代理交互的函數(shù)式Bean可以是Supplier、Consumer和Function類型,分別用于處理輸出、處理輸入、處理輸入和輸出,并可以使用spring.cloud.function.definition屬性顯式指定要使用的函數(shù)式Bean,更直觀和便捷地編寫消息處理邏輯。SpringCloudStream快速入門8.28.2SpringCloudStream快速入門先定一個小目標!
先定一個小目標!掌握SpringCloudStream快速入門,能夠在SpringBoot應(yīng)用中接收RabbitMQ中的消息8.2SpringCloudStream快速入門1.創(chuàng)建SpringBoot項目文件8-1pom.xml源代碼在IDEA中創(chuàng)建一個名稱為stream_quickstart的Maven項目,并在項目的pom.xml文件中添加SpringCloud、SpringBoot和SpringCloudStream的相關(guān)依賴,具體如文件8-1所示。8.2SpringCloudStream快速入門2.創(chuàng)建消息消費者文件8-2RabbitMQBean.java源代碼在stream_quickstart項目下創(chuàng)建com.itheima.rabittmq包,在該包下創(chuàng)建RabbitMQBean類,并在該類中創(chuàng)建定義Consumer類型的函數(shù)式Bean用于處理消息,具體如文件8-2所示。8.2SpringCloudStream快速入門3.創(chuàng)建啟動類文件8-3StreamQuickStartApplication.java源代碼在stream_quickstart項目的com.itheima包下創(chuàng)建項目的啟動類,具體如文件8-3所示。8.2SpringCloudStream快速入門4.查看效果依次啟動RabbitMQ和stream_quickstart項目的啟動類StreamQuickStartApplication,StreamQuickStartApplication啟動成功后,控制臺輸出的啟動日志。8.2SpringCloudStream快速入門4.查看效果在瀏覽器訪問http://localhost:15672,使用guest賬號登錄RabbitMQ可視化管理頁面成功后,單擊導(dǎo)航欄中的Connections選項,查看RabbitMQ的連接信息。8.2SpringCloudStream快速入門4.查看效果單擊Queues選項,查看RabbitMQ中的隊列信息。8.2SpringCloudStream快速入門4.查看效果在隊列的管理頁面中,通過Publishmessage功能來發(fā)送一條消息到隊列中。8.2SpringCloudStream快速入門4.查看效果在RabbitMQ中完成消息的發(fā)送后,查看stream_quickstart項目的控制臺中輸出的消息。SpringCloudStream的發(fā)布-訂閱模式8.38.3SpringCloudStream的發(fā)布-訂閱模式先定一個小目標!
先定一個小目標!掌握SpringCloudStream的發(fā)布-訂閱模式,能夠通過SpringCloudStream發(fā)布消息,并由多個訂閱者同時消費所訂閱的消息8.3SpringCloudStream的發(fā)布-訂閱模式SpringCloudStream中采用的消息通信方式是基于發(fā)布-訂閱模式。當消息被發(fā)送到消息中間件后,SpringCloudStream會通過共享的主題進行廣播,訂閱了該主題的消息消費者會接收消息,并觸發(fā)相應(yīng)的業(yè)務(wù)邏輯處理。在SpringCloudStream中,主題是一個抽象概念,代表消息發(fā)布給消費者的地方。在不同的消息中間件中,主題可能對應(yīng)不同的實現(xiàn)方式,比如在RabbitMQ中對應(yīng)的是Exchange,在Kafka中對應(yīng)的是Topic。為了讀者可以直觀地感受SpringCloudStream使用發(fā)布-訂閱模式時,消息是如何被分發(fā)到多個訂閱者,下面通過案例演示SpringCloudStream的發(fā)布-訂閱模式。8.3SpringCloudStream的發(fā)布-訂閱模式1.創(chuàng)建項目父工程文件8-4pom.xml源代碼在IDEA中創(chuàng)建一個名稱為stream-publishsubscribe的Maven工程,在該工程的pom.xml文件中指定SpringCloud和SpringBoot依賴的版本,具體如文件8-4所示。8.3SpringCloudStream的發(fā)布-訂閱模式2.創(chuàng)建消息生產(chǎn)者文件8-5pom.xml源代碼在stream-publishsubscribe下創(chuàng)建名稱為stream-supplier的模塊,在該項目的pom.xml文件中引入SpringCloudStream集成RabbitMQ的依賴,以及SpringBoot構(gòu)建Web應(yīng)用程序的起步依賴,具體如文件8-5所示。8.3SpringCloudStream的發(fā)布-訂閱模式2.創(chuàng)建消息生產(chǎn)者SpringCloudStream中提供了一個spring.cloud.stream.bindings屬性用于配置消息通道綁定,通過該屬性綁定的通道命名約定的格式為“消息處理的方法名稱-in/out-索引”,其中,in和out對應(yīng)于綁定的類型為輸入和輸出,索引是輸入或輸出綁定的索引值,對于應(yīng)用程序只包含單個輸入或輸出方法,索引值始終為0。8.3SpringCloudStream的發(fā)布-訂閱模式2.創(chuàng)建消息生產(chǎn)者文件8-6application.yml源代碼在stream-supplier模塊下創(chuàng)建配置文件application.yml,并在該配置文件中設(shè)置應(yīng)用程序與RabbitMQ的連接信息,以及消息通道的綁定關(guān)系,具體如文件8-6所示。8.3SpringCloudStream的發(fā)布-訂閱模式2.創(chuàng)建消息生產(chǎn)者文件8-7MessageController.java源代碼在stream-supplier模塊下創(chuàng)建包com.itheima.controller,在該包下創(chuàng)建一個控制器類,在該類中定義方法通過StreamBridge對象向RabbitMQ發(fā)送消息,具體如文件8-7所示。8.3SpringCloudStream的發(fā)布-訂閱模式2.創(chuàng)建消息生產(chǎn)者文件8-8StreamSupplierApplication.java源代碼在stream-supplier模塊下的com.itheima包下創(chuàng)建一個項目啟動類,具體如文件8-8所示。8.3SpringCloudStream的發(fā)布-訂閱模式3.創(chuàng)建消息消費者文件8-9MessageService.java源代碼在stream-supplier模塊下創(chuàng)建包com.itheima.service,在該包下創(chuàng)建一個類通過函數(shù)式編程的方式創(chuàng)建一個Bean,并在該Bean中定義消息處理方法,具體如文件8-9所示。8.3SpringCloudStream的發(fā)布-訂閱模式3.創(chuàng)建消息消費者文件8-10pom.xml源代碼在stream-consumer模塊的pom.xml文件中引入SpringCloudStream集成RabbitMQ的依賴,具體如文件8-10所示。8.3SpringCloudStream的發(fā)布-訂閱模式3.創(chuàng)建消息消費者文件8-11application.yml源代碼在stream-consumer模塊下創(chuàng)建配置文件application.yml,并在該配置文件中設(shè)置應(yīng)用程序與RabbitMQ的連接信息,以及消息通道的綁定關(guān)系,具體如文件8-11所示。8.3SpringCloudStream的發(fā)布-訂閱模式3.創(chuàng)建消息消費者文件8-12application.yml源代碼在stream-consumer模塊下創(chuàng)建包com.itheima.service,在該包下創(chuàng)建一個類,在該類中通過函數(shù)式編程的方式創(chuàng)建一個Bean,并在該Bean中定義消息處理方法,具體如文件8-12所示。8.3SpringCloudStream的發(fā)布-訂閱模式3.創(chuàng)建消息消費者文件8-13StreamConsumerApplication.java源代碼在stream-consumer模塊下的com.itheima包下創(chuàng)建一個項目啟動類,具體如文件8-13所示。8.3SpringCloudStream的發(fā)布-訂閱模式4.效果測試依次啟動RabbitMQ、StreamSupplierApplication和StreamConsumerApplication,在瀏覽器訪問http://localhost:8801/data?msg=中國高鐵。8.3SpringCloudStream的發(fā)布-訂閱模式4.效果測試此時IDEA中StreamSupplierApplication和StreamConsumerApplication控制臺輸出信息。消費組和消息分區(qū)8.48.4
消費組和消息分區(qū)為了有效解決一些特定的應(yīng)用場景和需求,如負載均衡、消息分區(qū)、高可用性和故障轉(zhuǎn)移等,SpringCloudStream引入消費組和消息分區(qū)。通過消費組和消息分區(qū),SpringCloudStream為消息驅(qū)動的微服務(wù)架構(gòu)提供了更靈活和可靠的消息處理機制。接下來,分別對SpringCloudStream中的消費組和消息分區(qū)分別進行講解。8.4.1
消費組先定一個小目標!
先定一個小目標!掌握消費組,能夠通過SpringCloudStream消費組實現(xiàn)生產(chǎn)者發(fā)送的消息只能被消費組中的一個實例消費8.4.1
消費組通常在生產(chǎn)環(huán)境中,每個服務(wù)往往會以多個實例的方式運行。當這些實例啟動時,他們會綁定到同一個消息通道的目標主題上。默認情況下,當生產(chǎn)者發(fā)送一條消息到綁定通道上時,每個消費者實例都會接收和處理該消息。然而,在某些業(yè)務(wù)場景下,我們希望生產(chǎn)者發(fā)送的消息只能被一個實例消費。為了實現(xiàn)這一功能,我們可以為這些消費者設(shè)置消費組。8.4.1
消費組(1)設(shè)置多實例運行在IDEA中設(shè)置stream-consumer允許多個實例運行。首先演示stream-consumer模塊不設(shè)置分組時,啟動2個實例消費消息的效果。8.4.1
消費組(1)設(shè)置多實例運行復(fù)制StreamConsumerApplication并將復(fù)制后的應(yīng)用命名為StreamConsumerApplication-02。8.4.1
消費組(2)測試效果在瀏覽器訪問http://localhost:8801/data?msg=祝融號,此時IDEA中StreamConsumerApplication和StreamConsumerApplication-02控制臺輸出信息。8.4.1
消費組如果開發(fā)者希望每個消息只被多個消費實例中的一個實例進行消費,可以對服務(wù)設(shè)置消費組,這樣服務(wù)中的所有實例默認都會在同一個分組。為消費者服務(wù)設(shè)置消費組實現(xiàn)的方式非常簡單,只需要在服務(wù)消費者端通過spring.cloud.stream.bindings.input.group屬性設(shè)置分組即可,其中input為綁定的輸入通道名稱。設(shè)置了消費組的多個實例,默認情況下是采用輪詢的方式消費消息,消息將按照順序依次分配給不同的實例進行消費,每個實例依次消費一個消息。通過輪詢的方式,可以實現(xiàn)基本的負載均衡,每個實例能夠均勻地消費消息,提高整體的處理能力。8.4.1
消費組(1)設(shè)置消費組在stream-consumer模塊下的application.yml文件中添加消費組的設(shè)置,添加后application.yml文件內(nèi)容如文件8-14所示。演示設(shè)置消費組后同一個服務(wù)不同消費者實例消費消息的效果。文件8-14application.yml源代碼8.4.1
消費組(2)測試效果在瀏覽器訪問http://localhost:8801/data?msg=祝融號,此時IDEA中StreamConsumerApplication和StreamConsumerApplication-02控制臺輸出信息。8.4.1
消費組(2)測試效果再次在瀏覽器訪問http://localhost:8801/data?msg=祝融號,此時IDEA中StreamConsumerApplication和StreamConsumerApplication-02控制臺輸出信息。8.4.2消息分區(qū)先定一個小目標!
先定一個小目標!掌握消息分區(qū),能夠通過SpringCloudStream消息分區(qū)實現(xiàn)相同特征的消息都被同一個消費者實例處理8.4.2消息分區(qū)通過設(shè)置消費組,可以確保在多實例環(huán)境中,同一條消息只會被一個消費者實例接收和處理。然而,在某些特殊情況下,除了要求只有一個實例處理消息外,我們還希望具有相同特征的消息都被同一個消費者實例處理。例如,對于同一ID的傳感器監(jiān)測數(shù)據(jù),我們需要確保他們被同一個實例進行統(tǒng)計、計算和分析,否則可能無法獲取到完整的數(shù)據(jù)。這時,可以對消息進行分區(qū)處理。8.4.2消息分區(qū)要實現(xiàn)Stream的消息分區(qū),開發(fā)者可以在生產(chǎn)者和消費者的配置文件中分別進行設(shè)置。1.生產(chǎn)者配置(1)ducer.partition-key-expression該屬性用于指定分區(qū)鍵表達式,以決定將消息發(fā)送到哪個分區(qū),其中output為綁定的輸出通道名稱。該屬性的取值可以是任何有效的SpEL(SpringExpressionLanguage,Spring表達式語言)表達式,用于從消息中提取分區(qū)鍵的值。8.4.2消息分區(qū)1.生產(chǎn)者配置ducer.partition-key-expression屬性取值的常見用法。①值為單個屬性的值直接使用請求中對象屬性的值作為分區(qū)鍵。spring:cloud:stream:bindings:output:producer:partition-key-expression:"payload.partitionKey"8.4.2消息分區(qū)1.生產(chǎn)者配置②值為消息頭(headers)中屬性的值spring:cloud:stream:bindings:output:producer:partition-key-expression:"headers['partitionKey']"8.4.2消息分區(qū)1.生產(chǎn)者配置③值為自定義SpEL表達式的值spring:cloud:stream:bindings:handleMessage-out-0:producer:partition-key-expression:"'myKeyPrefix'+payload.myId"8.4.2消息分區(qū)1.生產(chǎn)者配置(2)ducer.partition-count用于指定參與消息分區(qū)的消費者實例數(shù)量,其中output為綁定的輸出通道名稱。8.4.2消息分區(qū)2.消費者配置(1)spring.cloud.stream.bindings.input.consumer.partitioned:用于開啟消費者的消息分區(qū)功能,其中input為綁定的輸入通道名稱。(2)s
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 二零二五年度新型公寓托管出租合同范本3篇
- 二零二五年度智能廣告牌匾技術(shù)研發(fā)與應(yīng)用合同3篇
- 2025年度校園體育設(shè)施租賃管理服務(wù)合同3篇
- 二零二五年度服務(wù)貿(mào)易統(tǒng)計與分析合同3篇
- 二零二五年份白酒行業(yè)創(chuàng)新技術(shù)研發(fā)與應(yīng)用合同3篇
- 二零二五年度化妝品店會員折扣合同3篇
- 英語ai課程設(shè)計
- 通源課程設(shè)計哪個簡單
- 二零二五年度智慧教育項目技術(shù)服務(wù)費合同模板3篇
- 二零二五年度歷史文化名城保護建設(shè)項目合同補充條款3篇
- DZ∕T 0388-2021 礦區(qū)地下水監(jiān)測規(guī)范
- 計算機網(wǎng)絡(luò)信息安全理論與實踐教程
- 2024年重慶市學(xué)業(yè)水平模擬考試地理試卷(二)
- 西師大版2023-2024學(xué)年五年級數(shù)學(xué)上冊期末測試卷含答案
- 2024年浙江省寧波寧??h事業(yè)單位公開招聘85人歷年公開引進高層次人才和急需緊缺人才筆試參考題庫(共500題)答案詳解版
- 校區(qū)熱水供水系統(tǒng)維護服務(wù)第冊維保服務(wù)方案
- (2024年)Maya三維建模教案
- 國開電大本科《理工英語4》機考真題(第六套)
- 公共資源交易培訓(xùn)課件
- 2024年二級造價師題庫(鞏固)
- 業(yè)主與物業(yè)公司調(diào)解協(xié)議書
評論
0/150
提交評論