Flink从无到有(2)

这是我参与更文挑战的第2天,活动详情查看: 更文挑战

Flink API详解&实操

https://s3-us-west-2.amazonaws.com/secure.notion-static.com/51f722a1-f8fe-4de7-bd4b-d1ef3ecea0a9/ozmU46.jpg


  • Stateful Stream Processing

最低级的抽象接口是状态化的数据流接口(stateful streaming)。这个接口是通过 ProcessFunction 集成到 DataStream API 中的。该接口允许用户自由的处理来自一个或多个流中的事件,并使用一致的容错状态。另外,用户也可以通过注册event time 和 processing time 处理回调函数的方法来实现复杂的计算

  • DataStream/DataSet API

DataStream / DataSet API 是 Flink 提供的核心 API ,DataSet 处理有界的数据集,DataStream 处理有界或者无界的数据流。用户可以通过各种方法(map /flatmap / window / keyby / sum / max /min / avg / join 等)将数据进行转换 / 计算

  • Table API

Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起来却更加简洁,可以在表与 DataStream/DataSet 之间无缝切换,也允许程序将 Table API 与DataStream 以及 DataSet 混合使用

  • SQL

Flink 提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行

Dataflows数据流图

在Flink的世界观中,一切都是数据流,所以对于批计算来说,那只是流计算的一个特例而已

Flink Dataflflows是由三部分组成,分别是:source、transformation、sink结束

source数据源会源源不断的产生数据,transformation将产生的数据进行各种业务逻辑的数据处理,最终由sink输出到外部(console、kafka、redis、DB……)

基于Flink开发的程序都能够映射成一个dataflows

https://s3-us-west-2.amazonaws.com/secure.notion-static.com/979f0960-5bd0-4340-b2a9-328be870db08/u3RagR.jpg

Dataflows Transformations

FlatMap

DataStream → DataStream
遍历数据流中的每一个元素,产生N个元素 N=0,1,2


/**
 * flatMap Demo
 * DataStream → DataStream
 * 遍历数据流中的每一个元素,产生N个元素 N=0,1,2
 */
public class FlinkDemo0001 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment=
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream= environment
                .socketTextStream("192.168.88.180",8888);
        DataStream<User> newStream= stream
                .flatMap(new FlatMapFunction<String, User>() {
            @Override
            public void flatMap(String s, Collector<User> collector)  {
                try {
                    String[] sp=s.split(",");
                    collector.collect(User.builder()
                            .id(Long.parseLong(sp[0]))
                            .name(sp[1])
                            .age(sp[2])
                            .build());
                }catch (Exception ex){
                    System.out.println(ex);
                }
            }
        }).setParallelism(2);

        newStream.print();
        environment.execute("flatMap Demo");

    }
}
/*User Model*/
@Data
@Builder
public class User {
    private  Long id;
    private  String name;
    private  String age;
}

复制代码
测试数据
### 控制台 
nc -lk  8888
###输入参数
1,1,1
2,2,2
,3,3,3
3,3,3
#### sout
User(id=1, name=1, age=1)
User(id=2, name=2, age=2)
java.lang.NumberFormatException: For input string: ""
User(id=3, name=3, age=3)

复制代码

Filter

DataStream → DataStream
过滤算子 根据数据流的元素计算出一个boolean类型的值,true代表保留,false代表过滤掉

/**
 * filter Demo
 * DataStream → DataStream
 * 过滤算子,根据数据流的元素计算出一个boolean类型的值,true代表保留,false代表过滤掉
 */
