ParallelStream终端操作阻塞详解

doMore 12 2025-06-13

前言

最近在工作中发现有同事讲很多可以并行的任务放入一个List 集合中,然后 parallelStream().forEach ,并且会在循环过程中将每个任务的执行日志、执行结果返回到主线程中使用。我由此产生了一个疑问, forEach 是否会阻塞 主线程的执行,让其等待并行任务全部执行完成之后再执行后续的操作。

产生这样的疑惑是,我忘记了自己在哪儿看到过 forEach 是不会阻塞的,只有 forEachOrdered 才会阻塞。但是同时对我说这两个都会阻塞,带着这样的疑问,决定仔细研究一下 JDK 源码。如果文章中有不对的地方请看者指正,万分感谢。

主线程阻塞

示例代码

public static void main(String[] args) {
    List<Runnable> rl = new ArrayList<>();
    rl.add(new Runn(5));
    rl.add(new Runn(2));
    rl.parallelStream().forEach(Runnable::run);
    System.out.println("main end");
}

private static class Runn implements Runnable {
    long seconds;
    public Runn(long seconds) {
        this.seconds = seconds;
    }
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(seconds);
            System.out.println("休眠" + seconds + "秒钟: " + Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
// 执行结果如下:
// 休眠2秒钟: main
// 休眠5秒钟: ForkJoinPool.commonPool-worker-1
// main end

详解

  1. parallelStream 只是构建一些实体类,并打一些标记,不会执行真正的操作,所以直接从 forEach 下手。
    default Stream<E> parallelStream() {
        return StreamSupport.stream(spliterator(), true);
    }

    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }
  1. 通过步骤1 能看出,构建的实体类是 ReferencePipeline.Head
    @Override
    public void forEach(Consumer<? super E_OUT> action) {
        // 判断是否是并行流,是并行流的话直接调用父类 ReferencePipeline 的方法 
        if (!isParallel()) {
            sourceStageSpliterator().forEachRemaining(action);
        }
        else {
            super.forEach(action);
        }
    }

    // java.util.stream.ReferencePipeline#forEach
    @Override
    public void forEach(Consumer<? super P_OUT> action) {
        // 执行终端操作,并得出结果
        evaluate(ForEachOps.makeRef(action, false));
    }
  1. 执行操作,
    // java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
    // TerminalOp 也就是代表终端操作,是真的会处理数据
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;
        // 是并行流的走 并行逻辑
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }


    // java.util.stream.ForEachOps.ForEachOp#evaluateParallel
    @Override
    public <S> Void evaluateParallel(PipelineHelper<T> helper,
                                     Spliterator<S> spliterator) {
        // 两个Task都继承了 ForkJoinTask ,但这里我们走 ForEachTask
        if (ordered)
            new ForEachOrderedTask<>(helper, spliterator, this).invoke();
        else
            new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
        return null;
    }

    // java.util.concurrent.ForkJoinTask#invoke
    public final V invoke() {
        int s;
        if ((s = doInvoke() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }

    // java.util.concurrent.ForkJoinTask#doInvoke
    private int doInvoke() {
        int s; Thread t; ForkJoinWorkerThread wt;
        // 这里判断是不是 ForkJoinWorkerThread ,主线程肯定不是,所有会走到 externalAwaitDone 这个方法后面再说。
        return (s = doExec()) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (wt = (ForkJoinWorkerThread)t).pool.
            awaitJoin(wt.workQueue, this, 0L) :
            externalAwaitDone();
    }

    // java.util.concurrent.ForkJoinTask#doExec
    final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                // 这里执行的是 CountedCompleter 的方法,ForEachTask继承了它
                // class ForEachTask<S, T> extends CountedCompleter<Void> 
                // java.util.concurrent.CountedCompleter#exec
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }

    // java.util.concurrent.CountedCompleter#exec
    protected final boolean exec() {
        compute();
        return false;
    }

  1. compute 方法说明,在该方法中会将任务拆分,并分配任务到线程池中。
    public void compute() {
        Spliterator<S> rightSplit = spliterator, leftSplit;
        long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
        if ((sizeThreshold = targetSize) == 0L)
            targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
        boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
        boolean forkRight = false;
        Sink<S> taskSink = sink;
        ForEachTask<S, T> task = this;
        while (!isShortCircuit || !taskSink.cancellationRequested()) {
            // 判断是不是值得将任务分发到线程池中,如果只有一个任务,直接由主线程执行
            // sizeEstimate : 1 ; sizeThreshold : 1
            if (sizeEstimate <= sizeThreshold ||
                (leftSplit = rightSplit.trySplit()) == null) {
                task.helper.copyInto(taskSink, rightSplit);
                break;
            }
            ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
            task.addToPendingCount(1);
            ForEachTask<S, T> taskToFork;
            if (forkRight) {
                forkRight = false;
                rightSplit = leftSplit;
                taskToFork = task;
                task = leftTask;
            }
            else {
                forkRight = true;
                taskToFork = leftTask;
            }
            // 剩下的任务 分发到 forkJoinPool 中
            taskToFork.fork();
            sizeEstimate = rightSplit.estimateSize();
        }
        task.spliterator = null;
        task.propagateCompletion();
    }

  1. 之前 4 步已经大致说了并行流的执行逻辑,现在回过头看一下 主线程的阻塞逻辑。
    private int externalAwaitDone() {
        // 这里会再次判断,需不需要 主线程帮助执行,如果需要 则会进一步执行任务(不只是执行一个任务)
        int s = ((this instanceof CountedCompleter) ? // try helping
                 ForkJoinPool.common.externalHelpComplete(
                     (CountedCompleter<?>)this, 0) :
                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
        // 如果不用主线程辅助,但是还有任务在执行,主线程进入 wait 状态,等待 ForkJoinThread 线程执行完成后来唤醒
        if (s >= 0 && (s = status) >= 0) {
            boolean interrupted = false;
            do {
                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                    synchronized (this) {
                        if (status >= 0) {
                            try {
                                wait(0L);
                            } catch (InterruptedException ie) {
                                interrupted = true;
                            }
                        }
                        else
                            notifyAll();
                    }
                }
            } while ((s = status) >= 0);
            if (interrupted)
                Thread.currentThread().interrupt();
        }
        return s;
    }

总结

对于 Stream 来说,其实只要是终端操作 如:collect、foreach、find 。并行流操作都会阻塞主线成,如果使用 forEachOrdered 则会全部交给一个线程执行,主线程处于阻塞中。