公開課canal數(shù)據(jù)同步終極解決方案實(shí)現(xiàn)_第1頁
公開課canal數(shù)據(jù)同步終極解決方案實(shí)現(xiàn)_第2頁
公開課canal數(shù)據(jù)同步終極解決方案實(shí)現(xiàn)_第3頁
公開課canal數(shù)據(jù)同步終極解決方案實(shí)現(xiàn)_第4頁
公開課canal數(shù)據(jù)同步終極解決方案實(shí)現(xiàn)_第5頁
已閱讀5頁,還剩28頁未讀, 繼續(xù)免費(fèi)閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)

文檔簡介

1、課程目標(biāo)目標(biāo)一:數(shù)據(jù)同步方案分析目標(biāo)二:Canal 的及工作原理目標(biāo)三Canal 的部署及配置:目標(biāo)四使用 Canal 完成 solr 的數(shù)據(jù)同步:1需求在現(xiàn)代的系統(tǒng)開發(fā)中, 為了提高搜索效率 , 以及搜索的精準(zhǔn)度, 會(huì)大量的使用 redis ,memcache 等 nosql 系統(tǒng)的數(shù)據(jù)庫 , 以及 solr , elasticsearch 類似的全文檢索服務(wù); 那么這個(gè)時(shí)候, 就又有一個(gè)問題需要我們來考慮, 就是數(shù)據(jù)同步的問題,如何將實(shí)時(shí)變化的數(shù)據(jù)庫中的數(shù)據(jù)同步到 solr 的索引庫中或者 redis 中呢?2數(shù)據(jù)同步方案2.1方案一: 業(yè)務(wù)代碼中同步在增加、修改、刪除之后,執(zhí)行操作 so

2、lr 索引庫的邏輯代碼。市昌平區(qū)建材城西路龍辦公樓一層:優(yōu)點(diǎn) : 操作簡便缺點(diǎn) :業(yè)務(wù)耦合度高執(zhí)行效率變低1)2)2.2方案二: 定時(shí)任務(wù)同步在增加、修改、刪除,操作數(shù)據(jù)庫中的數(shù)據(jù)變更之后 ,通過定時(shí)任務(wù)定時(shí)的將數(shù)據(jù)庫的數(shù)據(jù)同步到 solr 的索引庫中。定時(shí)任務(wù)技術(shù) : SpringTask , Quartz優(yōu)點(diǎn):同步 solr 索引庫操作與業(yè)務(wù)代碼完全解耦。缺點(diǎn):數(shù)據(jù)的實(shí)時(shí)性并不高。2.3方案三: 通過 MQ 實(shí)現(xiàn)同步在增加、修改、刪除之后, 往 MQ 中發(fā)送一條消息;同步程序作為 MQ 中的消費(fèi)者,從消息隊(duì)列中獲取消息,然后執(zhí)行同步 solr 索引庫的邏輯。市昌平區(qū)建材城西路龍辦公樓一層:

3、優(yōu)點(diǎn):業(yè)務(wù)代碼解耦, 并且可以做到準(zhǔn)實(shí)時(shí)缺點(diǎn):需要在業(yè)務(wù)代碼中加入發(fā)送消息到 MQ 中的代碼 , API 耦合2.4方案四: 通過 Canal 實(shí)現(xiàn)實(shí)時(shí)同步通過 Canal 來數(shù)據(jù)庫的日志, 來檢測數(shù)據(jù)庫中表結(jié)構(gòu)的數(shù)據(jù)變化,從而更新 solr 索引庫。優(yōu)點(diǎn):業(yè)務(wù)代碼完全解耦,API 完全解耦,可以做到準(zhǔn)實(shí)時(shí)。缺點(diǎn):無3Canal3.1Canal 概述阿里巴巴 mysql 數(shù)據(jù)庫 binlog 的增量訂閱 & 消費(fèi)組件。名稱:canal k'næl譯意:水道 / 管道 / 溝渠語言:純 java 開發(fā)定位:基于數(shù)據(jù)庫增量日志,提供增量數(shù)據(jù)訂閱 & 消費(fèi),目前主要

