Stream流式计算
优点
- 结合着函数式接口和链式调用,对于数据的处理变得更加简单,同时增强可读性。
- 并行执行效率高,大数据下耗时更少
问题
从五个用户中
- 选出一个
- id是偶数
- 年龄大于23
- 用户名转为大写
- 用户名倒序排列
public static void main(String[] args) {
User u1=new User(21,1,"a");
User u2=new User(22,2,"b");
User u3=new User(23,3,"c");
User u4=new User(24,4,"d");
User u5=new User(25,6,"e");
List<User> users = Arrays.asList(u1, u2, u3, u4, u5);
//选择偶数id
users.stream().filter((u)->{return u.getId()%2==0;})
// 在偶数id中选择age>23
.filter((u)->{return u.getAge()>23;})
// 将name转为大写
.map((u) -> {u.setName(u.getName().toUpperCase());return u;})
// 排序
.sorted((uu1,uu2)->{return uu2.getName().compareTo(uu1.getName());})
//限制个数
.limit(1)
.forEach(System.out::println);
}
复制代码
Why
filter
// 返回与predicate匹配的流
Stream<T> filter(Predicate<? super T> predicate);
复制代码
而Predicate在函数式接口中我们已经了解了。
它有一个boolean test(T t);
是进行判断的依据。
因此我们通过lambda表达式实现了该方法。
filter((u)->{return u.getId()%2==0;})
map
// 返回经过mapper操作后的结果组成的流。
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
复制代码
而Function我们已经知道他有一个R apply(T t);
是需要实现的。这也是该map对流进行的操作。
sorted
//返回由该流的元素组成的流,根据提供的 Comparator进行排序。
Stream<T> sorted(Comparator<? super T> comparator);
复制代码
而Comparator中需要实现的是int compare(T o1, T o2);
。我们实现的该方法即是排序的判断依据。
limit
//返回由此流的元素组成的流,截短长度不能超过 maxSize 。
Stream<T> limit(long maxSize);
复制代码
以上都是需要传递一个函数式接口而我们通过lambda表达式节省了代码量。 而并行的高效将在Forkjoin中一同比较
ForkJoin
Forkjoin:将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。
工作窃取
当执行新的任务时它可以将其拆分成 更小的任务执行,并将小任务加到线程队列中,当没有任务执行时,再从一个随机线程的队列中偷一个并把它放在自己的队列中
问题:计算0-100_0000_0000的和
- 方法一
static void test1(){
long sum=0;
long start = System.currentTimeMillis();
for (long i = 0; i <= 100_0000_0000L; i++) {
sum+=i;
}
System.out.println("simple sum="+sum);
long end=System.currentTimeMillis();
System.out.println("time cost"+(end-start));
}
复制代码
- 方法二
利用forkjoin,将任务拆解
static void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
//通过Forkjoin池来管理
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> task=new ForkjoinDemo(0, 100_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(task);
System.out.println("forkjoin sum->"+submit.get());
long end=System.currentTimeMillis();
System.out.println("time cost"+(end-start));
}
复制代码
class ForkjoinDemo extends RecursiveTask<Long> {
public static long num=0;
private long start;
private long end;
private final long t=10000L;
public ForkjoinDemo(long start,long end) {
this.start = start;
this.end=end;
}
@Override
protected Long compute() {
// 与递归类似
if ((end-start)<t){
long sum=0L;
for (long i=start;i<=end;i++){
sum+=i;
}
return sum;
}
else {
Long temp=(end +start)/2;
ForkjoinDemo forkJoinTask=new ForkjoinDemo(start,temp);
forkJoinTask.fork();
ForkjoinDemo forkJoinTask2=new ForkjoinDemo(temp+1,end);
forkJoinTask2.fork();
return forkJoinTask.join()+forkJoinTask2.join();
}
}
}
复制代码
- 方法三
Stream流计算
static void test3(){
long start = System.currentTimeMillis();
System.out.println("stream sum->"+LongStream.rangeClosed(0L, 100_0000_0000L).parallel().reduce(0, Long::sum));
long end=System.currentTimeMillis();
System.out.println("time cost"+(end-start))
}
复制代码
结果:
simple sum=-5340232216128654848
time cost3174
forkjoin sum->-5340232216128654848
time cost1835
stream sum->-5340232216128654848
time cost1125
复制代码
Stream流并行的高效在此得到体现。尽管forkjoin效率不如stream流,但是它什么时候停止递归是可以规划的,有进一步提升的可能。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END