Flink queryable state

Flink计算过程中有很多的中间状态,我们如果要获取这些状态,可以将这些状态sink到文件系统,然后从文件系统中获取。但是这种方式就必须依赖外部存储系统,并且存在延时,Flink提供了Client,让我们可以从Flink中实时获取这些中间状态。

Architecture 

image.png

前提条件

  1. opt/flink-queryable-state-runtime_2.11-1.13.2.jar复制到目录lib/下。
  2. 设置queryable-state.enabletrue.

Flink Code

Flink中需要将state转化成可被查询的state:

//ValueState转换成QueryableState
QueryableStateStream asQueryableState(String queryableStateName, ValueStateDescriptor stateDescriptor)
//ReducingState转换成QueryableState
QueryableStateStream asQueryableState(String queryableStateName, ReducingStateDescriptor stateDescriptor)

//通过setQueryable(String queryableStateName)
MapStateDescriptor<EmailId, EmailInformation> stateDescriptor =
		new MapStateDescriptor<>(
				"state-name",
				TypeInformation.of(new TypeHint<EmailId>() {

				}),
				TypeInformation.of(new TypeHint<EmailInformation>() {

				})
		);
stateDescriptor.setQueryable("queryable-name");
复制代码

Client Code

依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-core</artifactId>
  <version>1.13.2</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-queryable-state-client-java</artifactId>
  <version>1.13.2</version>
</dependency>
复制代码

Code

//create client
QueryableStateClient client = new QueryableStateClient(host, port);
//
CompletableFuture<MapState<EmailId, EmailInformation>> resultFuture =
client.getKvState(
		JobID.fromHexString(jobId),
		"queryable-name",
		"key-of-keyed-state", 
		BasicTypeInfo.STRING_TYPE_INFO,//key的类型
		stateDescriptor);//stateDescriptor与Flink代码中你要查询的state的Descriptor一模一样
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享