io.reactivex.Flowable.fromPublisher()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(11.2k)|赞(0)|评价(0)|浏览(307)

本文整理了Java中io.reactivex.Flowable.fromPublisher()方法的一些代码示例,展示了Flowable.fromPublisher()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.fromPublisher()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:fromPublisher

Flowable.fromPublisher介绍

[英]Converts an arbitrary Reactive-Streams Publisher into a Flowable if not already a Flowable.

The Publisher must follow the Reactive-Streams specification. Violating the specification may result in undefined behavior.

If possible, use #create(FlowableOnSubscribe,BackpressureStrategy) to create a source-like Flowable instead.

Note that even though Publisher appears to be a functional interface, it is not recommended to implement it through a lambda as the specification requires state management that is not achievable with a stateless lambda. Backpressure: The operator is a pass-through for backpressure and its behavior is determined by the backpressure behavior of the wrapped publisher. Scheduler: fromPublisher does not operate by default on a particular Scheduler.
[中]将任意反应流发布服务器转换为可流发布服务器(如果尚未是可流发布服务器)。
发布者必须遵循Reactive-Streams specification。违反规范可能导致未定义的行为。
如果可能,使用#create(FlowableOnSubscribe,BackPressureStragy)创建类似Flowable的源。
请注意,尽管Publisher看起来是一个功能接口,但不建议通过lambda实现它,因为规范要求使用无状态lambda无法实现状态管理。背压:操作符是背压的传递,其行为由已包装发布服务器的背压行为决定。调度程序:默认情况下,fromPublisher不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Flowable<T> apply(final Publisher<T> onSubscribe) {
    return Flowable.fromPublisher(onSubscribe);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<R> apply(Flowable<T> t) throws Exception {
    Publisher<R> p = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null Publisher");
    return Flowable.fromPublisher(p).observeOn(scheduler);
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Override
public Iterator<T> iterator() {
  LatestSubscriberIterator<T> lio = new LatestSubscriberIterator<T>();
  Flowable.<T>fromPublisher(source).materialize().subscribe(lio);
  return lio;
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void fromPublisherNull() {
  Flowable.fromPublisher(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = TestException.class)
public void firstOnError() {
  Flowable<Integer> source = Flowable.fromPublisher(new Publisher<Integer>() {
    @Override
    public void subscribe(Subscriber<? super Integer> s) {
      s.onSubscribe(new BooleanSubscription());
      s.onError(new TestException());
    }
  });
  source.blockingFirst();
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void firstFgnoredCancelAndOnNext() {
  Flowable<Integer> source = Flowable.fromPublisher(new Publisher<Integer>() {
    @Override
    public void subscribe(Subscriber<? super Integer> s) {
      s.onSubscribe(new BooleanSubscription());
      s.onNext(1);
      s.onNext(2);
    }
  });
  assertEquals(1, source.blockingFirst().intValue());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Concatenates the Publisher sequence of Publishers into a single sequence by subscribing to each inner Publisher,
 * one after the other, one at a time and delays any errors till the all inner and the outer Publishers terminate.
 * <p>
 * <img width="640" height="360" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.concatDelayError.p.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>{@code concatDelayError} fully supports backpressure.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param <T> the common element base type
 * @param sources the Publisher sequence of Publishers
 * @return the new Publisher with the concatenating behavior
 */
@SuppressWarnings({ "unchecked", "rawtypes" })
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concatDelayError(Publisher<? extends MaybeSource<? extends T>> sources) {
  return Flowable.fromPublisher(sources).concatMapDelayError((Function)MaybeToPublisher.instance());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Concatenates the Publisher sequence of Publishers into a single sequence by subscribing to each inner Publisher,
 * one after the other, one at a time and delays any errors till the all inner and the outer Publishers terminate.
 *
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>{@code concatDelayError} fully supports backpressure.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
 * </dl>
 *
 * @param <T> the common element base type
 * @param sources the Publisher sequence of Publishers
 * @param prefetch the number of elements to prefetch from the outer Publisher
 * @param tillTheEnd if true exceptions from the outer and all inner Publishers are delayed to the end
 *                   if false, exception from the outer Publisher is delayed till the current Publisher terminates
 * @return the new Publisher with the concatenating behavior
 */
@SuppressWarnings({ "unchecked", "rawtypes" })
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources, int prefetch, boolean tillTheEnd) {
  return fromPublisher(sources).concatMapDelayError((Function)Functions.identity(), prefetch, tillTheEnd);
}

代码示例来源:origin: ReactiveX/RxJava

public final <R> CylonDetectorObservable<R> boop(Function<? super T, ? extends R> func) {
  return new CylonDetectorObservable<R>(new FlowableMap<T, R>(Flowable.fromPublisher(onSubscribe), func));
}

代码示例来源:origin: ReactiveX/RxJava

public final CylonDetectorObservable<T> beep(Predicate<? super T> predicate) {
  return new CylonDetectorObservable<T>(new FlowableFilter<T>(Flowable.fromPublisher(onSubscribe), predicate));
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void wrap() {
  Flowable.fromPublisher(new Publisher<Integer>() {
    @Override
    public void subscribe(Subscriber<? super Integer> subscriber) {
      subscriber.onSubscribe(new BooleanSubscription());
      subscriber.onNext(1);
      subscriber.onNext(2);
      subscriber.onNext(3);
      subscriber.onNext(4);
      subscriber.onNext(5);
      subscriber.onComplete();
    }
  })
  .test()
  .assertResult(1, 2, 3, 4, 5);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteCrash() {
  Flowable.fromPublisher(new Publisher<Object>() {
    @Override
    public void subscribe(Subscriber<? super Object> s) {
      s.onSubscribe(new BooleanSubscription());
      s.onComplete();
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      throw new IOException();
    }
  })
  .test()
  .assertFailure(IOException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void backpressureWithBufferDropOldest() throws InterruptedException {
  int bufferSize = 3;
  final AtomicInteger droppedCount = new AtomicInteger(0);
  Action incrementOnDrop = new Action() {
    @Override
    public void run() throws Exception {
      droppedCount.incrementAndGet();
    }
  };
  TestSubscriber<Long> ts = createTestSubscriber();
  Flowable.fromPublisher(send500ValuesAndComplete.onBackpressureBuffer(bufferSize, incrementOnDrop, DROP_OLDEST))
      .subscribe(ts);
  // we request 10 but only 3 should come from the buffer
  ts.request(10);
  ts.awaitTerminalEvent();
  assertEquals(bufferSize, ts.values().size());
  ts.assertNoErrors();
  assertEquals(497, ts.values().get(0).intValue());
  assertEquals(498, ts.values().get(1).intValue());
  assertEquals(499, ts.values().get(2).intValue());
  assertEquals(droppedCount.get(), 500 - bufferSize);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(timeout = 2000)
public void backpressureWithBufferDropLatest() throws InterruptedException {
  int bufferSize = 3;
  final AtomicInteger droppedCount = new AtomicInteger(0);
  Action incrementOnDrop = new Action() {
    @Override
    public void run() throws Exception {
      droppedCount.incrementAndGet();
    }
  };
  TestSubscriber<Long> ts = createTestSubscriber();
  Flowable.fromPublisher(send500ValuesAndComplete.onBackpressureBuffer(bufferSize, incrementOnDrop, DROP_LATEST))
      .subscribe(ts);
  // we request 10 but only 3 should come from the buffer
  ts.request(10);
  ts.awaitTerminalEvent();
  assertEquals(bufferSize, ts.values().size());
  ts.assertNoErrors();
  assertEquals(0, ts.values().get(0).intValue());
  assertEquals(1, ts.values().get(1).intValue());
  assertEquals(499, ts.values().get(2).intValue());
  assertEquals(droppedCount.get(), 500 - bufferSize);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void firstIgnoredCancelAndOnError() {
  List<Throwable> list = TestHelper.trackPluginErrors();
  try {
    Flowable<Integer> source = Flowable.fromPublisher(new Publisher<Integer>() {
      @Override
      public void subscribe(Subscriber<? super Integer> s) {
        s.onSubscribe(new BooleanSubscription());
        s.onNext(1);
        s.onError(new TestException());
      }
    });
    assertEquals(1, source.blockingFirst().intValue());
    TestHelper.assertUndeliverable(list, 0, TestException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteCrashConditional() {
  Flowable.fromPublisher(new Publisher<Object>() {
    @Override
    public void subscribe(Subscriber<? super Object> s) {
      s.onSubscribe(new BooleanSubscription());
      s.onComplete();
    }
  })
  .doOnComplete(new Action() {
    @Override
    public void run() throws Exception {
      throw new IOException();
    }
  })
  .filter(Functions.alwaysTrue())
  .test()
  .assertFailure(IOException.class);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onErrorAfterCrash() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Flowable.fromPublisher(new Publisher<Object>() {
      @Override
      public void subscribe(Subscriber<? super Object> s) {
        s.onSubscribe(new BooleanSubscription());
        s.onError(new TestException());
      }
    })
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        throw new IOException();
      }
    })
    .test()
    .assertFailure(TestException.class);
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteAfterCrash() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Flowable.fromPublisher(new Publisher<Object>() {
      @Override
      public void subscribe(Subscriber<? super Object> s) {
        s.onSubscribe(new BooleanSubscription());
        s.onComplete();
      }
    })
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        throw new IOException();
      }
    })
    .test()
    .assertResult();
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void overflowReported() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Completable.concat(
      Flowable.fromPublisher(new Publisher<Completable>() {
        @Override
        public void subscribe(Subscriber<? super Completable> s) {
          s.onSubscribe(new BooleanSubscription());
          s.onNext(Completable.never());
          s.onNext(Completable.never());
          s.onNext(Completable.never());
          s.onNext(Completable.never());
          s.onComplete();
        }
      }), 1
    )
    .test()
    .assertFailure(MissingBackpressureException.class);
    TestHelper.assertError(errors, 0, MissingBackpressureException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void onCompleteAfterCrashConditional() {
  List<Throwable> errors = TestHelper.trackPluginErrors();
  try {
    Flowable.fromPublisher(new Publisher<Object>() {
      @Override
      public void subscribe(Subscriber<? super Object> s) {
        s.onSubscribe(new BooleanSubscription());
        s.onComplete();
      }
    })
    .doAfterTerminate(new Action() {
      @Override
      public void run() throws Exception {
        throw new IOException();
      }
    })
    .filter(Functions.alwaysTrue())
    .test()
    .assertResult();
    TestHelper.assertUndeliverable(errors, 0, IOException.class);
  } finally {
    RxJavaPlugins.reset();
  }
}

相关文章

Flowable类方法