Flink-KeyedProcessFuntion中ValueState<Boolean>与Boolean的区别

背景

在项目开发中,遇到如下场景:将数据按照业务逻辑分组,接下来的所有计算以组为单位;当第一条数据传入时,以当前数据的时间对应天的结束时间为定时触发时间,做定时。下一次的定时与上一次定时的时间间隔为86400s;

数据结构与备用数据

/**
* @author DSH12138
* @since 2020-11-02
* 原始数据结构
* */

@Data
@NoArgsConstructor
@ToString
public class SourceDataScheme {

    private String plant;
    private Float values;
    private String device;
    private Long timestamp;

    public SourceDataScheme(String plant, Float values, String device, Long timestamp) {
        this.plant = plant;
        this.values = values;
        this.device = device;
        this.timestamp = timestamp;
    }
}
复制代码

测试数据:

SourceDataScheme(plant=MS003, values=1, device=DXL480, timestamp=1603788065000)
SourceDataScheme(plant=MS001, values=2, device=DGS301, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=3, device=DXL479, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=4, device=DXL478, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=5, device=DXL477, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=6, device=DXL476, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=7, device=DXL475, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=8, device=DXL474, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=9, device=DXL473, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=10, device=DXL472, timestamp=1603788065000)
SourceDataScheme(plant=MS003, values=11, device=DXL471, timestamp=1603788065000)
================================================================================
SourceDataScheme(plant=MS003, values=12, device=DXL480, timestamp=1603788066000)
SourceDataScheme(plant=MS001, values=13, device=DGS301, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=14, device=DXL479, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=15, device=DXL478, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=16, device=DXL477, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=17, device=DXL476, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=18, device=DXL475, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=19 device=DXL474, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=20, device=DXL473, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=21, device=DXL472, timestamp=1603788066000)
SourceDataScheme(plant=MS003, values=22, device=DXL471, timestamp=1603788066000)
复制代码

错误的做法-直接使用Boolean

思路:只要在第一次数据进入的时候,识别这批数据是第一批数据即可。在KeyedProcessFunction中加一个变量初始值为true,做完定时后,将这个变量置为false。

所以在下面代码中,KeyedProcessFunction中设置了变量Boolean firstValue = true;在KeyedProcessFunction中的processElement中,处理为:

if (firstValue) {  context.timerService().registerEventTimeTimer(sourceDataScheme.getTimestamp() + 10);
                System.out.println("做了数据" + sourceDataScheme + "的定时");
                firstValue = false;
                }
复制代码

完整代码见下:

import com.leayun.merak.po.source.SourceDataScheme;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;

/**
* @author DSH12138
* @since 2020-11-02
* 测试keyedProcessFunction中ValueState<Boolean>和Boolean的区别
*/
public class KeyedHandle {

    public DataStream<SourceDataScheme> keyedProcess(DataStream<SourceDataScheme> sourceDataSchemeDataStream) {

        SingleOutputStreamOperator<SourceDataScheme> resultDataStream = sourceDataSchemeDataStream
        //设置实时的watermark
        .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<SourceDataScheme>() {
            @Nullable
            @Override
            public Watermark checkAndGetNextWatermark(SourceDataScheme lastElement, long extractedTimestamp) {
                long newTimestamp = lastElement.getTimestamp();
                if (extractedTimestamp > newTimestamp) {
                newTimestamp = extractedTimestamp;
                }
                return new Watermark(newTimestamp);
            }

             @Override
            public long extractTimestamp(SourceDataScheme element, long recordTimestamp) {
                long newTimestamp = element.getTimestamp();
                if (recordTimestamp > newTimestamp) {
                newTimestamp = recordTimestamp;
                }
                return newTimestamp;
            }
        })
        .keyBy(new KeySelector<SourceDataScheme, Tuple2<String, String>>() {
        @Override
        public Tuple2<String, String> getKey(SourceDataScheme sourceDataScheme) throws Exception {
        	return new Tuple2<>(sourceDataScheme.getPlant(), sourceDataScheme.getDevice());
        }})
        .process(new KeyedProcessFunction<Tuple2<String, String>, SourceDataScheme, SourceDataScheme>() {
            Boolean firstValue = true;
             @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<SourceDataScheme> out) throws Exception {
            	System.out.println("定时触发,当前key = " + ctx.getCurrentKey().f0 + ", " + ctx.getCurrentKey().f1);
  				context.timerService().registerEventTimeTimer(timestamp+86400*1000L);
            }
            @Override
            public void processElement(SourceDataScheme sourceDataScheme, Context context, Collector<SourceDataScheme> collector) throws Exception {
                if (firstValue) {
                    						context.timerService().registerEventTimeTimer(sourceDataScheme.getTimestamp() + 10);
                System.out.println("做了数据" + sourceDataScheme + "的定时");
                firstValue = false;
                }
            }
        }).setParallelism(2);

        return resultDataStream;
    }
}
复制代码

