Flink DataStream API 编程指南#

Flink 中的 DataStream 程序是对数据流进行转换(例如,过滤、更新状态、定义窗口、聚合)的常用方式。数据流起于各种 sources(例如,消息队列,socket流,文件)。通过 sinks 返回结果,例如将数据写入文件或标准输出(例如命令行终端)。Flink 程序可以运行在各种上下文环境中,独立或嵌入其他程序中。 执行过程可能发生在本地 JVM 或在由许多机器组成的集群上。

请参考 基本概念 了解关于Flink API 的介绍。

为了创建你的 Flink DataStream 程序,我们鼓励你从 剖析 Flink 程序 开始,并且逐渐添加你的 stream transformations 。其余部分作为附加操作和高级特性的参考。

程序示例#

下面的程序是单词计数应用程序的一个完整的工作示例,其中使用了流式窗口,对来自web socket的单词,以5秒为窗口进行计数。您可以复制和粘贴代码在本地运行。

Java

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

Scala


import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WindowWordCount {
  def main(args: Array[String]) {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)

    counts.print()

    env.execute("Window Stream WordCount")
  }
}

要运行示例程序,首先从终端启动 netcat 输入流:

nc -lk 9999

然后输入一些单词,回车,再输入新一行的单词。这些输入将作为示例程序的输入。如果要使得某个单词的计数结果大于1,请在5秒钟内重复输入相同的单词(如果5秒钟输入相同单词对你来说太快,请把示例程序中的窗口大小从5秒调大 ☺)。

数据 Sources#

Java#

你的程序从数据源读取数据。你可以通过 StreamExecutionEnvironment.addSource(sourceFunction) 将 Source 添加到你的程序中。Flink 提供了若干已经实现好了的 source functions,当然你也可以通过实现 SourceFunction 来自定义非并发的 source ,实现 ParallelSourceFunction 接口或扩展 RichParallelSourceFunction 来自定义并发的 source。

StreamExecutionEnvironment 中可以使用以下几个已实现的 stream sources :

基于文件:

  • readTextFile(path) - 按行读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回。

  • readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(一次)。

  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是上面两个方法内部调用的方法。它根据给定的 fileInputFormat 和读取路径读取文件。根据提供的 watchType参数 ,这个 source 可以定期(每隔 interval 毫秒)监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你可以通过 pathFilter 进一步排除掉不需要处理的文件。

    实现:

    在具体实现上,Flink把文件读取过程分为两个子任务,即目录监控数据读取。每个子任务都由单独的实体实现。目录监控由单个非并行(并行度为1)的任务执行,而数据读取由并发运行的多个任务执行。后者的并发度等于作业的并发度。监控目录任务的作用是扫描目录(根据 watchType 定期扫描或仅扫描一次),查找要处理的文件并把文件分割成切分片(splits),然后将这些切分片(splits)分配给下游 reader 。 reader 负责读取数据。每个切分片(splits)只能由一个 reader 读取,但一个 reader 可以逐个读取多个切分片(splits)。

    重要注意事项:

    1. 如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY ,则当文件被修改时,其内容将被重新处理。这会打破 “exactly-once” 语义,因为在文件末尾附加数据将导致其所有内容被重新处理。

    2. 如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE ,则 source 仅扫描路径一次然后退出,而不等待 reader 读完文件内容。当然 reader 会继续阅读,直到读取所有的文件内容。关闭 source 后就不会再有checkpoints。这可能导致节点故障后的恢复速度较慢,因为该作业将从最后一个checkpoint处重新读取数据。

基于 Socket:

  • socketTextStream - 从 socket 读取。元素可以用分隔符切分。

基于集合:

  • fromCollection(Collection) - 从 Java 的 Java.util.Collection 创建数据流。集合中的所有元素类型必须相同。

  • fromCollection(Iterator, Class) - 从一个迭代器中创建数据流。Class 指定了该迭代器返回元素的类型。

  • fromElements(T ...) - 从给定的对象序列中创建数据流。所有对象类型必须相同。

  • fromParallelCollection(SplittableIterator, Class) - 从一个迭代器中创建并行数据流。Class 指定了该迭代器返回元素的类型。

  • generateSequence(from, to) - 并发生成指定间隔的数字序列。

