io.reactivex.Flowable.timestamp()方法的使用及代码示例

x33g5p2x  于2022-01-19 转载在 其他  
字(17.9k)|赞(0)|评价(0)|浏览(149)

本文整理了Java中io.reactivex.Flowable.timestamp()方法的一些代码示例,展示了Flowable.timestamp()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.timestamp()方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:timestamp

Flowable.timestamp介绍

[英]Returns a Flowable that emits each item emitted by the source Publisher, wrapped in a Timed object.

Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: timestamp does not operate on any particular scheduler but uses the current time from the computation Scheduler.
[中]返回一个FlowTable,该FlowTable发出源发布服务器发出的每个项,并封装在一个定时对象中。
背压:操作员不会干扰由源发布者的背压行为确定的背压。调度器:时间戳不在任何特定的调度器上运行,而是使用计算调度器中的当前时间。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Override
  public Publisher<Timed<Integer>> createPublisher(long elements) {
    return
        Flowable.range(0, (int)elements).timestamp()
    ;
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void timestampSchedulerNull() {
  just1.timestamp(TimeUnit.SECONDS, null);
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns a Flowable that emits each item emitted by the source Publisher, wrapped in a
 * {@link Timed} object whose timestamps are provided by a specified Scheduler.
 * <p>
 * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timestamp.s.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
 *  behavior.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>This operator does not operate on any particular scheduler but uses the current time
 *  from the specified {@link Scheduler}.</dd>
 * </dl>
 *
 * @param scheduler
 *            the {@link Scheduler} to use as a time source
 * @return a Flowable that emits timestamped items from the source Publisher with timestamps provided by
 *         the {@code scheduler}
 * @see <a href="http://reactivex.io/documentation/operators/timestamp.html">ReactiveX operators documentation: Timestamp</a>
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE) // Supplied scheduler is only used for creating timestamps.
public final Flowable<Timed<T>> timestamp(Scheduler scheduler) {
  return timestamp(TimeUnit.MILLISECONDS, scheduler);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void timestampUnitNull() {
  just1.timestamp(null, Schedulers.single());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns a Flowable that emits each item emitted by the source Publisher, wrapped in a
 * {@link Timed} object.
 * <p>
 * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timestamp.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
 *  behavior.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code timestamp} does not operate on any particular scheduler but uses the current time
 *  from the {@code computation} {@link Scheduler}.</dd>
 * </dl>
 *
 * @return a Flowable that emits timestamped items from the source Publisher
 * @see <a href="http://reactivex.io/documentation/operators/timestamp.html">ReactiveX operators documentation: Timestamp</a>
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<Timed<T>> timestamp() {
  return timestamp(TimeUnit.MILLISECONDS, Schedulers.computation());
}

代码示例来源:origin: ReactiveX/RxJava

/**
 * Returns a Flowable that emits each item emitted by the source Publisher, wrapped in a
 * {@link Timed} object.
 * <p>
 * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timestamp.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
 *  behavior.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code timestamp} does not operate on any particular scheduler but uses the current time
 *  from the {@code computation} {@link Scheduler}.</dd>
 * </dl>
 *
 * @param unit the time unit for the current time
 * @return a Flowable that emits timestamped items from the source Publisher
 * @see <a href="http://reactivex.io/documentation/operators/timestamp.html">ReactiveX operators documentation: Timestamp</a>
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<Timed<T>> timestamp(TimeUnit unit) {
  return timestamp(unit, Schedulers.computation());
}

代码示例来源:origin: redisson/redisson

/**
 * Returns a Flowable that emits each item emitted by the source Publisher, wrapped in a
 * {@link Timed} object whose timestamps are provided by a specified Scheduler.
 * <p>
 * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timestamp.s.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
 *  behavior.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>This operator does not operate on any particular scheduler but uses the current time
 *  from the specified {@link Scheduler}.</dd>
 * </dl>
 *
 * @param scheduler
 *            the {@link Scheduler} to use as a time source
 * @return a Flowable that emits timestamped items from the source Publisher with timestamps provided by
 *         the {@code scheduler}
 * @see <a href="http://reactivex.io/documentation/operators/timestamp.html">ReactiveX operators documentation: Timestamp</a>
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE) // Supplied scheduler is only used for creating timestamps.
public final Flowable<Timed<T>> timestamp(Scheduler scheduler) {
  return timestamp(TimeUnit.MILLISECONDS, scheduler);
}

代码示例来源:origin: redisson/redisson

/**
 * Returns a Flowable that emits each item emitted by the source Publisher, wrapped in a
 * {@link Timed} object.
 * <p>
 * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timestamp.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
 *  behavior.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code timestamp} does not operate on any particular scheduler but uses the current time
 *  from the {@code computation} {@link Scheduler}.</dd>
 * </dl>
 *
 * @return a Flowable that emits timestamped items from the source Publisher
 * @see <a href="http://reactivex.io/documentation/operators/timestamp.html">ReactiveX operators documentation: Timestamp</a>
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<Timed<T>> timestamp() {
  return timestamp(TimeUnit.MILLISECONDS, Schedulers.computation());
}

代码示例来源:origin: redisson/redisson

/**
 * Returns a Flowable that emits each item emitted by the source Publisher, wrapped in a
 * {@link Timed} object.
 * <p>
 * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timestamp.png" alt="">
 * <dl>
 *  <dt><b>Backpressure:</b></dt>
 *  <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
 *  behavior.</dd>
 *  <dt><b>Scheduler:</b></dt>
 *  <dd>{@code timestamp} does not operate on any particular scheduler but uses the current time
 *  from the {@code computation} {@link Scheduler}.</dd>
 * </dl>
 *
 * @param unit the time unit for the current time
 * @return a Flowable that emits timestamped items from the source Publisher
 * @see <a href="http://reactivex.io/documentation/operators/timestamp.html">ReactiveX operators documentation: Timestamp</a>
 */
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<Timed<T>> timestamp(TimeUnit unit) {
  return timestamp(unit, Schedulers.computation());
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void timeIntervalDefault() {
  final TestScheduler scheduler = new TestScheduler();
  RxJavaPlugins.setComputationSchedulerHandler(new Function<Scheduler, Scheduler>() {
    @Override
    public Scheduler apply(Scheduler v) throws Exception {
      return scheduler;
    }
  });
  try {
    Flowable.range(1, 5)
    .timestamp()
    .map(new Function<Timed<Integer>, Long>() {
      @Override
      public Long apply(Timed<Integer> v) throws Exception {
        return v.time();
      }
    })
    .test()
    .assertResult(0L, 0L, 0L, 0L, 0L);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void timeIntervalDefaultSchedulerCustomUnit() {
  final TestScheduler scheduler = new TestScheduler();
  RxJavaPlugins.setComputationSchedulerHandler(new Function<Scheduler, Scheduler>() {
    @Override
    public Scheduler apply(Scheduler v) throws Exception {
      return scheduler;
    }
  });
  try {
    Flowable.range(1, 5)
    .timestamp(TimeUnit.SECONDS)
    .map(new Function<Timed<Integer>, Long>() {
      @Override
      public Long apply(Timed<Integer> v) throws Exception {
        return v.time();
      }
    })
    .test()
    .assertResult(0L, 0L, 0L, 0L, 0L);
  } finally {
    RxJavaPlugins.reset();
  }
}

代码示例来源:origin: bsideup/graphql-java-reactive

@Test
public void testCompletableFuture() throws Exception {
  ReactiveExecutionStrategy strategy = new ReactiveExecutionStrategy();
  GraphQLSchema schema = newQuerySchema(it -> it
      .field(newStringField("a").dataFetcher(env -> CompletableFuture.completedFuture("static")))
  );
  ExecutionResult executionResult = new GraphQL(schema, strategy).execute("{ a }");
  Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);
  subscriber
      .assertChanges(it -> it.containsExactly(
          tuple("00:000", "", ImmutableMap.of("a", "static"))
      ))
      .assertComplete();
}

代码示例来源:origin: bsideup/graphql-java-reactive

@Test
public void testPlainField() throws Exception {
  GraphQLSchema schema = newQuerySchema(it -> it
      .field(newLongField("a").dataFetcher(env -> Flowable.interval(1, SECONDS, scheduler).take(2)))
  );
  ExecutionResult executionResult = new GraphQL(schema, strategy).execute("{ a }");
  assertThat(executionResult)
      .isNotNull()
      .satisfies(it -> assertThat(it.<Publisher<Change>>getData()).isNotNull().isInstanceOf(Publisher.class));
  Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);
  scheduler.advanceTimeBy(2, SECONDS);
  subscriber
      .assertChanges(it -> it.containsExactly(
          tuple("01:000", "", ImmutableMap.of("a", 0L)),
          tuple("02:000", "a", 1L)
      ))
      .assertComplete();
}

代码示例来源:origin: bsideup/graphql-java-reactive

@Test
public void testArrayStartsWith() throws Exception {
  GraphQLObjectType innerType = newObject()
      .name("Inner")
      .field(newStringField("b").staticValue("foo"))
      .build();
  GraphQLSchema schema = newQuerySchema(it -> it
      .field(
          newField("a", new GraphQLList(innerType))
              .dataFetcher(env -> Flowable
                  .just(asList(true, true, true))
                  .delay(1, SECONDS, scheduler)
                  .startWith(new ArrayList<Boolean>())
              )
      )
  );
  ExecutionResult executionResult = new GraphQL(schema, strategy).execute("{ a { b } }");
  Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);
  scheduler.advanceTimeBy(2, SECONDS);
  subscriber
      .assertChanges(it -> it.containsExactly(
          tuple("00:000", "", ImmutableMap.of("a", emptyList())),
          tuple("01:000", "a", asList(ImmutableMap.of("b", "foo"), ImmutableMap.of("b", "foo"), ImmutableMap.of("b", "foo"))),
          // FIXME this values are duplicated. For now, let's call it "at least once" delivery :D
          tuple("01:000", "a[1]", ImmutableMap.of("b", "foo")),
          tuple("01:000", "a[2]", ImmutableMap.of("b", "foo"))
      ))
      .assertComplete();
}

代码示例来源:origin: bsideup/graphql-java-reactive

@Test
  public void testStaticValues() throws Exception {
    GraphQLSchema schema = newQuerySchema(it -> it
        .field(newStringField("a").dataFetcher(env -> "staticA"))
        .field(newStringField("b").dataFetcher(env -> "staticB"))
    );

    ExecutionResult executionResult = new GraphQL(schema, strategy).execute("{ a, b }");

    Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);

    subscriber
        .assertChanges(it -> it.containsExactly(
            tuple("00:000", "", ImmutableMap.of("a", "staticA", "b", "staticB"))
        ))
        .assertComplete();
  }
}

