这是我参与更文挑战的第2天,活动详情查看: 更文挑战
Flink API详解&实操
- Stateful Stream Processing
最低级的抽象接口是状态化的数据流接口(stateful streaming)。这个接口是通过 ProcessFunction 集成到 DataStream API 中的。该接口允许用户自由的处理来自一个或多个流中的事件,并使用一致的容错状态。另外,用户也可以通过注册event time 和 processing time 处理回调函数的方法来实现复杂的计算
- DataStream/DataSet API
DataStream / DataSet API 是 Flink 提供的核心 API ,DataSet 处理有界的数据集,DataStream 处理有界或者无界的数据流。用户可以通过各种方法(map /flatmap / window / keyby / sum / max /min / avg / join 等)将数据进行转换 / 计算
- Table API
Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起来却更加简洁,可以在表与 DataStream/DataSet 之间无缝切换,也允许程序将 Table API 与DataStream 以及 DataSet 混合使用
- SQL
Flink 提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行
Dataflows数据流图
在Flink的世界观中,一切都是数据流,所以对于批计算来说,那只是流计算的一个特例而已
Flink Dataflflows是由三部分组成,分别是:source、transformation、sink结束
source数据源会源源不断的产生数据,transformation将产生的数据进行各种业务逻辑的数据处理,最终由sink输出到外部(console、kafka、redis、DB……)
基于Flink开发的程序都能够映射成一个dataflows
Dataflows Transformations
FlatMap
DataStream → DataStream
遍历数据流中的每一个元素,产生N个元素 N=0,1,2
/**
* flatMap Demo
* DataStream → DataStream
* 遍历数据流中的每一个元素,产生N个元素 N=0,1,2
*/
public class FlinkDemo0001 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment=
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream= environment
.socketTextStream("192.168.88.180",8888);
DataStream<User> newStream= stream
.flatMap(new FlatMapFunction<String, User>() {
@Override
public void flatMap(String s, Collector<User> collector) {
try {
String[] sp=s.split(",");
collector.collect(User.builder()
.id(Long.parseLong(sp[0]))
.name(sp[1])
.age(sp[2])
.build());
}catch (Exception ex){
System.out.println(ex);
}
}
}).setParallelism(2);
newStream.print();
environment.execute("flatMap Demo");
}
}
/*User Model*/
@Data
@Builder
public class User {
private Long id;
private String name;
private String age;
}
复制代码
测试数据
### 控制台
nc -lk 8888
###输入参数
1,1,1
2,2,2
,3,3,3
3,3,3
#### sout
User(id=1, name=1, age=1)
User(id=2, name=2, age=2)
java.lang.NumberFormatException: For input string: ""
User(id=3, name=3, age=3)
复制代码
Filter
DataStream → DataStream
过滤算子 根据数据流的元素计算出一个boolean类型的值,true代表保留,false代表过滤掉
/**
* filter Demo
* DataStream → DataStream
* 过滤算子,根据数据流的元素计算出一个boolean类型的值,true代表保留,false代表过滤掉
*/
public class FlinkDemo0002 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment=
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream= environment
.socketTextStream("192.168.88.180",8888);
DataStream<User> newStream= stream
.flatMap(new FlatMapFunction<String, User>() {
@Override
public void flatMap(String s, Collector<User> collector) {
try {
String[] sp=s.split(",");
collector.collect(User.builder()
.id(Long.parseLong(sp[0]))
.name(sp[1])
.age(sp[2])
.build());
}catch (Exception ex){
System.out.println(ex);
}
}
})
.filter(new RichFilterFunction<User>() {
@Override
public boolean filter(User user) throws Exception {
/**只处理age 大于25的参数*/
if (Integer.parseInt(user.getAge())>25){
return true;
}
return false;
}
}).setParallelism(2);
newStream.print();
environment.execute("filter demo");
}
}
复制代码
### 控制台
nc -lk 8888
###输入参数
1,huang,26
2,kun,24
3,jie,23
4,la,29
#### sout
User(id=1, name=huang, age=26)
User(id=4, name=la, age=29)
## 可以看出age 小于25的都被过滤掉了
复制代码
keyBy
(一般配合分组计算)
DataStream → KeyedStream
根据数据流中指定的字段来分区,相同指定字段值的数据一定是在同一个分区中,内部分区使用的是HashPartitioner
指定分区字段的方式有三种:
- 根据索引号指定
- 通过匿名函数来指定
- 通过实现KeySelector接口指定分区字段
public class FlinkDemo0003_1 {
public static void main(String[] args) throws Exception {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
StreamExecutionEnvironment environment=
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream= environment
.socketTextStream("192.168.88.180",8888);
DataStream <CoffeeShop> dataStream= stream.flatMap(new RichFlatMapFunction<String, CoffeeShop>() {
@Override
public void flatMap(String s, Collector<CoffeeShop> collector) {
try{
String [] sp= s.split(",");
collector.collect(CoffeeShop.builder()
.id(Long.parseLong(sp[0]))
.date(format.parse(sp[1]))
.sales(Integer.parseInt(sp[2]))
.build());
}catch (Exception ex){
System.out.println(ex);
}
}
}) /** 依据id分组*/
.keyBy(CoffeeShop::getId)
/** 对销量进行求和*/
.sum("sales");
dataStream.print();
environment.execute("keyBy & sum demo");
}
}
复制代码
### 控制台
nc -lk 8888
###输入参数
1,2021-10-1,1
2,2021-10-1,1
3,2021-10-1,1
1,2021-10-2,2
2,2021-10-2,2
3,2021-10-2,2
1,2021-10-3,1
2,2021-10-3,2
3,2021-10-3,3
#### sout
User(id=1, name=huang, age=26)
User(id=4, name=la, age=29)
## 可以看出每次id相同的sales 都会被累加 id 相同的最后一个sales 必是总和
CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=1)
CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=2)
CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=4)
CoffeeShop(id=2, date=Fri Oct 01 00:00:00 CST 2021, sales=1)
CoffeeShop(id=3, date=Sat Oct 02 00:00:00 CST 2021, sales=2)
CoffeeShop(id=3, date=Sat Oct 02 00:00:00 CST 2021, sales=3)
CoffeeShop(id=2, date=Fri Oct 01 00:00:00 CST 2021, sales=3)
CoffeeShop(id=2, date=Fri Oct 01 00:00:00 CST 2021, sales=5)
CoffeeShop(id=3, date=Sat Oct 02 00:00:00 CST 2021, sales=6)
复制代码
Reduce
KeyedStream:根据key分组 → DataStream
reduce是基于分区后的流对象进行聚合,也就是说,DataStream类型的对象无法调用reduce方法(必须先进行分组)
public class FlinkDemo0004 {
public static void main(String[] args) throws Exception {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream = environment
.socketTextStream("192.168.88.180", 8888);
DataStream dataStream = stream.
flatMap(new RichFlatMapFunction<String, CoffeeShop>() {
@Override
public void flatMap(String s, Collector<CoffeeShop> collector) {
try {
String[] sp = s.split(",");
collector.collect(CoffeeShop.builder()
.id(Long.parseLong(sp[0]))
.date(format.parse(sp[1]))
.sales(Integer.parseInt(sp[2]))
.build());
} catch (Exception ex) {
System.out.println(ex);
}
}
}) /** 依据id分组*/
.keyBy(CoffeeShop::getId).reduce(new ReduceFunction<CoffeeShop>() {
@Override
public CoffeeShop reduce(CoffeeShop coffeeShop, CoffeeShop t1)
throws Exception {
/***
* 聚合 分区后 只保留销量大
*/
return new CoffeeShop(coffeeShop.getId(),
coffeeShop.getDate(),
Math.max(coffeeShop.getSales(),
t1.getSales())
);
}
});
dataStream.print().setParallelism(1);
environment.execute("reduce demo");
}
}
复制代码
Aggregations
KeyedStream → DataStream
Aggregations代表的是一类聚合算子,具体算子如下:
- sum
- min
- max
- minBy
- maxBy
union 真合并
DataStream* → DataStream
合并两个或者更多的数据流产生一个新的数据流,这个新的数据流中包含了所合并的数据流的元素
注意:需要保证数据流中元素类型一致
/**
* union Demo
* DataStream * → DataStream
* 合并两个或者更多的数据流产生一个新的数据流,这个新的数据流中包含了所合并的数据流的元素
* 注意:需要保证数据流中元素类型一致
*/
public class FlinkDemo0006 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream1 = environment
.socketTextStream("192.168.88.180", 8888);
DataStreamSource<String> stream2 = environment
.socketTextStream("192.168.88.181", 8888);
stream1.union(stream2).print();
environment.execute("union demo");
}
}
复制代码
### 控制台 192.168.88.180
nc -lk 8888
###输入参数
1,2021-10-1,1
### 控制台 192.168.88.181
nc -lk 8888
###输入参数
2,2021-10-1,1
#### sout
1,2021-10-1,1
2,2021-10-1,1
## 可以看出两个流被合并为了一个流进行处理
复制代码
Connect 假合并
DataStream,DataStream → ConnectedStreams
合并两个数据流并且保留两个数据流的数据类型,能够共享两个流的状态
public class FlinkDemo0007 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream1 = environment
.socketTextStream("192.168.88.180", 8888);
DataStreamSource<String> stream2 = environment
.socketTextStream("192.168.88.181", 8888);
ConnectedStreams connectedStreams = stream1.connect(stream2);
connectedStreams.getFirstInput().print();
connectedStreams.getSecondInput().print();
DataStream dataStream = connectedStreams
.map(new RichCoMapFunction<String, String, String>() {
@Override
public String map1(String s) throws Exception {
return s;
}
@Override
public String map2(String s) throws Exception {
return s;
}
});
environment.execute("connect demo");
}
}
复制代码
### 控制台 192.168.88.180
nc -lk 8888
###输入参数
1,2021-10-1,1
### 控制台 192.168.88.181
nc -lk 8888
###输入参数
2,2021-10-1,1
#### sout
1,2021-10-1,1
2,2021-10-1,1
## 可以看出两个流被合并为了一个流进行处理 但是在处理上还是得对两个流进行分别处理
复制代码
问题:和Connect的区别?
- Connect 的数据类型可以不同,Connect 只能合并两个流;
- Union可以合并多条流,Union的数据结构必须是一样的;
CoMap, CoFlatMap
ConnectedStreams → DataStream
CoMap, CoFlatMap并不是具体算子名字,而是一类操作名称
凡是基于ConnectedStreams数据流做map遍历,这类操作叫做CoMap
凡是基于ConnectedStreams数据流做flatMap遍历,这类操作叫做CoFlatMap
public class FlinkDemo0008 {
public static void main(String[] args) throws Exception {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream1 = environment
.socketTextStream("192.168.88.180", 8888);
DataStreamSource<String> stream2 = environment
.socketTextStream("192.168.88.181", 8888);
ConnectedStreams connectedStreams = stream1.connect(stream2);
DataStream dataStream = connectedStreams.flatMap(new RichCoFlatMapFunction<String,String, CoffeeShop>() {
@Override
public void flatMap1(String s, Collector<CoffeeShop> collector)
throws Exception {
String[] sp=s.split(",");
collector.collect(CoffeeShop.builder()
.id(Long.parseLong(sp[0]))
.date(format.parse(sp[2]))
.sales(Integer.parseInt(sp[3]))
.build());
}
@Override
public void flatMap2(String s, Collector<CoffeeShop> collector)
throws Exception {
String[] sp=s.split(",");
collector.collect(CoffeeShop.builder()
.id(Long.parseLong(sp[0]))
.date(format.parse(sp[1]))
.sales(Integer.parseInt(sp[2]))
.build());
}
});
dataStream.print(">>>>>>>").setParallelism(1);
environment.execute("connect demo");
}
}
复制代码
### 控制台 192.168.88.180
nc -lk 8888
###输入参数
1,统一优乐美,2021-10-1,1
### 控制台 192.168.88.181
nc -lk 8888
###输入参数
1,2021-10-1,1
#### sout
>>>>>>>> CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=1)
>>>>>>>> CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=1)
## 可以看出两个流被合并为了一个流进行处理 但是在处理上还是得对两个流进行分别处理
##coFlapMap/coMap 可以将不完全合并的两个流合并为一个流进行相应的处理
复制代码
Split(2.12 已删)
DataStream → SplitStream
根据条件将一个流分成两个或者更多的流
Select(2.12 已删)
SplitStream → DataStream
从SplitStream中选择一个或者多个数据流
side output侧输出流
流计算过程,可能遇到根据不同的条件来分隔数据流。filter分割造成不必要的数据复制
public class FlinkDemo0009 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream = environment
.socketTextStream("192.168.88.180", 8888);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
final OutputTag<CoffeeShop> outputTag = new OutputTag<CoffeeShop>("side-output") {
};
SingleOutputStreamOperator dataStream = stream
.flatMap(new RichFlatMapFunction<String, CoffeeShop>() {
@Override
public void flatMap(String s, Collector<CoffeeShop> collector) {
try {
String sp[] = s.split(",");
collector.collect(CoffeeShop.builder()
.id(Long.parseLong(sp[0]))
.date(format.parse(sp[1]))
.sales(Integer.parseInt(sp[2]))
.build());
} catch (Exception ex) {
System.out.println(ex);
}
}
})
.process(new ProcessFunction<CoffeeShop, CoffeeShop>() {
@Override
public void processElement(CoffeeShop s, Context context, Collector<CoffeeShop> collector) {
try {
if (s.getId() % 2 == 0) {
collector.collect(s);
} else {
context.output(outputTag, s);
}
} catch (Exception ex) {
System.out.println(ex);
}
}
});
dataStream.getSideOutput(outputTag).print("outputTag>>>>>>>>>>>>>");
dataStream.print("stream>>>>>>>>>>>>>");
environment.execute("process demo");
}
}
复制代码
### 控制台 192.168.88.180
nc -lk 8888
###输入参数
1,2021-10-1,1
2,2021-10-1,2
3,2021-10-1,3
4,2021-10-2,4
5,2021-10-2,5
6,2021-10-2,6
7,2021-10-3,7
8,2021-10-3,8
9,2021-10-3,9
#### sout
outputTag>>>>>>>>>>>>>:1> CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=1)
outputTag>>>>>>>>>>>>>:3> CoffeeShop(id=3, date=Fri Oct 01 00:00:00 CST 2021, sales=3)
stream>>>>>>>>>>>>>:4> CoffeeShop(id=4, date=Sat Oct 02 00:00:00 CST 2021, sales=4)
stream>>>>>>>>>>>>>:2> CoffeeShop(id=2, date=Fri Oct 01 00:00:00 CST 2021, sales=2)
outputTag>>>>>>>>>>>>>:3> CoffeeShop(id=7, date=Sun Oct 03 00:00:00 CST 2021, sales=7)
stream>>>>>>>>>>>>>:4> CoffeeShop(id=8, date=Sun Oct 03 00:00:00 CST 2021, sales=8)
stream>>>>>>>>>>>>>:2> CoffeeShop(id=6, date=Sat Oct 02 00:00:00 CST 2021, sales=6)
outputTag>>>>>>>>>>>>>:1> CoffeeShop(id=5, date=Sat Oct 02 00:00:00 CST 2021, sales=5)
outputTag>>>>>>>>>>>>>:1> CoffeeShop(id=9, date=Sun Oct 03 00:00:00 CST 2021, sales=9)
## 可以看出流被切分成为了两个流 通过id 奇偶
复制代码
Iterate(比较重要)
DataStream → IterativeStream → DataStream
Iterate算子提供了对数据流迭代的支持
迭代由两部分组成:迭代体、终止迭代条件
不满足终止迭代条件的数据流会返回到stream流中,进行下一次迭代
满足终止迭代条件的数据流继续往下游发送
public class FlinkDemo0010 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream = environment
.socketTextStream("192.168.88.180", 8888);
IterativeStream<Long> iterate = stream
.map(new RichMapFunction<String, Long>() {
@Override
public Long map(String s) {
return Long.parseLong(s);
}
}).iterate();
DataStream<Long> feedback = iterate
.map(new RichMapFunction<Long, Long>() {
@Override
public Long map(Long aLong) throws Exception {
return aLong > 2 ? aLong : (aLong + 1);
}
}).filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long s) throws Exception {
if (s > 2) {
return false;
} else {
return true;
}
}
});
iterate.closeWith(feedback).print("feedback");
SingleOutputStreamOperator<Long> result = iterate.
filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return value >= 2;
}
});
result.print("result:");
environment.execute("iterate demo");
}
}
复制代码
### 控制台 192.168.88.180
nc -lk 8888
###输入参数
1
2
3
4
5
#### sout
feedback> 2
result> 2
result> 2
result> 3
result> 4
result> 5
#可以看出 数字为1 不满足跳出的条件会返回map迭代 到满足条件才跳出
复制代码
函数类和富函数类
在使用Flink算子的时候,可以通过传入匿名函数和函数类对象
函数类分为:普通函数类、富函数类**(自行划分)**
富函数类相比于普通的函数,可以获取运行环境的上下文(Context),拥有一些生命周期方法,管理状态,可以实现更加复杂的功能
富函数类由于继承了AbstractRichFunction 可以适应AbstractRichFunction中的方法获取上下文及完成一些特殊操作
public class FlinkDemo0011 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream = environment
.socketTextStream("192.168.88.180", 8888);
stream.flatMap(new RichFlatMapFunction<String, Long>() {
@Override
public void flatMap(String s, Collector<Long> collector) throws Exception {
collector.collect(Long.parseLong(s));
}
}).filter(new RichFilterFunction<Long>() {
@Override
public void open(Configuration parameters) {
/**
* 由AbstractRichFunction提供的getRuntimeContext()方法
* 可以获取运行时候的上下文信息
* **/
System.out.println(getRuntimeContext().getTaskName());
System.out.println(getRuntimeContext().getTaskNameWithSubtasks());
}
@Override
public boolean filter(Long s) {
return s<100;
}
}).print();
environment.execute("demo");
}
}
复制代码
底层API(ProcessFunctionAPI)
属于低层次的API,我们前面讲的map、filter、flatMap等算子都是基于这层高层封装出来的
越低层次的API,功能越强大,用户能够获取的信息越多,比如可以拿到元素状态信息、事件时间、设置定时器 等
public class FlinkDemo0012 {
public static void main(String[] args) throws Exception {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream = environment
.socketTextStream("192.168.88.180", 8888);
final OutputTag<CoffeeShop> outputTag = new OutputTag<CoffeeShop>("output"){};
SingleOutputStreamOperator dataStream = stream.
flatMap(new RichFlatMapFunction<String, CoffeeShop>() {
@Override
public void flatMap(String s, Collector<CoffeeShop> collector) {
try {
String[] sp = s.split(",");
collector.collect(CoffeeShop.builder()
.id(Long.parseLong(sp[0]))
.date(format.parse(sp[1]))
.sales(Integer.parseInt(sp[2]))
.build());
} catch (Exception ex) {
System.out.println(ex);
}
}
}) /** 依据id分组*/
.keyBy(CoffeeShop::getId)
.process(new KeyedProcessFunction<Long, CoffeeShop, String>() {
@Override
public void processElement(
CoffeeShop coffeeShop,
Context context,
Collector<String> collector) {
try{
/**当前处理时间*/
long currentTime= context.timerService()
.currentProcessingTime();
if(coffeeShop.getSales()>100){
long timerTime = currentTime + 3 * 1000;
context.timerService()
.registerProcessingTimeTimer(timerTime);
}else{
collector.collect(coffeeShop
.toString()+"未到达100");
}
}
catch (Exception ex){
System.out.println(ex);
}
}
/**设置时钟时间*/
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<String> out
) {
out.collect( ctx.getCurrentKey()+"销量到达:100");
}
});
dataStream.print("dataStream").setParallelism(1);
environment.execute("process demo");
}
}
复制代码
### 控制台 192.168.88.180
nc -lk 8888
###输入参数
1,2021-10-1,99
2,2021-10-1,1011
3,2021-10-1,100
4,2021-10-2,88
5,2021-10-2,101
6,2021-10-2,102
#### sout
dataStream> CoffeeShop(id=1, date=Fri Oct 01 00:00:00 CST 2021, sales=99)未到达100
dataStream> CoffeeShop(id=4, date=Sat Oct 02 00:00:00 CST 2021, sales=88)未到达100
dataStream> CoffeeShop(id=3, date=Fri Oct 01 00:00:00 CST 2021, sales=100)未到达100
dataStream> 2销量到达:100 3秒后到达
dataStream> 5销量到达:100 3秒后到达
dataStream> 6销量到达:100 3秒后到达
复制代码
总结
使用Map Filter….算子的适合,可以直接传入一个匿名函数、
普通函数类对象(MapFuncation FilterFunction)
富函数类对象(RichMapFunction、RichFilterFunction)
传入的富函数类对象:可以拿到任务执行的上下文,生命周期方法、管理状态…..
如果业务比较复杂,通过Flink提供这些算子无法满足我们的需求,通过process算子直接使用比较底层API(使用这套API 上下文、生命周期方法、测输出流、时间服务)
KeyedDataStream调用process KeyedProcessFunction
DataStream调用process ProcessFunction
具体写代码的适合,看提示就行