canal rdb适配器
canal rdb适配器
官网:https://github.com/alibaba/canal/wiki/Sync-RDB
应用:**实现数据库之间的增量同步、数据备份 **
*********************
配置文件
application.yml:启动器配置文件
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
username: root
password: 121212
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
jdbc.username: root
jdbc.password: 121212
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
mytest_user.yml:canal adapter会自动加载conf/rdb下的所有以.yml结尾的适配器配置文件
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: mytest # 源库
table: user # 源表
targetTable: mytest2.user # 目标数据库、表
targetPk:
id: id
#mapAll: true # 是否整表映射, 要求源表和目标表字段名一模一样
# 如果targetColumns也配置了映射,则以targetColumns配置为准
targetColumns: # 字段映射, 格式: 目标表字段: 源表字段, 如果字段名一样源表字段名可不填
id:
name:
role_id:
c_time:
test1:
etlCondition: "where c_time>={}"
commitBatch: 3000 # 批量提交的大小
## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
#dbMapping:
# mirrorDb: true #镜像配置,需要源库、目标库一模一样
# database: mytest
*********************
适配器配置类
MappingConfig:映射配置类
public class MappingConfig implements AdapterConfig {
private String dataSourceKey; // 数据源key
private String destination; // canal实例或MQ的topic
private String groupId; // groupId
private String outerAdapterKey; // 对应适配器的key
private boolean concurrent = false; // 是否并行同步
private DbMapping dbMapping; // db映射配置
。。。
public void validate() {
if (dbMapping.database == null || dbMapping.database.isEmpty()) {
throw new NullPointerException("dbMapping.database");
} //database不能为空,否则抛空指针异常
if (!dbMapping.getMirrorDb() && (dbMapping.table == null || dbMapping.table.isEmpty())) {
throw new NullPointerException("dbMapping.table");
} //如果没有开启镜像(dbMapping.mirrorDb=false),则table不能为空,否则抛空指针异常
if (!dbMapping.getMirrorDb() && (dbMapping.targetTable == null || dbMapping.targetTable.isEmpty())) {
throw new NullPointerException("dbMapping.targetTable");
} //如果没有开启镜像(dbMapping.mirrorDb=false),则targetTable不能为空,否则抛空指针异常
}
***********
内部类:DbMapping
public static class DbMapping implements AdapterMapping {
private boolean mirrorDb = false; // 是否镜像库
private String database; // 数据库名或schema名
private String table; // 表名
private Map<String, String> targetPk = new LinkedHashMap<>(); // 目标表主键字段
private boolean mapAll = false; // 映射所有字段
private String targetDb; // 目标库名
private String targetTable; // 目标表名
private Map<String, String> targetColumns; // 目标表字段映射
private boolean caseInsensitive = false; // 目标表不区分大小写,默认是否
private String etlCondition; // etl条件sql
private int readBatch = 5000;
private int commitBatch = 5000; // etl等批量提交大小
private Map<String, String> allMapColumns;
*********************
示例
创建 mysql实例:源数据库
docker run -it -d --net fixed --ip 172.18.0.2 -p 3306:3306 --privileged=true \
--name mysql -e MYSQL_ROOT_PASSWORD=123456 mysql
# 创建用户、授权
mysql> create user canal identified with mysql_native_password by "123456";
Query OK, 0 rows affected (0.02 sec)
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
创建 canal实例
docker run -it -d --net fixed --ip 172.18.0.3 \
-p 11111:11111 --name canal-server \
-e canal.instance.master.address=172.18.0.2:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=123456 canal/canal-server
创建 mysql实例:目标数据库
docker run -it -d --net fixed --ip 172.18.0.5 -p 3307:3306 --privileged=true \
--name mysql2 -e MYSQL_ROOT_PASSWORD=123456 mysql
# 创建数据库
mysql> create database lihu;
Query OK, 1 row affected (0.00 sec)
mysql> use lihu;
Database changed
mysql> create table user(id int not null auto_increment primary key,custom_name varchar(20) not null, custom_age int not null);
Query OK, 0 rows affected (0.02 sec)
创建 canal adapter
docker run -it -d --net fixed --ip 172.18.0.4 -p 8081:8081 \
-v /usr/canal/canal-adapter/rdb/application.yml:/opt/canal-adapter/conf/application.yml \
-v /usr/canal/canal-adapter/rdb/adapter:/opt/canal-adapter/conf/rdb \
--name canal-adapter slpcat/canal-adapter:v1.1.5-jdk8
*****************
application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 172.18.0.3:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://172.18.0.2:3306/lihu?useUnicode=true
username: root
password: 123456
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
# - name: logger
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://172.18.0.5:3306/lihu?useUnicode=true
jdbc.username: root
jdbc.password: 123456
*****************
mytest_user.yml:适配器配置文件
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: lihu
table: user
targetTable: lihu.user
targetPk:
id: id
# mapAll: true
targetColumns:
id: id
custom_name: name
custom_age: age
# etlCondition: "where c_time>={}"
commitBatch: 3000 # 批量提交的大小
******************
使用测试
mysql:源数据库插入数据
mysql> insert into user(id,name,age) values(3,'lihu',20),(4,'lihu2',21);
Query OK, 2 rows affected (0.10 sec)
Records: 2 Duplicates: 0 Warnings: 0
canal-adapter 日志
# rdb创建成功
2021-07-07 23:19:42.700 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: rdb succeed
2021-07-07 23:19:42.770 [main] INFO c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /opt/canal-adapter/plugin
2021-07-07 23:19:42.886 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed
2021-07-07 23:19:42.886 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2021-07-07 23:19:42.899 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2021-07-07 23:19:42.925 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
2021-07-07 23:19:43.036 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2021-07-07 23:19:43.201 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
2021-07-07 23:19:43.235 [main] INFO c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 21.316 seconds (JVM running for 23.705)
2021-07-07 23:19:43.332 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============
Wed Jul 07 23:20:13 GMT+08:00 2021 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
# 插入的行记录:{"id":3,"name":"lihu","age":20}
2021-07-07 23:20:14.284 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":3,"name":"lihu","age":20},"database":"lihu","destination":"example","old":null,"table":"user","type":"INSERT"}
# 插入的行记录:{"id":4,"name":"lihu2","age":21}
2021-07-07 23:20:14.285 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":4,"name":"lihu2","age":21},"database":"lihu","destination":"example","old":null,"table":"user","type":"INSERT"}
目标数据库 mysql2
mysql> select * from user;
+----+-------------+------------+
| id | custom_name | custom_age |
+----+-------------+------------+
| 3 | lihu | 20 |
| 4 | lihu2 | 21 |
+----+-------------+------------+
2 rows in set (0.00 sec)
源数据库的新增数据同步成功
还没有评论,来说两句吧...