Metrics

Flink公开了一个指标系统,可以收集和暴露指标给外部系统.

Registering metrics

你可以调用 getRuntimeContext().getMetricGroup()方法来访问任何继承自RichFunction函数的用户函数的指标系统.这个方法返回一个MetricGroup对象,通过这个对象可以创建和注册新的指标.

Metric types

Flink支持的指标类型:Counters,Gauges,HistogramsMeters.

Counter

Counter用作某方面计数,通过调用inc()/inc(long n) 或者 dec()/dec(long n)方法来使当前的值增加或者减少. 通过调用MetricGroupcounter(String name)方法可以创建和注册一个Counter.

public class MyMapper extends RichMapFunction<String, Integer> {
  private Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @public Integer map(String value) throws Exception {
    this.counter.inc();
  }
}

或者你也可以使用自己实现的Counter

public class MyMapper extends RichMapFunction<String, Integer> {
  private Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCustomCounter", new CustomCounter());
  }
}

Gauge

Gauge按需提供任意类型的值,要使用Gauge,你必须首先创建一个类并实现org.apache.flink.metrics.Gauge接口。 这里对返回值的类型没有限制。 可以调用MetricGroupgauge(String name,Gauge gauge)方法来注册一个gauge。

public class MyMapper extends RichMapFunction<String, Integer> {
  private int valueToExpose;

  @Override
  public void open(Configuration config) {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", new Gauge<Integer>() {
        @Override
        public Integer getValue() {
          return valueToExpose;
        }
      });
  }
}
public class MyMapper extends RichMapFunction[String,Int] {
  val valueToExpose = 5

  override def open(parameters: Configuration): Unit = {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
  }
  ...
}

请注意reporters会将暴露的对象转化成String型,这意味着需要去实现一个有意义的toString()方法

Histogram

Histogram用于度量长值分布情况, 你可以通过调用MetricGrouphistogram(String name, Histogram histogram)方法来注册一个Histogram

public class MyMapper extends RichMapFunction<Long, Integer> {
  private Histogram histogram;

  @Override
  public void open(Configuration config) {
    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram());
  }

  @public Integer map(Long value) throws Exception {
    this.histogram.update(value);
  }
}

Flink没有提供一个默认的Histogram实现。但是提供了一个Wrapper来允许使用 Codahale/DropWizard 直方图。 如需使用此包装器,请在您的pom.xml中添加以下依赖:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.2</version>
</dependency>

你可以注册一个Codahale/DropWizard 直方图类似于:

public class MyMapper extends RichMapFunction<Long, Integer> {
  private Histogram histogram;

  @Override
  public void open(Configuration config) {
    com.codahale.metrics.Histogram histogram =
      new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));

    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new DropwizardHistogramWrapper(histogram));
  }
}

Meter

Meter用于度量平均吞吐量,使用markEvent()方法可以注册一个发生的事件.同时发生的多个事件可以使用markEvent(long n)方法来进行注册。 通过调用MetricGroupmeter(String name, Meter meter)方法来注册一个meter

public class MyMapper extends RichMapFunction<Long, Integer> {
  private Meter meter;

  @Override
  public void open(Configuration config) {
    this.meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new MyMeter());
  }

  @public Integer map(Long value) throws Exception {
    this.meter.markEvent();
  }
}

Flink提供了一个 Wrapper来允许使用 Codahale/DropWizard meters. 要使用此包装器,请在您的pom.xml中添加以下依赖:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.2</version>
</dependency>

您可以注册Codahale / DropWizard meter类似于这样:

public class MyMapper extends RichMapFunction<Long, Integer> {
  private Meter meter;

  @Override
  public void open(Configuration config) {
    com.codahale.metrics.Meter meter = new com.codahale.metrics.Meter();

    this.meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new DropwizardMeterWrapper(meter));
  }
}

Scope

每个指标被分配一个标识符,该标识符将基于3个组件进行汇报:注册指标时用户提供的名称,可选的用户自定义域和系统提供的域。例如,如果A.B是系统域,C.D是用户域,E是名称,那么指标的标识符将是A.B.C.D.E. 你可以通过设置conf/flink-conf.yam里面的metrics.scope.delimiter参数来配置标识符的分隔符(默认:.).

User Scope

