一、Canal 介绍
1、应用场景
Canal 就是一个很好的数据库同步工具。canal 是阿里巴巴旗下的一款开源项目,纯 Java 开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了 MySQL。
2、Canal 环境搭建
Canal 的原理是基于 mysql binlog 技术,所以这里需要开始 mysql 的 binlog 写入功能
开启 mysql 服务: service mysql start
(1) 检查 binlog 功能是否开启
1
| mysql> show variables like 'log_bin';
|
(2) 如果显示状态为 OFF 表示该功能未开启,开启 binlog 功能
- 修改 mysql 的配置文件 my.cnf
1 2 3 4 5 6
| vi /etc/my.cnf
追加内容: log-bin=mysql-bin binlog_format=ROW server_id=1
|
- 重启 mysql
1 2 3
| systemctl restart mysqld 或 service mysql restart
|
- 登录 mysql 客户端,查看 log_bin 变量
1
| mysql> show variables like 'log_bin';
|
注意:
如果显示状态为 ON 表示该功能已开启
(3)在 mysql 里面添加以下的相关用户和权限 (参考)
判断是否需要配置,先检查一下的 MySQL 是否支持远程连接,测试很容易可以使用第三方工具(Navicat 或 SQLyog )连接虚拟机的数据库,如果连接成功则不需要在以下配置,直接跳过这个步骤,如果没有连接成功,需要以下配置。
先使用 mysql -uroot -p 进入数据库:
1
| grant all privileges on *.* to root@'%' identified by 'password';
|
或
1 2 3
| CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
|
3、下载安装 Canal 服务
下载地址:https://github.com/alibaba/canal/releases
(1)下载之后,放到目录中,解压文件
1 2 3
| cd /usr/local/canal
tar zxvf canal.deployer-1.1.4.tar.gz
|
(2)修改配置文件
1
| vi conf/example/instance.properties
|
1 2 3 4 5 6 7 8 9 10 11
| canal.instance.master.address=192.168.64.131:3306
canal.instance.dbUsername=root canal.instance.dbPassword=root
canal.instance.filter.regex=guli.member
|
注:
1 2 3 4 5 6 7 8 9 10 11 12
| mysql 数据解析关注的表,Perl正则表达式. 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:
1. 所有表:.* or .*\\..* 2. canal schema下所有表: canal\\..* 3. canal下的以canal打头的表:canal\\.canal.* 4. canal schema下的一张表:canal.test1 5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)
|
(3)进入 bin 目录下启动
启动后:
二、创建 Java 工程测试模块
1、创建 canal_client 模块
2、引入相关依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| <properties> <java.version>1.8</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <spring-boot.version>2.3.7.RELEASE</spring-boot.version> </properties>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>commons-dbutils</groupId> <artifactId>commons-dbutils</artifactId> <version>1.7</version> </dependency>
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency>
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency>
|
3、创建 application.properties 配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13
| server.port=10000
spring.application.name=canal-client
spring.profiles.active=dev
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/guli?serverTimezone=GMT%2B8 spring.datasource.username=root spring.datasource.password=root
|
4、编写 canal 客户端类
注意:
这里需要成自己的虚拟机地址, 记得把虚拟机防火墙关闭,不然有可能连接不成功。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
| import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.dbutils.DbUtils; import org.apache.commons.dbutils.QueryRunner; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.sql.DataSource; import java.net.InetSocketAddress; import java.sql.Connection; import java.sql.SQLException; import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue;
@Component public class CanalClient {
private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
@Resource private DataSource dataSource;
public void run() {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.64.131", 11111), "example", "", ""); int batchSize = 1000; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); try { while (true) { Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { Thread.sleep(1000); } else { dataHandle(message.getEntries()); } connector.ack(batchId);
if (SQL_QUEUE.size() >= 1) { executeQueueSql(); } } } catch (InterruptedException e) { e.printStackTrace(); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } finally { connector.disconnect(); } }
public void executeQueueSql() { int size = SQL_QUEUE.size(); for (int i = 0; i < size; i++) { String sql = SQL_QUEUE.poll(); System.out.println("[sql]----> " + sql);
this.execute(sql.toString()); } }
private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException { for (Entry entry : entrys) { if (EntryType.ROWDATA == entry.getEntryType()) { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); EventType eventType = rowChange.getEventType(); if (eventType == EventType.DELETE) { saveDeleteSql(entry); } else if (eventType == EventType.UPDATE) { saveUpdateSql(entry); } else if (eventType == EventType.INSERT) { saveInsertSql(entry); } } } }
private void saveUpdateSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> newColumnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set "); for (int i = 0; i < newColumnList.size(); i++) { sql.append(" " + newColumnList.get(i).getName() + " = '" + newColumnList.get(i).getValue() + "'"); if (i != newColumnList.size() - 1) { sql.append(","); } } sql.append(" where "); List<Column> oldColumnList = rowData.getBeforeColumnsList(); for (Column column : oldColumnList) { if (column.getIsKey()) { sql.append(column.getName() + "=" + column.getValue()); break; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } }
private void saveDeleteSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> columnList = rowData.getBeforeColumnsList(); StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where "); for (Column column : columnList) { if (column.getIsKey()) { sql.append(column.getName() + "=" + column.getValue()); break; } } SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } }
private void saveInsertSql(Entry entry) { try { RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); List<RowData> rowDatasList = rowChange.getRowDatasList(); for (RowData rowData : rowDatasList) { List<Column> columnList = rowData.getAfterColumnsList(); StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " ("); for (int i = 0; i < columnList.size(); i++) { sql.append(columnList.get(i).getName()); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(") VALUES ("); for (int i = 0; i < columnList.size(); i++) { sql.append("'" + columnList.get(i).getValue() + "'"); if (i != columnList.size() - 1) { sql.append(","); } } sql.append(")"); SQL_QUEUE.add(sql.toString()); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } }
public void execute(String sql) { Connection con = null; try { if(null == sql) return; con = dataSource.getConnection(); QueryRunner qr = new QueryRunner(); int row = qr.execute(con, sql); System.out.println("update: "+ row); } catch (SQLException e) { e.printStackTrace(); } finally { DbUtils.closeQuietly(con); } } }
|
5、创建启动类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @SpringBootApplication public class CanalApplication implements CommandLineRunner { @Resource private CanalClient canalClient;
public static void main(String[] args) { SpringApplication.run(CanalApplication.class, args); }
@Override public void run(String... strings) throws Exception { canalClient.run(); } }
|
三、测试
- 在虚拟机的数据中 使用
mysql -u root -p
1 2 3
| use guli;
insert into member(name,sex) values('xiaobai','男');
|
效果:
程序端以监控到
数据库中:
虚拟机数据库:
window 本地数据库:
以上效果,则表示测试成功!!!