重要: 依赖于Scala的maven artifacts现在会添加一个Scala主版本的后缀,例如 "2.10" 或 "2.11". 请查阅迁移指南.

Table API - Relational Queries (Beta)

The Table API an experimental feature

Flink provides an API that allows specifying operations using SQL-like expressions. Instead of manipulating DataSet or DataStream you work with Table on which relational operations can be performed.

The following dependency must be added to your project when using the Table API:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table_2.10</artifactId>
  <version>1.1-SNAPSHOT</version>
</dependency>

Note that the Table API is currently not part of the binary distribution. See linking with it for cluster execution here.

Scala Table API

The Table API can be enabled by importing org.apache.flink.api.scala.table._. This enables implicit conversions that allow converting a DataSet or DataStream to a Table. This example shows how a DataSet can be converted, how relational queries can be specified and how a Table can be converted back to a DataSet:

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._

case class WC(word: String, count: Int)
val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
val expr = input.toTable
val result = expr.groupBy('word).select('word, 'count.sum as 'count).toDataSet[WC]

The expression DSL uses Scala symbols to refer to field names and we use code generation to transform expressions to efficient runtime code. Please note that the conversion to and from Tables only works when using Scala case classes or Flink POJOs. Please check out the programming guide to learn the requirements for a class to be considered a POJO.

This is another example that shows how you can join to Tables:

case class MyResult(a: String, d: Int)

val input1 = env.fromElements(...).toTable('a, 'b)
val input2 = env.fromElements(...).toTable('c, 'd)
val joined = input1.join(input2).where("b = a && d > 42").select("a, d").toDataSet[MyResult]

Notice, how a DataSet can be converted to a Table by using as and specifying new names for the fields. This can also be used to disambiguate fields before a join operation. Also, in this example we see that you can also use Strings to specify relational expressions.

Please refer to the Scaladoc (and Javadoc) for a full list of supported operations and a description of the expression syntax.

Java Table API

When using Java, Tables can be converted to and from DataSet and DataStream using TableEnvironment. This example is equivalent to the above Scala Example:

public class WC {

  public WC(String word, int count) {
    this.word = word; this.count = count;
  }

  public WC() {} // empty constructor to satisfy POJO requirements

  public String word;
  public int count;
}

...

ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
TableEnvironment tableEnv = new TableEnvironment();

DataSet<WC> input = env.fromElements(
        new WC("Hello", 1),
        new WC("Ciao", 1),
        new WC("Hello", 1));

Table table = tableEnv.fromDataSet(input);

Table filtered = table
        .groupBy("word")
        .select("word.count as count, word")
        .filter("count = 2");

DataSet<WC> result = tableEnv.toDataSet(filtered, WC.class);

When using Java, the embedded DSL for specifying expressions cannot be used. Only String expressions are supported. They support exactly the same feature set as the expression DSL.

Table API Operators

Table API provide a domain-spcific language to execute language-integrated queries on structured data in Scala and Java. This section gives a brief overview of all available operators. You can find more details of operators in the Javadoc.


Operators Description
Select

Similar to a SQL SELECT statement. Perform a select operation.

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.select("a, c as d");
As

Rename fields.

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.as("d, e, f");
Filter

Similar to a SQL WHERE clause. Filter out elements that do not pass the filter predicate.

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.filter("a % 2 = 0");
Where

Similar to a SQL WHERE clause. Filter out elements that do not pass the filter predicate.

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.where("b = 'red'");
GroupBy

Similar to a SQL GROUPBY clause. Group the elements on the grouping keys, with a following aggregation operator to aggregate on per-group basis.

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.groupBy("a").select("a, b.sum as d");

Note: Flink can refer to nonaggregated columns in the select list that are not named in the groupBy clause, it could be used to get better performance by avoiding unnecessary column sorting and grouping while nonaggregated column is cogrouped with columns in groupBy clause. For example:

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.groupBy("a").select("a, b, c.sum as d");
Join