你可以通过调用MetricGroup#addGroup(String name)MetricGroup#addGroup(int name)来定义一个用户域

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter");

System Scope

系统域包含关于这个指标的上下文信息,例如其注册的任务或该任务属于哪个作业.

可以通过在conf/flink-conf.yaml中设置以下关键字来配置它的上下文信息.这些关键字的每一个都期望可以包含常量的格式字符串(例如:“taskmanager”)和将在运行时被替换的变量(例如:”<task_id>“)

  • metrics.scope.jm
    • 默认: <host>.jobmanager
    • 适用于属于一个job manager的所有指标.
  • metrics.scope.jm.job
    • 默认: <host>.jobmanager.<job_name>
    • 适用于属于一个job manager和job的所有指标.
  • metrics.scope.tm
    • 默认: <host>.taskmanager.<tm_id>
    • 适用于属于一个task manager的所有指标.
  • metrics.scope.tm.job
    • 默认: <host>.taskmanager.<tm_id>.<job_name>
    • 适用于属于一个task manager和job的所有指标.
  • metrics.scope.task
    • 默认: <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
    • 适用于属于一个task的所有指标.
  • metrics.scope.operator
    • 默认: <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
    • 适用于属于一个operator的所有指标.

这里对变量的数量和顺序没有限制,变量区分大小写.

运算指标的默认域将导致类似的标识符:

localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric

如果你想包含任务名称,但省略task manager的信息,你可以指定以下格式:

metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>

这可以创建标识符localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric.

注意对于此格式的字符串,如果同一作业同时运行多次,则可能会发生标识符冲突,导致指标标准数据不一致.因此,建议使用格式字符串,通过包括ID(例如 <job_id>)或通过为作业和操作符分配唯一的名称来保证一定程度上的唯一性.

变量列表

  • JobManager: <host>
  • TaskManager: <host>, <tm_id>
  • Job: <job_id>, <job_name>
  • Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>
  • Operator: <operator_name>, <subtask_index>

Reporter

通过在conf/flink-conf.yaml中配置一个或者一些reporters,能够让指标暴露给一个外部系统.这些reporters将在每个job和task manager启动时被实例化.

  • metrics.reporters: reporters的名称列表.
  • metrics.reporter.<name>.<config>: 给定reporter名称<name>的通用设置.
  • metrics.reporter.<name>.class: 给定reporter名称<name>的reporter类 .
  • metrics.reporter.<name>.interval: 给定reporter名称<name>的reporter间隔.
  • metrics.reporter.<name>.scope.delimiter: 给定reporter名称<name>所使用的分割符标识(默认值用:metrics.scope.delimiter)

所有的reporters必须至少具备class属性,有些允许指定一个reporting的interval,以下,我们将列举更多针对每个reporter的设置.

举例说明指定多个reporters的配置

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

重要提示:当Flink启动的时候,通过放入到/lib目录下包含reporter的jar文件必须可访问.

你可以通过实现org.apache.flink.metrics.reporter.MetricReporter接口来定义自己的Reporter, 如果这个Reporter必须定期发送报告,那你也必须同时实现Scheduled接口.

下面的章节列举了支持的reporters.

JMX (org.apache.flink.metrics.jmx.JMXReporter)

不必包含其他依赖关系,因为JMX reporter默认可用,但是并没有被激活

参数: - port - (可选) JMX侦听连接的端口,也可以是端口范围。当指定范围时,相关job或者task manager 日志将显示实际端口。如果设置了此设置,Flink将为给定的端口/范围启动一个额外的JMX连接器。在默认本地JMX接口上指标始终可用.

示例配置:

metrics.reporters: jmx
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789

通过JMX暴露出来的指标由一个域和一个键-属性列表来标识,它们一起形成对象名称。

域始终以 org.apache.flink打头,后面跟着通用指标标识。相对于常用的标识符,它不受域格式影响,不包含任何变量,并且作业之间是不变的,这样域的一个列子是:

org.apache.flink.job.task.numBytesOut.

键-属性列表包含所有变量的值,与配置的域格式无关,它们与给定的指标相关联,这样列表的列子是:

host=localhost,job_name=MyJob,task_name=MyTask.

因此域识别一个指标类,而键-属性性列表识别一个(或多个)指标实例

