canal rdb适配器

女爷i 2022-10-13 01:57 236阅读 0赞

canal rdb适配器

官网:https://github.com/alibaba/canal/wiki/Sync-RDB

应用:**实现数据库之间的增量同步、数据备份 **

*********************

配置文件

application.yml:启动器配置文件

  1. server:
  2. port: 8081
  3. spring:
  4. jackson:
  5. date-format: yyyy-MM-dd HH:mm:ss
  6. time-zone: GMT+8
  7. default-property-inclusion: non_null
  8. canal.conf:
  9. mode: tcp #tcp kafka rocketMQ rabbitMQ
  10. flatMessage: true
  11. zookeeperHosts:
  12. syncBatchSize: 1000
  13. retries: 0
  14. timeout:
  15. accessKey:
  16. secretKey:
  17. consumerProperties:
  18. # canal tcp consumer
  19. canal.tcp.server.host: 127.0.0.1:11111
  20. canal.tcp.zookeeper.hosts:
  21. canal.tcp.batch.size: 500
  22. canal.tcp.username:
  23. canal.tcp.password:
  24. # kafka consumer
  25. kafka.bootstrap.servers: 127.0.0.1:9092
  26. kafka.enable.auto.commit: false
  27. kafka.auto.commit.interval.ms: 1000
  28. kafka.auto.offset.reset: latest
  29. kafka.request.timeout.ms: 40000
  30. kafka.session.timeout.ms: 30000
  31. kafka.isolation.level: read_committed
  32. kafka.max.poll.records: 1000
  33. # rocketMQ consumer
  34. rocketmq.namespace:
  35. rocketmq.namesrv.addr: 127.0.0.1:9876
  36. rocketmq.batch.size: 1000
  37. rocketmq.enable.message.trace: false
  38. rocketmq.customized.trace.topic:
  39. rocketmq.access.channel:
  40. rocketmq.subscribe.filter:
  41. # rabbitMQ consumer
  42. rabbitmq.host:
  43. rabbitmq.virtual.host:
  44. rabbitmq.username:
  45. rabbitmq.password:
  46. rabbitmq.resource.ownerId:
  47. srcDataSources:
  48. defaultDS:
  49. url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
  50. username: root
  51. password: 121212
  52. canalAdapters:
  53. - instance: example # canal instance Name or mq topic name
  54. groups:
  55. - groupId: g1
  56. outerAdapters:
  57. - name: rdb
  58. key: mysql1
  59. properties:
  60. jdbc.driverClassName: com.mysql.jdbc.Driver
  61. jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
  62. jdbc.username: root
  63. jdbc.password: 121212
  64. # - name: rdb
  65. # key: oracle1
  66. # properties:
  67. # jdbc.driverClassName: oracle.jdbc.OracleDriver
  68. # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
  69. # jdbc.username: mytest
  70. # jdbc.password: m121212
  71. # - name: rdb
  72. # key: postgres1
  73. # properties:
  74. # jdbc.driverClassName: org.postgresql.Driver
  75. # jdbc.url: jdbc:postgresql://localhost:5432/postgres
  76. # jdbc.username: postgres
  77. # jdbc.password: 121212
  78. # threads: 1
  79. # commitSize: 3000

mytest_user.yml:canal adapter会自动加载conf/rdb下的所有以.yml结尾的适配器配置文件

  1. dataSourceKey: defaultDS
  2. destination: example
  3. groupId: g1
  4. outerAdapterKey: mysql1
  5. concurrent: true
  6. dbMapping:
  7. database: mytest # 源库
  8. table: user # 源表
  9. targetTable: mytest2.user # 目标数据库、表
  10. targetPk:
  11. id: id
  12. #mapAll: true # 是否整表映射, 要求源表和目标表字段名一模一样
  13. # 如果targetColumns也配置了映射,则以targetColumns配置为准
  14. targetColumns: # 字段映射, 格式: 目标表字段: 源表字段, 如果字段名一样源表字段名可不填
  15. id:
  16. name:
  17. role_id:
  18. c_time:
  19. test1:
  20. etlCondition: "where c_time>={}"
  21. commitBatch: 3000 # 批量提交的大小
  22. ## Mirror schema synchronize config
  23. #dataSourceKey: defaultDS
  24. #destination: example
  25. #groupId: g1
  26. #outerAdapterKey: mysql1
  27. #concurrent: true
  28. #dbMapping:
  29. # mirrorDb: true #镜像配置,需要源库、目标库一模一样
  30. # database: mytest

*********************

适配器配置类

