flink 根据时间消费kafka

冷不防 2023-05-31 10:25 163阅读 0赞
  • kafka版本:0.11
  • flink版本:1.9

FlinkKafkaConsumer011中提供了很多方法确定如何消费kafka消息:

  • setStartFromTimestamp
  • setStartFromEarliest
  • setStartFromLatest
  • setStartFromSpecificOffsets
  • setStartFromGroupOffsets

需要根据时间消费kafka消息,调用setStartFromTimestamp方法就行。

  1. public class StreamingJob {
  2. public static void main(String[] args) throws Exception {
  3. // set up the streaming execution environment
  4. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. Properties prop = new Properties();
  6. prop.put("bootstrap.servers", "localhost:9092");
  7. prop.put("group.id", "flink-streaming-job");
  8. FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>("start_log", new SimpleStringSchema(), prop);
  9. //指定Timestamp位置开始消费kafka数据
  10. consumer.setStartFromTimestamp(1571909309022L);
  11. //source
  12. DataStream source = env.addSource(consumer);
  13. source.print();
  14. // execute program
  15. env.execute("Flink Streaming Java Table API Skeleton");
  16. }
  17. }

实现的原理是根据传入的时间戳计算topic每个partition的offset,然后开始消费。关键代码FlinkKafkaConsumer010.fetchOffsetsWithTimestamp:

  1. protected Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(
  2. Collection<KafkaTopicPartition> partitions,
  3. long timestamp) {
  4. Map<TopicPartition, Long> partitionOffsetsRequest = new HashMap<>(partitions.size());
  5. for (KafkaTopicPartition partition : partitions) {
  6. partitionOffsetsRequest.put(
  7. new TopicPartition(partition.getTopic(), partition.getPartition()),
  8. timestamp);
  9. }
  10. final Map<KafkaTopicPartition, Long> result = new HashMap<>(partitions.size());
  11. // use a short-lived consumer to fetch the offsets;
  12. // this is ok because this is a one-time operation that happens only on startup
  13. try (KafkaConsumer<?, ?> consumer = new KafkaConsumer(properties)) {
  14. for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset :
  15. consumer.offsetsForTimes(partitionOffsetsRequest).entrySet()) {
  16. result.put(
  17. new KafkaTopicPartition(partitionToOffset.getKey().topic(), partitionToOffset.getKey().partition()),
  18. (partitionToOffset.getValue() == null) ? null : partitionToOffset.getValue().offset());
  19. }
  20. }
  21. return result;
  22. }

这是通过代码传入时间消费kafka消息的,那如果是flink sql呢,该如何配置?
通过查看源码,flink sql不直接支持根据时间戳消费kafka消息:
KafkaTableSourceBase

  1. protected FlinkKafkaConsumerBase<Row> getKafkaConsumer(
  2. String topic,
  3. Properties properties,
  4. DeserializationSchema<Row> deserializationSchema) {
  5. FlinkKafkaConsumerBase<Row> kafkaConsumer =
  6. createKafkaConsumer(topic, properties, deserializationSchema);
  7. switch (startupMode) {
  8. case EARLIEST:
  9. kafkaConsumer.setStartFromEarliest();
  10. break;
  11. case LATEST:
  12. kafkaConsumer.setStartFromLatest();
  13. break;
  14. case GROUP_OFFSETS:
  15. kafkaConsumer.setStartFromGroupOffsets();
  16. break;
  17. case SPECIFIC_OFFSETS:
  18. kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
  19. break;
  20. }
  21. return kafkaConsumer;
  22. }

可以根据时间戳计算出topic每个partition的offset,然后采用SPECIFIC_OFFSETS模式去消费,达到根据时间戳消费kafka消息的目的。
但是这样样一来,就只能通过编码方式去实现了。

  1. /**
  2. * 将startup-mode = specific-offsets AND 'connector.timestamp' != (null OR '') 的sql转为startup-mode = specific-offsets模式的标准sql.
  3. * 不符合的返回原sql
  4. * @return
  5. */
  6. public static String convertToSpecificOffsetSql(String sql) {
  7. final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
  8. SqlNode sqlNode = planner.parse(sql);
  9. if(sqlNode instanceof SqlCreateTable) {
  10. Operation operation = SqlToOperationConverter.convert(planner, sqlNode);
  11. if(operation instanceof CreateTableOperation) {
  12. CreateTableOperation op = (CreateTableOperation) operation;
  13. CatalogTable catalogTable = op.getCatalogTable();
  14. return convert(catalogTable, sql);
  15. }
  16. }
  17. // 如果不符合,return 原sql
  18. return sql;
  19. }
  20. private static String convert(CatalogTable catalogTable, String sql) {
  21. String startupMode = catalogTable.getProperties().get(KafkaValidator.CONNECTOR_STARTUP_MODE);
  22. if(KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS.equalsIgnoreCase(startupMode)) {
  23. long tmstamp = -1;
  24. try {
  25. tmstamp = Long.parseLong(catalogTable.getProperties().get("connector.timestamp"));
  26. } catch (Exception e){
  27. }
  28. //说明根据时间戳消费
  29. if(tmstamp > 0) {
  30. DescriptorProperties descriptorProperties = getValidatedProperties(catalogTable.getProperties());
  31. Properties kafkaProperties = getKafkaProperties(descriptorProperties);
  32. kafkaProperties.put(KafkaValidator.CONNECTOR_TOPIC, catalogTable.getProperties().get(KafkaValidator.CONNECTOR_TOPIC));
  33. kafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  34. kafkaProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  35. Map<KafkaTopicPartition, Long> partitionLongMap = fetchOffsetsWithTimestamp(tmstamp, kafkaProperties);
  36. int partitionIndex = 0;
  37. StringBuilder OffsetConfig = new StringBuilder();
  38. for (Map.Entry<KafkaTopicPartition, Long> entry : partitionLongMap.entrySet()) {
  39. String partition = "'connector.specific-offsets." + partitionIndex + ".partition' = '" + entry.getKey().getPartition() + "'";
  40. String offset = "'connector.specific-offsets." + partitionIndex + ".offset' = '" + entry.getValue() + "'";
  41. OffsetConfig.append(", " + partition + ", " + offset);
  42. partitionIndex++;
  43. }
  44. StringBuilder changeSql = new StringBuilder(sql);
  45. //假设WITH()代码放在最后,那么可以用如下方法将specific-offsets信息插入原始sql中
  46. int offset = changeSql.lastIndexOf(")");
  47. String newSql = changeSql.insert(offset, OffsetConfig).toString();
  48. //去掉connector.timestamp
  49. newSql = newSql.replaceAll("'connector.timestamp'(.*?),", "");
  50. return newSql;
  51. }
  52. }
  53. // 不符合回返原sql
  54. return sql;
  55. }

发表评论

表情:
评论列表 (有 0 条评论,163人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Flink消费kafka数据

    前言: Flink的DataSoures模块中,定义了DataStream API的数据输入操作,Flink将数据源主要分为内置和第三方数据源,内置数据源包含文件、Soc...