ZMQ通信機制-BCC(強婕)_第1頁
ZMQ通信機制-BCC(強婕)_第2頁
ZMQ通信機制-BCC(強婕)_第3頁
ZMQ通信機制-BCC(強婕)_第4頁
ZMQ通信機制-BCC(強婕)_第5頁
已閱讀5頁,還剩6頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

1、ZeroMQ使用說明1.ZeroMQ介紹ZMQ(0MQ、ZeroMQ,OMQ)套嵌入式的網(wǎng)絡看起來像是一鏈接庫,但工作起來更像是一個并發(fā)式的框架,介于應用層和傳輸層之間(按照TCP/IP劃分)。它提供的套接字可以在多種協(xié)議中傳輸消息,如線程間、進程間、TCP、廣播等。你可以使用套接字構建多對多的連接模式,如扇出、發(fā)布-訂閱、任務分發(fā)、請求-應答等。一開始ZMQ代表零中間件、零延遲,同時,它又有了新的含義:零管理、零成本、零浪費。總的來說,零表示最小、最簡,這是貫穿于該項目的哲理。傳統(tǒng)的TCPSocket的連接是1對1的,可以認為“1個Socket=1個連接”,每一個線程獨立的維護一個Socke

2、t。但是ZMQ摒棄了這種1對1的模式,ZMQ的Socket可以很輕松的實現(xiàn)1對N,N對1和N對N的連接模式,一個ZMQ的Socket可以自動的維護一組連接,用戶無法操作這些連接,用戶只能操作套接字,而不是連接本身,所以說ZMQ的世界里,連接是私有的。2.基礎知識:套接字APIzmq建立在標準套接字api之上,用IMessageSocket類封裝,其生命周期主要包含四個部分:創(chuàng)建和銷毀套接字:IMessageSocket:IMessageSocket-zmq_socket(void*,inttype),IMessageSocket:close()-zmq_close()配置和讀取套接字選項:zm

3、q_setsockopt(void*s,intoption,constvoid*optval,size_toptvallen),zmq_getsockopt(void*s,intoption,void*optval,size_t*optvallen)為套接字建立連接:IMessageSocket:Bind-zmq_bind(void*s,constchar*addr),IMessageSocket:Connect-zmq_connect(void*s,constchar*addr)發(fā)送和接收消息:zmq_send(void*s,constvoid*buf,size_tlen,intflags)

4、,zmq_recv(void*s,void*buf,size_tlen,intflags)3.ZMQ常用模式1)發(fā)布-訂閱(PUB-SUB:1-N)PublisherPUBsueSiabscfiberCMmectbmdlCflllllflCtCMneGtSUBSubscriber2)請求-應答(REQ-REP)對于Request類型的socket,它是同步的,它一個時刻只能對一個連接進行操作,在一個連接上發(fā)送了數(shù)據(jù)之后,必須接著在這個連接上執(zhí)行recv,也就是send與recv必須同時匹配出現(xiàn)。因此ZMQ在使用一個Socket處理請求的過程中,會阻塞同一個端口的其他請求。Response類型的

5、socket也是同步的,與Request的意思差不多,不過順序是先recv再send。3)獨立對模式(PAIR-PAIR):線程間l-t0-1隊列的實現(xiàn),采用了lockfree實現(xiàn),所以速度很快。對于特定的線程PAIR是最好的選擇。4)管道模式(PUSH-PULL):這種模式主要用于發(fā)布數(shù)據(jù)到由管道排列的節(jié)點上面,數(shù)據(jù)總是沿著管道流動。每個管道階段連接了至少一個節(jié)點。push會負載均衡的將消息分發(fā)到pull端,worker可以隨時自由加入。push端無法recv,pull無法send。PULLSXrtk信封機制(ROUTER-DEALER):Replyaddress4EnvelopeEmpty

