最近在瞎折腾rxjava,写了一段自认为能并发执行的代码如下:
1 | // 大小为5的线程池 |
由于我在subscribe中sleep了1s,所以我认为这五个数字会并发的执行到subscribe中去,期待会有如下的输出
1 | 1 pool-1-thread-1 |
然而事与愿违,实际的输出是这样的1
2
3
4
51 pool-1-thread-1
2 pool-1-thread-1
3 pool-1-thread-1
4 pool-1-thread-1
5 pool-1-thread-1
嗯嗯?为什么没有并发执行subscribe里的代码呢?我以为是我自己的代码有问题,又陆续尝试了内置的一些Scheduler,consumer均是在同一个线程中执行的,好吧,看来是我理解错rxjava的Schedulers了,这货的from方法接收一个Executor参数,并不是指接下来的任务会提交给这个线程池并发的执行。
这大概也是RxJava和CompletableFuture的区别之一吧。搜索了一圈,用rxjava实现并发主要以以下几个方法
在flatMap中使用obseveOn
1
2
3
4
5
6
7Flowable.just(1, 2, 3, 4, 5)
.flatMap(i -> Flowable.just(i).observeOn(Schedulers.from(exec))
.doOnNext(d -> {
System.out.println(d + "\t" + Thread.currentThread().getName());
Thread.sleep(1000L);
}))
.subscribe();在flatMap中使用Future
1
2
3
4
5
6
7
8Flowable.just(1, 2, 3, 4, 5)
.flatMap(i -> Flowable.fromFuture(CompletableFuture.completedFuture(i), Schedulers.from(exec))
.doOnNext(d -> {
System.out.println(d + "\t" + Thread.currentThread().getName());
Thread.sleep(1000L);
})
)
.subscribe();使用ParallelFlowable
1
2
3
4
5
6
7
8
9Flowable.just(1, 2, 3, 4, 5)
.parallel()
.runOn(Schedulers.from(exec))
.doOnNext(d -> {
System.out.println(d + "\t" + Thread.currentThread().getName());
Thread.sleep(1000L);
})
.sequential()
.subscribe();
有两个要注意的地方
- RxJava在执行并发的时候,并不会使用Executor的maximumPollSize这个属性,corePollSize有多大,那么最大就有多少个线程
- parallel()有一个重载方法可以传入并发数,默认为cpu核心数,在单核的服务器上这个数字是1,也就是不管Executor有多少个线程,只会用一个线程去执行任务
后续
最近发现在使用UnicastProcessor和ParallelFlowable的时候有cpu占用高的情况,经过跟踪发现是这样的问题:
Flowable是支持背压的,所以在元素弹出过快的时候会抛出异常,而我又使用了retry,使得在抛出异常的时候会重新订阅Flowable,而UnicastProcessor只能被订阅一次,所以抛出了大量的IllegalStateException