Flink示例——Flink-CDC
Flink示例——Flink-CDC
版本信息
产品 | 版本 |
---|---|
Flink | 1.11.1 |
flink-cdc-connectors | 1.1.0 |
Java | 1.8.0_231 |
MySQL | 5.7.16 |
注意:官方说目前支持MySQL-5.7和8,但笔者还简单测试过mariadb-10.0.38(对应MySQL-5.6)。包括增加、删除、更新、聚合,目前皆可用,但不排除未知问题。
Mavan依赖
pom.xml 依赖部分
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.11.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${ flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${ flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${ flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${ flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${ flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${ flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${ flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${ flink.version}</version>
<type>test-jar</type>
</dependency>
<!-- Flink-CDC -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
主从同步配置、数据准备
- 关闭MySQL服务
在需要被同步的MySQL节点,添加如下配置(可供参考的文档)
[mysqld]
前面还有其他配置
添加的部分
server-id = 12345
log-bin = mysql-bin必须为ROW
binlog_format = ROW
必须为FULL,MySQL-5.7后才有该参数
binlog_row_image = FULL
expire_logs_days = 10启动MySQL服务
使用如下命令,可查看binlog相关变量配置
SHOW VARIABLES LIKE ‘%binlog%’;
创建待测试的库、表、数据
CREATE DATABASE db_inventory_cdc;
CREATE TABLE tb_products_cdc(
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(64),
description VARCHAR(128)
);
INSERT INTO tb_products_cdc
VALUES(DEFAULT, 'zhangsan', 'aaa'),
(DEFAULT, 'lisi', 'bbb'),
(DEFAULT, 'wangwu', 'ccc');
创建用于同步的用户,并给予权限(可供参考的文档)
— 设置拥有同步权限的用户
CREATE USER ‘flinkuser’ IDENTIFIED BY ‘flinkpassword’;
— 赋予同步相关权限
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘flinkuser’;
使用Flink-CDC
- sql-client 方面,官方已经给出了示例,点击查看
编码方式,方便提交jar包,示例如下
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;/* Description: Flink-CDC 测试
Date: 2020/9/16 14:03 @author ALion */
public class FlinkCDCSQLTest {public static void main(String[] args) throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
// 数据源表
String sourceDDL =
"CREATE TABLE mysql_binlog (\n" +
" id INT NOT NULL,\n" +
" name STRING,\n" +
" description STRING\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'localhost',\n" +
" 'port' = '3306',\n" +
" 'username' = 'flinkuser',\n" +
" 'password' = 'flinkpassword',\n" +
" 'database-name' = 'db_inventory_cdc',\n" +
" 'table-name' = 'tb_products_cdc'\n" +
")";
// 输出目标表
String sinkDDL =
"CREATE TABLE tb_sink (\n" +
" name STRING,\n" +
" countSum BIGINT,\n" +
" PRIMARY KEY (name) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";
// 简单的聚合处理
String transformSQL =
"INSERT INTO tb_sink " +
"SELECT name, COUNT(1) " +
"FROM mysql_binlog " +
"GROUP BY name";
tableEnv.executeSql(sourceDDL);
tableEnv.executeSql(sinkDDL);
TableResult result = tableEnv.executeSql(transformSQL);
// 等待flink-cdc完成快照
waitForSnapshotStarted("tb_sink");
result.print();
result.getJobClient().get().cancel().get();
}
private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
while (sinkSize(sinkName) == 0) {
Thread.sleep(100);
}
}
private static int sinkSize(String sinkName) {
synchronized (TestValuesTableFactory.class) {
try {
return TestValuesTableFactory.getRawResults(sinkName).size();
} catch (IllegalArgumentException e) {
// job is not started yet
return 0;
}
}
}
}
运行任务
简单的测试
进行简单测试,开始修改MySQL表的数据
— SQL测试数据,对照Flink应用
INSERT INTO tb_products_cdc VALUE(DEFAULT, ‘lisi’, ‘ddd’);
DELETE FROM tb_products_cdc WHERE id=4;
UPDATE tb_products_cdc SET name=’wangwu’ WHERE id=2;
执行一条SQL,查看一下Flink的结果变化
还没有评论,来说两句吧...