Flink流连接器之Kafka【一】【Kafka安装、Kafka版本差异、序列化类型】
一.简介
Flink提供了特殊的Kafka连接器,用于在Kafka主题之间读写数据。Flink Kafka Consumer与Flink的检查点机制集成在一起,以提供一次精确的处理语义。为了实现这一目标,Flink不仅依赖于Kafka的消费者组偏移量跟踪,而且在内部也跟踪并检查这些偏移量。
对于大多数用户来说,FlinkKafkaConsumer08【08代表Kafka的版本】是合适的。
具体如下:
二.安装Apache Kafka
参考博客:Kafka分布式安装
备注:
- 按照Kafka快速入门中的说明下载代码并启动服务器(每次启动该应用程序之前,都需要启动Zookeeper和Kafka服务器)。
- 如果Kafka和Zookeeper服务器在远程计算机上运行,则文件中的advertised.host.name设置config/server.properties必须设置为计算机的IP地址。
三.Kafka 1.0.0+连接器
从Flink 1.7开始,有一个新的通用Kafka连接器,它不跟踪特定的Kafka主版本。而是在Flink发布时跟踪Kafka的最新版本。
如果Kafka broker 版本是1.0.0或更高版本,则应使用此Kafka连接器。如果使用Kafka的旧版本(0.11、0.10、0.9或0.8),则应使用与代理版本相对应的连接器。
兼容性
通用的Kafka连接器通过Kafka客户端API和代理的兼容性保证旧的和新的Kafka代理兼容。它与代理版本0.11.0或更高版本兼容,具体取决于所使用的功能。
将Kafka连接器从0.11迁移到通用
须知:
- 在整个过程中使用Flink 1.9或更高版本。
- 不要同时升级Flink和操作。
- 确保作业中使用的Kafka Consumer和Kafka Producer分配了唯一的标识符(uid)。
- 使用带有保存点功能的CLI命令停止功能来获取保存点(例如,通过使用stop —withSavepoint)。
四.Kafka消费者
Flink的Kafka使用者被称为FlinkKafkaConsumer09(对于Kafka 0.10.0.x版本,则称为010,11与此类似;FlinkKafkaConsumer对于Kafka> = 1.0.0版本)。它提供对一个或多个Kafka主题的访问。
构造函数接受以下参数:
- 主题名称/主题名称列表。
- DeserializationSchema / KafkaDeserializationSchema用于反序列化来自Kafka的数据。
Kafka消费者的属性。需要以下属性:
“ bootstrap.servers”(以逗号分隔的Kafka broker列表)
“ zookeeper.connect”(Zookeeper服务器的逗号分隔列表)
“ group.id”消费者组的ID
代码案例:
val properties = new Properties()
properties.setProperty("bootstrap.servers" , "master:9092,slave01:9092,slave02:9092")
properties.setProperty("zookeeper.connect" , "master:2181,slave01:2181,slave02:2181")
properties.setProperty("group.id" , "spark")
properties.setProperty("enable.auto.commit" , "true")
properties.setProperty("auto.commit.interval.ms" ,"5000")
/** * 配置下次重新消费的话,从哪里开始消费: * latest:从上一次提交的offset位置开始的 * earlist:从头开始进行(重复消费数据) */
properties.setProperty("auto.offset.reset" , "latest")
// 配置序列化和反序列化
properties.setProperty("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer")
properties.setProperty("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer")
//获取数据源 kafka
val consumer : FlinkKafkaConsumer09[String] = new FlinkKafkaConsumer09[String](
"spark", new SimpleStringSchema(), properties
)
五.DeserializationSchema
Flink Kafka使用者需要了解如何将Kafka中的二进制数据转换为Java / Scala对象。在 DeserializationSchema允许用户指定这样的一个架构。T deserialize(byte[] message) 每条Kafka消息都会调用该方法,并传递Kafka中的值。
从AbstractDeserializationSchema开始通常会很有帮助,该过程将产生的Java / Scala类型描述为Flink的类型系统。实现DeserializationSchema需要自己实现getProducedType(…)方法。
为了访问Kafka消息的键,值和元数据,KafkaDeserializationSchema具有以下反序列化方法T deserialize(ConsumerRecord
为了方便起见,Flink提供以下架构:
- TypeInformationSerializationSchema(和TypeInformationKeyValueSerializationSchema)基于Flink的创建架构TypeInformation。如果数据由Flink写入和读取,这将很有用。该模式是其他通用序列化方法的高性能Flink特定替代方案。
- JsonDeserializationSchema(和JSONKeyValueDeserializationSchema)将序列化的JSON转换为ObjectNode对象,可以使用objectNode.get(“field”).as(Int/String/…)()从中访问字段。KeyValue objectNode包含一个“键”和“值”字段,其中包含所有字段,以及一个可选的“元数据”字段,用于显示此消息的偏移量/分区/主题。
- AvroDeserializationSchema它使用静态提供的架构读取以Avro格式序列化的数据。它可以从Avro生成的类(AvroDeserializationSchema.forSpecific(…))推断模式,也可以GenericRecords 与手动提供的模式(AvroDeserializationSchema.forGeneric(…))一起使用。该反序列化模式期望序列化的记录不包含嵌入式模式。
该模式还有一个可用的版本,可以在Confluent Schema Registry中查找作者的模式(用于写入记录的模式)。使用这些反序列化模式记录,将读取从Schema Registry检索并转换为静态提供(通过 ConfluentRegistryAvroDeserializationSchema.forGeneric(…)或ConfluentRegistryAvroDeserializationSchema.forSpecific(…))的模式。
遇到由于某种原因而无法反序列化的损坏消息时,有两种选择-从deserialize(…)方法中引发异常,这将导致作业失败并重新启动,或者返回null以允许Flink Kafka使用者静默跳过损坏的消息。请注意,由于消费者的容错能力,如果在损坏的消息上执行失败的工作,消费者将尝试再次反序列化消息。因此,如果反序列化仍然失败,则消费者将陷入该错误消息的不间断重启和失败循环中。
还没有评论,来说两句吧...