public class FlinkDemo0002 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment=
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream= environment
                .socketTextStream("192.168.88.180",8888);
        DataStream<User> newStream= stream
                .flatMap(new FlatMapFunction<String, User>() {
                    @Override
                    public void flatMap(String s, Collector<User> collector)  {
                        try {
                            String[] sp=s.split(",");
                            collector.collect(User.builder()
                                    .id(Long.parseLong(sp[0]))
                                    .name(sp[1])
                                    .age(sp[2])
                                    .build());
                        }catch (Exception ex){
                            System.out.println(ex);
                        }
                    }
                })
                .filter(new RichFilterFunction<User>() {
                    @Override
                    public boolean filter(User user) throws Exception {
                        /**只处理age 大于25的参数*/
                        if (Integer.parseInt(user.getAge())>25){
                            return true;
                        }
                        return false;
                    }
                }).setParallelism(2);

        newStream.print();
        environment.execute("filter demo");

    }
}
复制代码
### 控制台 
nc -lk  8888
###输入参数
1,huang,26
2,kun,24
3,jie,23
4,la,29
#### sout
 User(id=1, name=huang, age=26)
 User(id=4, name=la, age=29)
## 可以看出age 小于25的都被过滤掉了
复制代码

keyBy

(一般配合分组计算)

DataStream → KeyedStream
根据数据流中指定的字段来分区,相同指定字段值的数据一定是在同一个分区中,内部分区使用的是HashPartitioner

指定分区字段的方式有三种:

  1. 根据索引号指定
  2. 通过匿名函数来指定
  3. 通过实现KeySelector接口指定分区字段
public class FlinkDemo0003_1 {
    public static void main(String[] args) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
        StreamExecutionEnvironment environment=
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream= environment
                .socketTextStream("192.168.88.180",8888);
     DataStream <CoffeeShop> dataStream=   stream.flatMap(new RichFlatMapFunction<String, CoffeeShop>() {
         @Override
         public void flatMap(String s, Collector<CoffeeShop> collector) {
             try{
                 String [] sp= s.split(",");
                 collector.collect(CoffeeShop.builder()
                         .id(Long.parseLong(sp[0]))
                         .date(format.parse(sp[1]))
                         .sales(Integer.parseInt(sp[2]))
                         .build());
             }catch (Exception ex){
                 System.out.println(ex);
             }

         }
     })           /** 依据id分组*/
                .keyBy(CoffeeShop::getId)
                 /** 对销量进行求和*/
                .sum("sales");
        dataStream.print();
        environment.execute("keyBy &  sum demo");
    }
}
复制代码

### 控制台 
nc -lk  8888
###输入参数
1,2021-10-1,1
2,2021-10-1,1
3,2021-10-1,1
1,2021-10-2,2
2,2021-10-2,2
3,2021-10-2,2
1,2021-10-3,1
2,2021-10-3,2
3,2021-10-3,3
#### sout
 User(id=1, name=huang, age=26)
 User(id=4, name=la, age=29)
## 可以看出每次id相同的sales 都会被累加 id 相同的最后一个sales  必是总和
CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=1)
CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=2)
CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=4)
CoffeeShop(id=2, date=Fri Oct 01 00:00:00 CST 2021, sales=1)
CoffeeShop(id=3, date=Sat Oct 02 00:00:00 CST 2021, sales=2)
CoffeeShop(id=3, date=Sat Oct 02 00:00:00 CST 2021, sales=3)
CoffeeShop(id=2, date=Fri Oct 01 00:00:00 CST 2021, sales=3)
CoffeeShop(id=2, date=Fri Oct 01 00:00:00 CST 2021, sales=5)
CoffeeShop(id=3, date=Sat Oct 02 00:00:00 CST 2021, sales=6)
复制代码

Reduce

KeyedStream:根据key分组 → DataStream
reduce是基于分区后的流对象进行聚合,也就是说,DataStream类型的对象无法调用reduce方法(必须先进行分组)