4、支持了mysql: mysql binlog parser / real-time / queue&topic市昌平區(qū)建材城西路龍辦公樓一層:3.2Canal官網(wǎng):這里我們選擇了 Canal 的 1.0.24 版本.canal.deployer-1.0.24.tar.gz : 這個(gè)是 canalServer 的部署包c(diǎn)anal.example-1.0.24.tar.gz : 這個(gè)是樣例Source code(zip) : 是 canal 的源碼包4Canal 工作原理4.1mysql 主從同步實(shí)現(xiàn)原理:市昌平區(qū)建材城西路龍辦公樓一層:從上層來看,主從分成三步:1. master 將改變

5、到二進(jìn)制日志(binary log) 中(這些叫做二進(jìn)制日志,binary log events ,可以通過 show binlog events 進(jìn)行查看);2. slave 將 master 的 binary log events拷貝到它的中繼日志(relaylog);3. slave 重做中繼日志中的將改變反映它的數(shù)據(jù)。4.2Canal 內(nèi)部原理原理圖:原理相對(duì)比較簡單:市昌平區(qū)建材城西路龍辦公樓一層:1. canal 模擬 mysql slave 的交互協(xié)議,為 mysqlslave ,向 mysqlmaster 發(fā)送 dump 協(xié)議。2. mysql master 收到 dump 請(qǐng)

6、求, 開始推送binarylog給slave(也就是canal) 。binary log 對(duì)象 ( 原始為 byte流 ) 。3. canal4.3Canal 內(nèi)部結(jié)構(gòu)說明:1)2)Server : 代表一個(gè) canal 運(yùn)行實(shí)例,對(duì)應(yīng)于一個(gè) jvmInstance : 對(duì)應(yīng)于一個(gè)數(shù)據(jù)隊(duì)列 (1 個(gè) server 對(duì)應(yīng) 1.n 個(gè) instance)instance 下的子模塊:eventParser : (數(shù)據(jù)源接入,模擬 slave 協(xié)議和 master 進(jìn)行交互,協(xié)議)eventSink: (Parser 和 Store 鏈接器,進(jìn)行數(shù)據(jù)過濾,分發(fā)的工作)市昌平區(qū)建材城西路龍辦公樓一層:

7、eventStore: (數(shù)據(jù))metaManager : (增量訂閱&消費(fèi)管理器)5Canal 環(huán)境準(zhǔn)備5.1Mysql 數(shù)據(jù)庫 rootgrantflushall privileges on *.* to 'root' '%' identified by '2143'privileges;5.2Mysql 配置canal 的原理是基于 mysqlbinlog 技術(shù),所以這里一定需要開啟mysql的binlog寫入功能,建議配置 binlog模式為 row。查看方式:SHOW VARIABLES LIKE'binlog_form

8、at'修改配置:修改以下配置項(xiàng):mysqldlog-bin=mysql-bin#添加這一行就 okbinlog_format=ROW#選擇 row 模式server_id=1#配置 mysql replaction 需要定義,不能與 canal 的 slaveId 重復(fù)市昌平區(qū)建材城西路龍辦公樓一層:注: 修改完成之后 , 需要重啟Mysql 服務(wù)市昌平區(qū)建材城西路龍辦公樓一層:知識(shí)小貼士 :1) Row日志中會(huì)成每一行數(shù)據(jù)被修改的形式,然后在 slave 端再對(duì)相同的數(shù)據(jù)進(jìn)行修改。優(yōu)點(diǎn):在 row 模式下,bin-log 中可以不執(zhí)行的 SQL 語句的上下文相關(guān)的,僅僅只需要那一條被

9、修改了,修改成什么樣了。所以 row 的日志內(nèi)容會(huì)非常清楚的下每一行數(shù)據(jù)修改的細(xì)節(jié),非常容易理解。而且出現(xiàn)某些特定情況下的過程或 function ,以及 trigger 的調(diào)用和觸發(fā)無法被正確的問題。2) Statement每一條會(huì)修改數(shù)據(jù)的 SQL 都會(huì) 到 master 的 bin-log 中。slave 在 的時(shí)候 SQL 進(jìn)程會(huì) 成和原來 master 端執(zhí)行過的相同的 SQL 再次執(zhí)行。優(yōu)點(diǎn):在 statement 模式下,首先就是解決了 row 模式的缺點(diǎn),不需要 每一行數(shù)據(jù)的變化, 減少了 bin-log 日志量,節(jié)省 I/O 以及 資源,提高性能。因?yàn)樗恍枰?在 maste