代码示例来源:origin: bsideup/graphql-java-reactive

Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);

代码示例来源:origin: bsideup/graphql-java-reactive

@Test
public void testStartWith() throws Exception {
  GraphQLSchema schema = newQuerySchema(it -> it
      .field(newLongField("a").dataFetcher(env -> Flowable.interval(1, SECONDS, scheduler).take(2).startWith(-42L)))
  );
  ExecutionResult executionResult = new GraphQL(schema, strategy).execute("{ a }");
  Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);
  scheduler.advanceTimeBy(2, SECONDS);
  subscriber
      .assertChanges(it -> it.containsExactly(
          tuple("00:000", "", ImmutableMap.of("a", -42L)),
          tuple("01:000", "a", 0L),
          tuple("02:000", "a", 1L)
      ))
      .assertComplete();
}

代码示例来源:origin: bsideup/graphql-java-reactive

@Test
public void testArray() throws Exception {
  GraphQLObjectType innerType = newObject()
      .name("Inner")
      .field(newStringField("b").dataFetcher(env -> Flowable.interval(300, MILLISECONDS, scheduler).take(4).map(i -> i + " " + env.getSource())))
      .build();
  GraphQLSchema schema = newQuerySchema(it -> it
      .field(newField("a", new GraphQLList(innerType)).dataFetcher(env -> Flowable.interval(500, MILLISECONDS, scheduler).take(3).toList().toFlowable()))
  );
  ExecutionResult executionResult = new GraphQL(schema, strategy).execute("{ a { b } }");
  Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);
  scheduler.advanceTimeBy(3, SECONDS);
  subscriber
      .assertChanges(it -> it.containsExactly(
          tuple("01:800", "", ImmutableMap.of("a", asList(ImmutableMap.of("b", "0 0"), ImmutableMap.of("b", "0 1"), ImmutableMap.of("b", "0 2")))),
          tuple("02:100", "a[0].b", "1 0"),
          tuple("02:100", "a[1].b", "1 1"),
          tuple("02:100", "a[2].b", "1 2"),
          tuple("02:400", "a[0].b", "2 0"),
          tuple("02:400", "a[1].b", "2 1"),
          tuple("02:400", "a[2].b", "2 2"),
          tuple("02:700", "a[0].b", "3 0"),
          tuple("02:700", "a[1].b", "3 1"),
          tuple("02:700", "a[2].b", "3 2")
      ))
      .assertComplete();
}

