度量

Flink公开了一个度量系统,允许收集和公开指标到外部系统。

注册指标

您可以通过调用从扩展RichFunction任何用户函数访问度量标准系统getRuntimeContext().getMetricGroup()此方法返回一个MetricGroup对象,您可以在该对象上创建和注册新指标。

度量类型

Flink支持CountersGaugesHistogramsMeters

计数器

A Counter用于计算某些东西。可以使用inc()/inc(long n)来Reduce当前值dec()/dec(long n)您可以创建并注册Counter调用counter(String name)MetricGroup

public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
    this.counter.inc();
    return value;
  }
}
class MyMapper extends RichMapFunction[String,String] {
  @transient private var counter: Counter = _

  override def open(parameters: Configuration): Unit = {
    counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter")
  }

  override def map(value: String): String = {
    counter.inc()
    value
  }
}

或者,您也可以使用自己的Counter实现:

public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCustomCounter", new CustomCounter());
  }

  @Override
  public String map(String value) throws Exception {
    this.counter.inc();
    return value;
  }
}
class MyMapper extends RichMapFunction[String,String] {
  @transient private var counter: Counter = _

  override def open(parameters: Configuration): Unit = {
    counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCustomCounter", new CustomCounter())
  }

  override def map(value: String): String = {
    counter.inc()
    value
  }
}

测量

A根据需要Gauge提供任何类型的值。为了使用a,Gauge您必须首先创建一个实现该org.apache.flink.metrics.Gauge接口的类返回值的类型没有限制。你可以通过调用注册一个计gauge(String name, Gauge gauge)MetricGroup

public class MyMapper extends RichMapFunction<String, String> {
  private transient int valueToExpose = 0;

  @Override
  public void open(Configuration config) {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", new Gauge<Integer>() {
        @Override
        public Integer getValue() {
          return valueToExpose;
        }
      });
  }

  @Override
  public String map(String value) throws Exception {
    valueToExpose++;
    return value;
  }
}
new class MyMapper extends RichMapFunction[String,String] {
  @transient private var valueToExpose = 0

  override def open(parameters: Configuration): Unit = {
    getRuntimeContext()
      .getMetricGroup()
      .gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
  }

  override def map(value: String): String = {
    valueToExpose += 1
    value
  }
}

请注意,报告会将公开的对象转换为a String,这意味着需要进行有意义的toString()实现。

直方图

A Histogram衡量长值的分布。你可以通过调用注册一个histogram(String name, Histogram histogram)上一个MetricGroup

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Histogram histogram;

  @Override
  public void open(Configuration config) {
    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram());
  }

  @Override
  public Long map(Long value) throws Exception {
    this.histogram.update(value);
    return value;
  }
}
class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var histogram: Histogram = _

  override def open(parameters: Configuration): Unit = {
    histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram())
  }

  override def map(value: Long): Long = {
    histogram.update(value)
    value
  }
}

Flink没有提供默认实现Histogram,但提供了一个允许使用Codahale / DropWizard直方图Wrapper要使用此打包,请在以下内容中添加以下依赖项pom.xml

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.7-SNAPSHOT</version>
</dependency>

然后你可以像这样注册一个Codahale / DropWizard直方图:

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Histogram histogram;

  @Override
  public void open(Configuration config) {
    com.codahale.metrics.Histogram dropwizardHistogram =
      new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));

    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));
  }
  
  @Override
  public Long map(Long value) throws Exception {
    this.histogram.update(value);
    return value;
  }
}
class MyMapper extends RichMapFunction[Long, Long] {
  @transient private var histogram: Histogram = _

  override def open(config: Configuration): Unit = {
    com.codahale.metrics.Histogram dropwizardHistogram =
      new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))
        
    histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram))
  }
  
  override def map(value: Long): Long = {
    histogram.update(value)
    value
  }
}

仪表

