Flink自定义metric监控流入量

「爱情、让人受尽委屈。」 2023-05-29 11:51 125阅读 0赞

点击蓝字

640?wx\_fmt=png

关注~~

flink任务本身提供了各种类型的指标监控,细化到了每一个Operator的流入/流出量、速率、Watermark值等,通常在实际应用中需要对接入数据做格式化例如转json,符合要求的数据会向下流动,不符合要求或者格式化异常称为脏数据会被过滤掉,现在目标实现一个通用化方式能够对正常数据与脏数据进行指标统计。

  1. flink metric类型分为Counter、Gauge、Histogram、Meter,需要统计的是一个累加值因此选取Counter类型的metirc
  2. 由于是对任务的流入监控,因此需要在Source端进行处理,通常对接的数据源是kafka, 而flink本身已经提供了kakfa connector,并且开放了数据反序列化的接口DeserializationSchema与抽象类AbstractDeserializationSchema,实现该接口或者继承抽象类可以完成数据的反序列化与格式化,由于每一条数据都需要进过反序列化处理,那么可以在反序列化的同时进行指标统计
  3. 在flink中自定义Metric入口是RuntimeContext, 但是在反序列化抽象类中并没有提供访问RuntimeContext的接口,一般是在RichFunction中,与其相关只有FlinkKafkaConsumer,那么就可以在FlinkKafkaConsumer中将获取到的RuntimeContext传给AbstractDeserializationSchema

实现步骤:

  1. 自定义一个继承AbstractDeserializationSchema的抽象类AbsDeserialization,里面包含RuntimeContext与两个统计的Counter,并且包含一个初始化Counter的方法initMetric
  2. 自定义一个继承FlinkKafkaConsumer010的抽象类,里面包含AbsDeserialization属性、构造化方法,并且重写run方法,在run方法里面给AbsDeserialization设置RuntimeContex对象并且调用其initMetric, 最后调用父类run方法

代码如下:

  1. public abstract class AbsDeserialization<T> extends AbstractDeserializationSchema<T> {
  2. private RuntimeContext runtimeContext;
  3. private String DIRTY_DATA_NAME="dirtyDataNum";
  4. private String NORMAL_DATA_NAME="normalDataNum";
  5. protected transient Counter dirtyDataNum;
  6. protected transient Counter normalDataNum;
  7. public RuntimeContext getRuntimeContext() {
  8. return runtimeContext;
  9. }
  10. public void setRuntimeContext(RuntimeContext runtimeContext) {
  11. this.runtimeContext = runtimeContext;
  12. }
  13. public void initMetric()
  14. {
  15. dirtyDataNum=runtimeContext.getMetricGroup().counter(DIRTY_DATA_NAME);
  16. normalDataNum=runtimeContext.getMetricGroup().counter(NORMAL_DATA_NAME);
  17. }
  18. }
  19. public class CustomerKafkaConsumer<T> extends FlinkKafkaConsumer010<T> {
  20. private AbsDeserialization<T> valueDeserializer;
  21. public CustomerKafkaConsumer(String topic, AbsDeserialization<T> valueDeserializer, Properties props) {
  22. super(topic, valueDeserializer, props);
  23. this.valueDeserializer=valueDeserializer;
  24. }
  25. @Override public void run(SourceContext<T> sourceContext) throws Exception {
  26. valueDeserializer.setRuntimeContext(getRuntimeContext());
  27. valueDeserializer.initMetric();
  28. super.run(sourceContext);
  29. }
  30. }

使用案例,只要定义一个继承AbsDeserialization类即可,

  1. class ParseDeserialization extends AbsDeserialization[RawData] {
  2. override def deserialize(message: Array[Byte]): RawData = {
  3. try {
  4. val msg = new String(message)
  5. val rawData = JSON.parseObject(msg, classOf[RawData])
  6. normalDataNum.inc() //正常数据指标
  7. rawData
  8. } catch {
  9. case e:Exception=>{
  10. dirtyDataNum.inc() //脏数据指标
  11. null
  12. }
  13. }
  14. }
  15. }

source使用方式:

  1. val consumer: CustomerKafkaConsumer[RawData] = new CustomerKafkaConsumer[RawData](topic, new ParseDeserialization, kafkaPro)

那么在任务运行中,可以在flink web的监控界面查看到normalDataNum 、dirtyDataNum 两个指标值,另外在AbsDeserialization里面也可以定义一些流入速率等监控。

640?wx\_fmt=png

扫一扫

640?wx\_fmt=png

640?wx\_fmt=png

发表评论

表情:
评论列表 (有 0 条评论,125人围观)

还没有评论,来说两句吧...

相关阅读

    相关 flinkmetrics开发

    flink metrics 的作用 Flink 提供的 Metrics 可以在 Flink 内部收集一些指标,通过这些指标让开发人员更好地理解作业或集群的状态。由于集群运

    相关 flink定义函数

    前言 在很多情况下,尽管flink提供了丰富的转换算子API可供开发者对数据进行各自处理,比如 map(),filter()等,但在实际使用的时候仍然不能满足所有的场景,