代码示例来源:origin: bsideup/graphql-java-reactive

@Test
public void testNested() throws Exception {
  GraphQLObjectType innerType = newObject()
      .name("Inner")
      .field(newStringField("b").dataFetcher(env -> Flowable.interval(300, MILLISECONDS, scheduler).take(4).map(i -> i + " " + env.getSource())))
      .build();
  GraphQLSchema schema = newQuerySchema(it -> it
      .field(newField("a", innerType).dataFetcher(env -> Flowable.interval(1, SECONDS, scheduler).take(3)))
  );
  ExecutionResult executionResult = new GraphQL(schema, strategy).execute("{ a { b } }");
  Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);
  scheduler.advanceTimeBy(5, SECONDS);
  subscriber
      .assertChanges(it -> it.containsExactly(
          tuple("01:300", "", ImmutableMap.of("a", ImmutableMap.of("b", "0 0"))),
          tuple("01:600", "a.b", "1 0"),
          tuple("01:900", "a.b", "2 0"),
          tuple("02:300", "a", ImmutableMap.of("b", "0 1")),
          tuple("02:600", "a.b", "1 1"),
          tuple("02:900", "a.b", "2 1"),
          tuple("03:300", "a", ImmutableMap.of("b", "0 2")),
          tuple("03:600", "a.b", "1 2"),
          tuple("03:900", "a.b", "2 2"),
          tuple("04:200", "a.b", "3 2")
      ))
      .assertComplete();
}

代码示例来源:origin: bsideup/graphql-java-reactive

@Test
public void testMultipleFields() throws Exception {
  GraphQLSchema schema = newQuerySchema(it -> it
      .field(newLongField("a").dataFetcher(env -> Flowable.interval(300, MILLISECONDS, scheduler).take(8)))
      .field(newLongField("b").dataFetcher(env -> Flowable.timer(2, SECONDS, scheduler)))
      .field(newLongField("c").dataFetcher(env -> Flowable.just(42L)))
  );
  ExecutionResult executionResult = new GraphQL(schema, strategy).execute("{ a, b, c }");
  Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);
  // c resolved, wait for a and b
  subscriber.assertEmpty();
  scheduler.advanceTimeBy(1, SECONDS);
  // a & c resolved, wait for b
  subscriber.assertEmpty();
  scheduler.advanceTimeBy(2, SECONDS);
  subscriber
      .assertChanges(it -> it.containsExactly(
          tuple("02:000", "", ImmutableMap.of("a", 5L, "b", 0L, "c", 42L)),
          tuple("02:100", "a", 6L),
          tuple("02:400", "a", 7L)
      ))
      .assertComplete();
}

相关文章

Flowable类方法