自定义:

  • addSource - 添加一个新的 source function 。例如,你可以 addSource(new FlinkKafkaConsumer08<>(...)) 以从 Apache Kafka 读取数据。详情参阅 connectors

Scala#

Sources 是你的程序读取输入的地方。你可以通过 StreamExecutionEnvironment.addSource(sourceFunction) 将 Source 添加到你的程序中。Flink 提供了若干已经实现好了的 source functions,当然你也可以通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。

StreamExecutionEnvironment 中可以使用以下几个已实现的 stream sources :

基于文件:

  • readTextFile(path) - 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回。

  • readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(一次)。

  • readFile(fileInputFormat, path, watchType, interval, pathFilter) - 这是上面两个方法内部调用的方法。它根据给定的fileInputFormat读取路径读取文件。根据提供的 watchType ,这个source可以定期(每隔 interval 毫秒)监测给定路径的新数据( FileProcessingMode.PROCESS_CONTINUOUSLY ),或者处理一次路径对应文件的数据并退出( FileProcessingMode.PROCESS_ONCE )。你可以通过 pathFilter 进一步排除掉需要处理的文件。

    实现:

    在具体实现上,Flink把文件读取过程分为两个子任务,即目录监控数据读取。每个子任务都由单独的实体实现。目录监控由单个非并行(并行度为1)的任务执行,而数据读取由并行运行的多个任务执行。后者的并行性等于作业的并行性。单个目录监控任务的作用是扫描目录(根据 watchType 定期扫描或仅扫描一次),查找要处理的文件并把文件分割成切分片,然后将这些切分片分配给下游 reader 。 reader 负责读取数据。每个切分片只能由一个 reader 读取,但一个 reader 可以逐个读取多个切分片。

    重要注意事项:

    1. 如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY ,则当文件被修改时,其内容将被重新处理。这会打破 “exactly-once” 语义,因为在文件末尾附加数据将导致其所有内容被重新处理。

    2. 如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE ,则 source 仅扫描路径一次然后退出,而不等待 reader 完成文件内容的读取。当然 reader 会继续阅读,直到读取所有的文件内容。关闭 source 后就不会再有检查点。这可能导致节点故障后的恢复速度较慢,因为该作业将从最后一个检查点恢复读取。

基于 Socket:

  • socketTextStream - 从 socket 读取。元素可以用分隔符切分。

基于集合:

  • fromCollection(Seq) - 从 Java 的 Java.util.Collection 创建数据流。集合中的所有元素类型必须相同。

  • fromCollection(Iterator) - 从一个迭代器中创建数据流。 Class 指定了该迭代器返回元素的类型。

  • fromElements(elements: _*) - 从给定的对象序列中创建数据流。所有对象类型必须相同。

  • fromParallelCollection(SplittableIterator) - 从一个迭代器中创建并行数据流。 Class 指定了该迭代器返回元素的类型。

  • generateSequence(from, to) - 创建一个生成指定区间范围内的数字序列的并行数据流。

自定义:

  • addSource - 添加一个新的 source function 。例如,你可以 addSource(new FlinkKafkaConsumer08<>(...)) 以从 Apache Kafka 读取数据。详情参阅 connectors

DataStream 转换#

查看流的转换请参阅 operators

数据 Sinks#

Java#

数据 sinks 消费 DataStream 的数据,并将结果写入文件、socket、外部系统或进行打印。Flink 自带多种内置的输出格式,这些都被封装在对 DataStream 的操作后:

  • writeAsText() / TextOutputFormat - 将元素以字符串形式按行写入。通过调用每个元素的 toString() 方法获得字符串。

  • writeAsCsv(...) / CsvOutputFormat - 将元组写入逗号分隔的csv文件。行和字段隔符均可配置。通过调用每个元素的 toString() 方法获得每个字段的字符串。

  • print() / printToErr() - 打印每个元素的 toString() 值到标准输出/错误输出流。可以配置前缀信息添加到输出,以区分不同 print 的结果。如果并行度大于1,则 task id 也会添加到输出结果的前缀上。

  • writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法/基类。支持自定义的对象到字节的转换。

  • writeToSocket - 根据 SerializationSchema 把元素写到 socket 。

  • addSink - 调用自定义 sink function 。Flink自带了很多连接其他系统的 connectors(如 Apache Kafka ),这些connectors都实现了 sink function 。

