io.reactivex.Observable.combineLatestDelayError()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(8.8k)|赞(0)|评价(0)|浏览(159)

本文整理了Java中io.reactivex.Observable.combineLatestDelayError()方法的一些代码示例,展示了Observable.combineLatestDelayError()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.combineLatestDelayError()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:combineLatestDelayError

Observable.combineLatestDelayError介绍

[英]Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of the source ObservableSources each time an item is received from any of the source ObservableSources, where this aggregation is defined by a specified function and delays any error from the sources until all source ObservableSources terminate.

Note on method signature: since Java doesn't allow creating a generic array with new T[], the implementation of this operator has to create an Object[] instead. Unfortunately, a Function passed to the method would trigger a ClassCastException.

If any of the sources never produces an item but only terminates (normally or with an error), the resulting sequence terminates immediately (normally or with all the errors accumulated till that point). If that input source is also synchronous, other sources after it will not be subscribed to.

If there are no ObservableSources provided, the resulting sequence completes immediately without emitting any items and without any calls to the combiner function. Scheduler: combineLatestDelayError does not operate by default on a particular Scheduler.
[中]通过在每次从任何源可观测资源接收到一个项目时,发送一个项目来聚合每个源可观测资源的最新值,从而组合源可观测资源的集合,其中,该聚合由指定函数定义,并延迟源的任何错误,直到所有源可观测源终止。
关于方法签名的注意事项:由于Java不允许使用新的t[]创建泛型数组,因此该操作符的实现必须创建一个对象[]。不幸的是,传递给该方法的函数将触发ClassCastException。
如果任何一个源从未生成一个项,但只终止(正常或有错误),则生成的序列立即终止(正常或所有错误累积到该点)。如果该输入源也是同步的,则不会订阅它之后的其他源。
如果没有提供可观察的资源,则生成的序列将立即完成,而不会发出任何项,也不会调用组合器函数。调度程序:CombineRelatestDelayError默认情况下不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorVarargsNull() {
  Observable.combineLatestDelayError(new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] v) {
      return 1;
    }
  }, 128, (Observable<Object>[])null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorIterableNull() {
  Observable.combineLatestDelayError((Iterable<Observable<Object>>)null, new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] v) {
      return 1;
    }
  }, 128);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorIterableFunctionNull() {
  Observable.combineLatestDelayError(Arrays.asList(just1), null, 128);
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorIterableFunctionReturnsNull() {
  Observable.combineLatestDelayError(Arrays.asList(just1), new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] v) {
      return null;
    }
  }, 128).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorVarargsFunctionNull() {
  Observable.combineLatestDelayError(null, 128, Observable.never());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@SuppressWarnings("unchecked")
public void combineLatestDelayErrorIterableOfSources() {
  Observable.combineLatestDelayError(Arrays.asList(
      Observable.just(1), Observable.just(2)
  ), new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] a) throws Exception {
      return Arrays.toString(a);
    }
  })
  .test()
  .assertResult("[1, 2]");
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorVarargsFunctionReturnsNull() {
  Observable.combineLatestDelayError(new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] v) {
      return null;
    }
  }, 128, just1).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

public static <T, R> Observable<R> combineLatestDelayError(Iterable<? extends ObservableSource<? extends T>> sources,
    Function<? super Object[], ? extends R> combiner) {
  return combineLatestDelayError(sources, combiner, bufferSize());

代码示例来源:origin: ReactiveX/RxJava

public static <T, R> Observable<R> combineLatestDelayError(ObservableSource<? extends T>[] sources,
    Function<? super Object[], ? extends R> combiner) {
  return combineLatestDelayError(sources, combiner, bufferSize());

代码示例来源:origin: ReactiveX/RxJava

@Test
@SuppressWarnings("unchecked")
public void combineLatestDelayErrorArrayOfSources() {
  Observable.combineLatestDelayError(new ObservableSource[] {
      Observable.just(1), Observable.just(2)
  }, new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] a) throws Exception {
      return Arrays.toString(a);
    }
  })
  .test()
  .assertResult("[1, 2]");
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorIterableIteratorNull() {
  Observable.combineLatestDelayError(new Iterable<Observable<Object>>() {
    @Override
    public Iterator<Observable<Object>> iterator() {
      return null;
    }
  }, new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] v) {
      return 1;
    }
  }, 128).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorIterableOneIsNull() {
  Observable.combineLatestDelayError(Arrays.asList(Observable.never(), null), new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] v) {
      return 1;
    }
  }, 128).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test(expected = NullPointerException.class)
