Checkpoint#

概览#

checkpoint使Fink的状态具有非常好的容错性,通过Checkpoint,Flink可以对作业的状态和计算位置进行恢复,因此Flink作业具备高容错执行语意。 通过 Checkpointing 查看如何在程序中开启和配置checkpoint。

保留Checkpoint#

默认情况下,Checkpoint仅用于恢复失败的作业,是不保留的,程序结束时Checkpoints也会被删除。然而,你可以配置周期性的保留checkpoint。当作业失败或被取消时,这些checkpoints将不会被自动清除。这样,你就可以用该checkpoint来恢复失败的作业。

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

“ExternalizedCheckpointCleanup”配置项定义了当作业取消时,对作业checkpoints的操作:

  • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 作业取消时,保留作业的checkpoint。注意,这种情况下,需要手动清除该作业的checkpoint。
  • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 作业取消时,删除作业的checkpoint。仅当作业失败时,作业的checkpoint才会被使用。

目录结构#

savepoints 类似, checkpoint由元数据文件、额外的数据文件(与state backend相关)组成。可以通过配置文件中“state.checkpoints.dir”配置项,指定元数据文件和数据文件的存储路径,也可以在代码中针对单个作业指定该配置。

通过配置文件全局配置#

state.checkpoints.dir: hdfs:///checkpoints/

创建state backend时对单个作业进行配置#

env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/");

checkpoint与savepoint的区别#

checkpoint与savepoint有一些区别。 他们

  • 的数据格式与state backend密切相关,可能以增量方式存储。
  • 不支持Flink的特殊功能,如扩缩容。

从checkpoint中恢复状态#

同savepoint一样,作业也可以使用checkpoint的元数据文件进行错误恢复 ( savepoint恢复指南)。注意若元数据文件中信息不够,那么jobmanager就需要使用相关的数据文件来恢复作业(详见目录结构)。

$ bin/flink run -s :checkpointMetaDataPath [:runArgs]