Flink示例——Flink-CDC

你的名字 2022-11-19 04:14 488阅读 0赞

版本信息


























产品 版本
Flink 1.11.1
flink-cdc-connectors 1.1.0
Java 1.8.0_231
MySQL 5.7.16

注意:官方说目前支持MySQL-5.7和8,但笔者还简单测试过mariadb-10.0.38(对应MySQL-5.6)。包括增加、删除、更新、聚合,目前皆可用,但不排除未知问题。

Mavan依赖

pom.xml 依赖部分

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <flink.version>1.11.1</flink.version>
  4. </properties>
  5. <dependencies>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-table-common</artifactId>
  9. <version>${ flink.version}</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.flink</groupId>
  13. <artifactId>flink-java</artifactId>
  14. <version>${ flink.version}</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.flink</groupId>
  18. <artifactId>flink-clients_2.11</artifactId>
  19. <version>${ flink.version}</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.flink</groupId>
  23. <artifactId>flink-streaming-java_2.11</artifactId>
  24. <version>${ flink.version}</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.apache.flink</groupId>
  28. <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  29. <version>${ flink.version}</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.apache.flink</groupId>
  33. <artifactId>flink-table-api-java</artifactId>
  34. <version>${ flink.version}</version>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.apache.flink</groupId>
  38. <artifactId>flink-table-planner-blink_2.11</artifactId>
  39. <version>${ flink.version}</version>
  40. </dependency>
  41. <dependency>
  42. <groupId>org.apache.flink</groupId>
  43. <artifactId>flink-table-planner-blink_2.11</artifactId>
  44. <version>${ flink.version}</version>
  45. <type>test-jar</type>
  46. </dependency>
  47. <!-- Flink-CDC -->
  48. <dependency>
  49. <groupId>com.alibaba.ververica</groupId>
  50. <artifactId>flink-connector-mysql-cdc</artifactId>
  51. <version>1.1.0</version>
  52. </dependency>
  53. </dependencies>

主从同步配置、数据准备

  • 关闭MySQL服务
  • 在需要被同步的MySQL节点,添加如下配置(可供参考的文档)

    [mysqld]

    前面还有其他配置

    添加的部分

    server-id = 12345
    log-bin = mysql-bin

    必须为ROW

    binlog_format = ROW

    必须为FULL,MySQL-5.7后才有该参数

    binlog_row_image = FULL
    expire_logs_days = 10

  • 启动MySQL服务

  • 使用如下命令,可查看binlog相关变量配置

    SHOW VARIABLES LIKE ‘%binlog%’;

  • 创建待测试的库、表、数据

    CREATE DATABASE db_inventory_cdc;

    CREATE TABLE tb_products_cdc(

    1. id INT PRIMARY KEY AUTO_INCREMENT,
    2. name VARCHAR(64),
    3. description VARCHAR(128)

    );

    INSERT INTO tb_products_cdc
    VALUES

    1. (DEFAULT, 'zhangsan', 'aaa'),
    2. (DEFAULT, 'lisi', 'bbb'),
    3. (DEFAULT, 'wangwu', 'ccc');
  • 创建用于同步的用户,并给予权限(可供参考的文档)

    — 设置拥有同步权限的用户
    CREATE USER ‘flinkuser’ IDENTIFIED BY ‘flinkpassword’;
    — 赋予同步相关权限
    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘flinkuser’;

  • sql-client 方面,官方已经给出了示例,点击查看
  • 编码方式,方便提交jar包,示例如下

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.TableResult;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.planner.factories.TestValuesTableFactory;

    /* Description: Flink-CDC 测试
    Date: 2020/9/16 14:03 @author ALion */
    public class FlinkCDCSQLTest {

    1. public static void main(String[] args) throws Exception {
    2. EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
    3. .useBlinkPlanner()
    4. .inStreamingMode()
    5. .build();
    6. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    7. env.setParallelism(1);
    8. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
    9. // 数据源表
    10. String sourceDDL =
    11. "CREATE TABLE mysql_binlog (\n" +
    12. " id INT NOT NULL,\n" +
    13. " name STRING,\n" +
    14. " description STRING\n" +
    15. ") WITH (\n" +
    16. " 'connector' = 'mysql-cdc',\n" +
    17. " 'hostname' = 'localhost',\n" +
    18. " 'port' = '3306',\n" +
    19. " 'username' = 'flinkuser',\n" +
    20. " 'password' = 'flinkpassword',\n" +
    21. " 'database-name' = 'db_inventory_cdc',\n" +
    22. " 'table-name' = 'tb_products_cdc'\n" +
    23. ")";
    24. // 输出目标表
    25. String sinkDDL =
    26. "CREATE TABLE tb_sink (\n" +
    27. " name STRING,\n" +
    28. " countSum BIGINT,\n" +
    29. " PRIMARY KEY (name) NOT ENFORCED\n" +
    30. ") WITH (\n" +
    31. " 'connector' = 'print'\n" +
    32. ")";
    33. // 简单的聚合处理
    34. String transformSQL =
    35. "INSERT INTO tb_sink " +
    36. "SELECT name, COUNT(1) " +
    37. "FROM mysql_binlog " +
    38. "GROUP BY name";
    39. tableEnv.executeSql(sourceDDL);
    40. tableEnv.executeSql(sinkDDL);
    41. TableResult result = tableEnv.executeSql(transformSQL);
    42. // 等待flink-cdc完成快照
    43. waitForSnapshotStarted("tb_sink");
    44. result.print();
    45. result.getJobClient().get().cancel().get();
    46. }
    47. private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
    48. while (sinkSize(sinkName) == 0) {
    49. Thread.sleep(100);
    50. }
    51. }
    52. private static int sinkSize(String sinkName) {
    53. synchronized (TestValuesTableFactory.class) {
    54. try {
    55. return TestValuesTableFactory.getRawResults(sinkName).size();
    56. } catch (IllegalArgumentException e) {
    57. // job is not started yet
    58. return 0;
    59. }
    60. }
    61. }

    }

  • 运行任务

简单的测试

  • 进行简单测试,开始修改MySQL表的数据

    — SQL测试数据,对照Flink应用

    INSERT INTO tb_products_cdc VALUE(DEFAULT, ‘lisi’, ‘ddd’);

    DELETE FROM tb_products_cdc WHERE id=4;

    UPDATE tb_products_cdc SET name=’wangwu’ WHERE id=2;

  • 执行一条SQL,查看一下Flink的结果变化

发表评论

表情:
评论列表 (有 0 条评论,488人围观)

还没有评论,来说两句吧...

相关阅读