6、messagepart*Oaia信封機制的根本作用是讓ROUTER知道如何將消息遞送給正確的應答目標。*從ROUTER中讀取一條消息時,0MQ會包上一層信封,上面注明了消息的來源。Frame1Frame2Frarne3*向ROUTER寫入一條消息時(包含信封),0MQ會將信封拆開,并將消息遞送給相應的對象。作用:實現(xiàn)多線程的請求-應答。4.消息收發(fā)接口發(fā)送進程間請求消息:發(fā)送消息體:SccMqContext_T:instance()-GetContext()-SendMessage/GetContext()-SendSyncMessage組幀:BccDealerHander:OnDealMes

7、sage發(fā)送數(shù)據(jù)包:SccMqContext_T:instance()-GetMessagSender()-SendMessage()接收進程間請求消息:接收:SccMqContext_T:instance()-GetMessagQueue()-OnMessage()處理:SccMqContext_T:instance()-bccAsynHandler.onDealAsynMsg(pAppFrame)發(fā)送線程間請求消息:發(fā)送:SCCManager_T:instance()-GetDispatcher()-PushMsg接收處理線程間請求消息:接收:SccWorkTask:OnMessage處理

8、:session_op發(fā)送響應消息:發(fā)送:MsgDealWorker:SendMsg2Dispatcher轉發(fā):SccMqContext_T:instance()-GetContext()-SendAsynResponse接收并處理響應消息:分發(fā):DealerTask:HandleRpnMsg接收:AsynReqWorker:RecvReponse處理:AsynMsgCallbackltf:OnRecvResponse()發(fā)送訂閱消息:SccMqContext_T:instance()-GetContext()-Notify訂閱消息處理:BccSubHandler:OnMessage()超時處

9、理:AsynMsgCallbackltf:OnTimeout()5.消息通信流程1)進程間通信消息發(fā)送SccMqContext:StartNotify()訂閱消息SccMqContext_T:instance()-GetContext()-SendMessage/SendSyncMessage進程間通信請求消息ptrReq-、endMessageGetContext()-NotifyclientPairSocket-SendPtdINDINGASYNREQTiyEOUTSTRtimeoutSocket_接收超時任務請求,并添加至timeoutReqMapasynReqWorkerSocket_

10、發(fā)送超時任務請求,TimeoutApynRequestPUSH起任務定時器ptrPublisher-Notify、PAIRTAIR用serverPairSocket接收請求消息AsynReqWorker:RecvPairy器時./定起ptrReq_-pTimeoutTask_-HandlerTimeout()/時1同:internalSocket-SendUBAsynReqWorker:RegAsynRemoteService注冊一個DEALER保存至到AsynReqMap、用AsynSocketInfo-pSocket轉發(fā)請求;并保存至msgMap_遍歷timeoutReqMap_判斷超時后

11、由pushWorkSockyt發(fā)送超吋消息PUSHBINDING_ASYNPULLREQ_WORKER_TIMEOUT_STRtimeoutRecvSocket接收消息msgMap_中找sequence寸應的AsynMsgCallbackItf消息接收SUBConsumerRecvTask*pRecvTask_-pSocke接收轉BccSubHandler:OnMessag處理(通過RegMsgConsume綁定IMessageConsume,bccSubHandler)-DEALERROUTERcallback-OnTimeoutpDealTask_-dealerSocket接收消息轉Bcc

12、DealerHander:OnDealMessag處理通過ptrDealer_-RegMsgDeale綁定msgid,bccDea1erHand1erpQueue-OPULL)nMessage()g.pSender-SendMessage()發(fā)送至主線程PUSHbccAsynHandler.onDealAsynMsg在OnAsynMsgMap找消息處理函數(shù)pDealTask_-dealerSocket接收消息Task:異步/同步消息處理過程DealerRecvTDealerTask::HandleCallback.同步消息DealerTask:ReponseMsg異步消息DealerTask:

