Table API 和SQL是Flink提供的相对DataStream更加高级的API,他是对DataStream的封装,它有一下的有点:
- 支持SQL
- Table API有更加简单的编程
- 由于Flink对Table API做了一些优化,所以在使用Table API的时候会更加的高效
- Table API意在支持统一的批流一体,可以同时兼容流处理以及批处理。
Table API中的table概念
在Table API和SQL中,Flink将数据看成是一张表,流数据就是一张无限追加的表,而批数据就是一张有限的数据表。既然是一张表就有特定的structure,可以是自己指定也可以根据数据源推断。
Table的层级结构
Flink中表的层级结构是:catalog -> database -> table.
catalog可以基于内存的,也是可以基于外部存储系统,比如hive,postgre。
- 如果是基于内存,那么catalog中的数据库和表就都是临时存在的,一旦会话结束,这些表元数据就被销毁。
- 如果是基于外部存储系统,那么这些表就是持久化表,可以被多个flink集群共用,可以在多次会话中使用。
当然在Flink中也支持视图。视图可以从现有的Table对象创建,通常用作Table API/SQL查询的结果。而表通常用来描述外部数据,例如文件、数据库表或消息队列。
本地IDE创建Table API的项目
- 创建一个maven项目,并添加依赖:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
复制代码
- 构建Table API ENV:
//方式1,使用setting的方式创建
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
//.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
//方式2, 使用StreamExecutionEnvironment来构建table env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// table api and sql to process data , including source, transfrom, sink
//execute job
tEnv.execute();
复制代码
Table API 与 DataStream 转换
DataStream to Table
fromDataStream(DataStream, Expression…)
fromDataStream(DataStream, String)
fromDataStream(DataStream, Schema)
//fromDataStream(DataStream), 不指定field name,field name就是f0,f1...
tableEnv.fromDataStream(stream)
//fromDataStream(DataStream, String)
tableEnv.fromDataStream(stream, "field-name1, field-name2.sub-name1, field-name3");
//fromDataStream(DataStream, Expression...)
tableEnv.fromDataStream(stream, $("field-name1"), ${"field-name2.sub-name1"), $("field-name3"));
//fromDataStream(DataStream, Schema)
tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
//.columnByMetadata("time1","TIMESTAMP_LTZ(3)")
//.column("field1","STRING”)
//.column("field2","TIMESTAMP_LTZ(3)”)
//.column("f0",DataTypes.of(User.class)”)
//.column("f0",DataTypes.STRUCTURE(User.class,DataTypes.FIELD("name",DataTypes.STRING()))”)
.watermark("timeField", "timeField - INTERVAL '10' MINUTE")
//.watermark("timeField", "SOURCE_WATERMARK()")
.build());
//fromChangelogStream(stream)
//fromChangelogStream(stream,Schema)
//创建view, 与创建table一样
tableEnv.createTemporaryView("table-name", stream)
tableEnv.createTemporaryView("table-name", stream, String);
tableEnv.createTemporaryView("table-name", stream, Expression...);
tableEnv.createTemporaryView("table-name", stream, Schema);
复制代码
Table to DataStream
//toDataStream(table)
tableEnv.toDataStream(table);
//toDataStream(table, Class)
tableEnv.toDataStream(table, User.class);
//toDataStream(table, AbstractDataType)
tableEnv.toDataStream(table, DataTypes.STRUCTURED(User.class,DataTypes.FIELD("name",DataTypes.STRING())));
//Append table, 只能insert
tEnv.toAppendStream(table, Types.ROW(Types.INT, Types.SQL_TIMESTAMP, Types.STRING, Types.SQL_TIMESTAMP));
tEnv.toAppendStream(table, Row.class);
tEnv.toAppendStream(table, new TupleTypeInfo(Types.STRING, Types.INT);
//toChangelogStream(table),支持表更新
tEnv.toChangelogStream(table)
//toChangelogStream(table,Schema),支持表更新
tEnv.toChangelogStream(table,Schema)
//Retract table:可以update,会在结果stream中增加一个boolean字段,在更新的时候先删除这条记录(将boolean标记为删除状态),然后再inset一条新的数据进去,新的数据Boolean状态为未删除状态
tEnv.toRetractStream(table, Types.ROW(Types.INT, Types.SQL_TIMESTAMP, Types.STRING, Types.SQL_TIMESTAMP));
tEnv.toRetractStream(table, Row.class);
tEnv.toRetractStream(table, new TupleTypeInfo(Types.STRING, Types.INT);
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END