本文整理了Java中io.reactivex.Observable.generate()
方法的一些代码示例,展示了Observable.generate()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.generate()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:generate
[英]Returns a cold, synchronous and stateless generator of values.
Scheduler: generate does not operate by default on a particular Scheduler.
[中]返回一个冷的、同步的、无状态的值生成器。
调度器:默认情况下,generate不会在特定的调度器上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void generateStateFunctionInitialStateNull() {
Observable.generate(null, new BiFunction<Object, Emitter<Object>, Object>() {
@Override
public Object apply(Object s, Emitter<Object> o) { o.onNext(1); return s; }
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void generateStateConsumerNull() {
Observable.generate(new Callable<Integer>() {
@Override
public Integer call() {
return 1;
}
}, (BiConsumer<Integer, Emitter<Object>>)null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void generateStateConsumerInitialStateNull() {
BiConsumer<Integer, Emitter<Integer>> generator = new BiConsumer<Integer, Emitter<Integer>>() {
@Override
public void accept(Integer s, Emitter<Integer> o) {
o.onNext(1);
}
};
Observable.generate(null, generator);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void generateFunctionDisposeNull() {
Observable.generate(new Callable<Object>() {
@Override
public Object call() {
return 1;
}
}, new BiFunction<Object, Emitter<Object>, Object>() {
@Override
public Object apply(Object s, Emitter<Object> o) { o.onNext(1); return s; }
}, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void generateConsumerNull() {
Observable.generate(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void generateConsumerDisposeNull() {
BiConsumer<Integer, Emitter<Integer>> generator = new BiConsumer<Integer, Emitter<Integer>>() {
@Override
public void accept(Integer s, Emitter<Integer> o) {
o.onNext(1);
}
};
Observable.generate(new Callable<Integer>() {
@Override
public Integer call() {
return 1;
}
}, generator, null);
}
代码示例来源:origin: ReactiveX/RxJava
public static Observable<Event> getEventStream(final String type, final int numInstances) {
return Observable.<Event>generate(new EventConsumer(numInstances, type)).subscribeOn(Schedulers.newThread());
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Returns a cold, synchronous and stateful generator of values.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/generate.2.png" alt="">
* <p>
* Note that the {@link Emitter#onNext}, {@link Emitter#onError} and
* {@link Emitter#onComplete} methods provided to the function via the {@link Emitter} instance should be called synchronously,
* never concurrently and only while the function body is executing. Calling them from multiple threads
* or outside the function call is not supported and leads to an undefined behavior.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code generate} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <S> the type of the per-Observer state
* @param <T> the generated value type
* @param initialState the Callable to generate the initial state for each Observer
* @param generator the Function called with the current state whenever a particular downstream Observer has
* requested a value. The callback then should call {@code onNext}, {@code onError} or
* {@code onComplete} to signal a value or a terminal event and should return a (new) state for
* the next invocation. Signalling multiple {@code onNext}
* in a call will make the operator signal {@code IllegalStateException}.
* @return the new Observable instance
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator) {
return generate(initialState, generator, Functions.emptyConsumer());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void generateFunctionStateNullAllowed() {
Observable.generate(new Callable<Object>() {
@Override
public Object call() {
return null;
}
}, new BiFunction<Object, Emitter<Object>, Object>() {
@Override
public Object apply(Object s, Emitter<Object> o) { o.onComplete(); return s; }
}).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void generateConsumerEmitsNull() {
Observable.generate(new Consumer<Emitter<Object>>() {
@Override
public void accept(Emitter<Object> s) {
s.onNext(null);
}
}).blockingLast();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void generateConsumerStateNullAllowed() {
BiConsumer<Integer, Emitter<Integer>> generator = new BiConsumer<Integer, Emitter<Integer>>() {
@Override
public void accept(Integer s, Emitter<Integer> o) {
o.onComplete();
}
};
Observable.generate(new Callable<Integer>() {
@Override
public Integer call() {
return null;
}
}, generator).blockingSubscribe();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void multipleOnComplete() {
Observable.generate(new Consumer<Emitter<Object>>() {
@Override
public void accept(Emitter<Object> e) throws Exception {
e.onComplete();
e.onComplete();
}
})
.test()
.assertResult();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void multipleOnNext() {
Observable.generate(new Consumer<Emitter<Object>>() {
@Override
public void accept(Emitter<Object> e) throws Exception {
e.onNext(1);
e.onNext(2);
}
})
.test()
.assertFailure(IllegalStateException.class, 1);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Observable.generate(new Callable<Object>() {
@Override
public Object call() throws Exception {
return 1;
}
}, new BiConsumer<Object, Emitter<Object>>() {
@Override
public void accept(Object s, Emitter<Object> e) throws Exception {
e.onComplete();
}
}, Functions.emptyConsumer()));
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void generatorThrows() {
Observable.generate(new Callable<Object>() {
@Override
public Object call() throws Exception {
return 1;
}
}, new BiConsumer<Object, Emitter<Object>>() {
@Override
public void accept(Object s, Emitter<Object> e) throws Exception {
throw new TestException();
}
}, Functions.emptyConsumer())
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void stateSupplierThrows() {
Observable.generate(new Callable<Object>() {
@Override
public Object call() throws Exception {
throw new TestException();
}
}, new BiConsumer<Object, Emitter<Object>>() {
@Override
public void accept(Object s, Emitter<Object> e) throws Exception {
e.onNext(s);
}
}, Functions.emptyConsumer())
.test()
.assertFailure(TestException.class);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void statefulBiconsumer() {
Observable.generate(new Callable<Object>() {
@Override
public Object call() throws Exception {
return 10;
}
}, new BiConsumer<Object, Emitter<Object>>() {
@Override
public void accept(Object s, Emitter<Object> e) throws Exception {
e.onNext(s);
}
}, new Consumer<Object>() {
@Override
public void accept(Object d) throws Exception {
}
})
.take(5)
.test()
.assertResult(10, 10, 10, 10, 10);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void multipleOnError() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Observable.generate(new Consumer<Emitter<Object>>() {
@Override
public void accept(Emitter<Object> e) throws Exception {
e.onError(new TestException("First"));
e.onError(new TestException("Second"));
}
})
.test()
.assertFailure(TestException.class);
TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second");
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void nullError() {
final int[] call = { 0 };
Observable.generate(Functions.justCallable(1),
new BiConsumer<Integer, Emitter<Object>>() {
@Override
public void accept(Integer s, Emitter<Object> e) throws Exception {
try {
e.onError(null);
} catch (NullPointerException ex) {
call[0]++;
}
}
}, Functions.emptyConsumer())
.test()
.assertFailure(NullPointerException.class);
assertEquals(0, call[0]);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void disposerThrows() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Observable.generate(new Callable<Object>() {
@Override
public Object call() throws Exception {
return 1;
}
}, new BiConsumer<Object, Emitter<Object>>() {
@Override
public void accept(Object s, Emitter<Object> e) throws Exception {
e.onComplete();
}
}, new Consumer<Object>() {
@Override
public void accept(Object d) throws Exception {
throw new TestException();
}
})
.test()
.assertResult();
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
内容来源于网络,如有侵权,请联系作者删除!