Scala#

数据 sinks 消费 DataStream 并将其发往文件、socket、外部系统或进行打印。Flink 自带多种内置的输出格式,这些都被封装在对 DataStream 的操作后:

  • writeAsText() / TextOutputFormat - 将元素以字符串形式写入。字符串 通过调用每个元素的 toString() 方法获得。

  • writeAsCsv(...) / CsvOutputFormat - 将元组写入逗号分隔的csv文件。行和字段 分隔符均可配置。每个字段的值来自对象的 toString() 方法。

  • print() / printToErr() - 打印每个元素的 toString() 值到标准输出/错误输出流。可以配置前缀信息添加到输出,以区分不同 print 的结果。如果并行度大于1,则 task id 也会添加到输出前缀上。

  • writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法/基类。支持自定义的对象到字节的转换。

  • writeToSocket - 根据 SerializationSchema 把元素写到 socket 。

  • addSink - 调用自定义 sink function 。Flink自带了很多连接其他系统的连接器( connectors )(如 Apache Kafka ),这些连接器都实现了 sink function 。

请注意, DataStream 上的 write*() 方法主要用于调试目的。它们没有实现Flink的checkpoint机制,这意味着这些 function 通常都有 at-least-once 语义。数据刷新到目标系统取决于 OutputFormat 的实现。这意味着并非所有发送到 OutputFormat 的元素都会立即在目标系统中可见。此外,在失败的情况下,这些记录可能会丢失。

为了可靠,在把流写到文件系统时,使用 flink-connector-filesystem 来实现 exactly-once 语义。此外,通过 .addSink(...) 方法自定义的实现可以参与Flink的checkpoint机制以实现 exactly-once 语义。

迭代#

Java#

迭代流程序实现一个 step function 并将其嵌入到 IterativeStream 中。由于 DataStream 程序可能永远不会结束,所以没有最大迭代次数。实际上,你需要指定哪一部分的流被反馈到迭代过程,哪个部分通过 splitfilter transformation 向下游转发。在这里,我们展示一个使用过滤器的例子。首先,我们定义一个 IterativeStream

IterativeStream<Integer> iteration = input.iterate();

然后,我们使用一系列 transformations 来指定在循环内执行的逻辑(这里示意一个简单的 map transformation)

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

要关闭迭代并定义迭代结束,需要调用 IterativeStreamcloseWith(feedbackStream) 方法。传给 closeWith function 的 DataStream 将被反馈给迭代的头部。一种常见的模式是使用 filter 来分离流中需要反馈的部分和需要继续发往下游的部分。这些 filter 可以定义“终止”逻辑,以控制元素是流向下游而不是反馈迭代。

iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

例如,如下程序从一系列整数连续减1,直到它们达到0:

DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  @Override
  public Long map(Long value) throws Exception {
    return value - 1 ;
  }
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value > 0);
  }
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value <= 0);
  }
});

Scala#

迭代流程序实现一个 step function 并将其嵌入到 IterativeStream 中。由于 DataStream 程序可能永远不会结束,所以没有最大迭代次数。事实上你需要指定哪一部分的流被反馈到迭代过程,哪个部分通过 splitfilter transformation 向下游转发。在这里,我们展示一个迭代的例子,其中主体(被反复执行计算部分)是简单的 map transformation,迭代反馈的元素和继续发往下游的元素通过 filters 进行区分。

val iteratedStream = someDataStream.iterate(
  iteration => {
    val iterationBody = iteration.map(/* this is executed many times */)
    (iterationBody.filter(/* one part of the stream */), iterationBody.filter(/* some other part of the stream */))
})

例如,如下程序从一系列整数连续减1,直到它们达到0:

val someIntegers: DataStream[Long] = env.generateSequence(0, 1000)

val iteratedStream = someIntegers.iterate(
  iteration => {
    val minusOne = iteration.map( v => v - 1)
    val stillGreaterThanZero = minusOne.filter (_ > 0)
    val lessThanZero = minusOne.filter(_ <= 0)
    (stillGreaterThanZero, lessThanZero)
  }
)

