Java8 Stream原理深度解析 亦凉 2022-01-23 08:34 311阅读 0赞 ## 常用的流操作 ## 在深入原理之前,我们有必要知道关于Stream的一些基础知识,关于Stream的操作分类,如表1-1所示。 表1-1 Stream的常用操作分类([表格引自这里][Link 1]) ![表1-1][1-1] 如表1-1中所示,**Stream中的操作可以分为两大类**: 中间操作与结束操作,中间操作只是对操作进行了记录,只有结束操作才会触发实际的计算(即惰性求值),这也是Stream在迭代大集合时高效的原因之一。 中间操作又可以分为无状态(Stateless)操作与有状态(Stateful)操作,前者是指元素的处理不受之前元素的影响;后者是指该操作只有拿到所有元素之后才能继续下去。 结束操作又可以分为短路与非短路操作,这个应该很好理解,前者是指遇到某些符合条件的元素就可以得到最终结果;而后者是指必须处理所有元素才能得到最终结果。 ## 原理探秘 ## 在探究Stream的执行原理之前,我们先看如下两段代码(本文将以code\_1为例进行说明): code\_1 public static void main(String[] args) { List<String> list = Lists.newArrayList( "bcd", "cde", "def", "abc"); List<String> result = list.stream() //.parallel() .filter(e -> e.length() >= 3) .map(e -> e.charAt(0)) //.peek(System.out :: println) //.sorted() //.peek(e -> System.out.println("++++" + e)) .map(e -> String.valueOf(e)) .collect(Collectors.toList()); System.out.println("----------------------------"); System.out.println(result); } code\_2 public void targetMethod() { List<String> list = Lists.newArrayList( "bcd", "cde", "def", "abc"); List<String> result = Lists.newArrayListWithCapacity(list.size()); for (String str : list) { if (str.length() >= 3) { char e = str.charAt(0); String tempStr = String.valueOf(e); result.add(tempStr); } } System.out.println("----------------------------"); System.out.println(result); } 很明显,在最终结果上而言,code\_1与code\_2是等价的。那么,Stream是怎么做的呢?显然不是每次操作都进行迭代,因为这对于执行时间与存储中间变量来说都将是噩梦。 ### 要解决的问题 ### 显然,如果code\_2只对集合迭代了一次,也就是说相当高效。那么这么做有没有弊端?有!模板代码、中间变量、不利于并行都是其存在的问题。但是按着code\_2的思路可以知道**有以下几个问题需要解决:** * **如何记录每次操作?** * **操作如何叠加?** * **叠加后的操作如何执行?** * **最后的结果如何存储?** ### 包结构分析 ### 那么Stream是如何解决的呢?所谓源码之下,无所遁形。那么,首先来看一下Stream包的结构(如图1-1所示)。 ![图1-1][1-1 1] 图1-1 Stream包的结构示意图 其中各个部分的主要功能为: 1. 主要是各种操作的工厂类、数据的存储结构以及收集器的工厂类等; 2. 主要用于Stream的惰性求值实现; 3. Stream的并行计算框架; 4. 存储并行流的中间结果; 5. 终结操作的定义; 我们单独把第二部分拎出来用于**说明Stream的惰性求值实现**,如图1-2所示,Java8针对Int、long、double进行了优化,主要用于频繁的拆装箱。我们以引用类型进行介绍,在图中已经标为绿色。 * BaseStream规定了流的基本接口,比如iterator、spliterator、isParallel等; * Stream中定义了map、filter、flatmap等用户关注的常用操作; * PipelineHelper主要用于Stream执行过程中相关结构的构建; * Head、StatelessOp、StatefulOp为ReferencePipeline中的内部类。 ![图1-2][1-2] 图1-2 ### 操作如何记录 ### 关于操作如何记录,在JDK源码注释中多次用(操作)stage来标识用户的每一次操作,而通常情况下Stream的操作又需要一个回调函数,所以一个完整的操作是由数据来源、操作、回调函数组成的三元组来表示。而在具体实现中,使用实例化的ReferencePipeline来表示,即图1-2中的Head、StatelessOp、StatefulOp的实例。 如code\_3、code\_4所示为调用stream.map()的关键的两个方法,在用户 调用一系列操作后会形成如图1-3所示的双链表结构。 ![图1-3][1-3] 图1-3 code\_3(ReferencePipeline.map()) @Override @SuppressWarnings("unchecked") public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; } }; } code\_4(AbstractPipeline.AbstractPipeline()) AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; } ### 如何叠加 ### 在上一步已经在stage中记录了每一步操作,此时并没有执行。但是stage只是保存了当前的操作,并不能确定下一个stage需要何种操作,何种数据,其实JDK为此定义了Sink接口,其中只有begin()、end()、cancellationRequested()、accept()四个接口(如表1-2所示,[摘自这里][Link 1]),其中中间操作的子类中包含一个指向下游sink的指针。 表1-2 ![表1-2][1-2 1] 现在转向code\_3,可以看出,在satge链中,每一步都包含了opWrapSink()。当调用终结操作时,将会触发code\_5从最后一个stage(终结操作产生的satge)开始,递归产生图1-4所示的结构。 code\_5(AbstractPipeline.wrapSink()) @Override @SuppressWarnings("unchecked") final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; } ![图1-4][1-4] 图1-4 ### 如何执行 ### 所有的操作已经形成了图1-4的结构,接下来就会触发code\_6,此时结果就会产生对应的结果啦! code\_6(AbstractPipelie.copyInto()) @Override final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } } ## 并行原理 ## 那么,Stream是如何并行执行的呢?其实产生stage链的过程和串行并没有区别,只是在最终执行时进行了相应的调整,我们将code\_1改变为code\_7 code\_7 public static void main(String[] args) { List<String> list = Lists.newArrayList( "bcd", "cde", "def", "abc"); List<String> result = list.stream() .parallel() .filter(e -> e.length() >= 3) //.map(e -> e.charAt(0)) //.peek(System.out :: println) .sorted() //.peek(e -> System.out.println("++++" + e)) .map(e -> String.valueOf(e)) .collect(Collectors.toList()); System.out.println("----------------------------"); System.out.println(result); } 那么最终产生的stage链与sink的结构如图1-5所示,因为此时stage链中有一个有状态操作(sorted()),也就是说在这里必须处理完所有元素才能进行下一步操作。那么此时无论是并行还是串行,此时都会产生两个sink链,也就是代表了两次迭代,才产生了最终结果。 ![图1-5][1-5] 图1-5 那么,究竟是如何并行的呢?其实当调用collect操作时会调用code\_8,其中的evaluateParallel()如code\_9所示。 code\_8(AbstractPipeline.evaluate()) final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); } code\_9(ReduceOp.evaluateParallel()) @Override public <P_IN> R evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { return new ReduceTask<>(this, helper, spliterator).invoke().get(); } 其实Stream的并行处理是基于ForkJoin框架的,相关类与接口的结构如图1-6所示。其中AbstractShortCircuitTask用于处理短路操作,其他相关操作类似,会产生对应的Task。 ![图1-6][1-6] 图1-6 关于code\_8中获取源Spliterator,如code\_10所示, code\_10(AbstractPipeline.sourceSpliterator()) @SuppressWarnings("unchecked") private Spliterator<?> sourceSpliterator(int terminalFlags) { Spliterator<?> spliterator = null; if (sourceStage.sourceSpliterator != null) { spliterator = sourceStage.sourceSpliterator; sourceStage.sourceSpliterator = null; } else if (sourceStage.sourceSupplier != null) { spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null; } else { throw new IllegalStateException(MSG_CONSUMED); } if (isParallel() && sourceStage.sourceAnyStateful) { //如果是并行流并且有stage包含stateful操作 //那么就会依次遍历stage,直到遇到stateful stage时 int depth = 1; for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this; u != e; u = p, p = p.nextStage) { int thisOpFlags = p.sourceOrOpFlags; if (p.opIsStateful()) { depth = 0; if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) { //如果有短路操作,则去除相应标记 thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT; } //尽量以惰性求值的方式进行操作 spliterator = p.opEvaluateParallelLazy(u, spliterator); thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED) ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED; } p.depth = depth++; p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags); } } if (terminalFlags != 0) { // Apply flags from the terminal operation to last pipeline stage combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags); } return spliterator; } ### 如何并行执行 ### 关于各个task就行是如何并行执行,其实最终调用的是code\_11所示,对应的流程如图1-7所示,其中交替fork子节点是为了缓和数据分片不均造成的性能退化。 code\_11(AbstractTask.compute()) @Override public void compute() { Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators long sizeEstimate = rs.estimateSize(); long sizeThreshold = getTargetSize(sizeEstimate); boolean forkRight = false; @SuppressWarnings("unchecked") K task = (K) this; while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) { K leftChild, rightChild, taskToFork; task.leftChild = leftChild = task.makeChild(ls); task.rightChild = rightChild = task.makeChild(rs); task.setPendingCount(1); if (forkRight) { forkRight = false; rs = ls; task = leftChild; taskToFork = rightChild; } else { forkRight = true; task = rightChild; taskToFork = leftChild; } taskToFork.fork(); sizeEstimate = rs.estimateSize(); } task.setLocalResult(task.doLeaf()); task.tryComplete(); } ![图1-7][1-7] 图1-7 ## 影响并行流的因素 ## 数据大小;源数据结构(分割越容易越好),arraylist、数组比较好,hashSet、treeSet次之,linked最差;装箱;核的数量(可使用);单元处理开销(越大越好) ## 建议: ## 终结操作以外的操作,尽量避免副作用,避免突变基于堆栈的引用,或者在执行过程中进行任何I/O;传递给流操作的数据源应该是互不干扰(避免修改数据源)。 ## 小结 ## 本文主要探究了Stream的实现原理,并没有涉及到具体的流操作的用法(读者可以参考《java8函数式编程》),并且给出了使用Stream的部分建议。 转载自: [https://www.cnblogs.com/xiaonihao444/p/8783771.html][https_www.cnblogs.com_xiaonihao444_p_8783771.html] [Link 1]: http://www.cnblogs.com/CarpenterLee/p/6637118.html [1-1]: /images/20220123/1e6f48cd2a12497c8056208c3039fff4.png [1-1 1]: /images/20220123/832b6827474548d0bbd907dff9dafe39.png [1-2]: /images/20220123/9ecbcf48bcb744519067283981dc6c72.png [1-3]: /images/20220123/ff5d0a1651e6435b80dc694ffa9b593b.png [1-2 1]: /images/20220123/d4485ede3d8643ffaa68a04943b22f8d.png [1-4]: /images/20220123/522208101c074d3694a2a8da4f80062b.png [1-5]: /images/20220123/7193d9a03d594fbf96c5b753521e7142.png [1-6]: /images/20220123/0779ac6e66b14a03bfd34bff714542b6.png [1-7]: /images/20220123/5bbc24a4f3b44c8ea1bccbf0a8a897a4.png [https_www.cnblogs.com_xiaonihao444_p_8783771.html]: https://www.cnblogs.com/xiaonihao444/p/8783771.html
相关 Java 8 Stream API使用误区解析 在Java 8的Stream API中,虽然提供了一种高效、简洁的数据处理方式,但还是存在一些常见的使用误区。以下是一些解析: 1. **过度使用**:Stream API的 Love The Way You Lie/ 2024年10月18日 12:09/ 0 赞/ 42 阅读
相关 Java 8 Stream API 实战案例解析 Java 8 Stream API 是 Java 进一步优化集合操作的一个重要特性。它提供了一种简洁、高效的方式来处理数据集,例如排序、过滤、组合等。 以下是几个实战案例的解 刺骨的言语ヽ痛彻心扉/ 2024年09月30日 01:33/ 0 赞/ 76 阅读
相关 Java 8 Stream API 实例解析 Java 8的Stream API是Java集合框架的一个重大升级,它提供了一种新的方式来处理数据流。 以下是一些Java 8 Stream API实例的解析: 1. ** ゞ 浴缸里的玫瑰/ 2024年09月13日 03:21/ 0 赞/ 59 阅读
相关 Java 8 Stream API 操作案例解析 Java 8的Stream API提供了一种新的、并行的处理集合元素的方式。以下是一些常见的Stream操作案例: 1. **过滤**:根据某种条件筛选元素。 ```java 墨蓝/ 2024年09月11日 07:51/ 0 赞/ 67 阅读
相关 Java 8 Stream流API解析 1. Stream流简介 Java 8 API添加了一个新的抽象称为流Stream,可以以一种声明的方式处理数据。 Stream流使用一种类似用 SQL 语句从数据库查 拼搏现实的明天。/ 2022年10月07日 12:54/ 0 赞/ 215 阅读
相关 Java 8 Optional类深度解析 转载自 http://www.importnew.com/6675.html 身为一名Java程序员,大家可能都有这样的经历:调用一个方法得到了返回值却不能直接将返 ゞ 浴缸里的玫瑰/ 2022年06月02日 09:10/ 0 赞/ 186 阅读
相关 Java 8 Optional类深度解析 尊重原创,感谢原创无私分享 [Optional类解析][Optional] [如何正确的使用Optional类][Optional 1] [Optional]: ht 迷南。/ 2022年06月02日 03:41/ 0 赞/ 172 阅读
相关 Java8 Stream原理深度解析 常用的流操作 在深入原理之前,我们有必要知道关于Stream的一些基础知识,关于Stream的操作分类,如表1-1所示。 表1-1 Stream的常用操作分类([表格引 亦凉/ 2022年01月23日 08:34/ 0 赞/ 312 阅读
相关 Flume原理深度解析 一、Flume简介 flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(or 我不是女神ヾ/ 2022年01月16日 10:23/ 0 赞/ 268 阅读
还没有评论,来说两句吧...