public class FlinkDemo0004 {
    public static void main(String[] args) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = environment
                .socketTextStream("192.168.88.180", 8888);
        DataStream dataStream = stream.
                flatMap(new RichFlatMapFunction<String, CoffeeShop>() {
            @Override
            public void flatMap(String s, Collector<CoffeeShop> collector) {
                try {
                    String[] sp = s.split(",");
                    collector.collect(CoffeeShop.builder()
                            .id(Long.parseLong(sp[0]))
                            .date(format.parse(sp[1]))
                            .sales(Integer.parseInt(sp[2]))
                            .build());
                } catch (Exception ex) {
                    System.out.println(ex);
                }

            }
        })        /** 依据id分组*/
                .keyBy(CoffeeShop::getId).reduce(new ReduceFunction<CoffeeShop>() {
                    @Override

                    public CoffeeShop reduce(CoffeeShop coffeeShop, CoffeeShop t1)
 throws Exception {
                        /***
                         * 聚合 分区后 只保留销量大
                         */
                        return new CoffeeShop(coffeeShop.getId(),
                                coffeeShop.getDate(),
                                Math.max(coffeeShop.getSales(),
                                t1.getSales())
                        );
                    }
                });
        dataStream.print().setParallelism(1);
        environment.execute("reduce demo");
    }
}
复制代码

Aggregations

KeyedStream → DataStream
Aggregations代表的是一类聚合算子,具体算子如下:

  • sum
  • min
  • max
  • minBy
  • maxBy

union 真合并

DataStream* → DataStream
合并两个或者更多的数据流产生一个新的数据流,这个新的数据流中包含了所合并的数据流的元素
注意:需要保证数据流中元素类型一致

20200515002416335509.png

/**
 * union Demo
 * DataStream * → DataStream
 * 合并两个或者更多的数据流产生一个新的数据流,这个新的数据流中包含了所合并的数据流的元素
 * 注意:需要保证数据流中元素类型一致
 */
public class FlinkDemo0006 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream1 = environment
                .socketTextStream("192.168.88.180", 8888);
        DataStreamSource<String> stream2 = environment
                .socketTextStream("192.168.88.181", 8888);
        stream1.union(stream2).print();
        environment.execute("union demo");
    }
}
复制代码
### 控制台 192.168.88.180
nc -lk  8888
###输入参数
1,2021-10-1,1
### 控制台 192.168.88.181
nc -lk  8888
###输入参数
2,2021-10-1,1
#### sout
1,2021-10-1,1
2,2021-10-1,1
## 可以看出两个流被合并为了一个流进行处理
复制代码

Connect 假合并

DataStream,DataStream → ConnectedStreams
合并两个数据流并且保留两个数据流的数据类型,能够共享两个流的状态

20200515002416221258.png

public class FlinkDemo0007 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment =
           StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream1 = environment
                .socketTextStream("192.168.88.180", 8888);
        DataStreamSource<String> stream2 = environment
                .socketTextStream("192.168.88.181", 8888);
        ConnectedStreams connectedStreams = stream1.connect(stream2);
        connectedStreams.getFirstInput().print();
        connectedStreams.getSecondInput().print();
        DataStream dataStream = connectedStreams
                       .map(new RichCoMapFunction<String, String, String>() {
            @Override
            public String map1(String s) throws Exception {
                return s;
            }

            @Override
            public String map2(String s) throws Exception {
                return s;
            }
        });
        environment.execute("connect demo");
    }
}
复制代码
### 控制台 192.168.88.180
nc -lk  8888
###输入参数
1,2021-10-1,1
### 控制台 192.168.88.181
nc -lk  8888
###输入参数
2,2021-10-1,1
#### sout
1,2021-10-1,1
2,2021-10-1,1
## 可以看出两个流被合并为了一个流进行处理 但是在处理上还是得对两个流进行分别处理

复制代码

问题:和Connect的区别?

  1. Connect 的数据类型可以不同,Connect 只能合并两个流;
  2. Union可以合并多条流,Union的数据结构必须是一样的;

CoMap, CoFlatMap

ConnectedStreams → DataStream
CoMap, CoFlatMap并不是具体算子名字,而是一类操作名称
凡是基于ConnectedStreams数据流做map遍历,这类操作叫做CoMap
凡是基于ConnectedStreams数据流做flatMap遍历,这类操作叫做CoFlatMap

public class FlinkDemo0008 {

