DataStream API与Table API/SQL集成
DataStream API与Table API/SQL集成
在定义数据处理管道时,Table API 和 DataStream API 同样重要。
DataStream API 在一个相对较低级别的命令式编程 API 中提供了流处理的原语(即时间、状态和数据流管理)。 Table API 抽象了许多内部结构,并提供了结构化和声明性的 API。
两种 API 都可以处理有界和无界流。
处理历史数据时需要管理有界流。 无限流发生在可能首先用历史数据初始化的实时处理场景中。
为了高效执行,这两个 API 都以优化的批处理执行模式提供处理有界流。 但是,由于批处理只是流的一种特殊情况,因此也可以在常规流执行模式下运行有界流的管道。
目前,DataStream API 和 Table API 都提供了自己的方式来启用批处理执行模式。 在不久的将来,这将进一步统一。
一个 API 中的管道可以端到端定义,而不依赖于另一个 API。 但是,出于各种原因,混合使用这两种 API 可能会很有用:
- 在 DataStream API 中实现主管道之前,使用表生态系统轻松访问目录或连接到外部系统。
- 在 DataStream API 中实现主管道之前,访问一些用于无状态数据规范化和清理的 SQL 函数。
- 如果 Table API 中不存在更底层的操作(例如自定义计时器处理),请不时切换到 DataStream API。
Flink 提供了特殊的桥接功能,使与 DataStream API 的集成尽可能顺畅。
在 DataStream 和 Table API 之间切换会增加一些转换开销。 例如,部分处理二进制数据的表运行时的内部数据结构(即 RowData)需要转换为对用户更友好的数据结构(即 Row)。 通常,可以忽略此开销,但为了完整起见,在此提及。
DataStream和Table之间的转换
Flink 在 Java 和 Scala 中提供了一个专门的 StreamTableEnvironment 用于与 DataStream API 集成。 这些环境使用其他方法扩展常规 TableEnvironment,并将 DataStream API 中使用的 StreamExecutionEnvironment 作为参数。
目前 StreamTableEnvironment 暂不支持开启批量执行模式。 尽管如此,有界流可以使用流式执行模式处理,但效率较低。 但是请注意,通用 TableEnvironment 可以在流式执行或优化的批处理执行模式下工作。
以下代码显示了如何在两个 API 之间来回切换的示例。 Table 的列名和类型自动派生自 DataStream 的 TypeInformation。 由于 DataStream API 本身不支持变更日志处理,因此代码在流到表和表到流的转换过程中假定仅附加/仅插入语义。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// create a DataStream
DataStream<String> dataStream = env.fromElements("Alice", "Bob", "John");
// interpret the insert-only DataStream as a Table
Table inputTable = tableEnv.fromDataStream(dataStream);
// register the Table object as a view and query it
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable");
// interpret the insert-only Table as a DataStream again
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
// add a printing sink and execute in DataStream API
resultStream.print();
env.execute();
// prints:
// +I[Alice]
// +I[Bob]
// +I[John]
fromDataStream 和 toDataStream 的完整语义可以在下面的专用部分中找到。 特别是,本节讨论了如何使用更复杂和嵌套的类型来影响模式派生。 它还涵盖了使用事件时间和水印。
根据查询的类型,在许多情况下,生成的动态表是一个管道,它不仅在将表覆盖到 DataStream 时产生仅插入更改,而且还会产生撤回和其他类型的更新。 在表到流的转换过程中,这可能会导致类似于以下的异常
Table sink 'Unregistered_DataStream_Sink_1' doesn't support consuming update changes [...].
在这种情况下,需要再次修改查询或切换到 toChangelogStream。
以下示例显示了如何转换更新表。 每个结果行都表示更改日志中的一个条目,该条目带有一个更改标志,可以通过对其调用 row.getKind() 进行查询。 在该示例中,Alice 的第二个分数在 (-U) 更改之前和 (+U) 更改之后创建更新。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// create a DataStream
DataStream<Row> dataStream = env.fromElements(
Row.of("Alice", 12),
Row.of("Bob", 10),
Row.of("Alice", 100));
// interpret the insert-only DataStream as a Table
Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "score");
// register the Table object as a view and query it
// the query contains an aggregation that produces updates
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery(
"SELECT name, SUM(score) FROM InputTable GROUP BY name");
// interpret the updating Table as a changelog DataStream
DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
// add a printing sink and execute in DataStream API
resultStream.print();
env.execute();
// prints:
// +I[Alice, 12]
// +I[Bob, 10]
// -U[Alice, 12]
// +U[Alice, 112]
fromChangelogStream 和 toChangelogStream 的完整语义可以在下面的专用部分中找到。 特别是,本节讨论了如何使用更复杂和嵌套的类型来影响模式派生。 它涵盖了使用事件时间和水印。 它讨论了如何为输入和输出流声明主键和更改日志模式。
依赖与导入
将 Table API 与 DataStream API 结合的项目需要添加以下桥接模块之一。 它们包括对 flink-table-api-java 或 flink-table-api-scala 的传递依赖以及相应的特定于语言的 DataStream API 模块。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.13.5</version>
<scope>provided</scope>
</dependency>
配置
TableEnvironment 将采用来自传递的 StreamExecutionEnvironment 的所有配置选项。 但是,不能保证对 StreamExecutionEnvironment 的配置的进一步更改会在 StreamTableEnvironment 实例化后传播到它。 此外,不支持将选项从 Table API 反向传播到 DataStream API。
我们建议在切换到 Table API 之前尽早在 DataStream API 中设置所有配置选项。
import java.time.ZoneId;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
// create Java DataStream API
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// set various configuration early
env.setMaxParallelism(256);
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// then switch to Java Table API
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// set configuration early
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Berlin"));
// start defining your pipelines in both APIs...
执行行为
这两个 API 都提供了执行管道的方法。 换句话说:如果需要,他们会编译一个作业图,该作业图将提交到集群并触发执行。 结果将流式传输到声明的接收器。
通常,这两个 API 都使用方法名称中的术语执行来标记此类行为。 但是,Table API 和 DataStream API 的执行行为略有不同。
DataStream API
DataStream API 的 StreamExecutionEnvironment 充当构建器模式来构建复杂的管道。 管道可能会分成多个分支,这些分支可能会或可能不会以接收器结束。
必须至少定义一个接收器。 否则,将引发以下异常:
java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
StreamExecutionEnvironment.execute() 提交整个构建的管道并随后清除构建器。 换句话说:不再声明源和接收器,并且可以将新管道添加到构建器中。 因此,每个 DataStream 程序通常以调用 StreamExecutionEnvironment.execute() 结束。 或者,DataStream.executeAndCollect() 隐式定义了一个接收器,用于将结果流式传输到本地客户端,并且只执行当前分支。
Table API
在 Table API 中,仅在 StatementSet 中支持分支管道,其中每个分支都必须声明一个最终接收器。 TableEnvironment 和 StreamTableEnvironment 都不提供专用的通用 execute() 方法。 相反,它们提供了提交单个源到接收器管道或语句集的方法:
// execute with explicit sink
tableEnv.from("InputTable").executeInsert("OutputTable")
tableEnv.executeSql("INSERT INTO OutputTable SELECT * FROM InputTable")
tableEnv.createStatementSet()
.addInsert("OutputTable", tableEnv.from("InputTable"))
.addInsert("OutputTable2", tableEnv.from("InputTable"))
.execute()
tableEnv.createStatementSet()
.addInsertSql("INSERT INTO OutputTable SELECT * FROM InputTable")
.addInsertSql("INSERT INTO OutputTable2 SELECT * FROM InputTable")
.execute()
// execute with implicit local sink
tableEnv.from("InputTable").execute().print()
tableEnv.executeSql("SELECT * FROM InputTable").print()
为了结合这两种执行行为,对 StreamTableEnvironment.toDataStream 或 StreamTableEnvironment.toChangelogStream 的每次调用都将具体化(即编译)表 API 子管道并将其插入到 DataStream API 管道构建器中。 这意味着之后必须调用 StreamExecutionEnvironment.execute() 或 DataStream.executeAndCollect。 Table API 中的执行不会触发这些“外部部分”。
// (1)
// adds a branch with a printing sink to the StreamExecutionEnvironment
tableEnv.toDataStream(table).print()
// (2)
// executes a Table API end-to-end pipeline as a Flink job and prints locally,
// thus (1) has still not been executed
table.execute().print()
// executes the DataStream API pipeline with the sink defined in (1) as a
// Flink job, (2) was already running before
env.execute()
处理(仅插入)流
StreamTableEnvironment 提供以下方法来转换和转换为 DataStream API:
fromDataStream(DataStream):将仅插入更改和任意类型的流解释为表。默认情况下不传播事件时间和水印。
fromDataStream(DataStream, Schema):将仅插入更改和任意类型的流解释为表。可选模式允许丰富列数据类型并添加时间属性、水印策略、其他计算列或主键。
createTemporaryView(String, DataStream):在一个名称下注册流,以便在 SQL 中访问它。它是 createTemporaryView(String, fromDataStream(DataStream)) 的快捷方式。
createTemporaryView(String, DataStream, Schema):在一个名称下注册流,以便在 SQL 中访问它。它是 createTemporaryView(String, fromDataStream(DataStream, Schema)) 的快捷方式。
toDataStream(DataStream):将表转换为只插入更改的流。默认流记录类型是 org.apache.flink.types.Row。单个行时间属性列被写回到 DataStream API 的记录中。水印也被传播。
toDataStream(DataStream, AbstractDataType):将表转换为只插入更改的流。此方法接受一种数据类型来表达所需的流记录类型。规划器可能会插入隐式强制转换和重新排序列以将列映射到(可能是嵌套的)数据类型的字段。
toDataStream(DataStream, Class):toDataStream(DataStream, DataTypes.of(Class)) 的快捷方式,可以快速反射地创建所需的数据类型。
从 Table API 的角度来看,与 DataStream API 之间的转换类似于读取或写入已使用 SQL 中的 CREATE TABLE DDL 定义的虚拟表连接器。
virtual CREATE TABLE name (schema) WITH (options) 语句中的 schema 部分可以从 DataStream 的类型信息中自动派生、丰富或完全使用 org.apache.flink.table.api.Schema 手动定义。
虚拟 DataStream 表连接器为每一行公开以下元数据:
Key | Data Type | Description | R/W |
---|---|---|---|
rowtime | TIMESTAMP_LTZ(3) NOT NULL | Stream record's timestamp. | R/W |
虚拟 DataStream 表源实现 SupportsSourceWatermark,因此允许调用 SOURCE_WATERMARK() 内置函数作为水印策略,以采用来自 DataStream API 的水印。
fromDataStream
例子
下面的代码展示了如何将 fromDataStream 用于不同的场景。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import java.time.Instant;
// some example POJO
public static class User {
public String name;
public Integer score;
public Instant event_time;
// default constructor for DataStream API
public User() {}
// fully assigning constructor for Table API
public User(String name, Integer score, Instant event_time) {
this.name = name;
this.score = score;
this.event_time = event_time;
}
}
// create a DataStream
DataStream<User> dataStream =
env.fromElements(
new User("Alice", 4, Instant.ofEpochMilli(1000)),
new User("Bob", 6, Instant.ofEpochMilli(1001)),
new User("Alice", 10, Instant.ofEpochMilli(1002)));
// === EXAMPLE 1 ===
// derive all physical columns automatically
Table table = tableEnv.fromDataStream(dataStream);
table.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9)
// )
// === EXAMPLE 2 ===
// derive all physical columns automatically
// but add computed columns (in this case for creating a proctime attribute column)
Table table = tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
.build());
table.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT NOT NULL,
// `event_time` TIMESTAMP_LTZ(9),
// `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
//)
// === EXAMPLE 3 ===
// derive all physical columns automatically
// but add computed columns (in this case for creating a rowtime attribute column)
// and a custom watermark strategy
Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
.build());
table.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)),
// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND
// )
// === EXAMPLE 4 ===
// derive all physical columns automatically
// but access the stream record's timestamp for creating a rowtime attribute column
// also rely on the watermarks generated in the DataStream API
// we assume that a watermark strategy has been defined for `dataStream` before
// (not part of this example)
Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build());
table.printSchema();
// prints:
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
// )
// === EXAMPLE 5 ===
// define physical columns manually
// in this example,
// - we can reduce the default precision of timestamps from 9 to 3
// - we also project the columns and put `event_time` to the beginning
Table table =
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.column("event_time", "TIMESTAMP_LTZ(3)")
.column("name", "STRING")
.column("score", "INT")
.watermark("event_time", "SOURCE_WATERMARK()")
.build());
table.printSchema();
// prints:
// (
// `event_time` TIMESTAMP_LTZ(3) *ROWTIME*,
// `name` VARCHAR(200),
// `score` INT
// )
// note: the watermark strategy is not shown due to the inserted column reordering projection
示例 1 说明了一个不需要基于时间的操作的简单用例。
示例 4 是最常见的用例,其中基于时间的操作(例如窗口或间隔连接)应该是管道的一部分。 示例 2 是这些基于时间的操作应该在处理时间内工作的最常见用例。
示例 5 完全依赖于用户的声明。 这对于将 DataStream API 中的泛型类型(在 Table API 中为 RAW)替换为适当的数据类型很有用。
由于 DataType 比 TypeInformation 更丰富,我们可以轻松启用不可变 POJO 和其他复杂的数据结构。 以下 Java 示例显示了可能的情况。 另请查看 DataStream API 的 Data Types & Serialization 页面以获取有关那里支持的类型的更多信息。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
// the DataStream API does not support immutable POJOs yet,
// the class will result in a generic type that is a RAW type in Table API by default
public static class User {
public final String name;
public final Integer score;
public User(String name, Integer score) {
this.name = name;
this.score = score;
}
}
// create a DataStream
DataStream<User> dataStream = env.fromElements(
new User("Alice", 4),
new User("Bob", 6),
new User("Alice", 10));
// since fields of a RAW type cannot be accessed, every stream record is treated as an atomic type
// leading to a table with a single column `f0`
Table table = tableEnv.fromDataStream(dataStream);
table.printSchema();
// prints:
// (
// `f0` RAW('User', '...')
// )
// instead, declare a more useful data type for columns using the Table API's type system
// in a custom schema and rename the columns in a following `as` projection
Table table = tableEnv
.fromDataStream(
dataStream,
Schema.newBuilder()
.column("f0", DataTypes.of(User.class))
.build())
.as("user");
table.printSchema();
// prints:
// (
// `user` *User<`name` STRING,`score` INT>*
// )
// data types can be extracted reflectively as above or explicitly defined
Table table3 = tableEnv
.fromDataStream(
dataStream,
Schema.newBuilder()
.column(
"f0",
DataTypes.STRUCTURED(
User.class,
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.INT())))
.build())
.as("user");
table.printSchema();
// prints:
// (
// `user` *User<`name` STRING,`score` INT>*
// )
createTemporaryView
示例
DataStream 可以直接注册为视图(可能通过模式丰富)。
从 DataStream 创建的视图只能注册为临时视图。 由于它们的内联/匿名性质,无法将它们注册到永久目录中。
下面的代码展示了如何在不同的场景下使用 createTemporaryView。
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
// create some DataStream
DataStream<Tuple2<Long, String>> dataStream = env.fromElements(
Tuple2.of(12L, "Alice"),
Tuple2.of(0L, "Bob"));
// === EXAMPLE 1 ===
// register the DataStream as view "MyView" in the current session
// all columns are derived automatically
tableEnv.createTemporaryView("MyView", dataStream);
tableEnv.from("MyView").printSchema();
// prints:
// (
// `f0` BIGINT NOT NULL,
// `f1` STRING
// )
// === EXAMPLE 2 ===
// register the DataStream as view "MyView" in the current session,
// provide a schema to adjust the columns similar to `fromDataStream`
// in this example, the derived NOT NULL information has been removed
tableEnv.createTemporaryView(
"MyView",
dataStream,
Schema.newBuilder()
.column("f0", "BIGINT")
.column("f1", "STRING")
.build());
tableEnv.from("MyView").printSchema();
// prints:
// (
// `f0` BIGINT,
// `f1` STRING
// )
// === EXAMPLE 3 ===
// use the Table API before creating the view if it is only about renaming columns
tableEnv.createTemporaryView(
"MyView",
tableEnv.fromDataStream(dataStream).as("id", "name"));
tableEnv.from("MyView").printSchema();
// prints:
// (
// `id` BIGINT NOT NULL,
// `name` STRING
// )
toDataStream
示例
下面的代码展示了如何在不同的场景中使用 toDataStream。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.types.Row;
import java.time.Instant;
// POJO with mutable fields
// since no fully assigning constructor is defined, the field order
// is alphabetical [event_time, name, score]
public static class User {
public String name;
public Integer score;
public Instant event_time;
}
tableEnv.executeSql(
"CREATE TABLE GeneratedTable "
+ "("
+ " name STRING,"
+ " score INT,"
+ " event_time TIMESTAMP_LTZ(3),"
+ " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
+ ")"
+ "WITH ('connector'='datagen')");
Table table = tableEnv.from("GeneratedTable");
// === EXAMPLE 1 ===
// use the default conversion to instances of Row
// since `event_time` is a single rowtime attribute, it is inserted into the DataStream
// metadata and watermarks are propagated
DataStream<Row> dataStream = tableEnv.toDataStream(table);
// === EXAMPLE 2 ===
// a data type is extracted from class `User`,
// the planner reorders fields and inserts implicit casts where possible to convert internal
// data structures to the desired structured type
// since `event_time` is a single rowtime attribute, it is inserted into the DataStream
// metadata and watermarks are propagated
DataStream<User> dataStream = tableEnv.toDataStream(table, User.class);
// data types can be extracted reflectively as above or explicitly defined
DataStream<User> dataStream =
tableEnv.toDataStream(
table,
DataTypes.STRUCTURED(
User.class,
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.INT()),
DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3))));
请注意,toDataStream 仅支持非更新表。 通常,基于时间的操作(例如窗口、间隔连接或 MATCH_RECOGNIZE 子句)非常适合与投影和过滤器等简单操作相邻的仅插入管道。
具有产生更新的操作的管道可以使用 toChangelogStream。
处理变更流
在内部,Flink 的表运行时是一个变更日志处理器。 概念页面描述了动态表和流如何相互关联。
StreamTableEnvironment 提供以下方法来公开这些变更数据捕获 (CDC) 功能:
fromChangelogStream(DataStream):将变更日志条目流解释为表格。流记录类型必须是 org.apache.flink.types.Row,因为它的 RowKind 标志是在运行时评估的。默认情况下不传播事件时间和水印。此方法需要一个包含各种更改的更改日志(在 org.apache.flink.types.RowKind 中枚举)作为默认的 ChangelogMode。
fromChangelogStream(DataStream, Schema):允许为 DataStream 定义类似于 fromDataStream(DataStream, Schema) 的模式。否则语义等于 fromChangelogStream(DataStream)。
fromChangelogStream(DataStream, Schema, ChangelogMode):完全控制如何将流解释为变更日志。传递的 ChangelogMode 有助于规划器区分仅插入、更新插入或收回行为。
toChangelogStream(Table):fromChangelogStream(DataStream)的逆操作。它生成一个包含 org.apache.flink.types.Row 实例的流,并在运行时为每条记录设置 RowKind 标志。该方法支持各种更新表。如果输入表包含单个行时间列,它将被传播到流记录的时间戳中。水印也将被传播。
toChangelogStream(Table, Schema):fromChangelogStream(DataStream, Schema)的逆操作。该方法可以丰富产生的列数据类型。如有必要,计划者可能会插入隐式强制转换。可以将行时间写为元数据列。
toChangelogStream(Table, Schema, ChangelogMode):完全控制如何将表转换为变更日志流。传递的 ChangelogMode 有助于规划器区分仅插入、更新插入或收回行为。
从 Table API 的角度来看,与 DataStream API 之间的转换类似于读取或写入已使用 SQL 中的 CREATE TABLE DDL 定义的虚拟表连接器。
因为 fromChangelogStream 的行为与 fromDataStream 类似,我们建议在继续之前阅读上一节。
此虚拟连接器还支持读取和写入流记录的行时元数据。
虚拟表源实现 SupportsSourceWatermark。
fromChangelogStream
使用示例
下面的代码展示了如何将 fromChangelogStream 用于不同的场景。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
// === EXAMPLE 1 ===
// interpret the stream as a retract stream
// create a changelog DataStream
DataStream<Row> dataStream =
env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", 12),
Row.ofKind(RowKind.INSERT, "Bob", 5),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
// interpret the DataStream as a Table
Table table = tableEnv.fromChangelogStream(dataStream);
// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
tableEnv
.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
.print();
// prints:
// +----+--------------------------------+-------------+
// | op | name | score |
// +----+--------------------------------+-------------+
// | +I | Bob | 5 |
// | +I | Alice | 12 |
// | -D | Alice | 12 |
// | +I | Alice | 100 |
// +----+--------------------------------+-------------+
// === EXAMPLE 2 ===
// interpret the stream as an upsert stream (without a need for UPDATE_BEFORE)
// create a changelog DataStream
DataStream<Row> dataStream =
env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", 12),
Row.ofKind(RowKind.INSERT, "Bob", 5),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100));
// interpret the DataStream as a Table
Table table =
tableEnv.fromChangelogStream(
dataStream,
Schema.newBuilder().primaryKey("f0").build(),
ChangelogMode.upsert());
// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
tableEnv
.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
.print();
// prints:
// +----+--------------------------------+-------------+
// | op | name | score |
// +----+--------------------------------+-------------+
// | +I | Bob | 5 |
// | +I | Alice | 12 |
// | -U | Alice | 12 |
// | +U | Alice | 100 |
// +----+--------------------------------+-------------+
示例 1 中显示的默认 ChangelogMode 对于大多数用例来说应该足够了,因为它接受各种更改。
但是,示例 2 显示了如何通过使用 upsert 模式将更新消息的数量减少 50% 来限制传入更改的种类以提高效率。 可以通过为 toChangelogStream 定义主键和 upsert 更改日志模式来减少结果消息的数量。
toChangelogStream
使用示例
下面的代码展示了如何在不同的场景下使用 toChangelogStream。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.data.StringData;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.*;
// create Table with event-time
tableEnv.executeSql(
"CREATE TABLE GeneratedTable "
+ "("
+ " name STRING,"
+ " score INT,"
+ " event_time TIMESTAMP_LTZ(3),"
+ " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
+ ")"
+ "WITH ('connector'='datagen')");
Table table = tableEnv.from("GeneratedTable");
// === EXAMPLE 1 ===
// convert to DataStream in the simplest and most general way possible (no event-time)
Table simpleTable = tableEnv
.fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12))
.as("name", "score")
.groupBy($("name"))
.select($("name"), $("score").sum());
tableEnv
.toChangelogStream(simpleTable)
.executeAndCollect()
.forEachRemaining(System.out::println);
// prints:
// +I[Bob, 12]
// +I[Alice, 12]
// -U[Alice, 12]
// +U[Alice, 14]
// === EXAMPLE 2 ===
// convert to DataStream in the simplest and most general way possible (with event-time)
DataStream<Row> dataStream = tableEnv.toChangelogStream(table);
// since `event_time` is a single time attribute in the schema, it is set as the
// stream record's timestamp by default; however, at the same time, it remains part of the Row
dataStream.process(
new ProcessFunction<Row, Void>() {
@Override
public void processElement(Row row, Context ctx, Collector<Void> out) {
// prints: [name, score, event_time]
System.out.println(row.getFieldNames(true));
// timestamp exists twice
assert ctx.timestamp() == row.<Instant>getFieldAs("event_time").toEpochMilli();
}
});
env.execute();
// === EXAMPLE 3 ===
// convert to DataStream but write out the time attribute as a metadata column which means
// it is not part of the physical schema anymore
DataStream<Row> dataStream = tableEnv.toChangelogStream(
table,
Schema.newBuilder()
.column("name", "STRING")
.column("score", "INT")
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.build());
// the stream record's timestamp is defined by the metadata; it is not part of the Row
dataStream.process(
new ProcessFunction<Row, Void>() {
@Override
public void processElement(Row row, Context ctx, Collector<Void> out) {
// prints: [name, score]
System.out.println(row.getFieldNames(true));
// timestamp exists once
System.out.println(ctx.timestamp());
}
});
env.execute();
// === EXAMPLE 4 ===
// for advanced users, it is also possible to use more internal data structures for efficiency
// note that this is only mentioned here for completeness because using internal data structures
// adds complexity and additional type handling
// however, converting a TIMESTAMP_LTZ column to `Long` or STRING to `byte[]` might be convenient,
// also structured types can be represented as `Row` if needed
DataStream<Row> dataStream = tableEnv.toChangelogStream(
table,
Schema.newBuilder()
.column(
"name",
DataTypes.STRING().bridgedTo(StringData.class))
.column(
"score",
DataTypes.INT())
.column(
"event_time",
DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class))
.build());
// leads to a stream of Row(name: StringData, score: Integer, event_time: Long)
有关示例 4 中的数据类型支持哪些转换的更多信息,请参阅 Table API 的数据类型页面。
toChangelogStream(Table).executeAndCollect() 的行为等同于调用 Table.execute().collect()。 但是, toChangelogStream(Table) 可能对测试更有用,因为它允许在 DataStream API 的后续 ProcessFunction 中访问生成的水印。
TypeInformation 和 DataType 之间的映射
DataStream API 使用 org.apache.flink.api.common.typeinfo.TypeInformation 的实例来描述在流中传输的记录类型。特别是,它定义了如何将记录从一个 DataStream 运算符序列化和反序列化到另一个。它还有助于将状态序列化为保存点和检查点。
Table API 使用自定义数据结构在内部表示记录,并向用户公开 org.apache.flink.table.types.DataType 以声明将数据结构转换为的外部格式,以便在源、接收器、UDF 或 DataStream 中使用API。
DataType 比 TypeInformation 更丰富,因为它还包含有关逻辑 SQL 类型的详细信息。因此,在转换过程中会隐式添加一些细节。
Table 的列名和类型自动派生自 DataStream 的 TypeInformation。使用 DataStream.getType() 检查是否已通过 DataStream API 的反射类型提取工具正确检测到类型信息。如果最外层记录的 TypeInformation 是 CompositeType,则在派生表的 schema 时将在第一级展平。
TypeInformation 转为 DataType
将 TypeInformation 转换为 DataType 时适用以下规则:
TypeInformation 的所有子类都映射到逻辑类型,包括与 Flink 内置序列化器对齐的可空性。
TupleTypeInfoBase 的子类被转换为行(对于 Row)或结构化类型(对于元组、POJO 和案例类)。
BigDecimal 默认转换为 DECIMAL(38, 18)。
PojoTypeInfo 字段的顺序由以所有字段作为参数的构造函数确定。 如果在转换过程中未找到,则字段顺序将按字母顺序排列。
无法表示为列出的 org.apache.flink.table.api.DataTypes 之一的 GenericTypeInfo 和其他 TypeInformation 将被视为黑盒 RAW 类型。 当前会话配置用于实现原始类型的序列化程序。 届时将无法访问复合嵌套字段。
有关完整的翻译逻辑,请参阅 TypeInfoDataTypeConverter。
使用 DataTypes.of(TypeInformation) 在自定义模式声明或 UDF 中调用上述逻辑。
DataType转为TypeInformation
表运行时将确保正确地将输出记录序列化到 DataStream API 的第一个运算符。
之后,需要考虑 DataStream API 的类型信息语义。