zookeeper源碼分析和運維總結(jié)_第1頁
zookeeper源碼分析和運維總結(jié)_第2頁
zookeeper源碼分析和運維總結(jié)_第3頁
zookeeper源碼分析和運維總結(jié)_第4頁
zookeeper源碼分析和運維總結(jié)_第5頁
已閱讀5頁,還剩11頁未讀 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

publicpublicstaticvoidmain(String[]args)QuorumPeerMainmain=newtrytry}catch(IllegalArgumentExceptione)LOG.error("Invalidarguments,exitingabnormally",}catch(ConfigExceptione)LOG.error("Invalidconfig,exitingabnormally",e);System.err.println("Invalidconfig,exitingabnormally");}catch(Exceptione)LOG.error("Unexpectedexception,exitingabnormally",e);}LOG.info("Exitingnormally");}protectedprotectedvoidinitializeAndRun(String[]args)throwsConfigException,IOException QuorumPeerConfigconfig=newQuorumPeerConfig();if(args.length==1){}DatadirCleanupManagerpurgeMgr=new.getDataDir(),config.getDataLogDir(),if(args.length==1&&config.servers.size()>}}elseLOG.warn("Eithernoconfigornoquorumdefinedinconfig,running+"instandalone//thereisonlyserverinthequorum--runas}}publicpublicvoidrunFromConfig(QuorumPeerConfigconfig)throws{try}catch(JMExceptione)LOG.warn("Unabletoregisterlog4jJMXcontrol",}LOG.info("StartingquorumtryxnFactorycnxnFactory=列quorumPeer=new quorumPeer.setTxnFactory(newnewFile(config.getDataDir()))); }catch(InterruptedExceptione)//warn,butgenerallythisisokLOG.warn("QuorumPeerinterrupted",e);}}publicsynchronizedvoid super.start();//這才真正調(diào)用線程的start方法也就會執(zhí)行run} publicpublicvoidrun()while{trysynchronized(this){selected=}ArrayList<SelectionKey>selectedList=newArrayList<SelectionKey>(selected);for(SelectionKeyk:selectedList){if((k.readyOps()&SelectionKey.OP_ACCEPT)!={SocketChannelsc=((ServerSocketChannel)InetAddressia=sc.socket().getInetAddress();intcnxncount= if xns>0&&cnxncount LOG.warn("Toomanyconnectionsfrom"++"-maxis" xns}elseLOG.info("Acceptedsocketconnectionfrom SelectionKeysk=sc.register(selector,xncnxn=createConnection(sc,}}elseif((k.readyOps()&(SelectionKey.OP_READ|SelectionKey.OP_WRITE))!=0){//處理讀和寫 xnc= xn) }elseif(LOG.isDebugEnabled())LOG.debug("Unexpectedopsinselect+}}}}catch(RuntimeExceptione)LOG.warn("Ignoringunexpectedruntimeexception",}catch(Exceptione)LOG.warn("Ignoringexception",}} xnfactoryexitedrun}synchronizedsynchronizedpublicvoid{trycurrentVote=newVote(myid,getLastLoggedZxid(),}catch(IOExceptione)RuntimeExceptionre=newRuntimeException(e.getMessage());throw}for(QuorumServerp:{if(p.id==myid){myQuorumAddr=p.addr;}}if(myQuorumAddr==null)thrownewRuntimeException("Myid"+myid+"notinthepeer}if(electionType=={tryudpSocket=newDatagramSocket(myQuorumAddr.getPort());responder=newResponderThread();}catch(SocketExceptione)throwthrownew}}}privateprivatevoidstarter(QuorumPeer{this.self=self;proposedLeader=-1;proposedZxid=-xManagersendqueue=newrecvqueue=newLinkedBlockingQueue<Notification>();this.messenger=newMessenger(manager);}FastLeaderElection中的lookForLeaderrunleader客戶端publicpublicZooKeeper(StringconnectString,intsessionTimeout,Watcherwatcher,booleancanBeReadOnly)throws{LOG.info("Initiatingconnection,connectString="++"sessionTimeout="+sessionTimeout+"watcher="+watcher); anager.defaultWatcher=watcher;ConnectStringParserconnectStringParser=newConnectStringParser(connectString);HostProviderhostProvider=s());//拿到ip端cnxn=new hostProvider,sessionTimeout,this,watanager,nSocket(),canBeReadOnly);//創(chuàng) xn對cnxn.start();//非thread線程啟} xn(StringchrootPath,HostProviderhostProvider,sessionTimeout,ZooKeeper anagerwatcher, longsessionId,byte[]sessionPasswd,booleancanBeReadOnly)this.zooKeeper=zooKeeper;this.watcher=watcher;this.sessionId=sessionId;this.sessionPasswd=sessionPasswd;this.sessionTimeout=sessionTimeout;this.hostProvider=hostProvider;this.chrootPath=connectTimeout=sessionTimeout/readTimeout=sessionTimeout*2/3;readOnly=canBeReadOnly;sendThread=new eventThread=new} publicpublicvoid{;} publicvoid intlonglast RwServer=System.currentTimeMillis();finalintMAX_SEND_ _INTERVAL=10000;//10secondswhile(state.isAlive()){tryif {}catch(InterruptedExceptione)LOG.warn("Unexpectedexception",}}//don'tre-establishconnectionifweareif(closing||{} }if(state.isConnected())//determinewhetherweneedtosendanAuthFailedif !={booleansendAuthEvent=if(zooKeeperSasl .SaslState.INITIAL){try }catch(SaslExceptione)LOG.error("SASLauthenticationwithZookeeperQuorummember"+

state=States.AUTH_FAILED;sendAuthEvent=true;}}KeeperStateauthState=zooKeeperSasl if(authState!=null){if(authState==KeeperState.AuthFailed)//AnauthenticationerroroccurredduringauthenticationwithZookeeper

state=States.AUTH_FAILED;sendAuthEvent=true;}elseif(authState=={sendAuthEvent=}}}if(sendAuthEvent=={eventThread.queueEvent(newWatchedEvent(}}to=readTimeout }elseto=connectTimeout }if(to<={StringwarnInfo= sessiontimedout,havenotheardfromserverin ++"forsessionid+thrownew}if(state.isConnected())//1000(1second)istopreventraceconditionmissingtosendthe//alsomakesurenottosendtoo swhenreadTimeoutisinttimeToNext =readTimeout/2- xnSocket.getIdleSend()- xnSocket.getIdleSend()>1000)?1000:0);//sendarequesteithertimeisdueornopacketsentout if(timeToNext <=0|| xnSocket.getIdleSend()> _INTERVAL){ }elseif <{to= }}}//Ifweareinread-onlymode,seekforread/writeif(state=={longnow=intidle RwServer=(int)(now-last if(idle RwServer>= RwTimeout){ RwServer=now; RwServer=0;RwTimeout RwTimeout,ma }to= RwTimeout- }xnSocket.doTransport(to,pendingQueue,

}catch(Throwable{if(closing)if(LOG.isDebugEnabled())//closingsothisisLOG.debug("Anexceptionwasthrownwhileclosingsendthreadfor++":"+}}else//thisisugly,youhaveabetterwayspeakif(einstanceofSessionExpiredException)LOG.info(e.getMessage()+",closingsocket}elseif(einstanceofSessionTimeoutException)LOG.info(e.getMessage()+}elseif(einstanceofEndOfStreamException)LOG.info(e.getMessage()+}elseif(einstanceofRWServerFoundException)}else"Session++"forserver +",unexpected+RETRY_CONN_MSG,}if{eventThread.queueEvent(newWatchedEvent(} }}}if(state.isAlive()){eventThread.queueEvent(newEvent.KeeperState.Disconnected,}ZooTrace.logTraceMessage(LOG,ZooTrace.getTextTraceLevel(),"SendThreadexitedloopforsession:0x"+} voiddoTransport(intwaitTimeOut,List<Packet>pendingQueue,LinkedList<Packet>xnthrowsIOException,Set<SelectionKey>selected;synchronized(this){selected=}//nonblocking,sotimeiseffective

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論