public void combineLatestDelayErrorVarargsOneIsNull() {
  Observable.combineLatestDelayError(new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] v) {
      return 1;
    }
  }, 128, Observable.never(), null).blockingLast();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@SuppressWarnings("unchecked")
public void combineLatestDelayErrorIterableOfSourcesWithError() {
  Observable.combineLatestDelayError(Arrays.asList(
      Observable.just(1), Observable.just(2).concatWith(Observable.<Integer>error(new TestException()))
  ), new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] a) throws Exception {
      return Arrays.toString(a);
    }
  })
  .test()
  .assertFailure(TestException.class, "[1, 2]");
}

代码示例来源:origin: ReactiveX/RxJava

@Test
@SuppressWarnings("unchecked")
public void combineLatestDelayErrorArrayOfSourcesWithError() {
  Observable.combineLatestDelayError(new ObservableSource[] {
      Observable.just(1), Observable.just(2).concatWith(Observable.<Integer>error(new TestException()))
  }, new Function<Object[], Object>() {
    @Override
    public Object apply(Object[] a) throws Exception {
      return Arrays.toString(a);
    }
  })
  .test()
  .assertFailure(TestException.class, "[1, 2]");
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void combineLatestDelayErrorEmpty() {
  assertSame(Observable.empty(), Observable.combineLatestDelayError(new ObservableSource[0], Functions.<Object[]>identity(), 16));
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void errorDelayed() {
  Observable.combineLatestDelayError(
      new Function<Object[], Object>() {
        @Override
        public Object apply(Object[] a) throws Exception {
          return a;
        }
      },
      128,
      Observable.error(new TestException()),
      Observable.just(1)
  )
  .test()
  .assertFailure(TestException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  @SuppressWarnings("unchecked")
  public void syncFirstErrorsAfterItemDelayError() {
    Observable.combineLatestDelayError(Arrays.asList(
          Observable.just(21).concatWith(Observable.<Integer>error(new TestException())),
          Observable.just(21).delay(100, TimeUnit.MILLISECONDS)
        ),
        new Function<Object[], Object>() {
          @Override
          public Object apply(Object[] a) throws Exception {
            return (Integer)a[0] + (Integer)a[1];
          }
        }
        )
    .test()
    .awaitDone(5, TimeUnit.SECONDS)
    .assertFailure(TestException.class, 42);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void dontSubscribeIfDone2() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    final int[] count = { 0 };
    Observable.combineLatestDelayError(
        Arrays.asList(Observable.empty(),
          Observable.error(new TestException())
          .doOnSubscribe(new Consumer<Disposable>() {
            @Override
            public void accept(Disposable d) throws Exception {
              count[0]++;
            }
          })
        ),
        new Function<Object[], Object>() {
          @Override
          public Object apply(Object[] a) throws Exception {
            return 0;
          }
        })
    .test()
    .assertResult();
    assertEquals(0, count[0]);
    assertTrue(errors.toString(), errors.isEmpty());
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@SuppressWarnings("unchecked")
@Test
public void errorDelayed2() {
  Observable.combineLatestDelayError(
      new Function<Object[], Object>() {
        @Override
        public Object apply(Object[] a) throws Exception {
          return a;
        }
      },
      128,
      Observable.error(new TestException()).startWith(1),
      Observable.empty()
  )
  .test()
  .assertFailure(TestException.class);
}

相关文章

Observable类方法