13、DealerMsgmsgDealerMapj找pRpnHandlermsgHandleMapj找pRpnHandler:起響應定時器reqMsgSocket發(fā)送給piDeabr-AplRpuL、timeoutRegSocket發(fā)送時)響應超時請求和PUSHPULL;rpnSocket_收ReponseTask:Rec處理*I.BiNDING_DPULLI異步:pRpnHandler-OnDealMessage!同步:pRpnHandlerpOnReponseMessage:同步消息Iu:IMessageDealen:SendAsynResponse響應超時請求TimeoutAsynReques

14、tPUSH)LE_TIMEOUTreponseTimeoutSocket接收響應超時任務請求,并添口至pnTimeoutMap停定時器從rpnTimeoutMapf刪除2)線程間通信消息發(fā)送消息接收SccMqContext_T:instance()-GetContext()-SendAsynResponseSCCManager_T:instance()-GetDispatcher()-PushMsg定Workerid)線程間通信(指MsgDispatcher-OnMessage(20)用pPushSockeL獲取待發(fā)送的響應消息clientPairSocket-SendPtr1PAIRrMgD

15、ispatchePAIRrOnMessaoe1rMsgDispatcher:RecvPair用serverPairSocket_接收并由pPushSocket轉發(fā)出去ROUTERROUTERSCCManager_T:instance()-DEALER例如:MsgDealWorker的子類::SccWorkTask:OnMessage(調用:!session_op中消息對應的處理函數(shù))GetDispatcher()SccMqContext:ReceiveZmqLoopDealer在MsgDispatcher的taskMap_中找到workerid對應MsgDealWorker,用pDealSoc

16、ket_接收消息,并調OnMessgeO處理用pDealSocket發(fā)送消息若需要回處理結果響應I處理完成后發(fā)送響應結果MIsgDealWorker:SendMsg2Dispatcher.回結果響應3)響應機制響應發(fā)送III:ptrDealer_-SendAsynResponse(sequence,msgbody)asynRpnSender2AsynRpnServerSocket_發(fā)送響應消息,c;/ubp.mq.dealer.asyn.rpn_*PUSHPULLptrDealer_-pDealTask_-asynRpnServerSocket_接收響應消息pDealTask-HandleRp

17、nMsg停響應定時器pDealTask-dealerSocket發(fā)送響應ROUTERDEALERptrReq_-workTask_-asynReqMap_AsynSocketInfo-pSocket_接收消息AsynReqWorker:RecvReponse在msgMap_中找到原始請求,并調用對應handler處理響應AsynMsgCallbackItf:OnRecvResponse(同步消息:AsynMsgRpnHandler:OnRecvResponse,把結果冋灌到promise)6.關鍵類SccMqContextclassSccMqContext:privateNonCopyable

18、,publicubp:platform:thread:WorkerRequestpublic:BccAsynMsgHandlerbccAsynHandler;/處理pMsgQueue里的消息(OnAsynMsgMap)private:IMessageContext*contextPtr_;/綁定上下文BccSubHandlerbccSubHandler_;消息訂閱handler,SUB注冊到contextPtr_-consumerList_(msgTopic_,pHandler_)BccSubHandlerbccSubStatusHandler_;狀態(tài)訂閱handler,SUBBccDeale

19、rHanderbccDealerHandler_;請求消息處理handler,注冊到contextPtr_-ptrDealer_-MsgDealerMapmsgId,IMessageDealerHandler*IMessageQueue*pMsgQueue_;/PULL:接收隊列IMessageSender*pMsgSender_;/PUSH:發(fā)送隊列;IMessageContextlmpl:classMQ_IMPORT_EXPORTIMessageContextImpl:private:void*zmqContext_;/ZMQcontextstd:auto_ptrptrReq_;/std:

