Flink计算过程中有很多的中间状态,我们如果要获取这些状态,可以将这些状态sink到文件系统,然后从文件系统中获取。但是这种方式就必须依赖外部存储系统,并且存在延时,Flink提供了Client,让我们可以从Flink中实时获取这些中间状态。
Architecture
前提条件
- 将
opt/flink-queryable-state-runtime_2.11-1.13.2.jar
复制到目录lib/
下。 - 设置
queryable-state.enable
为true
.
Flink Code
Flink中需要将state转化成可被查询的state:
//ValueState转换成QueryableState
QueryableStateStream asQueryableState(String queryableStateName, ValueStateDescriptor stateDescriptor)
//ReducingState转换成QueryableState
QueryableStateStream asQueryableState(String queryableStateName, ReducingStateDescriptor stateDescriptor)
//通过setQueryable(String queryableStateName)
MapStateDescriptor<EmailId, EmailInformation> stateDescriptor =
new MapStateDescriptor<>(
"state-name",
TypeInformation.of(new TypeHint<EmailId>() {
}),
TypeInformation.of(new TypeHint<EmailInformation>() {
})
);
stateDescriptor.setQueryable("queryable-name");
复制代码
Client Code
依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-client-java</artifactId>
<version>1.13.2</version>
</dependency>
复制代码
Code
//create client
QueryableStateClient client = new QueryableStateClient(host, port);
//
CompletableFuture<MapState<EmailId, EmailInformation>> resultFuture =
client.getKvState(
JobID.fromHexString(jobId),
"queryable-name",
"key-of-keyed-state",
BasicTypeInfo.STRING_TYPE_INFO,//key的类型
stateDescriptor);//stateDescriptor与Flink代码中你要查询的state的Descriptor一模一样
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END