io.reactivex.Flowable.doOnTerminate()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(4.0k)|赞(0)|评价(0)|浏览(156)

本文整理了Java中io.reactivex.Flowable.doOnTerminate()方法的一些代码示例,展示了Flowable.doOnTerminate()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.doOnTerminate()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:doOnTerminate

Flowable.doOnTerminate介绍

[英]Modifies the source Publisher so that it invokes an action when it calls onComplete or onError.

This differs from doAfterTerminate in that this happens before the onComplete or onError notification. Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: doOnTerminate does not operate by default on a particular Scheduler.
[中]修改源发布服务器,使其在调用onComplete或onError时调用操作。
这与doAfterTerminate的不同之处在于,这发生在onComplete或onError通知之前。背压:操作员不会干扰由源发布者的背压行为确定的背压。调度程序:默认情况下,DooInterminate不会在特定调度程序上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void doOnTerminatedNull() {
  just1.doOnTerminate(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
  public void doOnTerminateComplete() {
    final AtomicBoolean r = new AtomicBoolean();
    String output = Flowable.just("one").doOnTerminate(new Action() {
      @Override
      public void run() {
        r.set(true);
      }
    }).blockingSingle();

    assertEquals("one", output);
    assertTrue(r.get());

  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void doOnTerminateError() {
  final AtomicBoolean r = new AtomicBoolean();
  Flowable.<String>error(new TestException()).doOnTerminate(new Action() {
    @Override
    public void run() {
      r.set(true);
    }
  })
  .test()
  .assertFailure(TestException.class);
  assertTrue(r.get());
}

代码示例来源:origin: vert-x3/vertx-examples

@Override
 public void start() throws Exception {

  JsonObject config = new JsonObject().put("url", "jdbc:hsqldb:mem:test?shutdown=true")
   .put("driver_class", "org.hsqldb.jdbcDriver");

  JDBCClient jdbc = JDBCClient.createShared(vertx, config);

  jdbc
   .rxGetConnection() // Connect to the database
   .flatMapPublisher(conn -> { // With the connection...
    return conn.rxUpdate("CREATE TABLE test(col VARCHAR(20))") // ...create test table
     .flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val1')")) // ...insert a row
     .flatMap(result -> conn.rxUpdate("INSERT INTO test (col) VALUES ('val2')")) // ...another one
     .flatMap(result -> conn.rxQueryStream("SELECT * FROM test")) // ...get values stream
     .flatMapPublisher(sqlRowStream -> {
      return sqlRowStream.toFlowable() // Transform the stream into a Flowable...
       .doOnTerminate(() -> {
        // ...and close the connection when the stream is fully read or an
        // error occurs
        conn.close();
        System.out.println("Connection closed");
       });
     });
   }).subscribe(row -> System.out.println("Row : " + row.encode()));
 }
}

代码示例来源:origin: io.reactivex.rxjava2/rxjavafx

/**
 * Performs the provided onTerminate action on the FX thread
 * @param onTerminate
 * @param <T>
 */
public static <T> FlowableTransformer<T,T> doOnTerminateFx(Action onTerminate) {
  return obs -> obs.doOnTerminate(() -> runOnFx(onTerminate));
}

代码示例来源:origin: io.reactivex/rxjavafx

/**
 * Performs the provided onTerminate action on the FX thread
 * @param onTerminate
 * @param <T>
 */
public static <T> FlowableTransformer<T,T> doOnTerminateFx(Action onTerminate) {
  return obs -> obs.doOnTerminate(() -> runOnFx(onTerminate));
}

代码示例来源:origin: gentics/mesh

public static Flowable<Buffer> toBufferFlow(io.vertx.reactivex.core.file.AsyncFile file) {
  return file.toFlowable()
    .map(io.vertx.reactivex.core.buffer.Buffer::getDelegate)
    .doOnTerminate(file::close)
    .doOnCancel(file::close);
}

代码示例来源:origin: io.smallrye.reactive/smallrye-reactive-streams-operators

@Override
  public Flowable<O> get() {
    CancellablePublisher<O> cancellable = new CancellablePublisher<>(engine.buildPublisher(second));
    return Flowable.concat(engine.buildPublisher(first), cancellable)
        .doOnCancel(cancellable::cancelIfNotSubscribed)
        .doOnTerminate(cancellable::cancelIfNotSubscribed);
  }
}

代码示例来源:origin: quanturium/bouquet

getMessageManager().printEvent(getComponentInfo(), RxEvent.COMPLETE);
})
.doOnTerminate(() -> {
  if (getScope() == RxLogger.Scope.ALL || getScope() == RxLogger.Scope.LIFECYCLE)
    getMessageManager().printEvent(getComponentInfo(), RxEvent.TERMINATE);

相关文章

Flowable类方法