Flink metric 简单尝试
参考文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html
实际代码:
package metric
import java.util.Properties
import com.sjb.constant.Constants
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.metrics.{Counter, Histogram, Meter, MeterView}
import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram
//import quartz.QuartzScheduler
//todo metric.MetricTest
object MetricTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
println("启动启动一个定时任务.........")
// QuartzScheduler.refreshScheduler();
println("启动定时任务完成.........")
val properties = new Properties;
properties.put("bootstrap.servers", Constants.BOOTSTRAP_SERVERS)
properties.put("group.id", Constants.GROUP_ID_TEST)
properties.setProperty("flink.partition-discovery.interval-millis", 300 * 1000 + "")
val stream = env.addSource(new FlinkKafkaConsumer[String](
"cep_test",
new SimpleStringSchema(),
properties).setStartFromEarliest())
stream.map(new RichMapFunction[String, String] {
private var counter: Counter = null
private var meter: Meter = null
private var histogram: Histogram = null
override def open(parameters: Configuration): Unit = {
this.counter = getRuntimeContext()
.getMetricGroup()
.addGroup("myGroup")
.counter("myCounter");
this.meter = getRuntimeContext()
.getMetricGroup()
.addGroup("myGroup")
.meter("myMeter", new MeterView(10));
this.histogram = getRuntimeContext()
.getMetricGroup()
.addGroup("myGroup")
.histogram("myHistogram",new DescriptiveStatisticsHistogram(1000));
}
override def map(in: String): String = {
// counter.inc()
this.meter.markEvent()
histogram.update(1024)
val aa = this.meter.getRate
aa.toString
}
}).print("==>")
env.execute("aaaa")
}
}
效果web端:
再具体详细的参考:
https://www.jianshu.com/p/a5ed47cd320a
https://www.cnblogs.com/qiu-hua/p/13910798.html
部分重要的:
还没有评论,来说两句吧...