flink-jdbc-ex 「爱情、让人受尽委屈。」 2022-12-27 11:26 72阅读 0赞 # flink-jdbc-ex # flink-jdbc一些简单的扩展 关键词: flink流表joinmysql维表 flink流表join维表 flink维流join ## [github地址][github] ## flink-jdbc懂得都懂,限制死了dialect ![在这里插入图片描述][20201210195610529.png]虽说flink也提供了类似flink-sql-connector-hive-2.2.0这样专用的hive连接器 起初是为了解决phoenix jdbc的问题 但是涉及到phoenix等操作的时候,flink-jdbc简直令人抓狂 后来发现流表和维表,百度上都是零零星星,没有可用代码 -------------------- 这次主要加了几个强力特性 1. dialect可spi自由扩展 2. **异步source** 3. **LookupableTableSource**`流表join维表` 4. 表名和字段小写转换 `总所周知phoenix全是大写,这里提供connector.case-sensitivity可以自动将ddl里小写转换为大写` 5. 一些有的没的乱搞的注释 ## 上使用方式 ## -------------------- 1.初始化环境 //step1. 创建phoenix维表 CREATE TABLE DIM_USER ( ID BIGINT NOT NULL, NAME VARCHAR, SEX VARCHAR, CONSTRAINT DIM_USER_PK PRIMARY KEY (ID) ); INSERT INTO DIM_USER (ID,NAME,SEX) VALUES (1,'小明是高手','男'); INSERT INTO DIM_USER (ID,NAME,SEX) VALUES (2,'小红不是高手','女'); //step2. 连接kafka产生实时数据 kafka-console-producer --topic test_phone --broker-list hadoop1:9092 -------------------- 1. flink-sql代码编写 //首先是流表 tEnv.sqlUpdate( """ |create table phone( | id bigint comment '主键id', | user_id bigint comment '用户', | name varchar comment '手机名称', | price int comment '手机价格', | phone_type varchar comment '手机类型', | buy_time varchar comment '购买时间', | process_time as PROCTIME() |) |with ( | 'connector.type' = 'kafka', | 'connector.version' = 'universal', | 'connector.topic' = 'test_phone', | 'connector.properties.group.id' = 'flink_consumer', | 'connector.startup-mode' = 'latest-offset', | 'connector.properties.zookeeper.connect' = 'hadoop1:2181,hadoop2:2181,hadoop3:2181', | 'connector.properties.bootstrap.servers' = 'hadoop1:9092,hadoop2:9092,hadoop3:9092', | 'format.type' = 'json' |) |""".stripMargin) //然后是phoenix维表 tEnv.sqlUpdate( """ |create table dim_user( | id bigint comment '用户ID', | name varchar comment '用户名称', | sex varchar comment '用户性别' | ) | with( | 'connector.type' = 'jdbc-async', | 'connector.url' = 'jdbc:phoenix:hadoop1,hadoop2,hadoop3:2181', | 'connector.driver' = 'org.apache.phoenix.jdbc.PhoenixDriver', | 'connector.table' = 'dim_user', | 'connector.username' = '', | 'connector.password' = '', | 'connector.write.flush.max-rows' = '1', | 'connector.case-sensitivity' = 'true' | ) |""".stripMargin) //最后是流维join一气呵成 tEnv.sqlQuery( """ |select * from phone p |join dim_user FOR SYSTEM_TIME AS OF p.process_time as u |on u.id = p.user_id |""".stripMargin) .toRetractStream[Row].print() -------------------- 1. kafka发送消息 { "id":1 ,"user_id":1 ,"name":"小米手机坏了","price":100,"phone_type":"家用","buy_time":"2020-12-09 12:10:13"} { "id":2 ,"user_id":1 ,"name":"华为手机","price":200,"phone_type":"办公","buy_time":"2020-12-08 11:52:11"} { "id":3 ,"user_id":2 ,"name":"大米手机坏了","price":200,"phone_type":"办公","buy_time":"2020-12-08 11:52:11"} -------------------- 最后在控制台乱输出一通 ,完成 phone:8> (true,1,1,小米手机坏了,100,家用,2020-12-09 12:10:13,2020-12-10T11:39:18.266) 8> (true,1,1,小米手机坏了,100,家用,2020-12-09 12:10:13,2020-12-10T11:39:21.529,1,小明是高手,男) phone:8> (true,1,1,小米手机坏了,100,家用,2020-12-09 12:10:13,2020-12-10T11:39:38.683) 8> (true,1,1,小米手机坏了,100,家用,2020-12-09 12:10:13,2020-12-10T11:39:38.749,1,小明是高手,男) phone:8> (true,3,2,大米手机坏了,200,办公,2020-12-08 11:52:11,2020-12-10T11:39:56.573) 8> (true,3,2,大米手机坏了,200,办公,2020-12-08 11:52:11,2020-12-10T11:39:56.601,2,小红不是高手,女) phone:8> (true,2,1,华为手机,200,办公,2020-12-08 11:52:11,2020-12-10T11:40:11.456) 8> (true,2,1,华为手机,200,办公,2020-12-08 11:52:11,2020-12-10T11:40:11.513,1,小明是高手,男) phone:8> (true,1,1,小米手机坏了,100,家用,2020-12-09 12:10:13,2020-12-10T11:40:24.103) 8> (true,1,1,小米手机坏了,100,家用,2020-12-09 12:10:13,2020-12-10T11:40:24.483,1,小明是高手,男) -------------------- 感谢 -------------------- future 1. 异步sink [github]: https://github.com/aly8246/flink-jdbc-ex/blob/master/README.md [20201210195610529.png]: /images/20221120/ebe9b698b0484179adc1f1c52e2a5c61.png
还没有评论,来说两句吧...