canal 环境搭建(docker)

墨蓝 2022-10-09 04:52 514阅读 0赞

canal 环境搭建(docker)

官网:https://github.com/alibaba/canal/wiki/Docker-QuickStart

docker 仓库:https://hub.docker.com/r/canal/canal-server/tags

配置文件:https://github.com/alibaba/canal/wiki/AdminGuide

*********************

架构设计

单机:canal server、canal client单节点直连

  1. ![2021062515525815.png][]

客户端连接

  1. public class CanalConnectors {
  2. public static CanalConnector newSingleConnector(SocketAddress address, String destination, String username, String password) {
  3. SimpleCanalConnector canalConnector = new SimpleCanalConnector(address, username, password, destination);
  4. canalConnector.setSoTimeout(60000);
  5. canalConnector.setIdleTimeout(3600000);
  6. return canalConnector;
  7. }

canal server高可用:canal server集群部署、canal client直连canal server服务列表

  1. ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzkzMTYyNQ_size_16_color_FFFFFF_t_70][]

canal server:集群部署,将节点信息存储在在zookeeper中,可实现高可用

canal client:通过canal server静态服务列表建立连接

  1. public class CanalConnectors {
  2. public static CanalConnector newClusterConnector(List<? extends SocketAddress> addresses, String destination, String username, String password) {
  3. ClusterCanalConnector canalConnector = new ClusterCanalConnector(username, password, destination, new SimpleNodeAccessStrategy(addresses));
  4. canalConnector.setSoTimeout(60000);
  5. canalConnector.setIdleTimeout(3600000);
  6. return canalConnector;
  7. }

canal server、canal client高可用:canal server集群部署、canal client开启多个

  1. ![20210625160946400.png][]

canal server:集群部署,节点信息存储在zookeeper中,可实现高可用

canal client:canal client注册到zookeeper中,从zookeeper中获取canal server信息

  1. public class CanalConnectors {
  2. public static CanalConnector newClusterConnector(String zkServers, String destination, String username, String password) {
  3. ClusterCanalConnector canalConnector = new ClusterCanalConnector(username, password, destination, new ClusterNodeAccessStrategy(destination, ZkClientx.getZkClient(zkServers)));
  4. canalConnector.setSoTimeout(60000);
  5. canalConnector.setIdleTimeout(3600000);
  6. return canalConnector;
  7. }

*********************

canal server 配置

canal配置加载方式:ManagerCanalInstanceGenerator、SpringCanalInstanceGenerator

  1. ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzkzMTYyNQ_size_16_color_FFFFFF_t_70 1][]

ManagerCanalInstanceGenerator:可视化界面配置canal参数

SpringCanalnstanceGenerator:本地文件配置(xxx-instance.xml、canal.properties、instance.properties)

*********************

xxx-instance.xml

创建CanalIstanceWithSpring实例,可选文件如下

memory-instance.xml:元数据在内存存储

file-instane.xml:**元数据持久化到文件,**log parser position优先在内存中查找,查找不到则到文件中查找

default-instance.xml:元数据保存到zookeeper,log parser position优先在内存中查找,查找不到则到zookeeper中查找

group-instance.xml:将多个parser组合成一个parser,可用于将分库分表后的数据导入同一地方存储分析,元数据默认保存在内存中

  1. public class CanalInstanceWithSpring extends AbstractCanalInstance {
  2. private static final Logger logger = LoggerFactory.getLogger(CanalInstanceWithSpring.class);
  3. public CanalInstanceWithSpring() {
  4. }
  5. **************
  6. AbstractCanalInstance
  7. public class AbstractCanalInstance extends AbstractCanalLifeCycle implements CanalInstance {
  8. private static final Logger logger = LoggerFactory.getLogger(AbstractCanalInstance.class);
  9. protected Long canalId; //canal标识
  10. protected String destination; //instance实例的名称,一个canal下可有多个instance
  11. protected CanalEventStore<Event> eventStore; //eventStore,存储拉取的数据
  12. protected CanalEventParser eventParser; //解析数据源
  13. protected CanalEventSink<List<Entry>> eventSink; //处理转换数据
  14. protected CanalMetaManager metaManager; //元数据管理器,parser log position、cursor position等数据
  15. protected CanalAlarmHandler alarmHandler; //报警处理类
  16. protected CanalMQConfig mqConfig; //mq配置,支持rocketmq、kafka、rabbitmq
  17. public AbstractCanalInstance() {
  18. }

*********************

properties 文件

canal.properties:配置canal server上instance的公共属性

instance.properties:配置instance的属性,若有同名配置,instance.properties优先级更高

canal.properties

  1. #################################################
  2. ######### common argument #############
  3. #################################################
  4. canal.id = 1 #canal server的唯一标识,默认为1
  5. # canal server用户名、密码,canal user、password如果不设置,则不开启密码功能
  6. canal.user = canal
  7. canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
  8. canal.ip = #canal server绑定的ip地址
  9. canal.port = 11111 #canal server tcp连接端口,供客户端使用,默认为11111
  10. canal.metrics.pull.port = 11112 #canal server指标数据端口,默认为11112
  11. canal.register.ip = #canal server注册到zookeeper中的ip信息
  12. canal.zkServers = #canal server连接的zookeeper集群,如:10.20.144.22:2181,10.20.144.51:2181
  13. canal.zookeeper.flush.period = 1000 #数据持久化到zookeeper的周期,默认为1000毫秒
  14. # canal admin配置
  15. canal.admin.manager = 127.0.0.1:8089
  16. canal.admin.port = 11110
  17. canal.admin.user = admin
  18. canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
  19. # admin自动注册
  20. #canal.admin.register.auto = true
  21. #canal.admin.register.cluster =
  22. #canal.admin.register.name =
  23. canal.withoutNetty = false
  24. # canal服务端模式,可选值:tcp, kafka, rocketMQ, rabbitMQ
  25. canal.serverMode = tcp
  26. # flush meta cursor/parse position to file
  27. # 将元数据cursor、parse position保存到文件
  28. canal.file.data.dir = ${canal.conf.dir}
  29. canal.file.flush.period = 1000
  30. # eventStore内存空间设置
  31. canal.instance.memory.batch.mode = MEMSIZE #ITEMSIZE:buffer.size表示记录数量
  32. #MEMSIZE(默认值):buffer.size * buffer.memunit限制存储空间大小
  33. canal.instance.memory.buffer.size = 16384 #记录数或者记录大小
  34. canal.instance.memory.buffer.memunit = 1024 #存储单位,默认为1Kb
  35. canal.instance.memory.rawEntry = true #存储原始字符串,不做序列化处理
  36. ## 心跳检查mysql是否可用
  37. canal.instance.detecting.enable = false #是否开启心跳检查,默认为false
  38. #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
  39. #心跳检查sql
  40. canal.instance.detecting.interval.time = 3 #心跳检查时间间隔,默认为3
  41. canal.instance.detecting.retry.threshold = 3 #心跳检查重试次数,默认为3
  42. canal.instance.detecting.heartbeatHaEnable = false #心跳检查mysql不可用时,是否自动切换到内分的数据库
  43. #默认为false
  44. # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
  45. # 支持的最大事务长度,超过该长度后,可能会切割存储到eventStore中,无法保证事务的完整可见性
  46. canal.instance.transaction.size = 1024
  47. # mysql fallback connected to new master should fallback times
  48. # canal server切换新的mysql后,需要往前查找binlog的事件,默认为60s
  49. # mysql 主从库同步存在延时,需要往前查找,保证数据不丢失
  50. canal.instance.fallbackIntervalInSeconds = 60
  51. # 网络配置
  52. canal.instance.network.receiveBufferSize = 16384 #canal server接受数据的最大缓存(从mysql解析的数据)
  53. canal.instance.network.sendBufferSize = 16384 #canal server发送数据的最大缓存(发送给canal cilent的数据)
  54. canal.instance.network.soTimeout = 30 #canal server读取数据超时时间,默认为30s
  55. # binlog过滤配置(binlog filter config)
  56. canal.instance.filter.druid.ddl = true #是否使用druid解析ddl语句,来获取数据库名、表名
  57. canal.instance.filter.query.dcl = false #是否忽略dcl语句(grant、commit、rollback)
  58. canal.instance.filter.query.dml = false #是否忽略dml语句(insert、delete、update等)
  59. canal.instance.filter.query.ddl = false #是否忽略ddl语句(create table、create view等)
  60. canal.instance.filter.table.error = false #是否忽略binlog表结构获取失败的异常
  61. #主要解决回溯binlog时,对应的表已被删除,
  62. #或者表结构和binlog不一致的情况
  63. canal.instance.filter.rows = false #是否忽略dml导致的数据变更,默认为false
  64. #主要针对用户只订阅ddl、dcl操作
  65. canal.instance.filter.transaction.entry = false #是否忽略事务头,事务尾,默认为false
  66. canal.instance.filter.dml.insert = false #是否忽略dml insert操作,默认为false
  67. canal.instance.filter.dml.update = false #是否忽略dml update操作,默认为false
  68. canal.instance.filter.dml.delete = false #是否忽略dml delete操作,默认为false
  69. # binlog format/image检查(binlog format/image check)
  70. canal.instance.binlog.format = ROW,STATEMENT,MIXED #默认支持ROW,STATEMENT、MIXED
  71. canal.instance.binlog.image = FULL,MINIMAL,NOBLOB #默认支持FULL、MINIMAL、NOBLOB
  72. # binlog ddl isolation
  73. canal.instance.get.ddl.isolation = false #ddl语句是否使用单独的batch返回,默认为false
  74. #如果和其他ddl/dml在同一batch返回,并发处理时前后顺序不能保证,可能会改变表结构
  75. # 并行配置
  76. canal.instance.parser.parallel = true #eventParser是否并行解析binlog,默认为true
  77. canal.instance.parser.parallelThreadSize = 16 #并行处理线程数,默认为60%可用线程数
  78. canal.instance.parser.parallelBufferSize = 256 #并行解析的ringBuffer队列数,需为2的指数
  79. # table meta tsdb info
  80. # tableMetaTSDB:处理ddl语句造成的表结构变更
  81. canal.instance.tsdb.enable = true #是否开启tablemeta tsdb
  82. # 全局tsdb配置文件
  83. canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
  84. canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
  85. canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
  86. #主要针对h2-tsdb.xml时对应h2文件的存放目录,默认为conf/xx/h2.mv.db
  87. canal.instance.tsdb.url =
  88. jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
  89. #jdbc url的配置,h2的地址为默认值,如果是mysql需要自行定义
  90. canal.instance.tsdb.dbUsername = canal #用户名
  91. canal.instance.tsdb.dbPassword = canal #密码
  92. canal.instance.tsdb.snapshot.interval = 24 #快照间隔,默认为24小时
  93. canal.instance.tsdb.snapshot.expire = 360 #快照失效时间,默认为360小时(15天)
  94. #################################################
  95. ######### destinations #############
  96. #################################################
  97. canal.destinations = example #当前服务器上部署的instance列表
  98. canal.conf.dir = ../conf #conf所在目录
  99. # auto scan instance dir add/remove and start/stop instance
  100. canal.auto.scan = true #是否开启instance自动扫描,默认为true
  101. #新增instance目录,自动加载配置,lazy=true时,自动启动
  102. #删除instance目录时,卸载对应配置,如果已经启动,则停止对应instance
  103. canal.auto.scan.interval = 5 #自动扫描的时间间隔,默认为5s
  104. # set this value to 'true' means that when binlog pos not found, skip to latest.
  105. # WARN: pls keep 'false' in production env, or if you know what you want.
  106. # true:如果没有找到binlog position,从最新点位开启解析
  107. # 生产环境中建议设置为false
  108. canal.auto.reset.latest.pos.mode = false
  109. canal.instance.global.mode = spring #全局配置加载方式,默认为spring(使用本地文件加载)
  110. canal.instance.global.lazy = false #全局lazy模式,默认为false
  111. canal.instance.global.manager.address = ${canal.admin.manager} #全局manager配置地址,可视化配置canal server时使用
  112. # 全局spring.xml配置文件位置,创建CanalInstanceWithSpring对象
  113. canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
  114. canal.instance.global.spring.xml = classpath:spring/file-instance.xml
  115. canal.instance.global.spring.xml = classpath:spring/default-instance.xml
  116. ##################################################
  117. ######### MQ Properties #############
  118. ##################################################
  119. # aliyun ak/sk , support rds/mq
  120. # 阿里云mq配置
  121. canal.aliyun.accessKey =
  122. canal.aliyun.secretKey =
  123. canal.aliyun.uid=
  124. canal.mq.flatMessage = true
  125. canal.mq.canalBatchSize = 50
  126. canal.mq.canalGetTimeout = 100
  127. # Set this value to "cloud", if you want open message trace feature in aliyun.
  128. canal.mq.accessChannel = local
  129. canal.mq.database.hash = true
  130. canal.mq.send.thread.size = 30
  131. canal.mq.build.thread.size = 8
  132. ##################################################
  133. ######### Kafka #############
  134. ##################################################
  135. # kafka 配置
  136. kafka.bootstrap.servers = 127.0.0.1:9092
  137. kafka.acks = all
  138. kafka.compression.type = none
  139. kafka.batch.size = 16384
  140. kafka.linger.ms = 1
  141. kafka.max.request.size = 1048576
  142. kafka.buffer.memory = 33554432
  143. kafka.max.in.flight.requests.per.connection = 1
  144. kafka.retries = 0
  145. kafka.kerberos.enable = false
  146. kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
  147. kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
  148. ##################################################
  149. ######### RocketMQ #############
  150. ##################################################
  151. # rocketMQ 配置
  152. rocketmq.producer.group = test
  153. rocketmq.enable.message.trace = false
  154. rocketmq.customized.trace.topic =
  155. rocketmq.namespace =
  156. rocketmq.namesrv.addr = 127.0.0.1:9876
  157. rocketmq.retry.times.when.send.failed = 0
  158. rocketmq.vip.channel.enabled = false
  159. rocketmq.tag =
  160. ##################################################
  161. ######### RabbitMQ #############
  162. ##################################################
  163. # rabbitMQ 配置
  164. rabbitmq.host =
  165. rabbitmq.virtual.host =
  166. rabbitmq.exchange =
  167. rabbitmq.username =
  168. rabbitmq.password =
  169. rabbitmq.deliveryMode =

instance.properties

  1. #################################################
  2. # canal.instance.mysql.slaveId=0 #canal server id,需保证当前mysql集群中该值唯一
  3. canal.instance.gtidon=false #是否使用mysql gtid订阅模式
  4. # eventParser 解析的master binlog信息
  5. canal.instance.master.address=127.0.0.1:3306 #主机地址
  6. canal.instance.master.journal.name= #binlog文件
  7. canal.instance.master.position= #binlog起始解析的偏移量
  8. canal.instance.master.timestamp= #binlog起始解析的时间戳
  9. canal.instance.master.gtid= #起始解析的gtid位点
  10. # mysql备机信息
  11. canal.instance.standby.address =
  12. canal.instance.standby.journal.name =
  13. canal.instance.standby.position =
  14. canal.instance.standby.timestamp =
  15. canal.instance.standby.gtid=
  16. # rds oss binlog
  17. # 阿里云binlog在18小时候会自动清理上传到阿里云,如果不需要下载oss上的binlog,可不配置
  18. canal.instance.rds.accesskey= #阿里云账号的ackKey信息
  19. canal.instance.rds.secretkey= #阿里云账号的secretKey信息
  20. canal.instance.rds.instanceId= #阿里云账号的instanceId信息
  21. # table meta tsdb info,可覆盖canal properties上的配置
  22. canal.instance.tsdb.enable=true
  23. canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
  24. canal.instance.tsdb.dbUsername=canal
  25. canal.instance.tsdb.dbPassword=canal
  26. # mysql数据库用户名、密码、编码集
  27. canal.instance.dbUsername=canal
  28. canal.instance.dbPassword=canal
  29. canal.instance.connectionCharset = UTF-8
  30. # enable druid Decrypt database password
  31. canal.instance.enableDruid=false
  32. #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
  33. canal.instance.filter.regex=.*\\..* #需要解析的表,默认.*\\..*,解析所有表
  34. canal.instance.filter.black.regex=mysql\\.slave_.* #不需要解析的表
  35. # 示例
  36. .*\\..*:所有表
  37. database\\..*:数据库database下的所有表
  38. database\\.test.*:数据库databasetest开头的表
  39. database\\.test:数据库database小的表test
  40. database\\.test.*,database\\.t2:数据库databasetest开头的表,database下的t2
  41. # 解析的字段
  42. # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
  43. canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
  44. # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
  45. canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
  46. # mq配置
  47. canal.mq.topic=example #静态topic:数据发送到固定topic
  48. canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* #动态topic:根据数据库名、表名设置
  49. 说明:canal.mq.dynamicTopic格式为schemaschema.tableName
  50. topicName:schematopic:schema.tableName(匹配的数据库、表发送到指定的topicName上)
  51. # 示例
  52. mytest1.user:发送到mytest1_user topic
  53. mytest2\\..*:发送到mytest2_tableName topic
  54. testtest库中的所有数据都发送到test topic
  55. topicName:test\\..*:test下的所有表数据都发送到topicName
  56. test,test1\\.*:数据库test1中的表发送到test1_tableName topic
  57. 数据库test中的所有表发送到test topic
  58. 其余所有数据发送到canal.mq.topic指定的topic
  59. # 发送分区设置
  60. canal.mq.partition=0 #发送到固定分区0
  61. # hash partition config
  62. canal.mq.partitionsNum=3 #分区数
  63. canal.mq.partitionHash=test.table:id^name,.*\\..* #根据数据库名、表名选择发送的分区
  64. # 为不同的topic动态设置partition num
  65. canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 #动态分区数,
  66. #test.*:4 test开头topic分区数为4,
  67. mycanal:6 topic mycanal分区数为6
  68. # 示例
  69. test.table:不设置任何字段,hash字段默认为表名
  70. test.table:idtest.table发送的分区hash字段为id
  71. test.table:id^nametest.table发送的分区使用idname hash计算得到
  72. .*\\..*:id:所有表的hash字段都为id
  73. .*\\..*:$pk$:所有表的hash字段为表的主键(自动搜索主键)
  74. partitionHash为空:发送到0分区
  75. test.table,test.table2:idtest.table根据tableName hash
  76. test.table2根据id hash
  77. 其余全部发送到对应topic0分区

*********************

tsdb 相关配置

  1. ![20210627161332846.png][]

sql:创建存储tsdb信息的表

  1. CREATE TABLE IF NOT EXISTS `meta_snapshot` (
  2. `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  3. `gmt_create` datetime NOT NULL COMMENT '创建时间',
  4. `gmt_modified` datetime NOT NULL COMMENT '修改时间',
  5. `destination` varchar(128) DEFAULT NULL COMMENT '通道名称',
  6. `binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog文件名',
  7. `binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog偏移量',
  8. `binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlog节点id',
  9. `binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog应用的时间戳',
  10. `data` longtext DEFAULT NULL COMMENT '表结构数据',
  11. `extra` text DEFAULT NULL COMMENT '额外的扩展信息',
  12. PRIMARY KEY (`id`),
  13. UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`),
  14. KEY `destination` (`destination`),
  15. KEY `destination_timestamp` (`destination`,`binlog_timestamp`),
  16. KEY `gmt_modified` (`gmt_modified`)
  17. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='表结构记录表快照表';
  18. CREATE TABLE IF NOT EXISTS `meta_history` (
  19. `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  20. `gmt_create` datetime NOT NULL COMMENT '创建时间',
  21. `gmt_modified` datetime NOT NULL COMMENT '修改时间',
  22. `destination` varchar(128) DEFAULT NULL COMMENT '通道名称',
  23. `binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog文件名',
  24. `binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog偏移量',
  25. `binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlog节点id',
  26. `binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog应用的时间戳',
  27. `use_schema` varchar(1024) DEFAULT NULL COMMENT '执行sql时对应的schema',
  28. `sql_schema` varchar(1024) DEFAULT NULL COMMENT '对应的schema',
  29. `sql_table` varchar(1024) DEFAULT NULL COMMENT '对应的table',
  30. `sql_text` longtext DEFAULT NULL COMMENT '执行的sql',
  31. `sql_type` varchar(256) DEFAULT NULL COMMENT 'sql类型',
  32. `extra` text DEFAULT NULL COMMENT '额外的扩展信息',
  33. PRIMARY KEY (`id`),
  34. UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`),
  35. KEY `destination` (`destination`),
  36. KEY `destination_timestamp` (`destination`,`binlog_timestamp`),
  37. KEY `gmt_modified` (`gmt_modified`)
  38. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='表结构变化明细表';

h2-tsdb.xml:使用h2存储tsdb信息

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
  4. xmlns:aop="http://www.springframework.org/schema/aop" xmlns:lang="http://www.springframework.org/schema/lang"
  5. xmlns:context="http://www.springframework.org/schema/context"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  7. http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
  8. http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-2.0.xsd
  9. http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
  10. http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"
  11. default-autowire="byName">
  12. <!-- properties -->
  13. <bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
  14. <property name="ignoreResourceNotFound" value="true" />
  15. <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
  16. <property name="locationNames">
  17. <list>
  18. <value>classpath:canal.properties</value>
  19. <value>classpath:${canal.instance.destination:}/instance.properties</value>
  20. </list>
  21. </property>
  22. </bean>
  23. <!-- 基于db的实现 -->
  24. <bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta" destroy-method="destory">
  25. <property name="metaHistoryDAO" ref="metaHistoryDAO"/>
  26. <property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
  27. </bean>
  28. <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
  29. <property name="driverClassName" value="org.h2.Driver" />
  30. <property name="url" value="${canal.instance.tsdb.url:}" />
  31. <property name="username" value="${canal.instance.tsdb.dbUsername:}" />
  32. <property name="password" value="${canal.instance.tsdb.dbPassword:}" />
  33. <property name="maxActive" value="30" />
  34. <property name="initialSize" value="0" />
  35. <property name="minIdle" value="1" />
  36. <property name="maxWait" value="10000" />
  37. <property name="timeBetweenEvictionRunsMillis" value="60000" />
  38. <property name="minEvictableIdleTimeMillis" value="300000" />
  39. <property name="testWhileIdle" value="true" />
  40. <property name="testOnBorrow" value="false" />
  41. <property name="testOnReturn" value="false" />
  42. <property name="useUnfairLock" value="true" />
  43. <property name="validationQuery" value="SELECT 1" />
  44. </bean>
  45. <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
  46. <property name="dataSource" ref="dataSource"/>
  47. <property name="configLocation" value="classpath:spring/tsdb/sql-map/sqlmap-config.xml"/>
  48. </bean>
  49. <bean id="metaHistoryDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDAO">
  50. <property name="sqlSessionFactory" ref="sqlSessionFactory"/>
  51. </bean>
  52. <bean id="metaSnapshotDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDAO">
  53. <property name="sqlSessionFactory" ref="sqlSessionFactory"/>
  54. </bean>
  55. </beans>

mysql-tsdb.xml:使用mysql存储tsdb信息

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
  4. xmlns:aop="http://www.springframework.org/schema/aop" xmlns:lang="http://www.springframework.org/schema/lang"
  5. xmlns:context="http://www.springframework.org/schema/context"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  7. http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
  8. http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-2.0.xsd
  9. http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd
  10. http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"
  11. default-autowire="byName">
  12. <!-- properties -->
  13. <bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
  14. <property name="ignoreResourceNotFound" value="true" />
  15. <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
  16. <property name="locationNames">
  17. <list>
  18. <value>classpath:canal.properties</value>
  19. <value>classpath:${canal.instance.destination:}/instance.properties</value>
  20. </list>
  21. </property>
  22. </bean>
  23. <!-- 基于db的实现 -->
  24. <bean id="tableMetaTSDB" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DatabaseTableMeta" destroy-method="destory">
  25. <property name="metaHistoryDAO" ref="metaHistoryDAO"/>
  26. <property name="metaSnapshotDAO" ref="metaSnapshotDAO"/>
  27. </bean>
  28. <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
  29. <property name="driverClassName" value="com.mysql.jdbc.Driver" />
  30. <property name="url" value="${canal.instance.tsdb.url:}" />
  31. <property name="username" value="${canal.instance.tsdb.dbUsername:}" />
  32. <property name="password" value="${canal.instance.tsdb.dbPassword:}" />
  33. <property name="maxActive" value="30" />
  34. <property name="initialSize" value="0" />
  35. <property name="minIdle" value="1" />
  36. <property name="maxWait" value="10000" />
  37. <property name="timeBetweenEvictionRunsMillis" value="60000" />
  38. <property name="minEvictableIdleTimeMillis" value="300000" />
  39. <property name="validationQuery" value="SELECT 1" />
  40. <property name="exceptionSorterClassName" value="com.alibaba.druid.pool.vendor.MySqlExceptionSorter" />
  41. <property name="validConnectionCheckerClassName" value="com.alibaba.druid.pool.vendor.MySqlValidConnectionChecker" />
  42. <property name="testWhileIdle" value="true" />
  43. <property name="testOnBorrow" value="false" />
  44. <property name="testOnReturn" value="false" />
  45. <property name="useUnfairLock" value="true" />
  46. </bean>
  47. <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
  48. <property name="dataSource" ref="dataSource"/>
  49. <property name="configLocation" value="classpath:spring/tsdb/sql-map/sqlmap-config.xml"/>
  50. </bean>
  51. <bean id="metaHistoryDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaHistoryDAO">
  52. <property name="sqlSessionFactory" ref="sqlSessionFactory"/>
  53. </bean>
  54. <bean id="metaSnapshotDAO" class="com.alibaba.otter.canal.parse.inbound.mysql.tsdb.dao.MetaSnapshotDAO">
  55. <property name="sqlSessionFactory" ref="sqlSessionFactory"/>
  56. </bean>
  57. </beans>

*********************

canal server 单机

创建mysql

  1. [root@centos ~]# docker run -it -d --net fixed --ip 172.18.0.2 \
  2. > -p 3306:3306 --privileged=true \
  3. > --name mysql -e MYSQL_ROOT_PASSWORD=123456 mysql
  4. # mysql8 bin_log默认开启
  5. mysql> show variables like "log_bin";
  6. +---------------+-------+
  7. | Variable_name | Value |
  8. +---------------+-------+
  9. | log_bin | ON |
  10. +---------------+-------+
  11. 1 row in set (0.00 sec)
  12. # mysql8 binlog_format默认为ROW
  13. mysql> show variables like "binlog_format";
  14. +---------------+-------+
  15. | Variable_name | Value |
  16. +---------------+-------+
  17. | binlog_format | ROW |
  18. +---------------+-------+
  19. 1 row in set (0.01 sec)

创建用户并授予权限,供canal server连接使用

  1. mysql> create user canal identified with mysql_native_password by "123456";
  2. Query OK, 0 rows affected (0.11 sec)
  3. mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  4. Query OK, 0 rows affected (0.01 sec)
  5. mysql> flush privileges;
  6. Query OK, 0 rows affected (0.00 sec)

创建canal server

  1. docker run -it -d --net fixed --ip 172.18.0.3 \
  2. -p 11111:11111 --name canal-server \
  3. -e canal.instance.master.address=172.18.0.2:3306 \
  4. -e canal.instance.dbUsername=canal \
  5. -e canal.instance.dbPassword=123456 canal/canal-server

查看canal server日志

  1. [root@centos ~]# docker logs canal-server
  2. DOCKER_DEPLOY_TYPE=VM
  3. ==> INIT /alidata/init/02init-sshd.sh
  4. ==> EXIT CODE: 0
  5. ==> INIT /alidata/init/fix-hosts.py
  6. ==> EXIT CODE: 0
  7. ==> INIT DEFAULT
  8. Generating SSH1 RSA host key: [ OK ]
  9. Starting sshd: [ OK ]
  10. Starting crond: [ OK ]
  11. ==> INIT DONE
  12. ==> RUN /home/admin/app.sh
  13. ==> START ...
  14. start canal ...
  15. start canal successful

查看instance 日志

  1. [root@centos ~]# docker exec -it canal-server bash
  2. [root@55e591394ef4 admin]# cd canal-server/logs/example
  3. [root@55e591394ef4 example]# ls
  4. example.log
  5. [root@55e591394ef4 example]# cat example.log
  6. 2021-06-29 01:03:54.889 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
  7. 2021-06-29 01:03:54.954 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
  8. 2021-06-29 01:03:54.954 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
  9. 2021-06-29 01:03:55.025 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
  10. 2021-06-29 01:03:55.378 [destination = example , address = /172.18.0.2:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
  11. 2021-06-29 01:03:55.379 [destination = example , address = /172.18.0.2:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
  12. 2021-06-29 01:03:57.678 [destination = example , address = /172.18.0.2:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000002,position=4,serverId=1,gtid=<null>,timestamp=1624899604000] cost : 2206ms , the next step is binlog dump

说明:instance成功连接上MySQL,并开始解析数据源

*********************

canal server 单机多instance

修改 canal.properties

  1. canal.destinations = example,example2

修改 example/instance.properties

  1. canal.instance.master.address=172.18.0.11:3306
  2. canal.instance.dbUsername=canal
  3. canal.instance.dbPassword=123456

修改 example2/instance.properties

  1. canal.instance.master.address=172.18.0.12:3306
  2. canal.instance.dbUsername=canal
  3. canal.instance.dbPassword=123456

创建 mysql实例

  1. docker run -it -d --net fixed --ip 172.18.0.11 -p 3306:3306 --privileged=true \
  2. --name mysql -e MYSQL_ROOT_PASSWORD=123456 mysql
  3. docker run -it -d --net fixed --ip 172.18.0.12 -p 3307:3306 --privileged=true \
  4. --name mysql2 -e MYSQL_ROOT_PASSWORD=123456 mysql
  5. # mysql、mysql2 创建用户并授权
  6. create user canal identified with mysql_native_password by "123456";
  7. GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  8. flush privileges;

创建 canal实例

  1. docker run -it -d --net fixed --ip 172.18.0.3 -p 11111:11111 --name canal-server \
  2. -v /usr/canal/single/conf:/home/admin/canal-server/conf canal/canal-server
  3. # 宿主机目录
  4. [root@centos conf]# pwd
  5. /usr/canal/single/conf
  6. [root@centos conf]# ls
  7. canal_local.properties canal.properties example example2 logback.xml metrics spring

查看canal server日志

  1. [root@centos example]# docker logs canal-server
  2. DOCKER_DEPLOY_TYPE=VM
  3. ==> INIT /alidata/init/02init-sshd.sh
  4. ==> EXIT CODE: 0
  5. ==> INIT /alidata/init/fix-hosts.py
  6. ==> EXIT CODE: 0
  7. ==> INIT DEFAULT
  8. Generating SSH1 RSA host key: [ OK ]
  9. Starting sshd: [ OK ]
  10. Starting crond: [ OK ]
  11. ==> INIT DONE
  12. ==> RUN /home/admin/app.sh
  13. ==> START ...
  14. start canal ...
  15. start canal successful
  16. ==> START SUCCESSFUL ...

canal server实例创建成功

查看 instance实例日志

  1. [root@centos example]# docker exec -it canal-server bash
  2. [root@ac38c13bce07 admin]# cd canal-server/logs
  3. [root@ac38c13bce07 logs]# pwd
  4. /home/admin/canal-server/logs
  5. [root@ac38c13bce07 logs]# ls
  6. canal example example2
  7. [root@ac38c13bce07 logs]# ls canal
  8. canal.log canal_stdout.log
  9. [root@ac38c13bce07 logs]# ls example
  10. example.log
  11. [root@ac38c13bce07 logs]# ls example2
  12. example2.log
  13. # instance日志
  14. [root@ac38c13bce07 logs]# cat example/example.log
  15. 2021-06-29 22:53:04.947 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
  16. 2021-06-29 22:53:05.089 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
  17. 2021-06-29 22:53:05.089 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
  18. 2021-06-29 22:53:05.153 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
  19. 2021-06-29 22:53:05.958 [destination = example , address = /172.18.0.11:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
  20. 2021-06-29 22:53:05.969 [destination = example , address = /172.18.0.11:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
  21. 2021-06-29 22:53:12.659 [destination = example , address = /172.18.0.11:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000002,position=4,serverId=1,gtid=<null>,timestamp=1624978095000] cost : 6604ms , the next step is binlog dump
  22. # instance2 日志
  23. [root@ac38c13bce07 logs]# cat example2/example2.log
  24. 2021-06-29 22:53:06.677 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example2
  25. 2021-06-29 22:53:06.717 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
  26. 2021-06-29 22:53:06.717 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
  27. 2021-06-29 22:53:06.735 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
  28. 2021-06-29 22:53:06.918 [destination = example2 , address = /172.18.0.12:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
  29. 2021-06-29 22:53:06.934 [destination = example2 , address = /172.18.0.12:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
  30. 2021-06-29 22:53:12.660 [destination = example2 , address = /172.18.0.12:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000002,position=4,serverId=1,gtid=<null>,timestamp=1624978126000] cost : 5665ms , the next step is binlog dump

instance、instance2创建成功

*********************

canal server 单机 mysql存储tsdb

创建 mysql、mysql2实例

  1. docker run -it -d --net fixed --ip 172.18.0.21 -p 3306:3306 --privileged=true \
  2. --name mysql -e MYSQL_ROOT_PASSWORD=123456 mysql
  3. docker run -it -d --net fixed --ip 172.18.0.20 -p 3307:3306 --privileged=true \
  4. --name mysql2 -e MYSQL_ROOT_PASSWORD=123456 mysql
  5. *************
  6. mysql:源数据
  7. # 创建用户,添加权限
  8. mysql> create user canal identified with mysql_native_password by "123456";
  9. Query OK, 0 rows affected (0.01 sec)
  10. mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  11. Query OK, 0 rows affected (0.00 sec)
  12. mysql> flush privileges;
  13. Query OK, 0 rows affected (0.01 sec)
  14. *************
  15. mysql2:存储tsdb
  16. # 创建用户,添加权限
  17. mysql> create user canal identified with mysql_native_password by "123456";
  18. Query OK, 0 rows affected (0.00 sec)
  19. mysql> GRANT ALL ON *.* TO 'canal'@'%';
  20. Query OK, 0 rows affected (0.01 sec)
  21. mysql> flush privileges;
  22. Query OK, 0 rows affected (0.00 sec)
  23. # 创建数据库、表
  24. mysql> create database example;
  25. Query OK, 1 row affected (0.00 sec)
  26. mysql> use example;
  27. Database changed
  28. mysql> CREATE TABLE IF NOT EXISTS `meta_snapshot` (
  29. -> `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '',
  30. -> `gmt_create` datetime NOT NULL COMMENT '',
  31. -> `gmt_modified` datetime NOT NULL COMMENT '',
  32. -> `destination` varchar(128) DEFAULT NULL COMMENT '',
  33. -> `binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog',
  34. -> `binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog',
  35. -> `binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlogid',
  36. -> `binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog',
  37. -> `data` longtext DEFAULT NULL COMMENT '',
  38. -> `extra` text DEFAULT NULL COMMENT '',
  39. -> PRIMARY KEY (`id`),
  40. -> UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`),
  41. -> KEY `destination` (`destination`),
  42. -> KEY `destination_timestamp` (`destination`,`binlog_timestamp`),
  43. -> KEY `gmt_modified` (`gmt_modified`)
  44. -> ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='';
  45. 改时间',
  46. `destination` varchar(128) DEFAULT NULL COMMENT '通道名称',
  47. `binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog文件名',
  48. `binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog偏移量',
  49. `binlog_master_id` varchar(64) DEFAULT Query OK, 0 rows affected, 3 warnings (0.04 sec)
  50. mysql>
  51. mysql> CREATE TABLE IF NOT EXISTS `meta_history` (
  52. -> `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '',
  53. -> `gmt_create` datetime NOT NULL COMMENT '',
  54. -> `gmt_modified` datetime NOT NULL COMMENT '',
  55. -> `destination` varchar(128) DEFAULT NULL COMMENT '',
  56. -> `binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog',
  57. -> `binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog',
  58. -> `binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlogid',
  59. -> `binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog',
  60. -> `use_schema` varchar(1024) DEFAULT NULL COMMENT 'sqlschema',
  61. -> `sql_schema` varchar(1024) DEFAULT NULL COMMENT 'schema',
  62. -> `sql_table` varchar(1024) DEFAULT NULL COMMENT 'table',
  63. -> `sql_text` longtext DEFAULT NULL COMMENT 'sql',
  64. -> `sql_type` varchar(256) DEFAULT NULL COMMENT 'sql',
  65. -> `extra` text DEFAULT NULL COMMENT '',
  66. -> PRIMARY KEY (`id`),
  67. -> UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`),
  68. -> KEY `destination` (`destination`),
  69. -> KEY `destination_timestamp` (`destination`,`binlog_timestamp`),
  70. -> KEY `gmt_modified` (`gmt_modified`)
  71. -> ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='';
  72. Query OK, 0 rows affected, 3 warnings (0.02 sec)

修改配置文件

  1. # canal.properties
  2. canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
  3. # example/instance.properties
  4. canal.instance.master.address=172.18.0.11:3306
  5. canal.instance.dbUsername=canal
  6. canal.instance.dbPassword=123456
  7. canal.instance.tsdb.enable=true
  8. canal.instance.tsdb.url=jdbc:mysql://172.18.0.20:3306/example
  9. canal.instance.tsdb.dbUsername=canal
  10. canal.instance.tsdb.dbPassword=123456

创建 canal server实例

  1. docker run -it -d --net fixed --ip 172.18.0.23 -p 11111:11111 --name canal-server \
  2. -v /usr/canal/single/tsdb/conf:/home/admin/canal-server/conf canal/canal-server

查看canal server日志

  1. [root@centos tsdb]# docker logs canal-server
  2. DOCKER_DEPLOY_TYPE=VM
  3. ==> INIT /alidata/init/02init-sshd.sh
  4. ==> EXIT CODE: 0
  5. ==> INIT /alidata/init/fix-hosts.py
  6. ==> EXIT CODE: 0
  7. ==> INIT DEFAULT
  8. Generating SSH1 RSA host key: [ OK ]
  9. Starting sshd: [ OK ]
  10. Starting crond: [ OK ]
  11. ==> INIT DONE
  12. ==> RUN /home/admin/app.sh
  13. ==> START ...
  14. start canal ...
  15. start canal successful
  16. ==> START SUCCESSFUL ...

canal server启动成功

查看instance日志

  1. [root@centos tsdb]# docker exec -it canal-server bash
  2. [root@69b22f3fc434 admin]# cd canal-server/logs/example
  3. [root@69b22f3fc434 example]# cat example.log
  4. 2021-06-30 00:43:57.172 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
  5. 2021-06-30 00:43:57.205 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
  6. 2021-06-30 00:43:57.205 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
  7. 2021-06-30 00:43:57.213 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
  8. 2021-06-30 00:43:57.410 [destination = example , address = /172.18.0.21:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
  9. 2021-06-30 00:43:57.410 [destination = example , address = /172.18.0.21:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
  10. 2021-06-30 00:43:59.387 [destination = example , address = /172.18.0.21:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000002,position=4,serverId=1,gtid=<null>,timestamp=1624984902000] cost : 1966ms , the next step is binlog dump

instance创建成功

*********************

canal server 集群

mysql:元数据

mysql2:存储内存表快照

zookeeper:存储集群元数据

canal-server、canal-server2:canal server集群

创建 mysql实例

  1. #mysql:源数据
  2. docker run -it -d --net fixed --ip 172.18.0.31 -p 3306:3306 --privileged=true \
  3. --name mysql -e MYSQL_ROOT_PASSWORD=123456 mysql
  4. #mysql2:存储内存表快照数据
  5. docker run -it -d --net fixed --ip 172.18.0.32 -p 3307:3306 --privileged=true \
  6. --name mysql2 -e MYSQL_ROOT_PASSWORD=123456 mysql
  7. **************
  8. mysql创建用户、授权
  9. mysql> create user canal identified with mysql_native_password by "123456";
  10. Query OK, 0 rows affected (0.01 sec)
  11. mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  12. Query OK, 0 rows affected (0.00 sec)
  13. mysql> flush privileges;
  14. Query OK, 0 rows affected (0.00 sec)
  15. **************
  16. mysql2创建用户、授权,创建数据库、表
  17. mysql> create user canal identified with mysql_native_password by "123456";
  18. Query OK, 0 rows affected (0.01 sec)
  19. mysql> GRANT ALL ON *.* TO 'canal'@'%';
  20. Query OK, 0 rows affected (0.00 sec)
  21. mysql> flush privileges;
  22. Query OK, 0 rows affected (0.01 sec)
  23. mysql> create database example;
  24. Query OK, 1 row affected (0.01 sec)
  25. mysql> use example;
  26. Database changed
  27. mysql> CREATE TABLE IF NOT EXISTS `meta_snapshot` (
  28. -> `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '',
  29. -> `gmt_create` datetime NOT NULL COMMENT '',
  30. -> `gmt_modified` datetime NOT NULL COMMENT '',
  31. -> `destination` varchar(128) DEFAULT NULL COMMENT '',
  32. -> `binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog',
  33. -> `binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog',
  34. -> `binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlogid',
  35. -> `binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog',
  36. -> `data` longtext DEFAULT NULL COMMENT '',
  37. -> `extra` text DEFAULT NULL COMMENT '',
  38. -> PRIMARY KEY (`id`),
  39. -> UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`),
  40. -> KEY `destination` (`destination`),
  41. -> KEY `destination_timestamp` (`destination`,`binlog_timestamp`),
  42. -> KEY `gmt_modified` (`gmt_modified`)
  43. -> ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='';
  44. 改时间',
  45. `destination` varchar(128) DEFAULT NULL COMMENT '通道名称',
  46. `binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog文件名',
  47. `binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog偏移量',
  48. `binlog_master_id` varchar(64) DEFAULT Query OK, 0 rows affected, 3 warnings (0.02 sec)
  49. mysql>
  50. mysql> CREATE TABLE IF NOT EXISTS `meta_history` (
  51. -> `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '',
  52. -> `gmt_create` datetime NOT NULL COMMENT '',
  53. -> `gmt_modified` datetime NOT NULL COMMENT '',
  54. -> `destination` varchar(128) DEFAULT NULL COMMENT '',
  55. -> `binlog_file` varchar(64) DEFAULT NULL COMMENT 'binlog',
  56. -> `binlog_offest` bigint(20) DEFAULT NULL COMMENT 'binlog',
  57. -> `binlog_master_id` varchar(64) DEFAULT NULL COMMENT 'binlogid',
  58. -> `binlog_timestamp` bigint(20) DEFAULT NULL COMMENT 'binlog',
  59. -> `use_schema` varchar(1024) DEFAULT NULL COMMENT 'sqlschema',
  60. -> `sql_schema` varchar(1024) DEFAULT NULL COMMENT 'schema',
  61. -> `sql_table` varchar(1024) DEFAULT NULL COMMENT 'table',
  62. -> `sql_text` longtext DEFAULT NULL COMMENT 'sql',
  63. -> `sql_type` varchar(256) DEFAULT NULL COMMENT 'sql',
  64. -> `extra` text DEFAULT NULL COMMENT '',
  65. -> PRIMARY KEY (`id`),
  66. -> UNIQUE KEY binlog_file_offest(`destination`,`binlog_master_id`,`binlog_file`,`binlog_offest`),
  67. -> KEY `destination` (`destination`),
  68. -> KEY `destination_timestamp` (`destination`,`binlog_timestamp`),
  69. -> KEY `gmt_modified` (`gmt_modified`)
  70. -> ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='';
  71. Query OK, 0 rows affected, 3 warnings (0.02 sec)

创建 zookeeper实例

  1. docker run -it -d --net fixed --ip 172.18.0.33 -p 2181:2181 --name zoo zookeeper

修改 canal配置文件

  1. # canal.properties
  2. canal.zkServers=172.18.0.33:2181
  3. canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
  4. canal.instance.global.spring.xml = classpath:spring/default-instance.xml
  5. # instance.properties
  6. canal.instance.master.address=172.18.0.31:3306
  7. canal.instance.dbUsername=canal
  8. canal.instance.dbPassword=123456
  9. canal.instance.tsdb.enable=true
  10. canal.instance.tsdb.url=jdbc:mysql://172.18.0.32:3306/example
  11. canal.instance.tsdb.dbUsername=canal
  12. canal.instance.tsdb.dbPassword=123456

创建 canal server集群

  1. docker run -it -d --net fixed --ip 172.18.0.34 -p 11111:11111 --name canal-server \
  2. -v /usr/canal/cluster/server/conf:/home/admin/canal-server/conf canal/canal-server
  3. docker run -it -d --net fixed --ip 172.18.0.35 -p 11112:11111 --name canal-server2 \
  4. -v /usr/canal/cluster/server2/conf:/home/admin/canal-server/conf canal/canal-server

查看 canal server日志

  1. # canal-server
  2. [root@centos ~]# docker logs canal-server
  3. DOCKER_DEPLOY_TYPE=VM
  4. ==> INIT /alidata/init/02init-sshd.sh
  5. ==> EXIT CODE: 0
  6. ==> INIT /alidata/init/fix-hosts.py
  7. ==> EXIT CODE: 0
  8. ==> INIT DEFAULT
  9. Generating SSH1 RSA host key: [ OK ]
  10. Starting sshd: [ OK ]
  11. Starting crond: [ OK ]
  12. ==> INIT DONE
  13. ==> RUN /home/admin/app.sh
  14. ==> START ...
  15. start canal ...
  16. start canal successful
  17. ==> START SUCCESSFUL ...
  18. # canal-server2
  19. [root@centos ~]# docker logs canal-server2
  20. DOCKER_DEPLOY_TYPE=VM
  21. ==> INIT /alidata/init/02init-sshd.sh
  22. ==> EXIT CODE: 0
  23. ==> INIT /alidata/init/fix-hosts.py
  24. ==> EXIT CODE: 0
  25. ==> INIT DEFAULT
  26. Generating SSH1 RSA host key: [ OK ]
  27. Starting sshd: [ OK ]
  28. Starting crond: [ OK ]
  29. ==> INIT DONE
  30. ==> RUN /home/admin/app.sh
  31. ==> START ...
  32. start canal ...
  33. start canal successful
  34. ==> START SUCCESSFUL ...

canal-server、canal-server2 启动成功

查看 instance日志

  1. # canal-server
  2. [root@centos ~]# docker exec -it canal-server bash
  3. [root@ddcb9672751c admin]# cd canal-server/logs
  4. [root@ddcb9672751c logs]# ls
  5. canal
  6. # canal-server2
  7. [root@centos cluster]# docker exec -it canal-server2 bash
  8. [root@e54128f28507 admin]# cd canal-server/logs
  9. [root@e54128f28507 logs]# ls
  10. canal example
  11. [root@e54128f28507 logs]# cd example
  12. [root@e54128f28507 example]# ls
  13. example.log
  14. [root@e54128f28507 example]# cat example.log
  15. 2021-06-30 22:42:37.377 [ZkClient-EventThread-10-172.18.0.33:2181] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
  16. 2021-06-30 22:42:37.497 [ZkClient-EventThread-10-172.18.0.33:2181] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
  17. 2021-06-30 22:42:37.498 [ZkClient-EventThread-10-172.18.0.33:2181] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
  18. 2021-06-30 22:42:37.597 [ZkClient-EventThread-10-172.18.0.33:2181] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
  19. 2021-06-30 22:42:38.351 [destination = example , address = /172.18.0.31:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
  20. 2021-06-30 22:42:38.359 [destination = example , address = /172.18.0.31:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
  21. 2021-06-30 22:42:40.622 [destination = example , address = /172.18.0.31:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000002,position=4,serverId=1,gtid=<null>,timestamp=1625063533000] cost : 2206ms , the next step is binlog dump

canal-server没有启动,处于Standby状态

canal-server2启动成功,可以看到启动日志

zookeeper中存储的元数据

  1. [root@centos cluster]# docker exec -it zoo bash
  2. root@bfa8dada58dc:/apache-zookeeper-3.6.2-bin# bin/zkCli.sh
  3. Connecting to localhost:2181
  4. 2021-06-30 15:11:46,453 [myid:] - INFO [main:Environment@98] - Client environment:zookeeper.version=3.6.2--803c7f1a12f85978cb049af5e4ef23bd8b688715, built on 09/04/2020 12:44 GMT
  5. 2021-06-30 15:11:46,465 [myid:] - INFO [main:Environment@98] - Client environment:host.name=bfa8dada58dc
  6. 2021-06-30 15:11:46,466 [myid:] - INFO [main:Environment@98] - Client environment:java.version=11.0.8
  7. 2021-06-30 15:11:46,472 [myid:] - INFO [main:Environment@98] - Client environment:java.vendor=N/A
  8. 2021-06-30 15:11:46,472 [myid:] - INFO [main:Environment@98] - Client environment:java.home=/usr/local/openjdk-11
  9. 2021-06-30 15:11:46,473 [myid:] - INFO [main:Environment@98] - Client environment:java.class.path=/apache-zookeeper-3.6.2-bin/bin/../zookeeper-server/target/classes:/apache-zookeeper-3.6.2-bin/bin/../build/classes:/apache-zookeeper-3.6.2-bin/bin/../zookeeper-server/target/lib/*.jar:/apache-zookeeper-3.6.2-bin/bin/../build/lib/*.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/zookeeper-prometheus-metrics-3.6.2.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/zookeeper-jute-3.6.2.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/zookeeper-3.6.2.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/snappy-java-1.1.7.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/slf4j-log4j12-1.7.25.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/slf4j-api-1.7.25.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/simpleclient_servlet-0.6.0.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/simpleclient_hotspot-0.6.0.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/simpleclient_common-0.6.0.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/simpleclient-0.6.0.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/netty-transport-native-unix-common-4.1.50.Final.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/netty-transport-native-epoll-4.1.50.Final.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/netty-transport-4.1.50.Final.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/netty-resolver-4.1.50.Final.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/netty-handler-4.1.50.Final.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/netty-common-4.1.50.Final.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/netty-codec-4.1.50.Final.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/netty-buffer-4.1.50.Final.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/metrics-core-3.2.5.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/log4j-1.2.17.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/json-simple-1.1.1.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/jline-2.14.6.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/jetty-util-9.4.24.v20191120.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/jetty-servlet-9.4.24.v20191120.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/jetty-server-9.4.24.v20191120.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/jetty-security-9.4.24.v20191120.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/jetty-io-9.4.24.v20191120.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/jetty-http-9.4.24.v20191120.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/javax.servlet-api-3.1.0.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/jackson-databind-2.10.3.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/jackson-core-2.10.3.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/jackson-annotations-2.10.3.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/commons-lang-2.6.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/commons-cli-1.2.jar:/apache-zookeeper-3.6.2-bin/bin/../lib/audience-annotations-0.5.0.jar:/apache-zookeeper-3.6.2-bin/bin/../zookeeper-*.jar:/apache-zookeeper-3.6.2-bin/bin/../zookeeper-server/src/main/resources/lib/*.jar:/conf:
  10. 2021-06-30 15:11:46,479 [myid:] - INFO [main:Environment@98] - Client environment:java.library.path=/usr/java/packages/lib:/usr/lib64:/lib64:/lib:/usr/lib
  11. 2021-06-30 15:11:46,480 [myid:] - INFO [main:Environment@98] - Client environment:java.io.tmpdir=/tmp
  12. 2021-06-30 15:11:46,480 [myid:] - INFO [main:Environment@98] - Client environment:java.compiler=<NA>
  13. 2021-06-30 15:11:46,480 [myid:] - INFO [main:Environment@98] - Client environment:os.name=Linux
  14. 2021-06-30 15:11:46,480 [myid:] - INFO [main:Environment@98] - Client environment:os.arch=amd64
  15. 2021-06-30 15:11:46,481 [myid:] - INFO [main:Environment@98] - Client environment:os.version=3.10.0-957.el7.x86_64
  16. 2021-06-30 15:11:46,481 [myid:] - INFO [main:Environment@98] - Client environment:user.name=root
  17. 2021-06-30 15:11:46,481 [myid:] - INFO [main:Environment@98] - Client environment:user.home=/root
  18. 2021-06-30 15:11:46,485 [myid:] - INFO [main:Environment@98] - Client environment:user.dir=/apache-zookeeper-3.6.2-bin
  19. 2021-06-30 15:11:46,486 [myid:] - INFO [main:Environment@98] - Client environment:os.memory.free=21MB
  20. 2021-06-30 15:11:46,492 [myid:] - INFO [main:Environment@98] - Client environment:os.memory.max=247MB
  21. 2021-06-30 15:11:46,493 [myid:] - INFO [main:Environment@98] - Client environment:os.memory.total=29MB
  22. 2021-06-30 15:11:46,514 [myid:] - INFO [main:ZooKeeper@1006] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@6166e06f
  23. 2021-06-30 15:11:46,524 [myid:] - INFO [main:X509Util@77] - Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation
  24. 2021-06-30 15:11:46,547 [myid:] - INFO [main:ClientCnxnSocket@239] - jute.maxbuffer value is 1048575 Bytes
  25. 2021-06-30 15:11:46,589 [myid:] - INFO [main:ClientCnxn@1716] - zookeeper.request.timeout value is 0. feature enabled=false
  26. Welcome to ZooKeeper!
  27. 2021-06-30 15:11:46,687 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1167] - Opening socket connection to server localhost/127.0.0.1:2181.
  28. 2021-06-30 15:11:46,688 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1169] - SASL config status: Will not attempt to authenticate using SASL (unknown error)
  29. JLine support is enabled
  30. 2021-06-30 15:11:46,761 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@999] - Socket connection established, initiating session, client: /127.0.0.1:43956, server: localhost/127.0.0.1:2181
  31. 2021-06-30 15:11:46,827 [myid:localhost:2181] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1433] - Session establishment complete on server localhost/127.0.0.1:2181, session id = 0x100002977550009, negotiated timeout = 30000
  32. WATCHER::
  33. WatchedEvent state:SyncConnected type:None path:null
  34. [zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running
  35. {"active":true,"address":"172.18.0.35:11111"}

当前运行的节点信息:{“active”:true,”address”:”172.18.0.35:11111”}

发表评论

表情:
评论列表 (有 0 条评论,514人围观)

还没有评论,来说两句吧...

相关阅读

    相关 docker环境

    docker有免费版本和收费的企业版本,企业版本无非是增加了一些安全性方面的集成,一般企业会选择docker CE免费版本,安全性方面,绝大部分企业都是自己实现内部的安全集成。

    相关 Docker环境

    一,虚拟机环境搭建     \ Linux镜像要求CentOS版本为7,且内核大于3.10     \ 具体安装步骤不做演示,只对几个遇到的问题进行说明