重要: 依赖于Scala的maven artifacts现在会添加一个Scala主版本的后缀,例如 "2.10" 或 "2.11". 请查阅迁移指南.

快速起步: Scala API

构建工具

Flink 工程可以用不同的构建工具构建。 为了让用户快速开始, Flink 为以下构建工具提供了工程模版:

这些模版帮助用户创建工程结构并初始化一些构建文件。

SBT

创建工程

$ g8 tillrohrmann/flink-project
这将在 指定 的工程目录下,从 flink-project 模版 创建一个 Flink 工程。 如果你没有安装 giter8, 请参照此 安装指南
$ git clone https://github.com/tillrohrmann/flink-project.git
这将在 flink-project 目录下创建 Flink 工程。
$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh)
这将在 指定 的工程目录下创建 Flink 工程

构建工程

为了构建工程, 只需要执行 sbt clean assembly 命令。 这将会在目录 target/scala_your-major-scala-version/ 下创建 fat-jar your-project-name-assembly-0.1-SNAPSHOT.jar

运行工程

如果想要运行你的工程, 输入 sbt run 命令。

作业将会默认运行在 sbt 运行的 JVM 上。 如果你想让作业运行在不同的 JVM 上, 将以下代码加入至 build.sbt

fork in run := true

IntelliJ

我们推荐使用 IntelliJ 作为你的 Flink job 开发环境。 首先, 你需要将新创建的工程导入至 IntelliJ。 你可以通过打开 File -> New -> Project from Existing Sources... , 然后选择你的工程目录来导入工程。 IntelliJ 会检测到 build.sbt 文件并自动导入。

如果你想要运行 Flink 作业, 建议将 mainRunner 模块 作为 Run/Debug Configuration 的 classpath 路径。 这将保证在执行过程中, 所有标识为”provided”的依赖都可用。 你可以通过打开 Run -> Edit Configurations... 来配置 Run/Debug Configurations, 然后从 Use classpath of module 下拉框中选择 mainRunner

Eclipse

如果你想要将新建的工程导入至 Eclipse, 首先你得为它创建 Eclipse 工程文件。 这些工程文件可以通过 sbteclipse 插件来创建。 将以下代码加入至 PROJECT_DIR/project/plugins.sbt 文件:

addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "4.0.0")

sbt 交互式环境下使用下列命令创建 Eclipse 工程文件:

> eclipse

现在你可以通过打开 File -> Import... -> Existing Projects into Workspace 并选择你的工程目录, 将之导入至 Eclipse。

Maven

要求

唯一的要求是使用 Maven 3.0.4 (或者更高) 和 Java 7.x (或者更高) 版本。

创建工程

使用下列命令中的一个 创建工程:

$ mvn archetype:generate                               \
  -DarchetypeGroupId=org.apache.flink              \
  -DarchetypeArtifactId=flink-quickstart-scala     \
  -DarchetypeVersion=1.1-SNAPSHOT
这种创建方式允许你 给新创建的工程命名。它会提示你输入 groupId、 artifactId, 以及 package name。
$ curl https://flink.apache.org/q/quickstart-scala.sh | bash

检查工程

您的工作目录中会出现一个新的目录。如果你使用了 curl 建立工程, 这个目录就是 quickstart。 否则, 就以你输入的 artifactId 命名。

这个示例工程是一个包含两个类的 Maven 工程Job 是一个基本的框架程序, WordCountJob 是一个示例程序。 请注意,这两个类的 main 方法都允许你在开发/测试模式下启动 Flink。

推荐 把这个工程导入你的 IDE 进行测试和开发。 如果用的是 Eclipse, 你需要从常用的 Eclipse 更新站点上下载并安装以下插件:

IntelliJ IDE 也支持 Maven 并提供了一个用于 Scala 开发的插件。

构建工程

如果想要 构建你的工程, 进入工程目录并输入 mvn clean package -Pbuild-jar 命令。 你会 找到一个jar包: target/your-artifact-id-1.0-SNAPSHOT.jar , 它可以在任意 Flink 集群上运行。还有一个 fat-jar, target/your-artifact-id-1.0-SNAPSHOT-flink-fat-jar.jar, 包含了所有添加到 Maven 工程的依赖。

下一步

编写你自己的应用程序!

Quickstart 工程包含了一个 WordCount 的实现, 也就是大数据处理系统的 “Hello World”。 WordCount 的目标是计算文本中单词出现的频率。 比如: 单词 “the” 或者 “house” 在所有的维基百科文本中出现了多少次。

示例输入:

big data is big

示例输出:

big 2
data 1
is 1

下面的代码就是 Quickstart 工程的 WordCount 实现, 它使用两种操作( FlatMap 和 Reduce )处理了一些文本, 并且在标准输出中打印了单词的计数结果。

object WordCountJob {
  def main(args: Array[String]) {

    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    // get input data
    val text = env.fromElements("To be, or not to be,--that is the question:--",
      "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
      "Or to take arms against a sea of troubles,")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    // emit result
    counts.print()
  }
}

GitHub 中查看完整的示例代码。

想要了解完整的 API, 可以查阅 编程指南更多的实例。 如果你有任何疑问, 咨询我们的 Mailing List, 我们很高兴可以提供帮助。