[英]Returns a Flowable that emits the items emitted by the source Publisher, converted to the specified type.
Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: cast does not operate by default on a particular Scheduler.
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void castNull() {
代码示例来源:origin: ReactiveX/RxJava
public Flowable<Object> apply(Flowable<? extends Throwable> t1) {
return t1.map(new Function<Throwable, Integer>() {
public Integer apply(Throwable t1) {
return 0;
代码示例来源:origin: ReactiveX/RxJava
public void testCast() {
Flowable<?> source = Flowable.just(1, 2);
Flowable<Integer> flowable = source.cast(Integer.class);
Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
verify(subscriber, times(1)).onNext(1);
verify(subscriber, times(1)).onNext(1);
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onComplete();
代码示例来源:origin: ReactiveX/RxJava
public void testCastWithWrongType() {
Flowable<?> source = Flowable.just(1, 2);
Flowable<Boolean> flowable = source.cast(Boolean.class);
Subscriber<Boolean> subscriber = TestHelper.mockSubscriber();
verify(subscriber, times(1)).onError(any(ClassCastException.class));
代码示例来源:origin: ReactiveX/RxJava
* Filters the items emitted by a Publisher, only emitting those of the specified type.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ofClass.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code ofType} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <U> the output type
* @param clazz
* the class type to filter the items emitted by the source Publisher
* @return a Flowable that emits items from the source Publisher of type {@code clazz}
* @see <a href="http://reactivex.io/documentation/operators/filter.html">ReactiveX operators documentation: Filter</a>
public final <U> Flowable<U> ofType(final Class<U> clazz) {
ObjectHelper.requireNonNull(clazz, "clazz is null");
return filter(Functions.isInstanceOf(clazz)).cast(clazz);
代码示例来源:origin: redisson/redisson
* Filters the items emitted by a Publisher, only emitting those of the specified type.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/ofClass.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code ofType} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <U> the output type
* @param clazz
* the class type to filter the items emitted by the source Publisher
* @return a Flowable that emits items from the source Publisher of type {@code clazz}
* @see <a href="http://reactivex.io/documentation/operators/filter.html">ReactiveX operators documentation: Filter</a>
public final <U> Flowable<U> ofType(final Class<U> clazz) {
ObjectHelper.requireNonNull(clazz, "clazz is null");
return filter(Functions.isInstanceOf(clazz)).cast(clazz);
代码示例来源:origin: Blankj/RxBus
private <T> Flowable<T> toFlowable(final Class<T> eventType,
final String tag,
final Scheduler scheduler) {
Flowable<T> flowable = mBus.ofType(TagMessage.class)
.filter(new Predicate<TagMessage>() {
public boolean test(TagMessage tagMessage) {
return tagMessage.isSameType(eventType, tag);
.map(new Function<TagMessage, Object>() {
public Object apply(TagMessage tagMessage) {
return tagMessage.mEvent;
if (scheduler != null) {
return flowable.observeOn(scheduler);
return flowable;
代码示例来源:origin: cr330326/DemoComponent
* 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
* @param code 事件code
* @param eventType 事件类型
private <T> Flowable<T> toObservable(final int code, final Class<T> eventType) {
return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(Message.class)
.filter(new Predicate<Message>() {
public boolean test(Message o) throws Exception {
return o.getCode() == code && eventType.isInstance(o.getObject());
}).map(new Function<Message, Object>() {
public Object apply(Message o) throws Exception {
return o.getObject();
代码示例来源:origin: LuckSiege/PictureSelectorLight
* 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
* @param code 事件code
* @param eventType 事件类型
private <T> Flowable<T> toObservable(final int code, final Class<T> eventType) {
return bus.toFlowable(BackpressureStrategy.BUFFER).ofType(Message.class)
.filter(new Predicate<Message>() {
public boolean test(Message o) throws Exception {
return o.getCode() == code && eventType.isInstance(o.getObject());
}).map(new Function<Message, Object>() {
public Object apply(Message o) throws Exception {
return o.getObject();
代码示例来源:origin: gravitee-io/graviteeio-access-management
.onErrorResumeNext(Single.error(new InvalidClientMetadataException("Unable to parse sector_identifier_uri : "+ uri.toString())))
.collect(HashSet::new,(set, value)->set.add(value))
.flatMap(allowedRedirectUris -> Observable.fromIterable(request.getRedirectUris().get())
代码示例来源:origin: io.gravitee.am.gateway.handlers/gravitee-am-gateway-handler
.onErrorResumeNext(Single.error(new InvalidClientMetadataException("Unable to parse sector_identifier_uri : "+ uri.toString())))
.collect(HashSet::new,(set, value)->set.add(value))
.flatMap(allowedRedirectUris -> Observable.fromIterable(request.getRedirectUris().get())