10、r 上所執(zhí)行的語句的細(xì)節(jié),以及執(zhí)行語句時(shí)候的上下文的 。缺點(diǎn):在 statement 模式下,由于他是 的執(zhí)行語句,所以,為了讓這些語句在 slave 端也能正確執(zhí)行,那么他還必須 每條語句在執(zhí)行的時(shí)候的一些相關(guān) ,也就是上下文 ,以保證所有語句在 slave 端杯執(zhí)行的時(shí)候能夠得到和在 master 端執(zhí)行時(shí)候相同的結(jié)果。另外就是,由于 MySQL 現(xiàn)在發(fā)展比較快,很多的新功能不斷的加入,使 MySQL 的 遇到了不小的 ,自然 的時(shí)候涉及到越復(fù)雜的內(nèi)容,bug 也就越容易出現(xiàn)。在 statement 中,目前已經(jīng)發(fā)現(xiàn)的就有不少情況會(huì)造成 MySQL的 出現(xiàn)問題,主要是修改數(shù)據(jù)的時(shí)候使用了某

11、些特定的函數(shù)或者功能的時(shí)候會(huì)出現(xiàn),比如:sleep()函數(shù)在有些版本中就不能被正確 ,在 過程中使用了 last_insert_id() 函數(shù),可能會(huì)使5.3Mysql 創(chuàng)建用戶canal 的原理是模擬為 mysql slave ,所以這里一定需要做為 mysql slave的相關(guān)權(quán)限。 創(chuàng)建一個(gè)主從同步的賬戶,并且賦予權(quán)限:CREATE USER canal'localhost' IDENTIFIED BY 'canal'GRANT SELECT, REPLICATION SLAVE, REPLICATIONON*.*TO'canal''

12、;localhost'FLUSH PRIVILEGES;6Canal 部署安裝6.1上傳解壓解壓后的目錄如下:目錄:bin :的是可執(zhí)行腳本conf liblogs:存放 canal 的配置文件:存放 canal 的 lib 目錄:存放的是日志文件市昌平區(qū)建材城西路龍辦公樓一層:slave 和 master 上得到不一致的 id 等等。由于 row 是基于每一行來的變化,所以出現(xiàn)類似的問題。6.2配置編輯 canal/conf/example/perties:選項(xiàng)含義:1)canal.instance.mysql.slaveId : mysql 集群配置中的 s

13、erverId 概念,需要保證和當(dāng)前 mysql 集群中 id 唯一;canal.instance.master.address: mysql 主庫鏈接地址; canal.instance.dbUsername : mysql 數(shù)據(jù)庫帳號(hào);2)3)4)5)6)7)canal.instance.dbPassword : mysql 數(shù)據(jù)庫;canal.instance.defauatabaseName : mysql 鏈接時(shí)默認(rèn)數(shù)據(jù)庫;canal.instance.connectionCharset : mysql canal.instance.filter.regex : mysql 數(shù)據(jù)數(shù)據(jù)

14、編碼;關(guān)注的表,Perl 正則表達(dá)式.6.3啟動(dòng)/停止市昌平區(qū)建材城西路龍辦公樓一層:# mysql serverId canal.instance.mysql.slaveId = 1234#position info,需要改成的數(shù)據(jù)庫canal.instance.master.address = :3306 = canal.instance.master.position = canal.instance.master.timestamp =#canal.instance.standby.address

15、= # = #canal.instance.standby.position = #canal.instance.standby.timestamp =#username/password,需要改成的數(shù)據(jù)庫canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defauatabaseName =canaldb canal.instance.connectionCharset = UTF-8#table regexc

16、anal.instance.filter.regex = canaldb.* #1) startup.sh : 啟動(dòng)腳本2) stop.sh : 停止腳本7數(shù)據(jù)拉取測試7.1官方源碼導(dǎo)入在源碼目錄中,有一個(gè)工程 example, 這個(gè)工程中存放的就是一些樣例工程.7.2測試類修改可以通過其中的一個(gè) SimpleCanalTest 類進(jìn)試.需要修改 CanalServer 的 IP 地址,及端.市昌平區(qū)建材城西路龍辦公樓一層:7.3數(shù)據(jù)變更測試7.3.1 創(chuàng)建表創(chuàng)建 tb_book 表:CREATE TABLE tb_book (id INT(11) NOT NULL AUTO_INCREMEN

