前言
最近在工作中发现有同事讲很多可以并行的任务放入一个List 集合中,然后 parallelStream().forEach ,并且会在循环过程中将每个任务的执行日志、执行结果返回到主线程中使用。我由此产生了一个疑问, forEach 是否会阻塞 主线程的执行,让其等待并行任务全部执行完成之后再执行后续的操作。
产生这样的疑惑是,我忘记了自己在哪儿看到过 forEach 是不会阻塞的,只有 forEachOrdered 才会阻塞。但是同时对我说这两个都会阻塞,带着这样的疑问,决定仔细研究一下 JDK 源码。如果文章中有不对的地方请看者指正,万分感谢。
主线程阻塞
示例代码
public static void main(String[] args) {
List<Runnable> rl = new ArrayList<>();
rl.add(new Runn(5));
rl.add(new Runn(2));
rl.parallelStream().forEach(Runnable::run);
System.out.println("main end");
}
private static class Runn implements Runnable {
long seconds;
public Runn(long seconds) {
this.seconds = seconds;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(seconds);
System.out.println("休眠" + seconds + "秒钟: " + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 执行结果如下:
// 休眠2秒钟: main
// 休眠5秒钟: ForkJoinPool.commonPool-worker-1
// main end
详解
- parallelStream 只是构建一些实体类,并打一些标记,不会执行真正的操作,所以直接从 forEach 下手。
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
- 通过步骤1 能看出,构建的实体类是 ReferencePipeline.Head
@Override
public void forEach(Consumer<? super E_OUT> action) {
// 判断是否是并行流,是并行流的话直接调用父类 ReferencePipeline 的方法
if (!isParallel()) {
sourceStageSpliterator().forEachRemaining(action);
}
else {
super.forEach(action);
}
}
// java.util.stream.ReferencePipeline#forEach
@Override
public void forEach(Consumer<? super P_OUT> action) {
// 执行终端操作,并得出结果
evaluate(ForEachOps.makeRef(action, false));
}
- 执行操作,
// java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
// TerminalOp 也就是代表终端操作,是真的会处理数据
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
// 是并行流的走 并行逻辑
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
// java.util.stream.ForEachOps.ForEachOp#evaluateParallel
@Override
public <S> Void evaluateParallel(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
// 两个Task都继承了 ForkJoinTask ,但这里我们走 ForEachTask
if (ordered)
new ForEachOrderedTask<>(helper, spliterator, this).invoke();
else
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
return null;
}
// java.util.concurrent.ForkJoinTask#invoke
public final V invoke() {
int s;
if ((s = doInvoke() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
// java.util.concurrent.ForkJoinTask#doInvoke
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
// 这里判断是不是 ForkJoinWorkerThread ,主线程肯定不是,所有会走到 externalAwaitDone 这个方法后面再说。
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}
// java.util.concurrent.ForkJoinTask#doExec
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
// 这里执行的是 CountedCompleter 的方法,ForEachTask继承了它
// class ForEachTask<S, T> extends CountedCompleter<Void>
// java.util.concurrent.CountedCompleter#exec
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
// java.util.concurrent.CountedCompleter#exec
protected final boolean exec() {
compute();
return false;
}
- compute 方法说明,在该方法中会将任务拆分,并分配任务到线程池中。
public void compute() {
Spliterator<S> rightSplit = spliterator, leftSplit;
long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
if ((sizeThreshold = targetSize) == 0L)
targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
boolean forkRight = false;
Sink<S> taskSink = sink;
ForEachTask<S, T> task = this;
while (!isShortCircuit || !taskSink.cancellationRequested()) {
// 判断是不是值得将任务分发到线程池中,如果只有一个任务,直接由主线程执行
// sizeEstimate : 1 ; sizeThreshold : 1
if (sizeEstimate <= sizeThreshold ||
(leftSplit = rightSplit.trySplit()) == null) {
task.helper.copyInto(taskSink, rightSplit);
break;
}
ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
task.addToPendingCount(1);
ForEachTask<S, T> taskToFork;
if (forkRight) {
forkRight = false;
rightSplit = leftSplit;
taskToFork = task;
task = leftTask;
}
else {
forkRight = true;
taskToFork = leftTask;
}
// 剩下的任务 分发到 forkJoinPool 中
taskToFork.fork();
sizeEstimate = rightSplit.estimateSize();
}
task.spliterator = null;
task.propagateCompletion();
}
- 之前 4 步已经大致说了并行流的执行逻辑,现在回过头看一下 主线程的阻塞逻辑。
private int externalAwaitDone() {
// 这里会再次判断,需不需要 主线程帮助执行,如果需要 则会进一步执行任务(不只是执行一个任务)
int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
// 如果不用主线程辅助,但是还有任务在执行,主线程进入 wait 状态,等待 ForkJoinThread 线程执行完成后来唤醒
if (s >= 0 && (s = status) >= 0) {
boolean interrupted = false;
do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0) {
try {
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
notifyAll();
}
}
} while ((s = status) >= 0);
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}
总结
对于 Stream 来说,其实只要是终端操作 如:collect、foreach、find 。并行流操作都会阻塞主线成,如果使用 forEachOrdered 则会全部交给一个线程执行,主线程处于阻塞中。