Flink 消费kafka 序列化

r囧r小猫 2024-04-18 15:50 159阅读 0赞

今天测试今天尝试了flink从kafka获取数据的测试程序编写,主要测试的kafka发送json的接收例子,尝试了几个kafka的DeserializationSchema(反序列化模式),包括了SimpleStringSchema,JSONKeyValueDeserializationSchema以及自定义DeserializationSchema.代码通过Flink计算引擎从Kafka相应的Topic中读取数据,通过FlinkKafkaConsumer010来实现.

1.SimpleStringSchema

官网上有SimpleStringSchema的示例,它可以构建DataStream[String],返回的就是kafka生产者发过来的信息。

以下是代码:

  1. package whTest
  2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  3. import java.util.Properties
  4. import org.apache.flink.api.common.serialization.SimpleStringSchema
  5. import org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristic
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
  7. import org.apache.flink.streaming.api.scala._
  8. object Fromkafka {
  9. case class Person (name:String,sex:String,age:Int)
  10. def main(args: Array[String]): Unit = {
  11. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  12. //开启checkPoint, Time interval between state checkpoints 5000 milliseconds.
  13. /**
  14. * 如果我们启用了Flink的Checkpint机制,
  15. * 那么Flink Kafka Consumer将会从指定的Topic中消费消息,
  16. * 然后定期地将Kafka offsets信息、状态信息以及其他的操作信息进行Checkpint。
  17. * 所以,如果Flink作业出故障了,Flink将会从最新的Checkpint中恢复,
  18. * 并且从上一次偏移量开始读取Kafka中消费消息。
  19. */
  20. env.enableCheckpointing(5000)
  21. import org.apache.flink.streaming.api.TimeCharacteristic
  22. //设置系统基本时间特性为事件时间
  23. // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  24. //kafka连接配置信息
  25. val properties = new Properties()
  26. properties.setProperty("bootstrap.servers", "localhost:9092")
  27. properties.setProperty("zookeeper.connect", "localhost:2181")
  28. properties.setProperty("group.id", "test")
  29. val kafkaStream = env
  30. .addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), properties))
  31. .print()
  32. // execute program
  33. env.execute("kafkaTest")
  34. }
  35. }

测试结果:

  1. {"ID_Link":"11111","CarNum":100,"speed":10.0}//即为生产者发送的信息

如果我们需要将消息进行封装,DataStream[String]->DataStream[MyType],可以在DataStream[String]后追加map函数进行转换,当然也可以使用下文的自定义DeserializationSchema。

2. JSONKeyValueDeserializationSchema

JSONKeyValueDeserializationSchema可以将序列化的JSON转换为ObjectNode对象,可以用objectNode.get(“field”)访问字段。新建JSONKeyValueDeserializationSchema需要带一个boolean类型参数,为true表示需要指明是否需要包含“元数据”、偏移量、分区和主题等信息,为false表明只需要数据。
以下是代码和结果:

  1. package whTest
  2. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  3. import java.util.Properties
  4. import org.apache.flink.api.common.serialization.SimpleStringSchema
  5. import org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristic
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
  7. import org.apache.flink.streaming.api.scala._
  8. object Fromkafka {
  9. case class Person (name:String,sex:String,age:Int)
  10. def main(args: Array[String]): Unit = {
  11. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  12. //开启checkPoint, Time interval between state checkpoints 5000 milliseconds.
  13. /**
  14. * 如果我们启用了Flink的Checkpint机制,
  15. * 那么Flink Kafka Consumer将会从指定的Topic中消费消息,
  16. * 然后定期地将Kafka offsets信息、状态信息以及其他的操作信息进行Checkpint。
  17. * 所以,如果Flink作业出故障了,Flink将会从最新的Checkpint中恢复,
  18. * 并且从上一次偏移量开始读取Kafka中消费消息。
  19. */
  20. env.enableCheckpointing(5000)
  21. import org.apache.flink.streaming.api.TimeCharacteristic
  22. //设置系统基本时间特性为事件时间
  23. // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  24. val properties = new Properties()
  25. properties.setProperty("bootstrap.servers", "localhost:9092")
  26. // only required for Kafka 0.8
  27. properties.setProperty("zookeeper.connect", "localhost:2181")
  28. properties.setProperty("group.id", "test")
  29. val kafkaStream = env
  30. .addSource(new FlinkKafkaConsumer010("test", new JSONKeyValueDeserializationSchema(true), properties))
  31. .print()
  32. // execute program
  33. env.execute("kafkaTest")
  34. }
  35. }

