用例有两个数据源:
服务1-从源1获取
服务2-从源2获取
应用程序应该至少从source-1返回数据。如果source-2一切正常-数据将被“增强”,比如乘以100。
服务1呼叫服务2。
如果所有成功的用户都从服务1和服务2获取数据(如果服务2上有错误),则用户仅从服务1获取数据(至少),如果服务1上有错误,则用户将获取错误。
有一个hello world bench代码,它模拟了这个场景:
import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.TimeUnit;
class Response {
public Integer value;
public String warning;
public Response(Integer value) {
this.value = value;
}
@Override
public String toString() {
return "Response{" +
"value=" + value +
", warning='" + warning + '\'' +
'}';
}
}
class Service1 {
public Observable<Response> call(int arg) {
return Observable
.just(
new Response(1),
new Response(2),
new Response(3),
new Response(4))
.delay(100, TimeUnit.MILLISECONDS);
}
}
class Service2 {
public Observable<Response> call(int arg) {
if ( arg % 2 == 0) {
System.out.println("service 2: " + arg);
return Observable
.just(new Response(100 * arg)) // service 2 multiplies x 100 on the result it gets from the service 1
.delay(10, TimeUnit.MILLISECONDS);
} else {
System.out.println("service 2: " + arg);
return Observable.error(new RuntimeException("service 2 error"));
}
}
}
public class Step1 {
static Service1 service1 = new Service1();
static Service2 service2 = new Service2();
public static void main(String[] args) throws InterruptedException {
var oo1 = service1.call(1);
var oo3 = oo1.switchMapDelayError(x -> {
final Observable<Response> oo2 = service2.call(x.value);
return oo2
.onErrorReturn((ex) -> {
//System.out.println("Error handling..." + ex.getMessage() + " " + x);
x.warning = ex.getMessage();
return x; // returns at least service1 result
});
});
oo3.subscribe(x -> {
System.out.println(x);
});
Thread.sleep(100000);
}
}
此代码的结果是:
service 2: 1
Response{value=1, warning='service 2 error'}
service 2: 2
service 2: 3
Response{value=3, warning='service 2 error'}
service 2: 4
Response{value=400, warning='null'}
问题是:没有预期的: value=200
2*100
但是,如果我在service2.call()//.delay(10,timeunit.millishes)处注解一个延迟,那么它将得到预期的结果:
service 2: 1
Response{value=1, warning='service 2 error'}
service 2: 2
Response{value=200, warning='null'}
service 2: 3
Response{value=3, warning='service 2 error'}
service 2: 4
Response{value=400, warning='null'}
问题是:为什么 .delay(10, TimeUnit.MILLISECONDS) on service2.call()
它不能产生值=200?这个解决方案有什么问题,我错过了什么?
谢谢。
1条答案
按热度按时间6rqinv9w1#
你的问题是
switchMapDelayError
接线员。您应该使用concatmap或flatmap我冒昧地为您的用例编写了一个测试。请注意,始终使用重载来提供
Scheduler
为了提供TestScheduler
用于测试。switchmap做什么?
在每个上游emit switchmap上订阅给定的内部流。当一个新的值从上游发出时,旧的内部流被取消订阅,并且switchmap的lambda被再次调用以订阅新的内部流。
问题可能是以下代码:
它几乎一个接一个地在堆栈上发出响应1到4,并且每个发出都在另一个线程上延迟。因此,响应1到4几乎会立即发出。它们不会像以下那样发出:100ms时的响应(1),200ms时的响应(2),等等。
让我们看看输出是为了什么
输出
因此,所有值几乎都会立即发出,并用switchmap相互覆盖。以前发出的值几乎立即被新值抵消。
解决方案
使用concatmap或flatmap或更改测试设置以100ms的间隔发出每个值。
flatmap只订阅每个值,默认情况下最多128个内部流。当内部流完成时,concatmap将只订阅下一个值。
测试
域
注意
不要使用可变对象。始终确保发出的值是不可变的,否则会遇到麻烦。