    public static void main(String[] args) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream1 = environment
                .socketTextStream("192.168.88.180", 8888);
        DataStreamSource<String> stream2 = environment
                .socketTextStream("192.168.88.181", 8888);
        ConnectedStreams connectedStreams = stream1.connect(stream2);
        DataStream dataStream = connectedStreams.flatMap(new RichCoFlatMapFunction<String,String, CoffeeShop>() {

            @Override
            public void flatMap1(String s, Collector<CoffeeShop> collector) 
                    throws Exception {
             String[] sp=s.split(",");
                collector.collect(CoffeeShop.builder()
                        .id(Long.parseLong(sp[0]))
                        .date(format.parse(sp[2]))
                        .sales(Integer.parseInt(sp[3]))
                        .build());
            }
            @Override
            public void flatMap2(String s, Collector<CoffeeShop> collector) 
                    throws Exception {
                String[] sp=s.split(",");
                collector.collect(CoffeeShop.builder()
                        .id(Long.parseLong(sp[0]))
                        .date(format.parse(sp[1]))
                        .sales(Integer.parseInt(sp[2]))
                        .build());
            }
        });
        dataStream.print(">>>>>>>").setParallelism(1);
        environment.execute("connect demo");
    }
    }
复制代码

### 控制台 192.168.88.180
nc -lk  8888
###输入参数
1,统一优乐美,2021-10-1,1
### 控制台 192.168.88.181
nc -lk  8888
###输入参数
1,2021-10-1,1
#### sout
>>>>>>>> CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=1)
>>>>>>>> CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=1)
## 可以看出两个流被合并为了一个流进行处理 但是在处理上还是得对两个流进行分别处理
##coFlapMap/coMap 可以将不完全合并的两个流合并为一个流进行相应的处理

复制代码

Split(2.12 已删)

DataStream → SplitStream

根据条件将一个流分成两个或者更多的流

Select(2.12 已删)

SplitStream → DataStream

从SplitStream中选择一个或者多个数据流

side output侧输出流

流计算过程,可能遇到根据不同的条件来分隔数据流。filter分割造成不必要的数据复制

public class FlinkDemo0009 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = environment
                .socketTextStream("192.168.88.180", 8888);
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");

        final OutputTag<CoffeeShop> outputTag = new OutputTag<CoffeeShop>("side-output") {
        };
        SingleOutputStreamOperator dataStream = stream

                .flatMap(new RichFlatMapFunction<String, CoffeeShop>() {

                    @Override
                    public void flatMap(String s, Collector<CoffeeShop> collector) {
                        try {
                            String sp[] = s.split(",");

                            collector.collect(CoffeeShop.builder()
                                    .id(Long.parseLong(sp[0]))
                                    .date(format.parse(sp[1]))
                                    .sales(Integer.parseInt(sp[2]))
                                    .build());
                        } catch (Exception ex) {

                            System.out.println(ex);

                        }
                    }
                })
                .process(new ProcessFunction<CoffeeShop, CoffeeShop>() {

                    @Override
                    public void processElement(CoffeeShop s, Context context, Collector<CoffeeShop> collector) {
                        try {
                            if (s.getId() % 2 == 0) {
                                collector.collect(s);
                            } else {
                                context.output(outputTag, s);
                            }

                        } catch (Exception ex) {
                            System.out.println(ex);

                        }
                    }
                });

        dataStream.getSideOutput(outputTag).print("outputTag>>>>>>>>>>>>>");
        dataStream.print("stream>>>>>>>>>>>>>");

        environment.execute("process demo");
    }
}
复制代码
### 控制台 192.168.88.180
nc -lk  8888
###输入参数
1,2021-10-1,1
2,2021-10-1,2
3,2021-10-1,3
4,2021-10-2,4
5,2021-10-2,5
6,2021-10-2,6
7,2021-10-3,7
8,2021-10-3,8
9,2021-10-3,9