17、T COMMENT '主鍵',name VARCHAR(100) NOT NULL COMMENT '書名',author VARCHAR(100) DEFAULT NULL COMMENT '作者',publishtime DATETIME DEFAULT NULL COMMENT '日期',price DOUBLE(10,2) DEFAULT NULL COMMENT '價(jià)格',publishgroup VARCHAR(100) DEFAULT NULL COMMENT '發(fā)版社', PRIMA

18、RY KEY (id) ENGINE=INNODB DEFAULT CHARSET=utf8mb47.3.2數(shù)據(jù)執(zhí)行 SQL :INSERT INTO tb_book(NAME , author , publishtime,price ');price');, publishgroup)VALUES('子講安全協(xié)議','吳瀚請(qǐng)',NOW(),99.00,'工業(yè)INSERT INTO tb_book(NAME , author , publishtime, publishgroup)VALUES('子講安全協(xié)議 2',

19、9;吳瀚請(qǐng)',NOW(),99.00,'工業(yè)Canal 數(shù)據(jù)監(jiān)測結(jié)果 :7.3.3 更新數(shù)據(jù)執(zhí)行 SQL 語句:UPDATE tb_book SET NAME = '子講安全協(xié)議第二版' WHEREid =2;Canal 數(shù)據(jù)監(jiān)測結(jié)果:市昌平區(qū)建材城西路龍辦公樓一層:7.3.4 刪除數(shù)據(jù)執(zhí)行 SQL :DELETE FROM tb_book WHEREid = 1;Canal 數(shù)據(jù)監(jiān)測結(jié)果:8數(shù)據(jù)同步實(shí)現(xiàn)8.1需求描述將數(shù)據(jù)庫數(shù)據(jù)的變化, 通過 canalbinlog 日志,實(shí)時(shí)更新到 solr 的索引庫中.8.2Solr 環(huán)境的搭建略8.3同步程序8.3.1引

20、入依賴市昌平區(qū)建材城西路龍辦公樓一層:<dependency><groupId>com.alibaba.otter</groupId>8.3.2 定義 POJO市昌平區(qū)建材城西路龍辦公樓一層:public class Book private Integer id; private String name; private String author;private Date publishtime; private Double price; private String publishgroup; public Integer getId() retur

21、n id;public void setId(Integer id) <artifactId>canal.</artifactId><version>1.0.24</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>tocol</artifactId><version>1.0.24</version></depende

22、ncy><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.6</version></dependency><dependency><groupId>org.codehaus.jackson</groupId><artifactId>jackson-mapper-asl</artifactId><ve

23、rsion>1.8.9</version></dependency><dependency><groupId>org.apache.solr</groupId><artifactId>solr-solrj</artifactId><version>4.10.3</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</ar

24、tifactId><version>4.9</version><scope>test</scope></dependency>8.3.3 定義 solr 的域與 pojo 屬性的市昌平區(qū)建材城西路龍辦公樓一層:Fieldprivate Integer id;Field("book_name")private String name;Field("book_author")this.id = id;public String getName() return name;public void

25、 setName(String name) = name;public String getAuthor() return author;public void setAuthor(String author) this.author = author;public Date getPublishtime() return publishtime;public void setPublishtime(Date publishtime) this.publishtime = publishtime;public Double getPrice() return price;p

26、ublic void setPrice(Double price) this.price = price;public String getPublishgroup() return publishgroup;public void setPublishgroup(String publishgroup) this.publishgroup = publishgroup;8.3.4同步程序編寫市昌平區(qū)建材城西路龍辦公樓一層:public class CanalPullData private static Logger logger = LoggerFactory.getLogger(Cana

27、lPullData.class);public static void main(String args) throws Exception String destination = "example"String hostname = "52" Integer port = 11111;CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port), destination, "&q

28、uot;, "");connector.connect(); connector.subscribe();("Canal Server" + hostname + " : " + port + " 連接");Integer batchSize = 5*1024;while (true)Message message = connector.getWithoutAck(batchSize);long messageId = message.getId();int size = message.g