Similar to a SQL JOIN clause. Join two tables, both tables must have distinct field name, and the where clause is mandatory for join condition.

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.join(right).where("a = d").select("a, b, e");
Union

Similar to a SQL UNION ALL clause. Union two tables, both tables must have identical schema(field names and types).

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.union(right);


Operators Description
Select

Similar to a SQL SELECT statement. Perform a select operation.

val in = ds.as('a, 'b, 'c);
val result = in.select('a, 'c as 'd);
As

Rename fields.

val in = ds.as('a, 'b, 'c);
Filter

Similar to a SQL WHERE clause. Filter out elements that do not pass the filter predicate.

val in = ds.as('a, 'b, 'c);
val result = in.filter('a % 2 === 0)
Where

Similar to a SQL WHERE clause. Filter out elements that do not pass the filter predicate.

val in = ds.as('a, 'b, 'c);
val result = in.where('b === "red");
GroupBy

Similar to a SQL GROUPBY clause. Group the elements on the grouping keys, with a following aggregation operator to aggregate on per-group basis.

val in = ds.as('a, 'b, 'c);
val result = in.groupBy('a).select('a, 'b.sum as 'd);

Note: Flink can refer to nonaggregated columns in the select list that are not named in the groupBy clause, it could be used to get better performance by avoiding unnecessary column sorting and grouping while nonaggregated column is cogrouped with columns in groupBy clause. For example:

val in = ds.as('a, 'b, 'c);
val result = in.groupBy('a).select('a, 'b, 'c.sum as 'd);
Join

Similar to a SQL JOIN clause. Join two tables, both tables must have distinct field name, and the where clause is mandatory for join condition.

val left = ds1.as('a, 'b, 'c);
val right = ds2.as('d, 'e, 'f);
val result = left.join(right).where('a === 'd).select('a, 'b, 'e);
Union

Similar to a SQL UNION ALL clause. Union two tables, both tables must have identical schema(field names and types).

val left = ds1.as('a, 'b, 'c);
val right = ds2.as('a, 'b, 'c);
val result = left.union(right);

Expression Syntax

Some of operators in previous section expect an expression. These can either be specified using an embedded Scala DSL or a String expression. Please refer to the examples above to learn how expressions can be formulated.

This is the complete EBNF grammar for expressions:

expression = single expression , { "," , single expression } ;

single expression = alias | logic ;

alias = logic | logic , "AS" , field reference ;

logic = comparison , [ ( "&&" | "||" ) , comparison ] ;

comparison = term , [ ( "=" | "!=" | ">" | ">=" | "<" | "<=" ) , term ] ;

term = product , [ ( "+" | "-" ) , product ] ;

product = binary bitwise , [ ( "*" | "/" | "%" ) , binary bitwise ] ;

binary bitwise = unary , [ ( "&" | "!" | "^" ) , unary ] ;

unary = [ "!" | "-" | "~" ] , suffix ;

suffix = atom | aggregation | cast | as | substring ;

aggregation = atom , [ ".sum" | ".min" | ".max" | ".count" | ".avg" ] ;

cast = atom , ".cast(" , data type , ")" ;

data type = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOL" | "BOOLEAN" | "STRING" | "DATE" ;

as = atom , ".as(" , field reference , ")" ;

substring = atom , ".substring(" , substring start , ["," substring end] , ")" ;

substring start = single expression ;

substring end = single expression ;

atom = ( "(" , single expression , ")" ) | literal | field reference ;

Here, literal is a valid Java literal and field reference specifies a column in the data. The column names follow Java identifier syntax.

Only the types LONG and STRING can be casted to DATE and vice versa. A LONG casted to DATE must be a milliseconds timestamp. A STRING casted to DATE must have the format “yyyy-MM-dd HH:mm:ss.SSS”, “yyyy-MM-dd”, “HH:mm:ss”, or a milliseconds timestamp. By default, all timestamps refer to the UTC timezone beginning from January 1, 1970, 00:00:00 in milliseconds.