開源分布式數(shù)據(jù)庫PolarDB-X源碼解讀_第1頁
開源分布式數(shù)據(jù)庫PolarDB-X源碼解讀_第2頁
開源分布式數(shù)據(jù)庫PolarDB-X源碼解讀_第3頁
開源分布式數(shù)據(jù)庫PolarDB-X源碼解讀_第4頁
開源分布式數(shù)據(jù)庫PolarDB-X源碼解讀_第5頁
已閱讀5頁,還剩332頁未讀 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

): 5 13): 22 37 52 68): 74 97): 107): 123 133 151 168): 180):):特性。):CN是一個多模塊的Java項目,模塊之間通過接口暴露服務(wù),模塊關(guān)系記錄在pom.xml中,通過mvndependency:tre):包–):–– ––):––– 碼碼e 碼):g1.整體了解):?協(xié)議解析是將協(xié)議數(shù)據(jù)對象分發(fā)到具體執(zhí)行邏輯的過程,入口在物理計劃優(yōu)化五個步驟,優(yōu)化產(chǎn)出物理執(zhí)行計劃,傳入執(zhí)行器。優(yōu)化器使用了執(zhí)行器接收到物理執(zhí)行計劃后,首先根據(jù)計劃類型確定執(zhí)行模式,包括):2.深入了解3.小結(jié)CNServer層的代碼主要包含在polardbx-server模塊中,main函數(shù)位于TddlLauncher。1.CobarServer對象的創(chuàng)建2.參數(shù)加載->CobarConfig.initCobarConfig()->SStringconf=System.getProperty("server.conf","classpath:perties");3.從MetaDB讀取元數(shù)據(jù),并初始化實例級的系統(tǒng)組件TddlLauncher.main()->CobarServer.new()->CobarConTddlLauncher.main()->CobarServer.new()->CobarConConfig.initCobarConfig()->ServerLoader.load()->SeMetaDbDataSource.initMetaDbDataSoMetaDbDataSource.initMetaDbDataSoSchemaChangeManager.geSchemaChangeManager.gepolardbx-gms\src\main\resources\ddl\中保存了系統(tǒng)表的表結(jié)構(gòu),并且使用polardbx.meta.table.d1.t1polardbx.meta.table.d1.t1并且會注冊對應(yīng)的listener,這樣當(dāng)inst_config表發(fā)生變化的時候,會回調(diào)),7)StorageHaManager4.創(chuàng)建線程池5.CobarServer.init路徑:路徑:TddlLauncher.main()->2)GmsClusterLoader.loadPolarDbXCluster3)warmup6.網(wǎng)絡(luò)層的初始化processorsprocessors=newNIoProcessor[system.getProcessoprocessors[i]=newNIoProces}):publicpublicNIOAcceptor(Stringname,intFrontendConnectionFactoryfactory,booleathis.serverChannel=ServerSocketthis.serverChannel.}同時,NIOAcceptor也是一個線程,會處理連接建立的請求。當(dāng)連接建立后,):7.結(jié)語):):):能。):):?[模塊]polardbx-cdc-assemble?[模塊]polardbx-cdc-canal):?[模塊]polardbx-cdc-daemon):?[模塊]polardbx-cdc-dumper?[模塊]polardbx-cdc-format?[模塊]polardbx-cdc-meta整形的基礎(chǔ)支撐。此外,該模塊還維護(hù)了CDC系統(tǒng)庫表的Sql腳本定義):?[模塊]polardbx-cdc-storage?[模塊]polardbx-cdc-task):?[模塊]polardbx-cdc-transfer?binlog_system_config?binlog_task_config?binlog_node_info?binlog_dumper_info?binlog_task_info):?binlog_logic_meta_history信息。?binlog_phy_ddl_history?binlog_oss_record?binlog_polarx_command化等。?binlog_schedule_history?binlog_storage_history):?binlog_env_config_history?binlog_schema_historypolardbx.instance.idmem_sizemetaDb_urlmetaDb_usernamemetaDbPasswdpolarx_urlpolarx_usernamepolarx_passworddnPasswordKey):createdatabasetransfer_test;CREATETABLE`transfer_test`.`accounts`(`id`int(11)NOTNULL,`balance`int(11)NOTNULL,`gmt_created`datetimenotnull,PRIMARYKEY(`id`))ENGINE=InnoDBDEFAULTCHARSET=utf8dbpartitionbyhash(`id`)tbpartitionbyhash(`id`)tbpartitions2;INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(1,100,now());INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(2,100,now());INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(3,100,now());INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(4,100,now());INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(5,100,now());INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(6,100,now());INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(7,100,now());):INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(8,100,now());INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(9,100,now());INSERTINTO`transfer_test`.`accounts`(`id`,`balance`,`gmt_created`)VALUES(10,100,now());dockerrun-itd--namemysql_3309-p3309:3306-eMYSQL_ROOT_PASSWORD=rootmysql登錄dokcer實例:dockerexec-itmysql_3309bash編輯/etc/mysql/f,a.增加如下配置來關(guān)閉Gtid(polardbx-cdc全局Binlog暫不支持Gtid)gtid_mode=OFFenforce_gtid_consistency=OFFb.更改serverid,避免與主庫重復(fù)server_id=2重啟docker實例:dockerrestartmysql_3309stopslave;resetslave;CHANGEMASTERTOMASTER_HOST='xxx',MASTER_USER='xxx',MASTER_PASSWORD='xxx',MASTER_PORT=xxx,MASTER_LOG_FILE='binlog.000001',MASTER_LOG_POS=4,MASTER_CONNECT_RETRY=100;startslave;):測試程序之后,可以用下面的SQL,驗證兩邊的數(shù)據(jù)是否完全一致SELECTISNULL(balance))))ASUNSIGNED))ASchecksumFROMaccou):):):節(jié)。過,這部分邏輯在NIOAcceptor的構(gòu)造函數(shù)中,每個CN進(jìn)程只啟動一個):):封裝了優(yōu)化執(zhí)行部分的邏輯,其中Planner#plan為優(yōu)化器入口,):):四、Parser),SELECT*FROMt1WHEREid>1;SELECT(keyword),*(identifier),FROM(keyword),t1(identifier),WHERE(keword),id(identifier),>(gt),1(literal_int),;(semicolon)):):劃管理。六、Validator):些對AST改寫的內(nèi)容,主要用于屏蔽相同語義的不同語法結(jié)構(gòu)。然后),):八、PlanEnumerator替換原來的子樹,而是將生成的新的執(zhí)行計劃保存在RelSubset中,后續(xù)從):十、PostPlannerSELECT*FROMrJOINtON=WHEREr.id=0ANDt.id=1;):ExecutorHelper#execute中根據(jù)優(yōu)化器確定的執(zhí)行模式選擇執(zhí)行鏈路。以下以Cursor鏈路中,首先根據(jù)執(zhí)行計中的算子找到對應(yīng)的handler,代碼位置在):):):):#一個簡單的PolarDB-X中的分庫分表sbtestCREATETABLE`sbtest`(`id`int(11)NOTNULLAUTO_INCREMENT,`k`int(11)NOTNULLDEFAULT'0',`c`char(120)NOTNULLDEFAULT'',`pad`char(60)NOTNULLDEFAULT'',PRIMARYKEY(`id`))dbpartitionbyhash(`id`)tbpartitionbyhash(`id`)tbpartitions2;insertintosbtest(id)values(100);values(100);PolarDB-X接收到該字符串語句后,開始執(zhí)行該SQL,可見):ExecutionPlanplan=Planner.getInstance().plan(sql,executionContext);):SqlNodevalidatedNode=converter.validate(ast);):...finalSqlValidatorNamespacetargetNamespace=getNamespace(insert);validateNamespace(targetNamespace,unknownType);...finalSqlNodesource=insert.getSource();if(sourceinstanceofSqlSelect){finalSqlSelectsqlSelect=(SqlSelect)source;validateSelect(sqlSelect,targetRowType);}else{finalSqlValidatorScopescope=scopes.get(source);validateQuery(source,scope,targetRowType);}...RelNoderelNode=converter.toRel(validatedNode,plannerContext);):...finalSqlToRelConvertersqlToRelConverter=newTddlSqlToRelConverter(...);RelRootroot=sqlToRelConverter.convertQuery(validatedNode,false,true);...RelNoderelNode=super.convertInsert(call);if(relNodeinstanceofTableModify){...}):ToDrdsRelVisitortoDrdsRelVisitor=newToDrdsRelVisitor(validatedNode,plannerContext);RelNodedrdsRelNode=relNode.accept(toDrdsRelVisitor);if((otherinstanceofLogicalTableModify)){...if(operation==TableModify.operation.INSERT||...){LogicalInsertlogicalInsert=newLogicalInsert(modify);...}}):):privateRelNodesqlRewriteAndPlanEnumerate(RelNodeinput,PlannerContextplannerContext){CalcitePlanoptimizerTrace.getoptimizerTracer().get().addSnapshot("Start",input,plannerContext);//RBo優(yōu)化RelNodelogicaloutput=optimizeBySqlWriter(input,plannerContext);CalcitePlanoptimizerTrace.getoptimizerTracer().get().addSnapshot("PlanEnumerate",logicaloutput,plannerContext);//CBo優(yōu)化RelNodebestPlan=optimizeByPlanEnumerator(logicaloutput,plannerContext);//finallyweshouldcleartheplannertoreleasememorybestPlan.getCluster().getPlanner().clear();bestPlan.getCluster().invalidateMetadataQuery();returnbestPlan;}publicenumExecutionStrategy{/***Foreachrow,existsonlyonetargetpartition.*Pushdownoriginstatement,withfunctioncallnotpushable(likesequencecall)replacedbyRexCallParam.*Typicalforsingletableandpartitionedtablewithoutgsi.*/):PUSHDOWN,/***Foreachrow,mightexistsmorethanonetargetpartition.*Pushdownoriginstatement,withnondeterministicfunctioncallreplacedbyRexCallParam.*Typicalforbroadcasttable.*/DETERMINISTIC_PUSHDOWN,/***Foreachrow,mightexistsmorethanonetargetpartition,anddataindifferenttargetpartitionsmightbedifferent.*Selectthenexecute,withallfunctioncallreplacedbyRexCallParam.*Typicalfortablewithgsiortablearedoingscaleout.*/LOGICAL;};BuildFinalPlanVisitorvisitor=newBuildFinalPlanVisitor(executionPlan.getAst(),plannerContext);):executionPlan=executionPlan.copy(executionPlan.getPlan().accept(visitor));ResultCursorresultCursor=executor.execute(plan,executionContext);):...MyPhyTableModifyCursormodifyCursor=(MyPhyTableModifyCursor)repo.getCursorFactory().repoCursor(executionContext,logicalPlan);...affectRows=modifyCursor.batchUpdate();...publicint[]batchUpdate(){try{returnhandler.executeUpdate(this.plan);}catch(SQLExceptione){throwGeneralUtil.nestedException(e);}}):publicCursorhandle(RelNodelogicalPlan,ExecutionContextexecutionContext){...LogicalInsertlogicalInsert=(LogicalInsert)logicalPlan;...if(!logicalInsert.isSourceSelect()){affectRows=doExecute(logicalInsert,executionContext,handlerParams);}else{affectRows=selectForInsert(logicalInsert,executionContext,handlerParams);):}...}會根據(jù)來源是否是Select語句選擇不同的執(zhí)行方式,具體執(zhí)行過程在...finalInsertWriterprimaryWriter=logicalInsert.getPrimaryInsertWriter();List<RelNode>inputs=primaryWriter.getInput(executionContext);...//如果有GSI,生成GSI表的物理執(zhí)行計劃finalList<InsertWriter>gsiWriters=logicalInsert.getGsiInsertWriters();gsiWriters.stream().map(gsiWriter->gsiWriter.getInput(executionContext))...;...finalinttotalAffectRows=executePhysicalPlan(allPhyPlan,executionContext,schemaName,isBroadcast);...):其中dbIndex是物理庫名,tableNames是物理表名,param保存了這條):publicint[]executeUpdate(BaseQueryoperationphyTableModify)throwsSQLException{...Pair<String,Map<Integer,ParameterContext>>dbIndexAndParam=phyTableModify.getDbIndexAndParam(executionContext.getParams()==null?null:executionContext.getParams().getCurrentParameter(),executionContext);...connection=getPhyConnection(transaction,rw,groupName);...//根據(jù)參數(shù)組成字符串SQLStringsql=buildSql(sqlAndParam.sql,executionContext);...//根據(jù)連接創(chuàng)建prepareStatementps=prepareStatement(sql,connection,executionContext,isInsert,false);...//設(shè)置參數(shù)ParameterMethod.setParameters(ps,sqlAndParam.param);...//執(zhí)行affectRow=((PreparedStatement)ps).executeUpdate();...}): 死鎖檢測功能屬于事務(wù)模塊的功能,死鎖檢測任務(wù)則掛載在事務(wù)管理器if(!hasLeadership()){return;}//GetallglobaltransactioninformationfinalTrxLookupSetlookupSet=fetchTransInfo();finallongbeforeTimeMillis=System.currentTimeMillis()-1000L;finallongbeforeTxid=IdGenerator.assembleId(beforeTimeMillis,0,0);for(ITransactiontran:transactions){if(!tran.isDistributed()){continue;}//Dodeadlockdetectiononlyfortransactionsthattakelongerthan1s.if(tran.getId()>=beforeTxid){continue;}//Getinformationfromthistran.......}//Getallgroupdatasources,andgroupbyDN'sID(host:port)finalMap<String,List<TGroupDataSource>>instId2GroupList=ExecUtils.getInstId2GroupList(allSchemas);finalDiGraph<TrxLookupSet.Transaction>graph=newDiGraph<>();for(List<TGroupDataSource>groupDataSources:instId2GroupList.values()){if(CollectionUtils.isNotEmpty(groupDataSources)){//SincealldatasourcesareinthesameDN,anydatasourceisok.finalTGroupDataSourcegroupDataSource=groupDataSources.get(0);//GetallgroupnamesinthisDN.finalSet<String>groupNames=groupDataSources.stream().map(TGroupDataSource::getDbGroupKey).collect(Collectors.toSet());//Fetchlock-waitinformationforthisDN,//andupdatethelookupsetandthegraphwiththeinformation.fetchLockWaits(groupDataSource,groupNames,lookupSet,graph);}}graph.detect().ifPresent((cycle)->{DeadlockParser.parseGlobalDeadlock(cycle);killByFrontendConnId(cycle.get(0));});privatevoiddoCancel()throwsSQLException{//這個futureCancelErrorCode用在后面的錯誤判斷中,//死鎖導(dǎo)致的kill,錯誤碼都是ERR_TRANS_DEADLOCKfutureCancelErrorCode=this.errorCode;//kill掉所有物理連接上正在運行的SQLif(conn!=null){conn.kill();}//這里這個f是正在執(zhí)行邏輯SQL的任務(wù)Futuref=executingFuture;if(f!=null){f.cancel(true);}}碼是ERR_TRANS_DEADLOCK時,就會將當(dāng)前事務(wù)回滾掉,并給客戶端發(fā)送Deadlockfoundwhentryingtogetlock;tryrestartingtransaction的錯誤提示。//Handledeadlockerror.if(isDeadLockException(t)){//Preventthistransactionfromcommitting.this.conn.getTrx().setCrucialError(ERR_TRANS_DEADLOCK);//Rollbackthistrx.try{innerRollback();}catch(SQLExceptionexception){logger.warn("rollbackfailedwhendeadlockfound",exception);}} 實現(xiàn)的一套精簡的定制化Reactor框架。這部分代碼改進(jìn)自polardbx-sql中的碼。會話。度私有協(xié)議相關(guān)的定時任務(wù),這個就是XConnectionManager的工作了,件在XConnection這里。JDBC兼容層提供了包括DataSource、Connection、publicclassGalaxyTest{publicfinalstaticStringSERVER_IP="";publicfinalstaticintSERVER_PORT=31306;publicfinalstaticStringSERVER_USR="root";publicfinalstaticStringSERVER_PSW="root";privatefinalstaticStringDATABASE="test";staticXDataSourcedataSource=newXDataSource(SERVER_IP,SERVER_PoRT,SERVER_USR,SERVER_PSW,DATABASE,null);publicstaticXConnectiongetConn()throwsException{return(XConnection)dataSource.getConnection();}publicstaticList<List<object>>getResult(XResultresult)throwsException{returngetResult(result,false);}publicstaticList<List<object>>getResult(XResultresult,booleanstringorBytes)throwsException{finalList<PolarxResultset.ColumnMetaData>metaData=result.getMetaData();finalList<List<object>>ret=newArrayList<>();while(result.next()!=null){finalList<ByteString>data=result.current().getRow();assertmetaData.size()==data.size();finalList<object>row=newArrayList<>();for(inti=0;i<metaData.size();++i){finalPair<object,byte[]>pair=XResultUtil.resultToobject(metaData.get(i),data.get(i),true,result.getSession().getDefaultTimezone());finalobjectobj=stringorBytes?(pair.getKey()instanceofbyte[]||null==pair.getValue()?pair.getKey():newString(pair.getValue())):pair.getKey();row.add(obj);}ret.add(row);}returnret;}privatevoidshow(XResultresult)throwsException{List<PolarxResultset.ColumnMetaData>metaData=result.getMetaData();for(PolarxResultset.ColumnMetaDatameta:metaData){System.out.print(meta.getName().toStringUtf8()+"\t");}System.out.println();finalList<List<object>>objs=getResult(result);for(List<object>list:objs){for(objectobj:list){System.out.print(obj+"\t");}System.out.println();}System.out.println(""+result.getRowsAffected()+"rowsaffected.");}@Ignore@Testpublicvoidplayground()throwsException{try(XConnectionconn=getConn()){conn.setStreamMode(true);finalXResultresult=conn.execQuery("select1");show(result);}}}首先XDataSource會根據(jù)存儲的【IP,端口,用戶名】這三元組查找到在我們這個代碼的場景下,由于數(shù)據(jù)源剛新建,后臺的定時任務(wù)還沒跑過,所以),們直接調(diào)用了XConnection中的execQuery,這個函數(shù)等價于直接創(chuàng)建一個首先execQuery會記錄各種調(diào)用信息進(jìn)行相關(guān)統(tǒng)計,然后會進(jìn)入關(guān)鍵的經(jīng)過一些列的變量設(shè)置,lazyDB設(shè)置,我們會構(gòu)造一個用于發(fā)送具體請求的),結(jié)果。并緩存到rows里面,而對應(yīng)上述測試代碼中流式執(zhí)行的情況,結(jié)果。):):二、Task組件):):):):):):):privatevoidconsume(TxnMessagemessage,MessageTypeprocessType)throwsIOException,InterruptedException{...switch(processType){caseBEGIN:...break;caseDATA:...break;caseEND:...break;caseTAG:currentToken=message.getTxnTag().getTxnMergedToken();if(currentToken.getType()==TxnType.META_DDL){...}elseif(currentToken.getType()==TxnType.META_DDL_PRIVATE){...}elseif(currentToken.getType()==TxnType.META_SCALE){...}elseif(currentToken.getType()==TxnType.META_HEARTBEAT){...}elseif(currentToken.getType()==TxnType.META_CONFIG_ENV_CHANGE){...}break;default:thrownewPolardbxException("invalidmessagetypeforlogfilegenerator:"+processType);}}):): ),我們將重點關(guān)注DDL在執(zhí)行器中的執(zhí)行流程,在閱讀本文前,可預(yù)先閱讀與):?一條邏輯DDL語句在解析后進(jìn)入優(yōu)化器,僅做簡單的類型轉(zhuǎn)化后生成publicCursorhandle(RelNodelogicalPlan,ExecutionContextexecutionContext){BaseDdloperationlogicalDdlPlan=(BaseDdloperation)logicalPlan;initDdlContext(logicalDdlPlan,executionContext);//Validatetheplanfirstandthenreturnimmediatelyifneeded.booleanreturnImmediately=validatePlan(logicalDdlPlan,executionContext);):.....setPartitionDbIndexAndPhyTable(logicalDdlPlan);//BuildaspecificDDLjobbysubclassthatoverridebuildDdlJobDdlJobddlJob=returnImmediately?newTransientDdlJob()://@overridebuildDdlJob(logicalDdlPlan,executionContext);//ValidatetheDDLjobbeforerequest.//@overridevalidateJob(logicalDdlPlan,ddlJob,executionContext);//HandletheclientDDLrequestontheworkerside.handleDdlRequest(ddlJob,executionContext);.....returnbuildResultCursor(logicalDdlPlan,executionContext);}):),),##本文涉及的SQL語句####createdatabasedb1mode="auto";usedb1;createtablet1(xint,yint);##本文詳解的DDL語句altertablet1addglobalindex`g_i_y`(`y`)COVERING(`x`)partitionbyhash(`y`);):privateResultCursorexecuteQuery(ByteStringsql,ExecutionContextexecutionContext,AtomicBooleantrxPolicyModified){//Getallmetaversionbeforeoptimizationfinallong[]metaVersions=MdlContext.snapshotMetaVersions();//PlannerExecutionPlanplan=Planner.getInstance().plan(sql,executionContext);...//forrequireMDLtransactionif(requireMdl&&enableMdl){if(!isClosed()){//AcquiremetadatalockforeachstatementmodifiestabledataacquireTransactionalMdl(sql.toString(),plan,executionContext);}...//updatePlanifmetaVersionchanged,whichindicatemetaupdated}...):ResultCursorresultCursor=executor.execute(plan,executionContext);...returnresultCursor;}publicclassCreatePartitionGsiJobFactoryextendsCreateGsiJobFactory{@overrideprotectedvoidexcludeResources(Set<String>resources){super.excludeResources(resources);//metadatalockinMetaDBresources.add(concatWithDot(schemaName,primaryTableName));//db1.t1resources.add(concatWithDot(schemaName,indexTableName));//db1.g_i_y...}@overrideprotectedExecutableDdlJobdoCreate(){...):if(needOnlineSchemaChange){bringUpGsi=GsiTaskFactory.addGlobalIndexTasks(schemaName,primaryTableName,indexTableName,stayAtDeleteOnly,stayAtWriteOnly,stayAtBackFill);}...List<DdlTask>taskList=newArrayList<>();//1.validatetaskList.add(validateTask);//2.creategsitable//2.1inserttablePartitionmetaforgsitabletaskList.add(createTableAddTablesPartitionInfoMetaTask);//2.2creategsiphysicaltableCreateGsiPhyDdlTaskcreateGsiPhyDdlTask=newCreateGsiPhyDdlTask(schemaName,primaryTableName,indexTableName,physicalPlanData);taskList.add(createGsiPhyDdlTask);//2.3inserttablesmetaforgsitabletaskList.add(addTablesMetaTask);taskList.add(showTableMetaTask);//3.//3.1insertindexesmetaforprimarytabletaskList.add(addIndexMetaTask);//3.2gsistatus:CREATING->DELETE_ONLY->WRITE_ONLY->WRITE_REORG->PUBLICtaskList.addAll(bringUpGsi);//lasttableSyncTaskDdlTasktableSyncTask=newTableSyncTask(schemaName,indexTableName);taskList.add(tableSyncTask);finalExecutableDdlJob4CreatePartitionGsiresult=newExecutableDdlJob4CreatePartitionGsi();result.addSequentialTasks(taskList);....returnresult;}...}):publicclassCreateTableAddTablesMetaTaskextendsBaseGmsTask{@overridepublicvoidexecuteImpl(ConnectionmetaDbConnection,ExecutionContextexecutionContext){PhyInfoSchemaContextphyInfoSchemaContext=TableMetaChanger.buildPhyInfoSchemaContext(schemaName,):logicalTableName,dbIndex,phyTableName,sequenceBean,tablesExtRecord,partitioned,ifNotExists,sqlKind,executionContext);FailPoint.injectRandomExceptionFromHint(executionContext);FailPoint.injectRandomSuspendFromHint(executionContext);TableMetaChanger.addTableMeta(metaDbConnection,phyInfoSchemaContext);}@overridepublicvoidrollbackImpl(ConnectionmetaDbConnection,ExecutionContextexecutionContext){TableMetaChanger.removeTableMeta(metaDbConnection,schemaName,logicalTableName,false,executionContext);}@overrideprotectedvoidonRollbackSuccess(ExecutionContextexecutionContext){TableMetaChanger.afterRemovingTableMeta(schemaName,logicalTableName);}}?addTableMetaTask?TableSyncTask):executor/src/main/java/com/alibaba/polardbx/execucom.alibaba.polardbx.executor.ddl.job.task.factpublicstaticList<DdlTask>addGlobalIndexTasks(StringschemaName,StringprimaryTableName,StringindexName,booleanstayAtDeleteonly,booleanstayAtWriteonly,booleanstayAtBackFill){....DdlTaskwriteonlyTask=newGsiUpdateIndexStatusTask(schemaName,):primaryTableName,indexName,IndexStatus.DELETE_oNLY,IndexStatus.WRITE_oNLY).onExceptionTryRecoveryThenRollback();....taskList.add(deleteonlyTask);taskList.add(newTableSyncTask(schemaName,primaryTableName));....taskList.add(writeonlyTask);taskList.add(newTableSyncTask(schemaName,primaryTableName));...taskList.add(newLogicalTableBackFillTask(schemaName,primaryTableName,indexName));...taskList.add(writeReorgTask);taskList.add(newTableSyncTask(schemaName,primaryTableName));taskList.add(publicTask);taskList.add(newTableSyncTask(schemaName,primaryTableName));returntaskList;}):com.alibaba.polardbx.gms.metadb.table.IndexStatus//optimizeLogicalInsertRule.javaprivateLogicalInserthandlePushdown(LogicalInsertorigin,booleandeterministicPushdown,ExecutionContextec){...//otherwritersfinalList<InsertWriter>gsiInsertWriters=newArrayList<>();IntStream.range(0,gsiMetas.size()).forEach(i->{finalTableMetagsiMeta=gsiMetas.get(i);finalReloptTablegsiTable=catalog.getTableForMember(ImmutableList.of(schema,gsiMeta.getTableName()));finalList<Integer>gsiValuePermute=gsiColumnMappings.get(i);finalbooleanisGsiBroadcast=TableTopologyUtil.isBroadcast(gsiMeta);finalbooleanisGsiSingle=TableTopologyUtil.isSingle(gsiMeta);//differentwritestragetyforcorrespondingtabletype.gsiInsertWriters.add(WriterFactory.createInsertorReplaceWriter(newInsert,gsiTable,sourceRowType,gsiValuePermute,gsiMeta,gsiKeywords,):null,isReplace,isGsiBroadcast,isGsiSingle,isValueSource,ec));});...}//LogicalInsertWriter.javaprotectedintexecuteInsert(LogicalInsertlogicalInsert,ExecutionContextexecutionContext,HandlerParamshandlerParams){...finalList<InsertWriter>gsiWriters=logicalInsert.getGsiInsertWriters();gsiWriters.stream().map(gsiWriter->gsiWriter.getInput(executionContext)).filter(w->!w.isEmpty()).forEach(w->{writableGsiCount.incrementAndGet();allPhyPlan.addAll(w);});//IndexStatus.java...publicstaticfinalEnumSet<IndexStatus>WRITABLE=EnumSet.of(WRITE_ONLY,WRITE_REORG,PUBLIC,DROP_WRITE_ONLY);publicbooleanisWritable(){returnWRITABLE.contains(this);}...):?expireSchemaManager(t1,g_i1,v0):消除舊版本元信息,將新版本元信息):com.alibaba.polardbx.executor.gms.GmsTableMetpublicvoidtonewversion(StringtableName,booleanpreemptive,LonginitWait,Longinterval,TimeUnittimeUnit){synchronized(optimizerContext.getContext(schemaName)){GmsTableMetaManageroldSchemaManager=(GmsTableMetaManager)optimizerContext.getContext(schemaName).getLatestSchemaManager();TableMetacurrentMeta=oldSchemaManager.getTableWithNull(tableName);longversion=-1;....//查詢當(dāng)前MetaDB中的元數(shù)據(jù)版本并將其賦值給vesion//1.loadSchemaSchemaManagernewSchemaManager=newGmsTableMetaManager(oldSchemaManager,tableName,rule);newSchemaManager.init();optimizerContext.getContext(schemaName).setSchemaManager(newSchemaManager);//2.mdl(v0).writeLockfinalMdlContextcontext;if(preemptive){):context=MdlManager.addContext(schemaName,initWait,interval,timeUnit);}else{context=MdlManager.addContext(schemaName,false);}MdlTicketticket=context.acquireLock(newMdlRequest(1L,MdlKey.getTableKeyWithLowerTableName(schemaName,currentMeta.getDigest()),MdlType.MDL_EXCLUSIVE,MdlDuration.MDL_TRANSACTION));//3.expireSchemaManager(t1,g_i1,v0)oldSchemaManager.expire();....//失效使用舊版本元信息的PlanCache.context.releaseLock(1L,ticket);}}): ):2.DDLTask3.Worker和Leader):):com.alibaba.polardbx.executor.handler.ddl.Locom.alibaba.polardbx.executor.ddl.newengine..alibaba.polardbx.executor.ddl.newengine.):com.alibaba.polardbx.executor.ddl.newengine.DdlEngin):publicclassDdlEngineDagExecutor{publicstaticvoidrestoreAndRun(StringschemaName,LongjobId,ExecutionContextexecutionContext){booleanrestoreSuccess=DdlEngineDagExecutorMap.restore(schemaName,jobId,executionContext);DdlEngineDagExecutordag=DdlEngineDagExecutorMap.get(schemaName,jobId);dag.run();}privatevoidrun(){//Startthejobstatemachine.if(ddlContext.getState()==DdlState.QUEUED){onQueued();}if(ddlContext.getState()==DdlState.RUNNING){onRunning();}if(ddlContext.getState()==DdlState.ROLLBACK_RUNNING){onRollingBack();}//Handletheterminatedstates.switch(ddlContext.getState()){caseROLLBACK_PAUSED:casePAUSED:onTerminated();break;caseROLLBACK_COMPLETED:caseCOMPLETED:onFinished();break;default:break;}}}com.alibaba.polardbx.executor.ddl.newengine.):別隊列。privatevoidonRunning(){while(true){if(hasFailureOnState(DdlState.RUNNING)){if(waitForAllTasksToStop(50L,TimeUnit.MILLISECONDS)){LOGGER.info(String.format("JobId:[%s],alltasksstopped",ddlContext.getJobId()));return;}else{continue;}}if(executingTaskScheduler.isAllTaskDone()){updateDdlState(DdlState.RUNNING,DdlState.COMPLETED);return;}if(executingTaskScheduler.hasMoreExecutable()){//fetch&executenextbatchsubmitDdlTask(executingTaskScheduler.pollBatch(),true,executingTaskScheduler);continue;}//getsomerestsleep(50L);}):privatevoidonRollingBack(){if(

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論