29、etEntries().size();if(messageId = -1 | size = 0)try TimeUnit.SECONDS.sleep(1); catch (InterruptedException e) e.printStackTrace();connector.ack(messageId);elseprivate String author;Field("book_publishtime")private Date publishtime;Field("book_price")private Double price;Field(&qu

30、ot;book_publishgroup")private String publishgroup;("binlog 分析開始");List<InnerBinlogEntry> entryList = CanalDataParser.convertToInnerBinlogEntry(message);syncDataToSolr(entryList);("檢測到修改的 Entry 數(shù)量size="+entryList.size()+"");private static

31、 void syncDataToSolr(List<InnerBinlogEntry> entryList) throws Exception SolrServer solrServer = new HttpSolrServer("");if(entryList != null)for (InnerBinlogEntry innerBinlogEntry : entryList) Book book = new Book();if(innerBinlogEntry.getEventType() = CanalEntry.EventType.INSERT |inn

32、erBinlogEntry.getEventType() = CanalEntry.EventType.UPDATE)List<Map<String, BinlogValue>> mapList = innerBinlogEntry.getRows();if(mapList != null)for (Map<String, BinlogValue> valueMap : mapList) BinlogValue idValue = valueMap.get("id"); BinlogValue nameValue = valueMap.g

33、et("name"); BinlogValue authorValue = valueMap.get("author");BinlogValue publishtimeValue = valueMap.get("publishtime"); BinlogValue priceValue = valueMap.get("price");BinlogValue publishgroupValue = valueMap.get("publishgroup");book.setId(Integer.pa

34、rseInt(idValue.getValue(); book.setName(nameValue.getValue(); book.setAuthor(authorValue.getValue(); book.setPublishtime(DateUtils.parseDate(publishtimeValue.getValue(); book.setPublishgroup(publishgroupValue.getValue();book.setPrice(Double.parseDouble(priceValue.getValue();/添加/更新數(shù)據(jù)到 solr 索引庫logger.

35、info("-> 添加/更新 solr 索引庫 : " + book); solrServer.addBean(book);solrSmit();else if(innerBinlogEntry.getEventType() = CanalEntry.EventType.DELETE) List<Map<String, BinlogValue>> rows = innerBinlogEntry.getRows(); if(rows != null)for (Map<String, BinlogValue> row : rows) 市

36、昌平區(qū)建材城西路龍辦公樓一層:8.3.5 工具類CanalDataParser:用來轉(zhuǎn)換從 CanalServer 中獲取的 Message 對(duì)象.市昌平區(qū)建材城西路龍辦公樓一層:public class CanalDataParser protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss" protected static final String yyyyMMddHHmmss = "yyyyMMddHHmmss" protected static final String y

37、yyyMMdd= "yyyyMMdd"protected static final String SEP= SystemUtils.LINE_SEPARATOR;protected static String context_format= null; protected static String row_format= null; protected static String tranion_format = null; protected static String row_log = null;private static Logger logger = Logg

38、erFactory.getLogger(CanalDataParser.class);static context_format = SEP + "*" + SEP; context_format += "* Batch Id: ,count : , memsize : , Time : " + SEP; context_format += "* Start : " + SEP;context_format += "* End : " + SEP;context_format += "*" +

39、SEP;row_format = SEP+ "> binlog:,name, eventType: , executeTime : , delay : ms"+ SEP;tranion_format = SEP + "=> binlog: , executeTime : , delay : ms" + SEP;row_log = "schema, table"BinlogValue idValue = row.get("id"); String id = idValue.getBeforeValue()

40、;/根據(jù) ID 刪除 solr 索引庫的數(shù)據(jù)("-> 刪除 solr 索引庫 : " + id); solrServer.deleteById(id);solrSmit();public static List<InnerBinlogEntry> convertToInnerBinlogEntry(Message message) List<InnerBinlogEntry> innerBinlogEntryList = new ArrayList<InnerBinlogEntry>();if(message

41、= null) ("接收到空的 message; 忽略");return innerBinlogEntryList;long batchId = message.getId();int size = message.getEntries().size();if (batchId = -1 | size = 0) ("接收到空的 messagesize=" + size + " 忽略");return innerBinlogEntryList;printLog(message, batchId

42、, size);List<Entry> entrys = message.getEntries();/輸出日志for (Entry entry : entrys) long executeTime = entry.getHeader().getExecuteTime();long delayTime = new Date().getTime() - executeTime;if (entry.getEntryType() = EntryType.TRAN EntryType.TRANIONEND) if (entry.getEntryType() = EntryType.TRANI

43、ONBEGIN | entry.getEntryType() =IONBEGIN) Trantry ionBegin begin = null;begin = TranionBegin.parseFrom(entry.getStoreValue(); catch (InvalidProtocolBufferException e) throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);/ 打印事務(wù)頭,執(zhí)行的線程 id,事務(wù)耗時(shí)("

44、;BEGIN -> Thid: ", begin.getThId();(tranion_format, new Object entry.getHeader().getLogfileName(),String.valueOf(entry.getHeader().getLogfileOffset(),String.valueOf(entry.getHeader().getExecuteTime(), String.valueOf(delayTime) ); else if (entry.getEntryType() = EntryType.TRAN Tran

45、ionEnd end = null;try IONEND) end = TranionEnd.parseFrom(entry.getStoreValue(); catch (InvalidProtocolBufferException e) throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);/ 打印事務(wù)提交,事務(wù) id市昌平區(qū)建材城西路龍辦公樓一層:("END -> tranion id: ", end.ge

46、tTranionId();(tranion_format,new Object entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset(),String.valueOf(entry.getHeader().getExecuteTime(), String.valueOf(delayTime) );continue;/結(jié)果if (entry.getEntryType() = EntryType.ROWDATA) RowChange rowChage = nul

47、l; try rowChage = RowChange.parseFrom(entry.getStoreValue(); catch (Exception e) throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);EventType eventType = rowChage.getEventType();(row_format, new Object entry.getHeader().getLogfileName(),String.v

48、alueOf(entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(),entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime(), String.valueOf(delayTime) );/組裝數(shù)據(jù)結(jié)果if (eventType = EventType.INSERT | eventType = EventType.DELETE | eventType = EventType.UPD

49、ATE)String schemaName = entry.getHeader().getSchemaName(); String tableName = entry.getHeader().getTableName();List<Map<String, BinlogValue>> rows = parseEntry(entry);InnerBinlogEntry innerBinlogEntry = new InnerBinlogEntry(); innerBinlogEntry.setEntry(entry); innerBinlogEntry.setEventTy

50、pe(eventType); innerBinlogEntry.setSchemaName(schemaName); innerBinlogEntry.setTableName(tableName.toLowerCase();innerBinlogEntry.setRows(rows);innerBinlogEntryList.add(innerBinlogEntry); else ("INSERT INSERT UPDATE 操作之外的 SQL " + eventType.toString() + "");continue;市昌平

51、區(qū)建材城西路龍辦公樓一層:return innerBinlogEntryList;private static List<Map<String, BinlogValue>> parseEntry(Entry entry) List<Map<String, BinlogValue>> rows = new ArrayList<Map<String, BinlogValue>>();try String schemaName = entry.getHeader().getSchemaName(); String tableNa

52、me = entry.getHeader().getTableName();RowChange rowChage = RowChange.parseFrom(entry.getStoreValue();EventType eventType = rowChage.getEventType();/ 處理每個(gè) Entry 中的每行數(shù)據(jù)for (RowData rowData : rowChage.getRowDatasList() StringBuilder rowlog = new StringBuilder("rowlog schema" + schemaName + &q

53、uot;, table" + tableName +", event" + eventType.toString() + "");Map<String, BinlogValue> row = new HashMap<String, BinlogValue>(); List<Column> beforeColumns = rowData.getBeforeColumnsList(); List<Column> afterColumns = rowData.getAfterColumnsList();

54、beforeColumns = rowData.getBeforeColumnsList();if (eventType = EventType.DELETE) /deletefor(Column column : beforeColumns) BinlogValue binlogValue = new BinlogValue(); binlogValue.setValue(column.getValue();binlogValue.setBeforeValue(column.getValue(); row.put(column.getName(), binlogValue); else if

55、(eventType = EventType.UPDATE) /updatefor(Column column : beforeColumns) BinlogValue binlogValue = new BinlogValue();binlogValue.setBeforeValue(column.getValue(); row.put(column.getName(), binlogValue);for(Column column : afterColumns) BinlogValue binlogValue = row.get(column.getName();if(binlogValu

56、e = null) binlogValue = new BinlogValue();binlogValue.setValue(column.getValue(); row.put(column.getName(), binlogValue); else / insertfor(Column column : afterColumns) BinlogValue binlogValue = new BinlogValue();市昌平區(qū)建材城西路龍辦公樓一層:binlogValue.setValue(column.getValue();binlogValue.setBeforeValue(column.getValue(); row.put(column.getName(), binlogValue);rows.add(row);String rowjson = JacksonUtil.obj2str(row);("# Data Parse Result #");(rowlog + " , " + rowjson);logger

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。

評(píng)論

0/150

提交評(píng)論