MappingConfig:映射配置类

  1. public class MappingConfig implements AdapterConfig {
  2. private String dataSourceKey; // 数据源key
  3. private String destination; // canal实例或MQ的topic
  4. private String groupId; // groupId
  5. private String outerAdapterKey; // 对应适配器的key
  6. private boolean concurrent = false; // 是否并行同步
  7. private DbMapping dbMapping; // db映射配置
  8. 。。。
  9. public void validate() {
  10. if (dbMapping.database == null || dbMapping.database.isEmpty()) {
  11. throw new NullPointerException("dbMapping.database");
  12. } //database不能为空,否则抛空指针异常
  13. if (!dbMapping.getMirrorDb() && (dbMapping.table == null || dbMapping.table.isEmpty())) {
  14. throw new NullPointerException("dbMapping.table");
  15. } //如果没有开启镜像(dbMapping.mirrorDb=false),则table不能为空,否则抛空指针异常
  16. if (!dbMapping.getMirrorDb() && (dbMapping.targetTable == null || dbMapping.targetTable.isEmpty())) {
  17. throw new NullPointerException("dbMapping.targetTable");
  18. } //如果没有开启镜像(dbMapping.mirrorDb=false),则targetTable不能为空,否则抛空指针异常
  19. }
  20. ***********
  21. 内部类:DbMapping
  22. public static class DbMapping implements AdapterMapping {
  23. private boolean mirrorDb = false; // 是否镜像库
  24. private String database; // 数据库名或schema名
  25. private String table; // 表名
  26. private Map<String, String> targetPk = new LinkedHashMap<>(); // 目标表主键字段
  27. private boolean mapAll = false; // 映射所有字段
  28. private String targetDb; // 目标库名
  29. private String targetTable; // 目标表名
  30. private Map<String, String> targetColumns; // 目标表字段映射
  31. private boolean caseInsensitive = false; // 目标表不区分大小写,默认是否
  32. private String etlCondition; // etl条件sql
  33. private int readBatch = 5000;
  34. private int commitBatch = 5000; // etl等批量提交大小
  35. private Map<String, String> allMapColumns;

*********************

示例

创建 mysql实例:源数据库

  1. docker run -it -d --net fixed --ip 172.18.0.2 -p 3306:3306 --privileged=true \
  2. --name mysql -e MYSQL_ROOT_PASSWORD=123456 mysql
  3. # 创建用户、授权
  4. mysql> create user canal identified with mysql_native_password by "123456";
  5. Query OK, 0 rows affected (0.02 sec)
  6. mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  7. Query OK, 0 rows affected (0.00 sec)
  8. mysql> flush privileges;
  9. Query OK, 0 rows affected (0.00 sec)

创建 canal实例

  1. docker run -it -d --net fixed --ip 172.18.0.3 \
  2. -p 11111:11111 --name canal-server \
  3. -e canal.instance.master.address=172.18.0.2:3306 \
  4. -e canal.instance.dbUsername=canal \
  5. -e canal.instance.dbPassword=123456 canal/canal-server

创建 mysql实例:目标数据库

  1. docker run -it -d --net fixed --ip 172.18.0.5 -p 3307:3306 --privileged=true \
  2. --name mysql2 -e MYSQL_ROOT_PASSWORD=123456 mysql
  3. # 创建数据库
  4. mysql> create database lihu;
  5. Query OK, 1 row affected (0.00 sec)
  6. mysql> use lihu;
  7. Database changed
  8. mysql> create table user(id int not null auto_increment primary key,custom_name varchar(20) not null, custom_age int not null);
  9. Query OK, 0 rows affected (0.02 sec)

创建 canal adapter

  1. docker run -it -d --net fixed --ip 172.18.0.4 -p 8081:8081 \
  2. -v /usr/canal/canal-adapter/rdb/application.yml:/opt/canal-adapter/conf/application.yml \
  3. -v /usr/canal/canal-adapter/rdb/adapter:/opt/canal-adapter/conf/rdb \
  4. --name canal-adapter slpcat/canal-adapter:v1.1.5-jdk8
  5. *****************
  6. application.yml
  7. server:
  8. port: 8081
  9. spring:
  10. jackson:
  11. date-format: yyyy-MM-dd HH:mm:ss
  12. time-zone: GMT+8
  13. default-property-inclusion: non_null
  14. canal.conf:
  15. mode: tcp #tcp kafka rocketMQ rabbitMQ
  16. flatMessage: true
  17. zookeeperHosts:
  18. syncBatchSize: 1000
  19. retries: 0
  20. timeout:
  21. accessKey:
  22. secretKey:
  23. consumerProperties:
  24. # canal tcp consumer
  25. canal.tcp.server.host: 172.18.0.3:11111
  26. canal.tcp.zookeeper.hosts:
  27. canal.tcp.batch.size: 500
  28. canal.tcp.username:
  29. canal.tcp.password:
  30. # kafka consumer
  31. kafka.bootstrap.servers: 127.0.0.1:9092
  32. kafka.enable.auto.commit: false
  33. kafka.auto.commit.interval.ms: 1000
  34. kafka.auto.offset.reset: latest
  35. kafka.request.timeout.ms: 40000
  36. kafka.session.timeout.ms: 30000
  37. kafka.isolation.level: read_committed
  38. kafka.max.poll.records: 1000
  39. # rocketMQ consumer
  40. rocketmq.namespace:
  41. rocketmq.namesrv.addr: 127.0.0.1:9876
  42. rocketmq.batch.size: 1000
  43. rocketmq.enable.message.trace: false
  44. rocketmq.customized.trace.topic:
  45. rocketmq.access.channel:
  46. rocketmq.subscribe.filter:
  47. # rabbitMQ consumer
  48. rabbitmq.host:
  49. rabbitmq.virtual.host:
  50. rabbitmq.username:
  51. rabbitmq.password:
  52. rabbitmq.resource.ownerId:
  53. srcDataSources:
  54. defaultDS:
  55. url: jdbc:mysql://172.18.0.2:3306/lihu?useUnicode=true
  56. username: root
  57. password: 123456
  58. canalAdapters:
  59. - instance: example # canal instance Name or mq topic name
  60. groups:
  61. - groupId: g1
  62. outerAdapters:
  63. # - name: logger
  64. - name: rdb
  65. key: mysql1
  66. properties:
  67. jdbc.driverClassName: com.mysql.jdbc.Driver
  68. jdbc.url: jdbc:mysql://172.18.0.5:3306/lihu?useUnicode=true
  69. jdbc.username: root
  70. jdbc.password: 123456
  71. *****************
  72. mytest_user.yml:适配器配置文件
  73. dataSourceKey: defaultDS
  74. destination: example
  75. groupId: g1
  76. outerAdapterKey: mysql1
  77. concurrent: true
  78. dbMapping:
  79. database: lihu
  80. table: user
  81. targetTable: lihu.user
  82. targetPk:
  83. id: id
  84. # mapAll: true
  85. targetColumns:
  86. id: id
  87. custom_name: name
  88. custom_age: age
  89. # etlCondition: "where c_time>={}"
  90. commitBatch: 3000 # 批量提交的大小

******************

使用测试

mysql:源数据库插入数据

  1. mysql> insert into user(id,name,age) values(3,'lihu',20),(4,'lihu2',21);
  2. Query OK, 2 rows affected (0.10 sec)
  3. Records: 2 Duplicates: 0 Warnings: 0

canal-adapter 日志

  1. # rdb创建成功
  2. 2021-07-07 23:19:42.700 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: rdb succeed
  3. 2021-07-07 23:19:42.770 [main] INFO c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /opt/canal-adapter/plugin
  4. 2021-07-07 23:19:42.886 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed
  5. 2021-07-07 23:19:42.886 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
  6. 2021-07-07 23:19:42.899 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
  7. 2021-07-07 23:19:42.925 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
  8. 2021-07-07 23:19:43.036 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
  9. 2021-07-07 23:19:43.201 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (http) with context path ''
  10. 2021-07-07 23:19:43.235 [main] INFO c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 21.316 seconds (JVM running for 23.705)
  11. 2021-07-07 23:19:43.332 [Thread-4] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============
  12. Wed Jul 07 23:20:13 GMT+08:00 2021 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
  13. # 插入的行记录:{"id":3,"name":"lihu","age":20}
  14. 2021-07-07 23:20:14.284 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":3,"name":"lihu","age":20},"database":"lihu","destination":"example","old":null,"table":"user","type":"INSERT"}
  15. # 插入的行记录:{"id":4,"name":"lihu2","age":21}
  16. 2021-07-07 23:20:14.285 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":4,"name":"lihu2","age":21},"database":"lihu","destination":"example","old":null,"table":"user","type":"INSERT"}

目标数据库 mysql2

  1. mysql> select * from user;
  2. +----+-------------+------------+
  3. | id | custom_name | custom_age |
  4. +----+-------------+------------+
  5. | 3 | lihu | 20 |
  6. | 4 | lihu2 | 21 |
  7. +----+-------------+------------+
  8. 2 rows in set (0.00 sec)

源数据库的新增数据同步成功

发表评论

表情:
评论列表 (有 0 条评论,236人围观)

还没有评论,来说两句吧...

相关阅读

    相关 canal简介

    1.简介 canal是纯Java开发的,基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql。 原理相对比较简单: 1. 1.      canal