执行参数#

StreamExecutionEnvironment 包含 ExecutionConfig ,需要通过ExecutionConfig,对作业运行时进行配置。

更多配置参数请参阅 execution configuration 。这些参数是 DataStream API 特有的:

  • setAutoWatermarkInterval(long milliseconds): 设置自动发射 watermark 的间隔。你可以通过 long getAutoWatermarkInterval() 获取当前的发射间隔。

容错#

State & Checkpointing 描述了如何开启和配置 Flink 的 checkpointing 机制。

控制延迟#

默认情况下,不会逐个传输元素(这将导致不必要的网络流量),而是被缓存的。缓存(实际是在机器之间传输)的大小可以在 Flink 配置文件中设置。虽然这种方法对于优化吞吐量有好处,但是当输入流不够快时,它可能会导致延迟问题。要控制吞吐量和延迟,你可以在 execution environment(或单个 operator )上使用 env.setBufferTimeout(timeoutMillis) 来设置缓冲区填满的最大等待时间。如果超过该最大等待时间,即使缓冲区未满,其中的数据也会被自动发送出去。该最大等待时间默认值为 100 ms。

用法:

Java

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

Scala

val env: LocalStreamEnvironment = StreamExecutionEnvironment.createLocalEnvironment
env.setBufferTimeout(timeoutMillis)

env.generateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)

为了最大化吞吐量,可以设置 setBufferTimeout(-1) ,这样就没有了超时机制,只有缓冲区填满时,才会发送数据出去。为了使延迟最小,可以把超时设置为接近 0 的值(例如 5 或 10 ms)。应避免将该超时设置为 0,因为这样可能导致性能严重下降。

调试#

在分布式集群中运行 Streaming 程序之前,最好确保实现的算法可以正常工作。因此,部署数据分析程序通常是一个渐进的过程:检查结果,调试和改进。

Flink 提供了诸多特性来大幅简化数据分析程序的开发:你可以在 IDE 中进行本地调试,注入测试数据,收集结果数据。本节给出一些如何简化 Flink 程序开发的指导。

本地执行环境#

LocalStreamEnvironment 会在其所在的进程中启动一个 Flink 引擎. 如果你在 IDE 中启动 LocalEnvironment ,你可以在你的代码中设置断点,轻松调试你的程序。

一个 LocalEnvironment 的创建和使用示例如下:

Java


final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// build your program

env.execute();

Scala

val env = StreamExecutionEnvironment.createLocalEnvironment()

val lines = env.addSource(/* some source */)
// build your program

env.execute()

基于集合的数据 Sources#

Flink 提供了基于 Java 集合实现的特殊数据 sources 用于测试。一旦程序通过测试,它的 sources 和 sinks 可以方便的替换为从外部系统读写的 sources 和 sinks 。

基于集合的数据 Sources 可以像这样使用:

Java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);

Scala

val env = StreamExecutionEnvironment.createLocalEnvironment()

// Create a DataStream from a list of elements
val myInts = env.fromElements(1, 2, 3, 4, 5)

// Create a DataStream from any Collection
val data: Seq[(String, Int)] = ...
val myTuples = env.fromCollection(data)

// Create a DataStream from an Iterator
val longIt: Iterator[Long] = ...
val myLongs = env.fromCollection(longIt)

注意: 当前,集合数据 source 要求数据类型和迭代器实现 Serializable 。此外,集合类数据源不能并发执行(并行度=1)。

迭代的数据 Sink#

Flink 还提供了一个 sink 来收集 DataStream 的测试和调试结果。它可以这样使用:

Java

import org.apache.flink.streaming.experimental.DataStreamUtils

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)

Scala

import org.apache.flink.streaming.experimental.DataStreamUtils
import scala.collection.JavaConverters.asScalaIteratorConverter

val myResult: DataStream[(String, Int)] = ...
val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala

注意: flink-streaming-contrib模块已经从 Flink 1.5.0 移除。 它的类已经被移动到 flink-streaming-javaflink-streaming-scala

下一步去哪里?#