Flink metric 简单尝试

「爱情、让人受尽委屈。」 2022-10-14 12:58 220阅读 0赞

参考文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzMxODY2Nzkz_size_16_color_FFFFFF_t_70

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzMxODY2Nzkz_size_16_color_FFFFFF_t_70 1

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzMxODY2Nzkz_size_16_color_FFFFFF_t_70 2

实际代码:

  1. package metric
  2. import java.util.Properties
  3. import com.sjb.constant.Constants
  4. import org.apache.flink.api.common.functions.RichMapFunction
  5. import org.apache.flink.api.common.serialization.SimpleStringSchema
  6. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  7. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
  8. import org.apache.flink.api.scala._
  9. import org.apache.flink.configuration.Configuration
  10. import org.apache.flink.metrics.{Counter, Histogram, Meter, MeterView}
  11. import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram
  12. //import quartz.QuartzScheduler
  13. //todo metric.MetricTest
  14. object MetricTest {
  15. def main(args: Array[String]): Unit = {
  16. val env = StreamExecutionEnvironment.getExecutionEnvironment
  17. env.setParallelism(1)
  18. println("启动启动一个定时任务.........")
  19. // QuartzScheduler.refreshScheduler();
  20. println("启动定时任务完成.........")
  21. val properties = new Properties;
  22. properties.put("bootstrap.servers", Constants.BOOTSTRAP_SERVERS)
  23. properties.put("group.id", Constants.GROUP_ID_TEST)
  24. properties.setProperty("flink.partition-discovery.interval-millis", 300 * 1000 + "")
  25. val stream = env.addSource(new FlinkKafkaConsumer[String](
  26. "cep_test",
  27. new SimpleStringSchema(),
  28. properties).setStartFromEarliest())
  29. stream.map(new RichMapFunction[String, String] {
  30. private var counter: Counter = null
  31. private var meter: Meter = null
  32. private var histogram: Histogram = null
  33. override def open(parameters: Configuration): Unit = {
  34. this.counter = getRuntimeContext()
  35. .getMetricGroup()
  36. .addGroup("myGroup")
  37. .counter("myCounter");
  38. this.meter = getRuntimeContext()
  39. .getMetricGroup()
  40. .addGroup("myGroup")
  41. .meter("myMeter", new MeterView(10));
  42. this.histogram = getRuntimeContext()
  43. .getMetricGroup()
  44. .addGroup("myGroup")
  45. .histogram("myHistogram",new DescriptiveStatisticsHistogram(1000));
  46. }
  47. override def map(in: String): String = {
  48. // counter.inc()
  49. this.meter.markEvent()
  50. histogram.update(1024)
  51. val aa = this.meter.getRate
  52. aa.toString
  53. }
  54. }).print("==>")
  55. env.execute("aaaa")
  56. }
  57. }

效果web端:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzMxODY2Nzkz_size_16_color_FFFFFF_t_70 3

再具体详细的参考:

https://www.jianshu.com/p/a5ed47cd320a

https://www.cnblogs.com/qiu-hua/p/13910798.html

部分重要的:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzMxODY2Nzkz_size_16_color_FFFFFF_t_70 4

发表评论

表情:
评论列表 (有 0 条评论,220人围观)

还没有评论,来说两句吧...

相关阅读

    相关 flinkmetrics开发

    flink metrics 的作用 Flink 提供的 Metrics 可以在 Flink 内部收集一些指标,通过这些指标让开发人员更好地理解作业或集群的状态。由于集群运

    相关 VMP的简单尝试

    之前写壳的时候想搞点vmp,不过网上讲解这个东西的博客比较少,翻来覆去只找到一篇:https://www.cnblogs.com/LittleHann/p/3344261.ht