Ganglia (org.apache.flink.metrics.ganglia.GangliaReporter)

为了使用这个reporter,你必须将/opt/flink-metrics-ganglia-1.2.jar拷贝到Flink发行版本下的/lib文件夹中

参数:

  • host - 在gmond.conf中的udp_recv_channel.bind下配置的gmond主机地址
  • port - 在gmond.conf中的udp_recv_channel.port下配置的gmond端口
  • tmax - 旧指标能够保留软性限制的最长时间
  • dmax - 旧指标能够保留硬性限制的最长时间
  • ttl - 传输UDP包的生存时间
  • addressingMode - UDP使用的寻址模式(UNICAST/MULTICAST)

示例配置:

metrics.reporters: gang
metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
metrics.reporter.gang.host: localhost
metrics.reporter.gang.port: 8649
metrics.reporter.gang.tmax: 60
metrics.reporter.gang.dmax: 0
metrics.reporter.gang.ttl: 1
metrics.reporter.gang.addressingMode: MULTICAST

Graphite (org.apache.flink.metrics.graphite.GraphiteReporter)

为了使用这个reporter,你必须将/opt/flink-metrics-graphite-1.2.jar拷贝到Flink发行版本下的/lib文件夹中

参数:

  • host - Graphite 服务器地址
  • port - Graphite 服务器端口
  • protocol - 使用的协议 (TCP/UDP)

示例配置:

metrics.reporters: grph
metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP

StatsD (org.apache.flink.metrics.statsd.StatsDReporter)

为了使用reporter,你必须将/opt/flink-metrics-statsd-1.2.jar拷贝到Flink发行版本下/lib文件夹中

参数:

  • host - the StatsD 服务器地址
  • port - the StatsD 服务器端口

示例配置

metrics.reporters: stsd
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125

System metrics

默认情况下,Flink收集了几个能够深入了解当前状态的指标,本章节是所有这些指标的一个参考 以下表格通常有4列:

  • “Scope”列描述了用于生成系统域的域格式,例如,如果单元格包含“Operator”,则使用“metric.scope.operator”的作用域格式,如果单元格包含以斜杠分割的多个值,则会根据不同的实体报告多个指标,例如job-和taskmanagers.

  • “Infix”(可选) 列描述了哪些中缀附加到系统域中.

  • “Metrics” 列中列出了给定域和中缀的所有注册指标的名称.

  • “Description” 列提供有关给定指标度量的相关信息.

请注意,中缀/指标名称列中的所有点仍然遵循“metrics.delimiter”设置,因此,为了推断指标标识符:

  1. 根据域列选择域格式
  2. 添加这个值在中缀列,如果存在,则表示“metrics.delimiter”设置
  3. 添加指标名称

CPU:

Scope Infix Metrics Description
Job-/TaskManager Status.JVM.CPU Load JVM最近CPU使用情况.
Time JVM使用的CPU时间.

内存:

Scope Infix Metrics Description
Job-/TaskManager Status.JVM.Memory Memory.Heap.Used 当前使用的堆内存大小.
Heap.Committed 保证JVM可用的堆内存大小.
Heap.Max 可用于内存管理的堆内存最大值.
NonHeap.Used 当前使用的非堆内存大小.
NonHeap.Committed 保证JVM可用的非堆内存大小.
NonHeap.Max 可用于内存管理的非堆内存最大值.
Direct.Count 直接缓冲池中的缓冲区数量.
Direct.MemoryUsed JVM中用于直接缓冲池的内存大小.
Direct.TotalCapacity 直接缓冲池中所有缓冲区的总容量.
Mapped.Count 映射缓冲池中缓冲区的数量.
Mapped.MemoryUsed JVM中用于映射缓冲池的内存大小.
Mapped.TotalCapacity 映射缓冲池中缓冲区的数量.

线程:

Scope Infix Metrics Description
Job-/TaskManager Status.JVM.ClassLoader Threads.Count 存活线程总数.

垃圾回收:

Scope Infix Metrics Description
Job-/TaskManager Status.JVM.GarbageCollector <GarbageCollector>.Count 已发生的回收总数.
<GarbageCollector>.Time 执行垃圾回收花费的总时间.

类加载器:

Scope Infix Metrics Description
Job-/TaskManager Status.JVM.ClassLoader ClassesLoaded 自JVM启动以来加载类的总数.
ClassesUnloaded 自JVM启动以来卸载类的总数.

