rxjava如何使flatmap在多线程上运行

zpjtge22  于 2021-06-29  发布在  Java
关注(0)|答案(1)|浏览(545)

我希望从flatmap发出的每个项目都在自己的线程上运行
这是一个实际用法的简化示例,其中每个项目都是一个url请求。
在每个线程上添加subscribeon(schedulers.io())仍然在单个线程上运行
这里有什么规定?

Integer[] array= new Integer[100];
for (int i = 0; i < 100; i++){
    array[i] = i+1;
}

Observable.fromArray(array)
        .flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
            @Override
            public SingleSource<Integer> apply(Integer i) throws Throwable {
                Log.i(TAG, "apply " +  i + " " + Thread.currentThread().getName());
                return Single.just(i).subscribeOn(Schedulers.io());
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread()) 
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }

            @Override
            public void onNext(@NonNull Integer i) {
               // Log.i(TAG, "onNext " + Thread.currentThread().getName() + i);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {
            }
        });

结果:

2020-12-16 22:54:47.010 10649-10700/com.example.rxjava I/MYTAG: apply 1 RxCachedThreadScheduler-1
2020-12-16 22:54:47.037 10649-10700/com.example.rxjava I/MYTAG: apply 2 RxCachedThreadScheduler-1
2020-12-16 22:54:47.038 10649-10700/com.example.rxjava I/MYTAG: apply 3 RxCachedThreadScheduler-1
2020-12-16 22:54:47.039 10649-10700/com.example.rxjava I/MYTAG: apply 4 RxCachedThreadScheduler-1
2020-12-16 22:54:47.040 10649-10700/com.example.rxjava I/MYTAG: apply 5 RxCachedThreadScheduler-1
2020-12-16 22:54:47.043 10649-10700/com.example.rxjava I/MYTAG: apply 6 RxCachedThreadScheduler-1
2020-12-16 22:54:47.051 10649-10700/com.example.rxjava I/MYTAG: apply 7 RxCachedThreadScheduler-1
2020-12-16 22:54:47.051 10649-10700/com.example.rxjava I/MYTAG: apply 8 RxCachedThreadScheduler-1
h7wcgrx3

h7wcgrx31#

除了使用 just ,它获取一个现有对象,从而获取之前创建和计算该对象的内容。在这种情况下,它是 flatMapSingle 从同一线程调用。
您必须使计算本身成为流的一部分,以便通过 fromCallable 例如:

Observable.fromArray(array)
.flatMapSingle(i -> {
    return Single.fromCallable(() -> {
        Log.i(TAG, "apply " +  i + " " + Thread.currentThread().getName());
        return i + 1000;
    })
    .subscribeOn(Schedulers.io());
})
.observeOn(AndroidSchedulers.mainThread())
// ...
;

相关问题