A Meter衡量平均吞吐量。可以使用该markEvent()方法注册事件的发生可以使用markEvent(long n)方法注册同时发生多个事件你可以通过调用注册一个仪表meter(String name, Meter meter)MetricGroup

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Meter meter;

  @Override
  public void open(Configuration config) {
    this.meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new MyMeter());
  }

  @Override
  public Long map(Long value) throws Exception {
    this.meter.markEvent();
    return value;
  }
}
class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var meter: Meter = _

  override def open(config: Configuration): Unit = {
    meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new MyMeter())
  }

  override def map(value: Long): Long = {
    meter.markEvent()
    value
  }
}

Flink提供了一个允许使用Codahale / DropWizard表打包器要使用此打包,请在以下内容中添加以下依赖项pom.xml

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.7-SNAPSHOT</version>
</dependency>

然后你可以像这样注册一个Codahale / DropWizard仪表:

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Meter meter;

  @Override
  public void open(Configuration config) {
    com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();

    this.meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter));
  }

  @Override
  public Long map(Long value) throws Exception {
    this.meter.markEvent();
    return value;
  }
}
class MyMapper extends RichMapFunction[Long,Long] {
  @transient private var meter: Meter = _

  override def open(config: Configuration): Unit = {
    com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter()
  
    meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter))
  }

  override def map(value: Long): Long = {
    meter.markEvent()
    value
  }
}

范围

为每个度量标准分配一个标识符和一组键值对,在该键值对下将报告度量标准。

标识符基于3个组件:注册度量标准时的用户定义名称,可选的用户定义范围和系统提供的范围。例如,如果A.B是系统范围,C.D用户范围和E名称,则度量标识符将是A.B.C.D.E

您可以.通过设置metrics.scope.delimiterKeys来配置要用于标识符的分隔符(默认值:) conf/flink-conf.yaml

用户范围

你可以通过调用定义用户范围MetricGroup#addGroup(String name)MetricGroup#addGroup(int name)Metric#addGroup(String key, String value)这些方法影响什么MetricGroup#getMetricIdentifierMetricGroup#getScopeComponents返回。

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter");

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter");
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter")

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter")

系统范围

系统范围包含有关度量标准的上下文信息,例如,它在哪个任务中注册或该任务属于哪个作业。

可以通过设置以下键来配置应包含哪些上下文信息conf/flink-conf.yaml这些键中的每一个都期望一个格式字符串可能包含常量(例如“taskmanager”)和变量(例如“<task_id>”),它们将在运行时被替换。

  • metrics.scope.jm
    • 默认值:<host> .jobmanager
    • 应用于作用域JobManager的所有指标。
  • metrics.scope.jm.job
    • 默认值:<host> .jobmanager。<job_name>
    • 应用于作用于JobManager和作业的所有度量标准。
  • metrics.scope.tm
    • 默认值:<host> .taskmanager。<tm_id>
    • 应用于作用于TaskManager的所有度量标准。
  • metrics.scope.tm.job
    • 默认值:<host> .taskmanager。<tm_id>。<job_name>
    • 应用于作用于TaskManager和作业的所有度量标准。
  • metrics.scope.task
    • 默认值:<host> .taskmanager。<tm_id>。<job_name>。<task_name>。<subtask_index>
    • 应用于作用于任务的所有指标。
  • metrics.scope.operator
    • 默认值:<host> .taskmanager。<tm_id>。<job_name>。<operator_name>。<subtask_index>
    • 应用于作用于算子的所有指标。

变量的数量或顺序没有限制。变量区分大小写。

算子指标的默认范围将产生类似于的标识符 localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric

如果您还想包含任务名称但省略TaskManager信息,则可以指定以下格式:

metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>

这可以创建标识符localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric

请注意,对于此格式字符串,如果同时多次运行同一作业,则可能发生标识符冲突,这可能导致度量标准数据不一致。因此,建议使用通过包含ID(例如<job_id>)或通过为作业和 算子分配唯一名称来提供一定程度的唯一性的格式字符串。

