Flink流连接器之Kafka【一】【Kafka安装、Kafka版本差异、序列化类型】

灰太狼 2023-02-20 13:43 140阅读 0赞

一.简介

Flink提供了特殊的Kafka连接器,用于在Kafka主题之间读写数据。Flink Kafka Consumer与Flink的检查点机制集成在一起,以提供一次精确的处理语义。为了实现这一目标,Flink不仅依赖于Kafka的消费者组偏移量跟踪,而且在内部也跟踪并检查这些偏移量。

对于大多数用户来说,FlinkKafkaConsumer08【08代表Kafka的版本】是合适的。
具体如下:
在这里插入图片描述

二.安装Apache Kafka

参考博客:Kafka分布式安装
备注:

  • 按照Kafka快速入门中的说明下载代码并启动服务器(每次启动该应用程序之前,都需要启动Zookeeper和Kafka服务器)。
  • 如果Kafka和Zookeeper服务器在远程计算机上运行,​​则文件中的advertised.host.name设置config/server.properties必须设置为计算机的IP地址。

三.Kafka 1.0.0+连接器

从Flink 1.7开始,有一个新的通用Kafka连接器,它不跟踪特定的Kafka主版本。而是在Flink发布时跟踪Kafka的最新版本。

如果Kafka broker 版本是1.0.0或更高版本,则应使用此Kafka连接器。如果使用Kafka的旧版本(0.11、0.10、0.9或0.8),则应使用与代理版本相对应的连接器。
兼容性
通用的Kafka连接器通过Kafka客户端API和代理的兼容性保证旧的和新的Kafka代理兼容。它与代理版本0.11.0或更高版本兼容,具体取决于所使用的功能。
将Kafka连接器从0.11迁移到通用
须知:

  • 在整个过程中使用Flink 1.9或更高版本。
  • 不要同时升级Flink和操作。
  • 确保作业中使用的Kafka Consumer和Kafka Producer分配了唯一的标识符(uid)。
  • 使用带有保存点功能的CLI命令停止功能来获取保存点(例如,通过使用stop —withSavepoint)。

四.Kafka消费者

Flink的Kafka使用者被称为FlinkKafkaConsumer09(对于Kafka 0.10.0.x版本,则称为010,11与此类似;FlinkKafkaConsumer对于Kafka> = 1.0.0版本)。它提供对一个或多个Kafka主题的访问。

构造函数接受以下参数:

  1. 主题名称/主题名称列表。
  2. DeserializationSchema / KafkaDeserializationSchema用于反序列化来自Kafka的数据。
  3. Kafka消费者的属性。需要以下属性:

    “ bootstrap.servers”(以逗号分隔的Kafka broker列表)
    “ zookeeper.connect”(Zookeeper服务器的逗号分隔列表)
    “ group.id”消费者组的ID

代码案例:

  1. val properties = new Properties()
  2. properties.setProperty("bootstrap.servers" , "master:9092,slave01:9092,slave02:9092")
  3. properties.setProperty("zookeeper.connect" , "master:2181,slave01:2181,slave02:2181")
  4. properties.setProperty("group.id" , "spark")
  5. properties.setProperty("enable.auto.commit" , "true")
  6. properties.setProperty("auto.commit.interval.ms" ,"5000")
  7. /** * 配置下次重新消费的话,从哪里开始消费: * latest:从上一次提交的offset位置开始的 * earlist:从头开始进行(重复消费数据) */
  8. properties.setProperty("auto.offset.reset" , "latest")
  9. // 配置序列化和反序列化
  10. properties.setProperty("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer")
  11. properties.setProperty("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer")
  12. //获取数据源 kafka
  13. val consumer : FlinkKafkaConsumer09[String] = new FlinkKafkaConsumer09[String](
  14. "spark", new SimpleStringSchema(), properties
  15. )

五.DeserializationSchema

Flink Kafka使用者需要了解如何将Kafka中的二进制数据转换为Java / Scala对象。在 DeserializationSchema允许用户指定这样的一个架构。T deserialize(byte[] message) 每条Kafka消息都会调用该方法,并传递Kafka中的值。

从AbstractDeserializationSchema开始通常会很有帮助,该过程将产生的Java / Scala类型描述为Flink的类型系统。实现DeserializationSchema需要自己实现getProducedType(…)方法。

为了访问Kafka消息的键,值和元数据,KafkaDeserializationSchema具有以下反序列化方法T deserialize(ConsumerRecord record)。

为了方便起见,Flink提供以下架构:

  • TypeInformationSerializationSchema(和TypeInformationKeyValueSerializationSchema)基于Flink的创建架构TypeInformation。如果数据由Flink写入和读取,这将很有用。该模式是其他通用序列化方法的高性能Flink特定替代方案。
  • JsonDeserializationSchema(和JSONKeyValueDeserializationSchema)将序列化的JSON转换为ObjectNode对象,可以使用objectNode.get(“field”).as(Int/String/…)()从中访问字段。KeyValue objectNode包含一个“键”和“值”字段,其中包含所有字段,以及一个可选的“元数据”字段,用于显示此消息的偏移量/分区/主题。
  • AvroDeserializationSchema它使用静态提供的架构读取以Avro格式序列化的数据。它可以从Avro生成的类(AvroDeserializationSchema.forSpecific(…))推断模式,也可以GenericRecords 与手动提供的模式(AvroDeserializationSchema.forGeneric(…))一起使用。该反序列化模式期望序列化的记录不包含嵌入式模式。
    该模式还有一个可用的版本,可以在Confluent Schema Registry中查找作者的模式(用于写入记录的模式)。使用这些反序列化模式记录,将读取从Schema Registry检索并转换为静态提供(通过 ConfluentRegistryAvroDeserializationSchema.forGeneric(…)或ConfluentRegistryAvroDeserializationSchema.forSpecific(…))的模式。

遇到由于某种原因而无法反序列化的损坏消息时,有两种选择-从deserialize(…)方法中引发异常,这将导致作业失败并重新启动,或者返回null以允许Flink Kafka使用者静默跳过损坏的消息。请注意,由于消费者的容错能力,如果在损坏的消息上执行失败的工作,消费者将尝试再次反序列化消息。因此,如果反序列化仍然失败,则消费者将陷入该错误消息的不间断重启和失败循环中。

发表评论

表情:
评论列表 (有 0 条评论,140人围观)

还没有评论,来说两句吧...

相关阅读