#### sout
outputTag>>>>>>>>>>>>>:1> CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=1)
outputTag>>>>>>>>>>>>>:3> CoffeeShop(id=3, date=Fri Oct 01 00:00:00 CST 2021, sales=3)
stream>>>>>>>>>>>>>:4> CoffeeShop(id=4, date=Sat Oct 02 00:00:00 CST 2021, sales=4)
stream>>>>>>>>>>>>>:2> CoffeeShop(id=2, date=Fri Oct 01 00:00:00 CST 2021, sales=2)
outputTag>>>>>>>>>>>>>:3> CoffeeShop(id=7, date=Sun Oct 03 00:00:00 CST 2021, sales=7)
stream>>>>>>>>>>>>>:4> CoffeeShop(id=8, date=Sun Oct 03 00:00:00 CST 2021, sales=8)
stream>>>>>>>>>>>>>:2> CoffeeShop(id=6, date=Sat Oct 02 00:00:00 CST 2021, sales=6)
outputTag>>>>>>>>>>>>>:1> CoffeeShop(id=5, date=Sat Oct 02 00:00:00 CST 2021, sales=5)
outputTag>>>>>>>>>>>>>:1> CoffeeShop(id=9, date=Sun Oct 03 00:00:00 CST 2021, sales=9)
## 可以看出流被切分成为了两个流  通过id 奇偶
复制代码

Iterate(比较重要)

DataStream → IterativeStream → DataStream
Iterate算子提供了对数据流迭代的支持
迭代由两部分组成:迭代体、终止迭代条件
不满足终止迭代条件的数据流会返回到stream流中,进行下一次迭代
满足终止迭代条件的数据流继续往下游发送


public class FlinkDemo0010 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = environment
                .socketTextStream("192.168.88.180", 8888);
        IterativeStream<Long> iterate = stream
                .map(new RichMapFunction<String, Long>() {
                    @Override
                    public Long map(String s) {
                        return Long.parseLong(s);
                    }
                }).iterate();

        DataStream<Long> feedback = iterate
                .map(new RichMapFunction<Long, Long>() {

                    @Override
                    public Long map(Long aLong) throws Exception {

                        return aLong > 2 ? aLong : (aLong + 1);
                    }
                }).filter(new FilterFunction<Long>() {
                    @Override
                    public boolean filter(Long s) throws Exception {
                        if (s > 2) {
                            return false;
                        } else {
                            return true;
                        }
                    }
                });
        iterate.closeWith(feedback).print("feedback");
        SingleOutputStreamOperator<Long> result = iterate.
filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long value) throws Exception {
                return value >= 2;
            }
        });
        result.print("result:");
        environment.execute("iterate demo");
    }
}

复制代码
### 控制台 192.168.88.180
nc -lk  8888
###输入参数
1
2
3
4
5
#### sout
feedback> 2
result> 2
result> 2
result> 3
result> 4
result> 5
#可以看出 数字为1 不满足跳出的条件会返回map迭代 到满足条件才跳出
复制代码

函数类和富函数类

在使用Flink算子的时候,可以通过传入匿名函数和函数类对象

函数类分为:普通函数类、富函数类**(自行划分)**

富函数类相比于普通的函数,可以获取运行环境的上下文(Context),拥有一些生命周期方法,管理状态,可以实现更加复杂的功能

函数类分类

https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/942f06e2b1e94e1fbd71e2a6dd7e5076~tplv-k3u1fbpfcp-zoom-1.image

富函数类由于继承了AbstractRichFunction 可以适应AbstractRichFunction中的方法获取上下文及完成一些特殊操作

public class FlinkDemo0011 {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = environment
                .socketTextStream("192.168.88.180", 8888);
         stream.flatMap(new RichFlatMapFunction<String, Long>() {
             @Override
             public void flatMap(String s, Collector<Long> collector) throws Exception {
                 collector.collect(Long.parseLong(s));
             }
         }).filter(new RichFilterFunction<Long>() {
             @Override
             public void open(Configuration parameters)  {
                 /**
                  * 由AbstractRichFunction提供的getRuntimeContext()方法
                  * 可以获取运行时候的上下文信息
                  * **/
                 System.out.println(getRuntimeContext().getTaskName());
                 System.out.println(getRuntimeContext().getTaskNameWithSubtasks());
             }
            @Override
            public boolean filter(Long s)  {
                return s<100;
            }
         }).print();