所有变量列表

  • JobManager:<host>
  • TaskManager:<host>,<tm_id>
  • 作业:<job_id>,<作业名称>
  • 任务:<task_id>,<task_name>,<task_attempt_id>,<task_attempt_num>,<subtask_index>
  • 算子:<operator_id>,<operator_name>,<subtask_index>

要点:对于Batch API,<operator_id>始终等于<task_id>。

用户变量

您可以通过调用来定义用户变量MetricGroup#addGroup(String key, String value)这种方法会影响什么MetricGroup#getMetricIdentifierMetricGroup#getScopeComponentsMetricGroup#getAllVariables()返回。

重要提示:用户变量不能用于范围格式。

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter");
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter")

报告

通过配置一个或多个报告,可以将度量标准暴露给外部系统conf/flink-conf.yaml这些报告将在每个工作和TaskManager启动时进行实例化。

  • metrics.reporter.<name>.<config><config>报告的通用设置命名<name>
  • metrics.reporter.<name>.class:报告类用于为报告命名<name>
  • metrics.reporter.<name>.interval:报告间隔用于报告的名字<name>
  • metrics.reporter.<name>.scope.delimiter:用于名称的报告者的标识符(默认值使用metrics.scope.delimiter的分隔符<name>
  • metrics.reporters:(可选)以逗号分隔的包含报告名称列表。默认情况下,将使用所有已配置的报告。

所有报告必须至少拥有该class财产,其中一些允许指定报告interval下面,我们将列出针对每位报告的更多设置。

示例报表配置,指定多个报告:

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

重要说明:启动Flink时,通过将其放在/ lib文件夹中,可以访问包含报告者的jar。

您可以Reporter通过实现org.apache.flink.metrics.reporter.MetricReporter接口编写自己如果Reporter应定期发送报告,您还必须实现该Scheduled接口。

以下部分列出了受支持的报告。

JMX(org.apache.flink.metrics.jmx.JMXReporter)

您不必包含其他依赖项,因为默认情况下JMX报告器可用但未激活。

参数:

  • port - (可选)JMX侦听连接的端口。为了能够在一个主机上运行多个报告实例(例如,当一个TaskManager与JobManager共同使用时),建议使用类似的端口范围9250-9260指定范围时,实际端口将显示在相关作业或TaskManager日志中。如果设置此设置,Flink将为给定的端口/范围启动额外的JMX连接器。度量标准始终在默认的本地JMX界面上可用。

配置示例:

metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789

通过JMX公开的度量标准由域和键属性列表标识,这些键属性一起形成对象名称。

域始终以org.apache.flink广义度量标识符开头与通常的标识符相反,它不受作用域格式的影响,不包含任何变量,并且在作业中保持不变。这种域的一个例子是org.apache.flink.job.task.numBytesOut

键属性列表包含与给定度量关联的所有变量的值,无论配置的范围格式如何。这样一个列表的一个例子是host=localhost,job_name=MyJob,task_name=MyTask

因此,域标识度量标准类,而关键属性列表标识该度量标准的一个(或多个)实例。

Ganglia(org.apache.flink.metrics.ganglia.GangliaReporter)

要使用此报告,您必须复制/opt/flink-metrics-ganglia-1.7-SNAPSHOT.jar/libFlink发行版文件夹中。

参数:

  • host-下配置的的gmond主机地址udp_recv_channel.bindgmond.conf
  • port-下配置的端口的gmond udp_recv_channel.portgmond.conf
  • tmax - 应保存旧度量标准的软限制
  • dmax - 应保存旧指标多长时间的硬限制
  • ttl - 传输的UDP数据包的生存时间
  • addressingMode - 要使用的UDP寻址模式(UNICAST / MULTICAST)

配置示例:

metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
metrics.reporter.gang.host: localhost
metrics.reporter.gang.port: 8649
metrics.reporter.gang.tmax: 60
metrics.reporter.gang.dmax: 0
metrics.reporter.gang.ttl: 1
metrics.reporter.gang.addressingMode: MULTICAST

Graphite(org.apache.flink.metrics.graphite.GraphiteReporter)

要使用此报告,您必须复制/opt/flink-metrics-graphite-1.7-SNAPSHOT.jar/libFlink发行版文件夹中。

参数:

  • host - Graphite服务器主机
  • port - Graphite服务器端口
  • protocol - 使用协议(TCP / UDP)

配置示例:

metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP

Prometheus (org.apache.flink.metrics.prometheus.PrometheusReporter)

要使用此报告,您必须复制/opt/flink-metrics-prometheus-1.7-SNAPSHOT.jar/libFlink发行版文件夹中。

参数:

  • port- (可选)Prometheus导出器侦听的端口,默认为9249为了能够在一个主机上运行多个报告实例(例如,当一个TaskManager与JobManager共同使用时),建议使用类似的端口范围9250-9260

配置示例:

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

Flink度量标准类型映射到Prometheus度量标准类型,如下所示:

Flink Prometheus 注意
计数器 测量 Prometheus 计数器不能Reduce。
测量 测量 仅支持数字和布尔值。
直方图 概要 分位数.5,.75,.95,.98,.99和.999
仪表 测量 仪表输出仪表的速率。

所有Flink度量变量(请参阅所有变量列表)都将作为标签导出到Prometheus。

PrometheusPushGateway(org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)

要使用此报告,您必须复制/opt/flink-metrics-prometheus-1.7-SNAPSHOT.jar/libFlink发行版文件夹中。

参数:

默认 描述
deleteOnShutdown
true 指定是否在关闭时从PushGateway中删除指标。
Host
(none) PushGateway服务器主机。
jobName
(none) 将推送指标的作业名称
port
-1 PushGateway服务器端口。
randomJobNameSuffix
true 指定是否应将随机后缀附加到作业名称。

配置示例:

metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false

PrometheusPushGatewayReporter将指标推送到Pushgateway,可由Prometheus 抓取

有关用例,请参阅Prometheus文档

StatsD(org.apache.flink.metrics.statsd.StatsDReporter)

要使用此报告,您必须复制/opt/flink-metrics-statsd-1.7-SNAPSHOT.jar/libFlink发行版文件夹中。

参数:

  • host - StatsD服务器主机
  • port - StatsD服务器端口

配置示例:

metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125

Datadog(org.apache.flink.metrics.datadog.DatadogHttpReporter)

要使用此报告,您必须复制/opt/flink-metrics-datadog-1.7-SNAPSHOT.jar/libFlink发行版文件夹中。

注意Flink指标,如任何变量<host><job_name><tm_id><subtask_index><task_name>,和<operator_name>,将被发送到Datadog的标签。标签看起来像host:localhostjob_name:myjobname

参数:

  • apikey - Datadog APIKeys
  • tags - (可选)发送到Datadog时将应用于度量标准的全局标记。标签应仅以逗号分隔

配置示例:

metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: xxx
metrics.reporter.dghttp.tags: myflinkapp,prod

Slf4j(org.apache.flink.metrics.slf4j.Slf4jReporter)

要使用此报告,您必须复制/opt/flink-metrics-slf4j-1.7-SNAPSHOT.jar/libFlink发行版文件夹中。

配置示例:

metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 60 SECONDS

系统指标

默认情况下,Flink会收集几个指标,这些指标可以提供有关当前状态的深入见解。本节是所有这些指标的参考。

下表通常包含5列:

  • “范围”列描述了用于生成系统范围的范围格式。例如,如果单元格包含“Operator”,则使用“metrics.scope.operator”的范围格式。如果单元格包含多个值(以斜杠分隔),则会针对不同的实体多次报告度量标准,例如作业和TaskManager。

  • (可选)“Infix”列描述了哪个中缀附加到系统范围。

  • “度量标准”列列出了为给定范围和中缀注册的所有度量标准的名称。

  • “描述”列提供有关给定度量正在测量的信息。

  • “类型”列描述了用于测量的度量类型。

请注意,中缀/指标名称列中的所有点仍受“metrics.delimiter”设置的约束。

因此,为了推断度量标识符:

  1. 根据“范围”列获取范围格式
  2. 如果存在,则将值附加到“中缀”列中,并考虑“metrics.delimiter”设置
  3. 附加指标名称。

中央处理器

范围 中缀 度量 描述 类型
Job-/TaskManager Status.JVM.CPU 加载 JVM最近的CPU使用情况。 测量
时间 JVM使用的CPU时间。 测量

内存

范围 中缀 度量 描述 类型
Job-/TaskManager Status.JVM.Memory Heap.Used 当前使用的堆内存量(以字节为单位)。 测量
Heap.Committed 保证可供JVM使用的堆内存量(以字节为单位)。 测量
Heap.Max 可用于内存管理的最大堆内存量(以字节为单位)。 测量
NonHeap.Used 当前使用的非堆内存量(以字节为单位)。 测量
NonHeap.Committed 保证JVM可用的非堆内存量(以字节为单位)。 测量
NonHeap.Max 可用于内存管理的最大非堆内存量(以字节为单位)。 测量
Direct.Count 直接缓冲池中的缓冲区数。 测量
Direct.MemoryUsed JVM用于直接缓冲池的内存量(以字节为单位)。 测量
Direct.TotalCapacity 直接缓冲池中所有缓冲区的总容量(以字节为单位)。 测量
Mapped.Count 映射缓冲池中的缓冲区数。 测量
Mapped.MemoryUsed JVM用于映射缓冲池的内存量(以字节为单位)。 测量
Mapped.TotalCapacity 映射缓冲池中的缓冲区数(以字节为单位)。 测量

线程

范围 中缀 度量 描述 类型
Job-/TaskManager Status.JVM.Threads 计数 活动线程总数。 测量

垃圾收集

范围 中缀 度量 描述 类型
Job-/TaskManager Status.JVM.GarbageCollector <GarbageCollector> .Count之间 已发生的集合总数。 测量
<GarbageCollector>。时间 执行垃圾收集所花费的总时间。 测量

类加载器

范围 中缀 度量 描述 类型
Job-/TaskManager Status.JVM.ClassLoader ClassesLoaded 自JVM启动以来加载的类总数。 测量
ClassesUnloaded 自JVM启动以来卸载的类总数。 测量

网络

范围 中缀 度量 描述 类型
TaskManager Status.Network AvailableMemorySegments 未使用的内存段数。 测量
TotalMemorySegments 分配的内存段数。 测量
Task buffers inputQueueLength 排队的输入缓冲区数。 测量
outputQueueLength 排队输出缓冲区的数量。 测量
inPoolUsage 估计输入缓冲区的使用情况。 测量
outPoolUsage 估计输出缓冲区的使用情况。 测量
Network.<Input|Output>.<gate>(only available if taskmanager.net.detailed-metrics config option is set) totalQueueLen 所有输入/输出通道中排队缓冲区的总数。 测量
minQueueLen 所有输入/输出通道中的最小排队缓冲区数。 测量
maxQueueLen 所有输入/输出通道中的最大排队缓冲区数。 测量
avgQueueLen 所有输入/输出通道中的平均缓冲区数。 测量

集群

范围 度量 描述 类型
JobManager numRegisteredTaskManagers 注册任务管理员的数量。 测量
numRunningJobs 正在运行的作业数量。 测量
taskSlotsAvailable 可用任务槽的数量。 测量
taskSlotsTotal 任务槽的总数。 测量

可用性

范围 度量 描述 类型
Job (only available on JobManager) restartingTime 重新启动作业所花费的时间,或当前重新启动的持续时间(以毫秒为单位)。 测量
uptime 作业运行的时间不间断。

对于已完成的作业,返回-1(以毫秒为单位)。

测量
downtime 对于当前处于故障/恢复状态的作业,在此中断期间经过的时间。

对于正在运行的作业返回0,对于已完成的作业返回-1(以毫秒为单位)。

测量
fullRestarts 自提交此作业以来完全重新启动的总次数。 测量

检查点

范围 度量 描述 类型
Job (only available on JobManager) lastCheckpointDuration 完成最后一个检查点所花费的时间(以毫秒为单位)。 测量
lastCheckpointSize 最后一个检查点的总大小(以字节为单位)。 测量
lastCheckpointExternalPath 存储最后一个外部检查点的路径。 测量
lastCheckpointRestoreTimestamp 在协调器上恢复最后一个检查点时的时间戳(以毫秒为单位)。 测量
lastCheckpointAlignmentBuffered 在最后一个检查点的所有子任务上进行对齐期间的缓冲字节数(以字节为单位)。 测量
numberOfInProgressCheckpoints 进行中检查点的数量。 测量
numberOfCompletedCheckpoints 成功完成检查点的数量。 测量
numberOfFailedCheckpoints 失败检查点的数量。 测量
totalNumberOfCheckpoints 总检查点的数量(正在进行,已完成,失败)。 测量
Task checkpointAlignmentTime 最后一次屏障对齐完成所花费的时间(以纳秒为单位),或当前对齐到目前为止所用的时间(以纳秒为单位)。 测量

IO

范围 度量 描述 类型
Job (only available on TaskManager) <SOURCE_ID> <source_subtask_index> <operator_id> <operator_subtask_index> .latency 从给定源子任务到算子子任务的延迟分布(以毫秒为单位)。 直方图
任务 numBytesInLocal 此任务从本地源读取的总字节数。 计数器
numBytesInLocalPerSecond 此任务每秒从本地源读取的字节数。 仪表
numBytesInRemote 此任务从远程源读取的总字节数。 计数器
numBytesInRemotePerSecond 此任务每秒从远程源读取的字节数。 仪表
numBuffersInLocal 此任务从本地源读取的网络缓冲区总数。 计数器
numBuffersInLocalPerSecond 此任务每秒从本地源读取的网络缓冲区数。 仪表
numBuffersInRemote 此任务从远程源读取的网络缓冲区总数。 计数器
numBuffersInRemotePerSecond 此任务每秒从远程源读取的网络缓冲区数。 仪表
numBytesOut 此任务已发出的总字节数。 计数器
numBytesOutPerSecond 此任务每秒发出的字节数。 仪表
numBuffersOut 此任务已发出的网络缓冲区总数。 计数器
numBuffersOutPerSecond 此任务每秒发出的网络缓冲区数。 仪表
任务/算子 numRecordsIn 此 算子/任务已收到的记录总数。 计数器
numRecordsInPerSecond 此 算子/任务每秒接收的记录数。 仪表
numRecordsOut 此 算子/任务已发出的记录总数。 计数器
numRecordsOutPerSecond 此 算子/任务每秒发送的记录数。 仪表
numLateRecordsDropped 此算子/任务因迟到而丢失的记录数。 计数器
currentInputWatermark 此 算子/任务收到的​​最后一个水印(以毫秒为单位)。

注意:对于具有2个输入的算子/任务,这是最后收到的水印的最小值。

测量
算子 currentInput1Watermark 此 算子在其第一个输入(毫秒)中收到的最后一个水印。

注意:仅适用于具有2个输入的算子。

测量
currentInput2Watermark 此 算子在其第二个输入中接收的最后一个水印(以毫秒为单位)。

注意:仅适用于具有2个输入的算子。

测量
currentOutputWatermark 此 算子发出的最后一个水印(以毫秒为单位)。 测量
numSplitsProcessed 此数据源已处理的InputSplits总数(如果 算子是数据源)。 测量

连接器

Kafka连接器

范围 度量 用户变量 描述 类型
算子 commitsSucceeded N / A 如果启用了偏移提交并且启用了检查点,则成功向Kafka提交的偏移提交总数。 计数器
算子 commitsFailed N / A 如果启用了偏移提交并且启用了检查点,则Kafka的偏移提交失败总数。请注意,将偏移量提交回Kafka只是暴露消费者进度的一种方法,因此提交失败不会影响Flink的检查点分区偏移的完整性。 计数器
算子 committedOffsets Topic,分区 对于每个分区,最后成功提交到Kafka的偏移量。可以通过主题名称和分区ID指定特定分区的度量标准。 测量
算子 currentOffsets Topic,分区 消费者对每个分区的当前读取偏移量。可以通过主题名称和分区ID指定特定分区的度量标准。 测量

Kinesis连接器

范围 度量 用户变量 描述 类型
算子 millisBehindLatest stream,shardId 对于每个Kinesis分片,消费者在流的头部后面的毫秒数,表示消费者当前时间落后多少。可以通过流名称和分片标识指定特定分片的度量标准。值为0表示记录处理被捕获,此时没有要处理的新记录。值-1表示该度量标准尚未报告。 测量
算子 sleepTimeMillis stream,shardId 消费者在从Kinesis获取记录之前花费的毫秒数。可以通过流名称和分片标识指定特定分片的度量标准。 测量
算子 maxNumberOfRecordsPerFetch stream,shardId 消费者在单个getRecords调用Kinesis时请求的最大记录数。如果ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS设置为true,则自适应地计算此值以最大化Kinesis的2 Mbps读取限制。 测量
算子 numberOfAggregatedRecordsPerFetch stream,shardId 消费者在单个getRecords调用Kinesis时获取的聚合Kinesis记录数。 测量
算子 numberOfDeggregatedRecordsPerFetch stream,shardId 消费者在单个getRecords调用Kinesis时获取的分解Kinesis记录的数量。 测量
算子 averageRecordSizeBytes stream,shardId Kinesis记录的平均大小(以字节为单位),由消费者在单个getRecords调用中获取。 测量
算子 runLoopTimeNanos stream,shardId 消费者在运行循环中花费的实际时间(以纳秒为单位)。 测量
算子 loopFrequencyHz stream,shardId 一秒钟内调用getRecords的次数。 测量
算子 bytesRequestedPerFetch stream,shardId 在一次调用getRecords中请求的字节数(2 Mbps / loopFrequencyHz)。 测量

系统资源

默认情况下禁用系统资源报告。metrics.system-resource 启用下面列出的指标将是可利用的作业-与TaskManager。系统资源度量标准会定期更新,并显示已配置间隔(metrics.system-resource-probing-interval)的平均值

系统资源报告要求在类路径上存在可选的依赖项(例如,放在Flink的lib目录中):

  • com.github.oshi:oshi-core:3.4.0 (根据EPL 1.0许可证授权)

包括它的传递依赖:

  • net.java.dev.jna:jna-platform:jar:4.2.2
  • net.java.dev.jna:jna:jar:4.2.2

这方面的失败将被报告为启动期间NoClassDefFoundError 记录的警告消息SystemResourcesMetricsInitializer

系统CPU

范围 中缀 度量 描述
Job-/TaskManager System.CPU 用法 机器上CPU使用率的总体百分比。
机器上CPU空闲使用率的百分比。
SYS 计算机上系统CPU使用率的百分比。
用户 计算机上用户CPU使用率的百分比。
IOWAIT 计算机上IOWait CPU使用率的百分比。
IRQ 机器上Irq CPU使用率的百分比。
软中断 计算机上SoftIrq CPU使用率的百分比。
尼斯 在机器上使用Nice Idle的百分比。
Load1min 平均CPU负载超过1分钟
Load5min 平均CPU负载超过5分钟
Load15min 平均CPU负载超过15分钟
UsageCPU * 每个处理器的CPU使用率百分比

系统内存

范围 中缀 度量 描述
Job-/TaskManager System.Memory 可得到 可用内存字节数
总内存(字节)
System.Swap 用过的 使用的交换字节
总交换字节数

系统网络

范围 中缀 度量 描述
Job-/TaskManager System.Network.INTERFACE_NAME ReceiveRate 平均接收速率,以每秒字节数为单位
SendRate 平均发送速率,以字节/秒为单位

延迟跟踪

Flink允许跟踪通过系统传输的记录的延迟。默认情况下禁用此函数。为了使延迟跟踪你必须设置latencyTrackingInterval在无论是正数 Flink配置ExecutionConfig

latencyTrackingInterval,源将定期发出一个特殊的记录,称为LatencyMarker标记包含从源发出记录时的时间戳。延迟标记不能超过常规用户记录,因此如果记录在算子面前排队,则会增加标记跟踪的延迟。

请注意,延迟标记不会考虑用户记录在算子中绕过它们的时间。特别是标记不考虑记录在窗口缓冲区中花费的时间。只有当算子无法接受新记录,因此他们排队时,使用标记测量的延迟才会反映出来。

所有中间 算子都会保存n每个源的最后一个延迟列表,以计算延迟分布。接收器算子保存每个源的列表,以及每个并行源实例,以允许检测由各个机器引起的延迟问题。

目前,Flink假定群集中所有计算机的时钟都是同步的。我们建议设置自动时钟同步服务(如NTP)以避免错误的延迟结果。

警告启用延迟指标可能会显着影响群集的性能。强烈建议仅将它们用于调试目的。

REST API集成

可以通过Monitoring REST API查询度量标准

下面是可用端点列表,带有示例JSON响应。所有端点都是样本表单http://hostname:8081/jobmanager/metrics,下面我们仅列出URL 路径部分。

尖括号中的值是变量,例如http://hostname:8081/jobs/<jobid>/metrics,必须请求变量http://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/metrics

请求特定实体的指标:

  • /jobmanager/metrics
  • /taskmanagers/<taskmanagerid>/metrics
  • /jobs/<jobid>/metrics
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>

请求在相应类型的所有实体之间聚合的指标:

  • /taskmanagers/metrics
  • /jobs/metrics
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics

请求在相应类型的所有实体的子集上聚合的度量标准:

  • /taskmanagers/metrics?taskmanagers=A,B,C
  • /jobs/metrics?jobs=D,E,F
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3

请求可用指标列表:

GET /jobmanager/metrics

[
  {
    "id": "metric1"
  },
  {
    "id": "metric2"
  }
]

请求特定(未聚合)指标的值:

GET taskmanagers/ABCDE/metrics?get=metric1,metric2

[
  {
    "id": "metric1",
    "value": "34"
  },
  {
    "id": "metric2",
    "value": "2"
  }
]

请求特定指标的汇总值:

GET /taskmanagers/metrics?get=metric1,metric2

[
  {
    "id": "metric1",
    "min": 1,
    "max": 34,
    "avg": 15,
    "sum": 45
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14,
    "avg": 7,
    "sum": 16
  }
]

请求特定指标的特定聚合值:

GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max

[
  {
    "id": "metric1",
    "min": 1,
    "max": 34,
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14,
  }
]

仪表板集成

为每个任务或算子收集的度量标准也可以在仪表板中显示。在作业的主页面上,选择Metrics选项卡。选择顶部图表中的一个任务后,您可以使用Add Metric下拉菜单选择要显示的指标

  • 任务指标列为<subtask_index>.<metric_name>
  • 算子指标列为<subtask_index>.<operator_name>.<metric_name>

每个度量将可视化为单独的图形,x轴表示时间,y轴表示测量值。所有图表每10秒自动更新一次,并在导航到另一页时继续更新。

可视化指标的数量没有限制; 但是只能显示数字指标。

回到顶部