但是使用测试数据进行测试时,控制台结果输出如下:

//在输入timestamp=1603788065000的数据后的输出
做了数据SourceDataScheme(plant=MS003, values=1, device=DXL480, timestamp=1603788065000)的定时
做了数据SourceDataScheme(plant=MS001, values=2, device=DGS301, timestamp=1603788065000)的定时
//在输入剩下的数据后的输出
定时触发,当前key = MS001, DGS301
定时触发,当前key = MS003, DXL480
复制代码

可以看到其余数据分组并没有做定时,因为并行度为2,所以前两条数据被分配到了两个task中。这时的变量Boolean firstValue = true;是在task中的变量。其余的数据分组被分到这个task中,因为此时task中的firstValue的值已经被赋值为false,所以不能再做定时;

正确做法-使用StateValue

思路:使用StateValue替换变量

import com.leayun.merak.po.source.SourceDataScheme;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;

/**
* @author DSH12138
* @since 2020-11-02
* 测试keyedProcessFunction中ValueState<Boolean>和Boolean的区别
*/
public class KeyedHandle {

    public DataStream<SourceDataScheme> keyedProcess(DataStream<SourceDataScheme> sourceDataSchemeDataStream) {

        SingleOutputStreamOperator<SourceDataScheme> resultDataStream = sourceDataSchemeDataStream
        //设置实时的watermark
        .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<SourceDataScheme>() {
            @Nullable
            @Override
            public Watermark checkAndGetNextWatermark(SourceDataScheme lastElement, long extractedTimestamp) { 
                long newTimestamp = lastElement.getTimestamp();
                if (extractedTimestamp > newTimestamp) {
                    newTimestamp = extractedTimestamp;
                }
                return new Watermark(newTimestamp);
            }

            @Override
            public long extractTimestamp(SourceDataScheme element, long recordTimestamp) {
                long newTimestamp = element.getTimestamp();
                if (recordTimestamp > newTimestamp) {
                    newTimestamp = recordTimestamp;
                }
                return newTimestamp;
            }
        })
        .keyBy(new KeySelector<SourceDataScheme, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> getKey(SourceDataScheme sourceDataScheme) throws Exception {
                return new Tuple2<>(sourceDataScheme.getPlant(), sourceDataScheme.getDevice());
            }
         })
         .process(new KeyedProcessFunction<Tuple2<String, String>, SourceDataScheme, SourceDataScheme>() {
            ValueState<Boolean> firstState;
            @Override
            public void open(Configuration parameters) throws Exception {
                firstState = getRuntimeContext().getState(new ValueStateDescriptor<>("first-value", Boolean.class));
            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<SourceDataScheme> out) throws Exception {
                System.out.println("定时触发,当前key = " + ctx.getCurrentKey().f0 + ", " + ctx.getCurrentKey().f1);
                ctx.timerService().registerEventTimeTimer(timestamp+86400*1000L);
            }

            @Override
            public void processElement(SourceDataScheme sourceDataScheme, Context context, Collector<SourceDataScheme> collector) throws Exception {
                if( firstState.value() == null ) {
                    context.timerService().registerEventTimeTimer( sourceDataScheme.getTimestamp() + 10 );
                    firstState.update(false);
                }
            }
        }).setParallelism(2);

        return resultDataStream;
    }
}
复制代码

总结

在KeyedProcessFunction中设置的变量,与其并行度有关,每个task中维护一个变量值,因为会有多个数据分组进入一个task计算的情况,所以会造成前面进入的分组修改变量,导致后面进入的分组无法进行相应的计算;

而在KeyedProcessFunction中使用ValueState,其值与分组一一对应,每一个分组都会维护一个状态,在Flink的checkpoint时也可以进行状态的暂存。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享