java8中3个参数的reduce方法怎么理解?

例如这个练习题,使用reduce和lambda表达式来实现map。
不明白的是reduce第三个参数的意义,感觉多此一举

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;

public class MapUsingReduce {
   
        public static <I, O> List<O> map(Stream<I> stream, Function<I, O> mapper) {
            return stream.reduce(new ArrayList<O>(), (acc, x) -> {
                // We are copying data from acc to new list instance. It is very inefficient,
                // but contract of Stream.reduce method requires that accumulator function does
                // not mutate its arguments.
                // Stream.collect method could be used to implement more efficient mutable reduction,
                // but this exercise asks to use reduce method.
                List<O> newAcc = new ArrayList<>(acc);
                newAcc.add(mapper.apply(x));
                return newAcc;
            }, (List<O> left, List<O> right) -> {
                // We are copying left to new list to avoid mutating it. 
                List<O> newLeft = new ArrayList<>(left);
                newLeft.addAll(right);
                return newLeft;
            });
        }
    
    }

下面是文档的说明,仍然不明白。。。

<U> U reduce(U identity,
              BiFunction<U,? super T,U> accumulator,
              BinaryOperator<U> combiner) 

Performs a reduction on the elements of this stream, using the provided identity, accumulation and combining functions. This is equivalent to:

  U result = identity;
  for (T element : this stream)
      result = accumulator.apply(result, element)
  return result;

but is not constrained to execute sequentially.

The identity value must be an identity for the combiner function. This means that for all u, combiner(identity, u) is equal to u. Additionally, the combiner function must be compatible with the accumulator function; for all u and t, the following must hold:

combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)

望指教,谢谢!

阅读 24.1k
4 个回答

我觉得可以这样理解

首先理解方法本身的意思:
Streamreduce方法,翻译过来是聚合或者是汇聚成一个的意思,由于Stream本身就代表着一堆数据,那stream.reduce()方法顾名思义就是把一堆数据聚合成一个数据

理解了reduce方法的意思,再来看看这个方法挂靠的对象是stream,是一个流,了解一下流的工作方式:
流底层核心其实是Spliterator接口的一个实现,而这个Spliterator接口其实本身就是Fork/Join并行框架的一个实现,所以归根结底要明白流的工作方式,就要明白一下Fork/Join框架的基本思想,即:以递归的方式将可以并行的任务拆分成更小的子任务,然后将每个子任务的结果合并起来生成整体的最后结果,画了个草图如下

clipboard.png

理解了方法本身的意思以及流的工作方式,再结合到一起理解一下stream.reduce()方法,即用Fork/Join的方式把一堆数据聚合成一个数据,因此可以画出reduce方法的运行草图

clipboard.png

结合草图,要实现stream.reduce()方法,必须要告诉JDK

  1. 你有什么需求数据要汇聚?(Stream已经提供了数据源,对应上面草图的A元素)
  2. 最后要汇聚成怎样的一个数据类型(对应reduce方法的参数一,对应上面草图的B元素)
  3. 如何将需求数据处理或转化成一个汇聚数据(对应reduce方法的参数二,对应上面草图的汇聚方式1)
  4. 如何将多个汇聚数据进行合并(对应reduce方法的参数三,对应上面草图的汇聚方式2)