结果:

  1. // new JSONKeyValueDeserializationSchema(true) send json :{"name":"limei","age":12,"sex":"f"} get : {"value":{"name":"limei","age":12,"sex":"f"},"metadata":{"offset":10,"topic":"test","partition":0}}
  2. // new JSONKeyValueDeserializationSchema(false) send json :{"name":"limei","age":12,"sex":"f"} get :{"value":{"name":"limei","age":12,"sex":"f"}}

3.自定义DeserializationSchema

自定义DeserializationSchema需要实现DeserializationSchema接口,这一部分代码可以参考官方代码org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer。
我需要实现的是将从kafka获取到的json数据转化为我需要的自定义pojo类(VideoData)。
主要是要实现DeserializationSchema方法的deserialize方法,这个方法的输入是byte[] message类型,我们需要将其转换为String类型,然后通过Json工具类解析成POJO类。这里我使用的是google的Gson框架。

以下是DeserializationSchema类和POJO类代码

  1. package whTest;
  2. import com.google.gson.Gson;
  3. import org.apache.flink.api.common.serialization.DeserializationSchema;
  4. import org.apache.flink.api.common.typeinfo.TypeInformation;
  5. import java.io.IOException;
  6. import java.nio.ByteBuffer;
  7. import java.nio.ByteOrder;
  8. import java.nio.charset.Charset;
  9. import java.nio.CharBuffer;
  10. import java.nio.charset.CharsetDecoder;
  11. public class VideoDataDeSerializer implements DeserializationSchema<VideoData> {
  12. private static final long serialVersionUID = 1L;
  13. @Override
  14. public VideoData deserialize(byte[] message) throws IOException {
  15. ByteBuffer buffer = ByteBuffer.wrap(message).order(ByteOrder.LITTLE_ENDIAN);
  16. String mess = this.byteBuffertoString(buffer);
  17. //封装为POJO类
  18. Gson gson = new Gson();
  19. VideoData data = gson.fromJson(mess, VideoData.class);
  20. return data;
  21. }
  22. @Override
  23. public boolean isEndOfStream(VideoData nextElement) {
  24. return false;
  25. }
  26. @Override
  27. public TypeInformation<VideoData> getProducedType() {
  28. return null;
  29. }
  30. /**
  31. * 将ByteBuffer类型转换为String类型
  32. * @param buffer
  33. * @return
  34. */
  35. public static String byteBuffertoString(ByteBuffer buffer)
  36. {
  37. Charset charset = null;
  38. CharsetDecoder decoder = null;
  39. CharBuffer charBuffer = null;
  40. try
  41. {
  42. charset = Charset.forName("UTF-8");
  43. decoder = charset.newDecoder();
  44. // charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空
  45. charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
  46. return charBuffer.toString();
  47. }
  48. catch (Exception ex)
  49. {
  50. ex.printStackTrace();
  51. return "";
  52. }
  53. }
  54. }

POJO类:

  1. package whTest;
  2. public class VideoData {
  3. public VideoData(String ID_Link,int CarNum,float speed){
  4. this.ID_Link =ID_Link;
  5. this.CarNum = CarNum;
  6. this.speed = speed;
  7. }
  8. private String ID_Link;
  9. private int CarNum;
  10. private float speed;
  11. public void setID_Link(String ID_Link) {
  12. this.ID_Link = ID_Link;
  13. }
  14. public void setCarNum(int carNum) {
  15. CarNum = carNum;
  16. }
  17. public void setSpeed(float speed) {
  18. this.speed = speed;
  19. }
  20. public String getID_Link() {
  21. return ID_Link;
  22. }
  23. public int getCarNum() {
  24. return CarNum;
  25. }
  26. public float getSpeed() {
  27. return speed;
  28. }
  29. }

主函数只需要把DeserializationSchema类修改为自定义的VideoDataDeSerializer,当kafka生产者发送过来用VideoData转换的Json类型时,返回的就是我们需要的DataStream[VideoData]。这就不需要后面再用map函数将String转换为VideoData类型了。

发表评论

表情:
评论列表 (有 0 条评论,159人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Flink消费kafka数据

    前言: Flink的DataSoures模块中,定义了DataStream API的数据输入操作,Flink将数据源主要分为内置和第三方数据源,内置数据源包含文件、Soc...