背景
在项目开发中,遇到如下场景:将数据按照业务逻辑分组,接下来的所有计算以组为单位;当第一条数据传入时,以当前数据的时间对应天的结束时间为定时触发时间,做定时。下一次的定时与上一次定时的时间间隔为86400s;
数据结构与备用数据
/**
* @author DSH12138
* @since 2020-11-02
* 原始数据结构
* */
@Data
@NoArgsConstructor
@ToString
public class SourceDataScheme {
private String plant;
private Float values;
private String device;
private Long timestamp;
public SourceDataScheme(String plant, Float values, String device, Long timestamp) {
this.plant = plant;
this.values = values;
this.device = device;
this.timestamp = timestamp;
}
}
复制代码
测试数据:
SourceDataScheme(plant=MS003, values=1, device=DXL480, timestamp=1603788065000)
SourceDataScheme(plant=MS001, values=2, device=DGS301, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=3, device=DXL479, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=4, device=DXL478, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=5, device=DXL477, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=6, device=DXL476, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=7, device=DXL475, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=8, device=DXL474, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=9, device=DXL473, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=10, device=DXL472, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=11, device=DXL471, timestamp=1603788065000)
================================================================================
SourceDataScheme(plant=MS003, values=12, device=DXL480, timestamp=1603788066000)
SourceDataScheme(plant=MS001, values=13, device=DGS301, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=14, device=DXL479, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=15, device=DXL478, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=16, device=DXL477, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=17, device=DXL476, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=18, device=DXL475, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=19 device=DXL474, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=20, device=DXL473, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=21, device=DXL472, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=22, device=DXL471, timestamp=1603788066000)
复制代码
错误的做法-直接使用Boolean
思路:只要在第一次数据进入的时候,识别这批数据是第一批数据即可。在KeyedProcessFunction中加一个变量初始值为true,做完定时后,将这个变量置为false。
所以在下面代码中,KeyedProcessFunction中设置了变量Boolean firstValue = true;
在KeyedProcessFunction中的processElement中,处理为:
if (firstValue) { context.timerService().registerEventTimeTimer(sourceDataScheme.getTimestamp() + 10);
System.out.println("做了数据" + sourceDataScheme + "的定时");
firstValue = false;
}
复制代码
完整代码见下:
import com.leayun.merak.po.source.SourceDataScheme;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
/**
* @author DSH12138
* @since 2020-11-02
* 测试keyedProcessFunction中ValueState<Boolean>和Boolean的区别
*/
public class KeyedHandle {
public DataStream<SourceDataScheme> keyedProcess(DataStream<SourceDataScheme> sourceDataSchemeDataStream) {
SingleOutputStreamOperator<SourceDataScheme> resultDataStream = sourceDataSchemeDataStream
//设置实时的watermark
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<SourceDataScheme>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(SourceDataScheme lastElement, long extractedTimestamp) {
long newTimestamp = lastElement.getTimestamp();
if (extractedTimestamp > newTimestamp) {
newTimestamp = extractedTimestamp;
}
return new Watermark(newTimestamp);
}
@Override
public long extractTimestamp(SourceDataScheme element, long recordTimestamp) {
long newTimestamp = element.getTimestamp();
if (recordTimestamp > newTimestamp) {
newTimestamp = recordTimestamp;
}
return newTimestamp;
}
})
.keyBy(new KeySelector<SourceDataScheme, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(SourceDataScheme sourceDataScheme) throws Exception {
return new Tuple2<>(sourceDataScheme.getPlant(), sourceDataScheme.getDevice());
}})
.process(new KeyedProcessFunction<Tuple2<String, String>, SourceDataScheme, SourceDataScheme>() {
Boolean firstValue = true;
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<SourceDataScheme> out) throws Exception {
System.out.println("定时触发,当前key = " + ctx.getCurrentKey().f0 + ", " + ctx.getCurrentKey().f1);
context.timerService().registerEventTimeTimer(timestamp+86400*1000L);
}
@Override
public void processElement(SourceDataScheme sourceDataScheme, Context context, Collector<SourceDataScheme> collector) throws Exception {
if (firstValue) {
context.timerService().registerEventTimeTimer(sourceDataScheme.getTimestamp() + 10);
System.out.println("做了数据" + sourceDataScheme + "的定时");
firstValue = false;
}
}
}).setParallelism(2);
return resultDataStream;
}
}
复制代码
但是使用测试数据进行测试时,控制台结果输出如下:
//在输入timestamp=1603788065000的数据后的输出
做了数据SourceDataScheme(plant=MS003, values=1, device=DXL480, timestamp=1603788065000)的定时
做了数据SourceDataScheme(plant=MS001, values=2, device=DGS301, timestamp=1603788065000)的定时
//在输入剩下的数据后的输出
定时触发,当前key = MS001, DGS301
定时触发,当前key = MS003, DXL480
复制代码
可以看到其余数据分组并没有做定时,因为并行度为2,所以前两条数据被分配到了两个task中。这时的变量Boolean firstValue = true;
是在task中的变量。其余的数据分组被分到这个task中,因为此时task中的firstValue的值已经被赋值为false,所以不能再做定时;
正确做法-使用StateValue
思路:使用StateValue替换变量
import com.leayun.merak.po.source.SourceDataScheme;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
/**
* @author DSH12138
* @since 2020-11-02
* 测试keyedProcessFunction中ValueState<Boolean>和Boolean的区别
*/
public class KeyedHandle {
public DataStream<SourceDataScheme> keyedProcess(DataStream<SourceDataScheme> sourceDataSchemeDataStream) {
SingleOutputStreamOperator<SourceDataScheme> resultDataStream = sourceDataSchemeDataStream
//设置实时的watermark
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<SourceDataScheme>() {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(SourceDataScheme lastElement, long extractedTimestamp) {
long newTimestamp = lastElement.getTimestamp();
if (extractedTimestamp > newTimestamp) {
newTimestamp = extractedTimestamp;
}
return new Watermark(newTimestamp);
}
@Override
public long extractTimestamp(SourceDataScheme element, long recordTimestamp) {
long newTimestamp = element.getTimestamp();
if (recordTimestamp > newTimestamp) {
newTimestamp = recordTimestamp;
}
return newTimestamp;
}
})
.keyBy(new KeySelector<SourceDataScheme, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(SourceDataScheme sourceDataScheme) throws Exception {
return new Tuple2<>(sourceDataScheme.getPlant(), sourceDataScheme.getDevice());
}
})
.process(new KeyedProcessFunction<Tuple2<String, String>, SourceDataScheme, SourceDataScheme>() {
ValueState<Boolean> firstState;
@Override
public void open(Configuration parameters) throws Exception {
firstState = getRuntimeContext().getState(new ValueStateDescriptor<>("first-value", Boolean.class));
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<SourceDataScheme> out) throws Exception {
System.out.println("定时触发,当前key = " + ctx.getCurrentKey().f0 + ", " + ctx.getCurrentKey().f1);
ctx.timerService().registerEventTimeTimer(timestamp+86400*1000L);
}
@Override
public void processElement(SourceDataScheme sourceDataScheme, Context context, Collector<SourceDataScheme> collector) throws Exception {
if( firstState.value() == null ) {
context.timerService().registerEventTimeTimer( sourceDataScheme.getTimestamp() + 10 );
firstState.update(false);
}
}
}).setParallelism(2);
return resultDataStream;
}
}
复制代码
总结
在KeyedProcessFunction中设置的变量,与其并行度有关,每个task中维护一个变量值,因为会有多个数据分组进入一个task计算的情况,所以会造成前面进入的分组修改变量,导致后面进入的分组无法进行相应的计算;
而在KeyedProcessFunction中使用ValueState,其值与分组一一对应,每一个分组都会维护一个状态,在Flink的checkpoint时也可以进行状态的暂存。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END