20、auto_ptrptrDealer_;/std:auto_ptrptrPublisher_;/訂閱消息發(fā)布者,PUBMsgConsumerListconsumerList_;DefaultReponseHander*pDftRpnHandler_;響應消息處理handler注冊至UcontextPtr_-ptrDealer_-msgHandleMap_msgId,IMessageReponseHandler*;IMessageContextlmpl-ptrReq_:std:auto_ptr-ptrDealer_:std:auto_ptr-ptrPublisher_:std:auto_ptr-c

21、onsumerList_:MsgConsumerList-pDftRpnHandler:DefaultReponseHander+SendSyncMessage(destSvcKey:std:string,message:【Message,timeout:ACE_UINT32,retry:ACE_UINT32):IMessage+SendMessage(destSvcKey:std:string,msgbody:IMessage,cb:AsynMsgCallbackltf,timeout:ACE_UINT32):ACE_INT32+SendAsynResponse(sequenee:ACE_U

22、INT64,msgbody:IMessage):ACE_INT32+RegMsgReponser(handler:IMessageReponseHandler,msgIds:char):ACE_INT32+RegMsgDealer(handler:IMessageDealerHandler,msgIds:char):ACE_INT32+Notify(header:IMessageHeader,msgBody:IMessage,topic:std:string,filter:std:string):ACE_INT32+RegMsgConsumer(destSvcKey:std:string,to

23、pic:std:string,cb:IMessageConsumerHandler,filter:std:string):ACE_INT32,IMessageAsynRequestIMessageReponseIMessageDealerIMessagePublisherIMessageAsynRequest:AsynReqWorkerIMessageAsynRequest-mqContext_:IMessageContextlmpl-clientPairSocket_:std:auto_ptr-serverPairSocket_:std:auto_ptr-workTask_:AsynReqW

24、orker-pTimeoutTask:RequestTimeoutTaskge+SendSyncMessage(destSvcKey:std:string,header:IMessageHeader,message:【Message,timeout:ACE_UINT32,tryTimes:ACE_UINT32):IMessa+SendMessage(promise:IMessagePromise,destSvcKey:std:string,message:IMessage,timeout:ACE_UINT32):ACE_INT32AsynReqWorker-context_:IMessageC

25、ontextImpl-pAsynReq_:IMessageAsynRequest-msgMap_:MsgCallbackMap-socketList_:std:vector-asynReqMap_:AsynReqMap-timeoutRecvSocket_:std:auto_ptr-asynReqWorkerSocket_:std:auto_ptr-asynpn_wait_timeout_:ACE_UINT64-maxidletime:ACEUINT64+Run(:void):ACE_INT32#FindMsgCallback(seq:ACE_UINT64):AsynMsgCallbackIt

26、f#RefreshPollSockets():zmq_pollitem_t#RecvReponse(:IMessageSocket):void#RecvPair(:IMessageSocket):void#RecvTimeout(:IMessageSocket):void#RegAsynRemoteService(endpoint:std:string,destSvcKey:std:string):AsynSocketInfo#DealIdleSocket():voidIMessageDealer:DealerTaskIMessageDealer-mqContext_:IMessageCont

27、extImpl-pMsgTimeoutTask_:TimeoutMsgTask-pDealTask_:DealerTask-msgDealerMap_:MsgDealerMap-msgHandleMap_:SyncMsgHandleMap-asynRpnSender2AsynRpnServerSocket:std:autoptr+SendAsynResponse(sequenee:ACE_UINT64/msgbody:IMessage):ACE_INT32+RegMsgDealer(handler:IMessageDealerHandler,msgIds:char):ACE_INT32+RegMsgReponser(handler:IMessageReponseHandler,msgIds:char):ACE_INT32DealerTask-pDealer_:IMessageDealer-dealerSocket_:std:auto_ptr-asynRpnServerSocket_:std:auto_ptr-dealTaskRegMsg2TimeoutSocket_:std:auto_ptr-timeout2DealTaskServerSocket_:std:auto_ptr-ptrRpn_:st

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論