flink 根据时间消费kafka
- kafka版本:0.11
- flink版本:1.9
FlinkKafkaConsumer011中提供了很多方法确定如何消费kafka消息:
- setStartFromTimestamp
- setStartFromEarliest
- setStartFromLatest
- setStartFromSpecificOffsets
- setStartFromGroupOffsets
需要根据时间消费kafka消息,调用setStartFromTimestamp方法就行。
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties prop = new Properties();
prop.put("bootstrap.servers", "localhost:9092");
prop.put("group.id", "flink-streaming-job");
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>("start_log", new SimpleStringSchema(), prop);
//指定Timestamp位置开始消费kafka数据
consumer.setStartFromTimestamp(1571909309022L);
//source
DataStream source = env.addSource(consumer);
source.print();
// execute program
env.execute("Flink Streaming Java Table API Skeleton");
}
}
实现的原理是根据传入的时间戳计算topic每个partition的offset,然后开始消费。关键代码FlinkKafkaConsumer010.fetchOffsetsWithTimestamp:
protected Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(
Collection<KafkaTopicPartition> partitions,
long timestamp) {
Map<TopicPartition, Long> partitionOffsetsRequest = new HashMap<>(partitions.size());
for (KafkaTopicPartition partition : partitions) {
partitionOffsetsRequest.put(
new TopicPartition(partition.getTopic(), partition.getPartition()),
timestamp);
}
final Map<KafkaTopicPartition, Long> result = new HashMap<>(partitions.size());
// use a short-lived consumer to fetch the offsets;
// this is ok because this is a one-time operation that happens only on startup
try (KafkaConsumer<?, ?> consumer = new KafkaConsumer(properties)) {
for (Map.Entry<TopicPartition, OffsetAndTimestamp> partitionToOffset :
consumer.offsetsForTimes(partitionOffsetsRequest).entrySet()) {
result.put(
new KafkaTopicPartition(partitionToOffset.getKey().topic(), partitionToOffset.getKey().partition()),
(partitionToOffset.getValue() == null) ? null : partitionToOffset.getValue().offset());
}
}
return result;
}
这是通过代码传入时间消费kafka消息的,那如果是flink sql呢,该如何配置?
通过查看源码,flink sql不直接支持根据时间戳消费kafka消息:
KafkaTableSourceBase
protected FlinkKafkaConsumerBase<Row> getKafkaConsumer(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema) {
FlinkKafkaConsumerBase<Row> kafkaConsumer =
createKafkaConsumer(topic, properties, deserializationSchema);
switch (startupMode) {
case EARLIEST:
kafkaConsumer.setStartFromEarliest();
break;
case LATEST:
kafkaConsumer.setStartFromLatest();
break;
case GROUP_OFFSETS:
kafkaConsumer.setStartFromGroupOffsets();
break;
case SPECIFIC_OFFSETS:
kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
break;
}
return kafkaConsumer;
}
可以根据时间戳计算出topic每个partition的offset,然后采用SPECIFIC_OFFSETS模式去消费,达到根据时间戳消费kafka消息的目的。
但是这样样一来,就只能通过编码方式去实现了。
/**
* 将startup-mode = specific-offsets AND 'connector.timestamp' != (null OR '') 的sql转为startup-mode = specific-offsets模式的标准sql.
* 不符合的返回原sql
* @return
*/
public static String convertToSpecificOffsetSql(String sql) {
final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
SqlNode sqlNode = planner.parse(sql);
if(sqlNode instanceof SqlCreateTable) {
Operation operation = SqlToOperationConverter.convert(planner, sqlNode);
if(operation instanceof CreateTableOperation) {
CreateTableOperation op = (CreateTableOperation) operation;
CatalogTable catalogTable = op.getCatalogTable();
return convert(catalogTable, sql);
}
}
// 如果不符合,return 原sql
return sql;
}
private static String convert(CatalogTable catalogTable, String sql) {
String startupMode = catalogTable.getProperties().get(KafkaValidator.CONNECTOR_STARTUP_MODE);
if(KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS.equalsIgnoreCase(startupMode)) {
long tmstamp = -1;
try {
tmstamp = Long.parseLong(catalogTable.getProperties().get("connector.timestamp"));
} catch (Exception e){
}
//说明根据时间戳消费
if(tmstamp > 0) {
DescriptorProperties descriptorProperties = getValidatedProperties(catalogTable.getProperties());
Properties kafkaProperties = getKafkaProperties(descriptorProperties);
kafkaProperties.put(KafkaValidator.CONNECTOR_TOPIC, catalogTable.getProperties().get(KafkaValidator.CONNECTOR_TOPIC));
kafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Map<KafkaTopicPartition, Long> partitionLongMap = fetchOffsetsWithTimestamp(tmstamp, kafkaProperties);
int partitionIndex = 0;
StringBuilder OffsetConfig = new StringBuilder();
for (Map.Entry<KafkaTopicPartition, Long> entry : partitionLongMap.entrySet()) {
String partition = "'connector.specific-offsets." + partitionIndex + ".partition' = '" + entry.getKey().getPartition() + "'";
String offset = "'connector.specific-offsets." + partitionIndex + ".offset' = '" + entry.getValue() + "'";
OffsetConfig.append(", " + partition + ", " + offset);
partitionIndex++;
}
StringBuilder changeSql = new StringBuilder(sql);
//假设WITH()代码放在最后,那么可以用如下方法将specific-offsets信息插入原始sql中
int offset = changeSql.lastIndexOf(")");
String newSql = changeSql.insert(offset, OffsetConfig).toString();
//去掉connector.timestamp
newSql = newSql.replaceAll("'connector.timestamp'(.*?),", "");
return newSql;
}
}
// 不符合回返原sql
return sql;
}
还没有评论,来说两句吧...