        environment.execute("demo");
    }
}
复制代码

底层API(ProcessFunctionAPI)

属于低层次的API,我们前面讲的map、filter、flatMap等算子都是基于这层高层封装出来的
越低层次的API,功能越强大,用户能够获取的信息越多,比如可以拿到元素状态信息、事件时间、设置定时器 等


public class FlinkDemo0012 {

    public static void main(String[] args) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");

        StreamExecutionEnvironment environment =
                StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = environment
                .socketTextStream("192.168.88.180", 8888);
        final OutputTag<CoffeeShop> outputTag = new OutputTag<CoffeeShop>("output"){};
        SingleOutputStreamOperator dataStream = stream.
                flatMap(new RichFlatMapFunction<String, CoffeeShop>() {
                    @Override
                    public void flatMap(String s, Collector<CoffeeShop> collector) {
                        try {
                            String[] sp = s.split(",");
                            collector.collect(CoffeeShop.builder()
                                    .id(Long.parseLong(sp[0]))
                                    .date(format.parse(sp[1]))
                                    .sales(Integer.parseInt(sp[2]))
                                    .build());
                        } catch (Exception ex) {
                            System.out.println(ex);
                        }

                    }
                })        /** 依据id分组*/
                .keyBy(CoffeeShop::getId)
                .process(new KeyedProcessFunction<Long, CoffeeShop, String>() {

                    @Override
                    public void processElement(
                            CoffeeShop coffeeShop,
                            Context context,
                            Collector<String> collector)  {
                        try{
                            /**当前处理时间*/
                            long currentTime=  context.timerService()
                                    .currentProcessingTime();
                            if(coffeeShop.getSales()>100){
                                 long timerTime = currentTime + 3 * 1000;
                                 context.timerService()
                                         .registerProcessingTimeTimer(timerTime);
                            }else{
                                collector.collect(coffeeShop
                                        .toString()+"未到达100");
                            }

                        }
                        catch (Exception ex){
                            System.out.println(ex);
                        }
                    }
                    /**设置时钟时间*/
                    @Override
                    public void onTimer(
																			 long timestamp,
																			 OnTimerContext ctx,
																			 Collector<String> out
)  {
                        out.collect( ctx.getCurrentKey()+"销量到达:100");
                    }
                });
          dataStream.print("dataStream").setParallelism(1);

        environment.execute("process demo");
    }

}
复制代码

### 控制台 192.168.88.180
nc -lk  8888
###输入参数
1,2021-10-1,99
2,2021-10-1,1011
3,2021-10-1,100
4,2021-10-2,88
5,2021-10-2,101
6,2021-10-2,102
#### sout
dataStream> CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=99)未到达100
dataStream> CoffeeShop(id=4, date=Sat Oct 02 00:00:00 CST 2021, sales=88)未到达100
dataStream> CoffeeShop(id=3, date=Fri Oct 01 00:00:00 CST 2021, sales=100)未到达100
dataStream> 2销量到达:100  3秒后到达
dataStream> 5销量到达:100  3秒后到达
dataStream> 6销量到达:100  3秒后到达

复制代码

总结

使用Map Filter….算子的适合,可以直接传入一个匿名函数、

普通函数类对象(MapFuncation FilterFunction)

富函数类对象(RichMapFunction、RichFilterFunction)

传入的富函数类对象:可以拿到任务执行的上下文,生命周期方法、管理状态…..

如果业务比较复杂,通过Flink提供这些算子无法满足我们的需求,通过process算子直接使用比较底层API(使用这套API 上下文、生命周期方法、测输出流、时间服务)

KeyedDataStream调用process KeyedProcessFunction

DataStream调用process ProcessFunction

具体写代码的适合,看提示就行

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享