再结合你给的map方法,其实是要把I类数据的流,最后转化为一个O类数据的List,因此按照上面的步骤可以进行对照

  1. 你有什么需求数据要汇聚?(I类数据流)
  2. 最后要汇聚成怎样的一个数据类型(一个集合,new ArrayList()
  3. 如何将需求数据处理或转化成一个汇聚数据(根据mapper把I转化为O,再用List.add方法)
  4. 如何将多个汇聚数据进行合并(两个集合合并,用List.addAll()

最后补充一点,若是你的参数真是Stream<I> streamFunction<I, O> mapper,建议不要用reduce方法,这么写可能会更好一点

public static <I, O> List<O> map(Stream<I> stream, Function<I, O> mapper) {
        return stream.map(mapper).collect(Collectors.toList());
    }

楼上说的不错,但是补充一点:
BinaryOperator是供多线程使用的,如果不在Stream中声明使用多线程,就不会使用子任务,自然也不会调用到该方法。另外多线程下使用BinaryOperator的时候是需要考虑线程安全的问题。
另外自问自答下为什么需要BinaryOperator
因为这个重载的方法和其他两个不相同,允许改变返回值,所以返回值并不一定是Collection的子类;因此必须显示的声明如何拼接两个子任务产生的结果。
但是java8函数编程一书中这一题返回的结果恰好是List,就造成了BinaryOperator并不需要的假象。
个人愚见,如有不对,恳请扶正。

@wanghuizuo
你的代码中每次BinaryOperator的两个形参是相等的原因是reduce操作的第三个参数BinaryOperator<U> combiner 操作的对象是第二个参数BiFunction<U,? super T,U> accumulator的返回值。你的accumulator中将累加的值添加到初始值中,然后把初始值返回。也就是说所有线程运行的accumlator都操作的是同一个list ,返回值也都是同一个list。所以combiner 接收的参数都是同一个list。如果你的accumulator中不改变初始值而是返回一个新的值,combiner接收的结果就是不同线程的accumulator的不同的结果了。

代码如下:

public class ParallelStreamReduce {
    public static void main(String[] args) {
        //accumulator不写入list,不需要线程同步,初始值使用普通的list
        List<Integer> list = new ArrayList<>();
        AtomicInteger accumulateCount = new AtomicInteger(0);
        AtomicInteger combineCount = new AtomicInteger(0);
        List<Integer> reduceResult = IntStream.range(0, 100)
                .parallel()
                .boxed()
                .reduce(list, (i, j) -> {
                    accumulateCount.incrementAndGet();
                    //不改变初始的i,而是返回一个新的i
                    ArrayList<Integer> newI = new ArrayList<>(i);
                    newI.add(j);
                    return newI;
                }, (i, j) -> {
                    combineCount.incrementAndGet();
                    System.out.println(String.format("i==j: %s, thread name:%s", i == j, Thread.currentThread().getName()));
                    ArrayList<Integer> newI = new ArrayList<>(i);
                    newI.addAll(j);
                    return newI;
                });
        System.out.println("---------------------------------------");
        System.out.println("reduce result size: "+reduceResult.size());
        System.out.println("reduce result : "+reduceResult);
        System.out.println("accumulateCount: "+accumulateCount.get());
        System.out.println("combineCount: "+combineCount.get());
    }
}

结果:

i==j: false, thread name:ForkJoinPool.commonPool-worker-1
i==j: false, thread name:ForkJoinPool.commonPool-worker-3
i==j: false, thread name:main
i==j: false, thread name:ForkJoinPool.commonPool-worker-1
i==j: false, thread name:ForkJoinPool.commonPool-worker-2
i==j: false, thread name:main
i==j: false, thread name:ForkJoinPool.commonPool-worker-1
i==j: false, thread name:main
i==j: false, thread name:ForkJoinPool.commonPool-worker-1
i==j: false, thread name:ForkJoinPool.commonPool-worker-2
i==j: false, thread name:ForkJoinPool.commonPool-worker-3
i==j: false, thread name:ForkJoinPool.commonPool-worker-2
i==j: false, thread name:ForkJoinPool.commonPool-worker-3
i==j: false, thread name:ForkJoinPool.commonPool-worker-2
i==j: false, thread name:ForkJoinPool.commonPool-worker-2
---------------------------------------
reduce result size: 100
reduce result : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
accumulateCount: 100
combineCount: 15
新手上路,请多包涵

@imango
你好,图画的非常清晰,但是我有一点疑问。
按照图的意思,多线程下,第三个参数BinaryOperator的两个行参一定是不同的。但是运行结果不是这样。
而是每次BinaryOperator的两个行参是相等的。

代码如下:

public static void main(String[] args) {
        List<Integer> list = Collections.synchronizedList(Lists.newArrayList());

        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        List<Integer> reduce = IntStream.range(0, 100)
                .boxed()
                .parallel()
                .reduce(list, (i, j) -> {
                    atomicInteger2.incrementAndGet();
                    i.add(j);
                    return i;
                }, (i, j) -> {
                    atomicInteger.incrementAndGet();
                    System.out.println(StrUtil.format("{},name:{}", i == j, Thread.currentThread().getName()));
                    return i;
                });

        System.out.println(reduce.size());
        System.out.println(atomicInteger.get() + "次!");
        System.out.println(atomicInteger2.get() + "次!");
    }

运行结果:

true,name:ForkJoinPool.commonPool-worker-3
true,name:ForkJoinPool.commonPool-worker-5
true,name:ForkJoinPool.commonPool-worker-1
true,name:ForkJoinPool.commonPool-worker-7
true,name:ForkJoinPool.commonPool-worker-2
true,name:ForkJoinPool.commonPool-worker-4
true,name:ForkJoinPool.commonPool-worker-6
true,name:main
true,name:ForkJoinPool.commonPool-worker-4
true,name:ForkJoinPool.commonPool-worker-2
true,name:ForkJoinPool.commonPool-worker-1
true,name:ForkJoinPool.commonPool-worker-7
true,name:ForkJoinPool.commonPool-worker-3
true,name:ForkJoinPool.commonPool-worker-5
true,name:ForkJoinPool.commonPool-worker-7
true,name:ForkJoinPool.commonPool-worker-3
true,name:ForkJoinPool.commonPool-worker-1
true,name:ForkJoinPool.commonPool-worker-2
true,name:main
true,name:ForkJoinPool.commonPool-worker-4
true,name:ForkJoinPool.commonPool-worker-6
true,name:main
true,name:ForkJoinPool.commonPool-worker-2
true,name:ForkJoinPool.commonPool-worker-3
true,name:ForkJoinPool.commonPool-worker-5
true,name:ForkJoinPool.commonPool-worker-3
true,name:ForkJoinPool.commonPool-worker-2
true,name:main
true,name:ForkJoinPool.commonPool-worker-6
true,name:ForkJoinPool.commonPool-worker-5
true,name:ForkJoinPool.commonPool-worker-6
true,name:ForkJoinPool.commonPool-worker-5
true,name:ForkJoinPool.commonPool-worker-6
true,name:ForkJoinPool.commonPool-worker-5
true,name:ForkJoinPool.commonPool-worker-5
100
35次!
100次!
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
宣传栏