重要: 依赖于Scala的maven artifacts现在会添加一个Scala主版本的后缀,例如 "2.10" 或 "2.11". 请查阅迁移指南.

快速起步:Java API

简单几步开始你的 Flink Java 程序。

要求

唯一的要求是使用 Maven 3.0.4 (或者更高) 和 Java 7.x (或者更高) 版本。

创建工程

使用下列命令中的一个 创建工程:

$ mvn archetype:generate                               \
  -DarchetypeGroupId=org.apache.flink              \
  -DarchetypeArtifactId=flink-quickstart-java      \
  -DarchetypeVersion=1.1-SNAPSHOT
这种创建方式允许你 给新创建的工程命名。 他会提示你输入 groupId、 artifactId, 以及 package name。
$ curl https://flink.apache.org/q/quickstart.sh | bash

检查工程

您的工作目录中会出现一个新的目录。如果你使用了 curl 建立工程,这个目录就是 quickstart。否则,就以你输入的 artifactId 命名。

这个示例工程是一个 Maven 工程, 包含两个类。 Job 是一个基本的框架程序, WordCountJob 是一个示例。 请注意,这两个类的 main 方法都允许你在开发/测试模式下启动Flink。

推荐 __把这个工程导入你的 IDE __ ,进行测试和开发。 如果用的是 Eclipse, 可以用 m2e 插件 导入 Maven 工程。有些 Eclipse 默认捆绑了这个插件,有些需要你手动安装。 IntelliJ IDE 也提供了对 Maven 工程的支持。

给 Mac OS X 用户的提示:默认的 JVM 堆内存对Flink来说太小了,你必须手动调高它。 在 Eclipse 里,选择 “运行时配置” -> 参数, 在“VM 参数” 里写入: “-Xmx800m”。

构建工程

如果你想要 构建你的工程, 进入工程目录,输入 mvn clean install -Pbuild-jar 命令。 你会__找到一个jar包__:target/your-artifact-id-1.0-SNAPSHOT.jar,它可以在任意Flink机器运行。 还有一个 fat-jar, target/your-artifact-id-1.0-SNAPSHOT-flink-fat-jar.jar,包含了所有添加到 Maven 工程的依赖。

下一步

编写你的应用程序

Quickstart 工程包含了一个 WordCount 的实现,也就是大数据处理系统的 “Hello World”。WordCount 的目标是计算文本中单词出现的频率。比如: 单词 “the” 或者 “house” 在所有的维基百科文本中出现了多少次。

示例输入:

big data is big

示例输出:

big 2
data 1
is 1

下面的代码就是 Quickstart 工程的 WordCount 实现,它使用两种操作( FlatMap 和 Reduce )处理了一些文本,并且在标准输出中打印了单词的计数结果。

public class WordCount {
  
  public static void main(String[] args) throws Exception {
    
    // set up the execution environment
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    // get input data
    DataSet<String> text = env.fromElements(
        "To be, or not to be,--that is the question:--",
        "Whether 'tis nobler in the mind to suffer",
        "The slings and arrows of outrageous fortune",
        "Or to take arms against a sea of troubles,"
        );
    
    DataSet<Tuple2<String, Integer>> counts = 
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new LineSplitter())
        // group by the tuple field "0" and sum up tuple field "1"
        .groupBy(0)
        .aggregate(Aggregations.SUM, 1);

    // emit result
    counts.print();
  }
}

这些操作是在专门的类中定义的,下面是 LineSplitter 类。

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

  @Override
  public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    // normalize and split the line into words
    String[] tokens = value.toLowerCase().split("\\W+");
    
    // emit the pairs
    for (String token : tokens) {
      if (token.length() > 0) {
        out.collect(new Tuple2<String, Integer>(token, 1));
      }
    }
  }
}

GitHub 中查看完整的示例代码。 想要了解完整的 API,可以查阅 编程指南更进一步的实例程序。 如果你有任何疑问,咨询我们的 Mailing List。我们很高兴可以提供帮助。