网络:

Scope Infix Metrics Description
TaskManager Status.Network AvailableMemorySegments 未使用的内存段数.
TotalMemorySegments 已分配的内存段数.
Task buffers inputQueueLength 队列输入缓冲区的数量.
outputQueueLength 队列输出缓冲区的数量.
inPoolUsage 输入缓冲区使用情况评估.
outPoolUsage 输出缓冲区使用情况评估.
Network.<Input|Output>.<gate>
(only available if taskmanager.net.detailed-metrics config option is set)
totalQueueLen 所有输入/输出通道中队列缓冲区的总数.
minQueueLen 所有输入/输出通道中队列缓冲区的最小数目.
maxQueueLen 所有输入/输出通道中队列缓冲区的最大数目.
avgQueueLen 所有输入/输出通道中队列缓冲区的平均数目.

集群:

Scope Metrics Description
JobManager numRegisteredTaskManagers 已注册taskmanagers的数量.
numRunningJobs 正在运行jobs的数量.
taskSlotsAvailable 可用task slots的数量
taskSlotsTotal task slots总数.

检查点:

Scope Metrics Description
Job (only available on JobManager) lastCheckpointDuration 完成上一次检测点所花费的时间.
lastCheckpointSize 上一次检测点的总大小.
lastCheckpointExternalPath 上一个检测点存储的路径.
Task checkpointAlignmentTime 最后一个障碍对齐所需的纳秒时间,或者当前对齐已经花费了多长时间.

IO:

Scope Metrics Description
Task currentLowWatermark 该任务已经获得的最低水位.
numBytesInLocal 该任务从本地源读取的字节总数.
numBytesInLocalPerSecond 该任务从本地源每秒读取的字节数.
numBytesInRemote 该任务从远端读取的字节总数.
numBytesInRemotePerSecond 该任务从远端每秒读取的字节数.
numBytesOut 该任务已发出的字节总数.
numBytesOutPerSecond 该任务每秒发出的字节数.
Task/Operator numRecordsIn 该任务/操作已收到的条目总数.
numRecordsInPerSecond 该任务/操作每秒收到的条目数.
numRecordsOut 该操作/任务已发出的条目总数.
numRecordsOutPerSecond 该操作/任务每秒发出的条目数.
Operator latency 所有输入源的延迟分布.
numSplitsProcessed 数据源已经处理的输入分片总数(如果操作是一个数据源).

延迟跟踪

Flink允许去跟踪条目在整个系统中运行的延迟,为了开启延迟跟踪,latencyTrackingInterval (毫秒)必须在ExecutionConfig中设置为一个正值. 在latencyTrackingInterval,源端将周期性的发送一个特殊条目,叫做LatencyMarker,这个标记包含一个从源端发出记录时的时间戳。延迟标记不能超过常规的用户条目,因此如果条目在一个操作的前面排队,将会通过这个标记添加延迟跟踪.

请注意延迟标记不记录用户条目在操作中所花费的时间,而是绕过它们。特别是这个标记是不用于记录在窗口缓冲区中的时间条目。只有当操作不能够接受新的条目时,它们才会排队,用这个标记测量的延迟将会反映出这一点.

所有中间操作通过保留每个源的最后n个延迟的列表,来计算一个延迟的分布。落地操作保留每个源的列表,然后每个并行源实例允许检测由单个机器所引起的延迟问题.

目前,Flink认为集群中所有机器的时钟是同步的。我们建议建立一个自动时钟同步服务(类似于NTP),以避免虚假的延迟结果.

仪表盘集成

为每个任务或者操作所收集到的指标,也可以展示在在仪表盘上。在一个作业的主页面,选择Metrics选项,在顶部图选择一个任务后,可以使用Add Metrics下拉菜单来选择要展示的指标值 * 任务指标被列为 <subtask_index>.<metric_name>. * 操作指标被列为 <subtask_index>.<operator_name>.<metric_name>. 每个指标被可视化为一个单独的图表,用x轴表示时间和y轴表示测量值。所有的图表每10秒自动更新一次,并在导航到另一页时继续执行.

这里对可视化指标的数量没有限制;但是只有数值型指标可以可视化。

Back to top