本文整理了Java中io.reactivex.Flowable.timestamp()
方法的一些代码示例,展示了Flowable.timestamp()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.timestamp()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:timestamp
[英]Returns a Flowable that emits each item emitted by the source Publisher, wrapped in a Timed object.
Backpressure: The operator doesn't interfere with backpressure which is determined by the source Publisher's backpressure behavior. Scheduler: timestamp does not operate on any particular scheduler but uses the current time from the computation Scheduler.
[中]返回一个FlowTable,该FlowTable发出源发布服务器发出的每个项,并封装在一个定时对象中。
背压:操作员不会干扰由源发布者的背压行为确定的背压。调度器:时间戳不在任何特定的调度器上运行,而是使用计算调度器中的当前时间。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<Timed<Integer>> createPublisher(long elements) {
return
Flowable.range(0, (int)elements).timestamp()
;
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void timestampSchedulerNull() {
just1.timestamp(TimeUnit.SECONDS, null);
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Returns a Flowable that emits each item emitted by the source Publisher, wrapped in a
* {@link Timed} object whose timestamps are provided by a specified Scheduler.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timestamp.s.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This operator does not operate on any particular scheduler but uses the current time
* from the specified {@link Scheduler}.</dd>
* </dl>
*
* @param scheduler
* the {@link Scheduler} to use as a time source
* @return a Flowable that emits timestamped items from the source Publisher with timestamps provided by
* the {@code scheduler}
* @see <a href="http://reactivex.io/documentation/operators/timestamp.html">ReactiveX operators documentation: Timestamp</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE) // Supplied scheduler is only used for creating timestamps.
public final Flowable<Timed<T>> timestamp(Scheduler scheduler) {
return timestamp(TimeUnit.MILLISECONDS, scheduler);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void timestampUnitNull() {
just1.timestamp(null, Schedulers.single());
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Returns a Flowable that emits each item emitted by the source Publisher, wrapped in a
* {@link Timed} object.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timestamp.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timestamp} does not operate on any particular scheduler but uses the current time
* from the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @return a Flowable that emits timestamped items from the source Publisher
* @see <a href="http://reactivex.io/documentation/operators/timestamp.html">ReactiveX operators documentation: Timestamp</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<Timed<T>> timestamp() {
return timestamp(TimeUnit.MILLISECONDS, Schedulers.computation());
}
代码示例来源:origin: ReactiveX/RxJava
/**
* Returns a Flowable that emits each item emitted by the source Publisher, wrapped in a
* {@link Timed} object.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timestamp.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timestamp} does not operate on any particular scheduler but uses the current time
* from the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param unit the time unit for the current time
* @return a Flowable that emits timestamped items from the source Publisher
* @see <a href="http://reactivex.io/documentation/operators/timestamp.html">ReactiveX operators documentation: Timestamp</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<Timed<T>> timestamp(TimeUnit unit) {
return timestamp(unit, Schedulers.computation());
}
代码示例来源:origin: redisson/redisson
/**
* Returns a Flowable that emits each item emitted by the source Publisher, wrapped in a
* {@link Timed} object whose timestamps are provided by a specified Scheduler.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timestamp.s.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>This operator does not operate on any particular scheduler but uses the current time
* from the specified {@link Scheduler}.</dd>
* </dl>
*
* @param scheduler
* the {@link Scheduler} to use as a time source
* @return a Flowable that emits timestamped items from the source Publisher with timestamps provided by
* the {@code scheduler}
* @see <a href="http://reactivex.io/documentation/operators/timestamp.html">ReactiveX operators documentation: Timestamp</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE) // Supplied scheduler is only used for creating timestamps.
public final Flowable<Timed<T>> timestamp(Scheduler scheduler) {
return timestamp(TimeUnit.MILLISECONDS, scheduler);
}
代码示例来源:origin: redisson/redisson
/**
* Returns a Flowable that emits each item emitted by the source Publisher, wrapped in a
* {@link Timed} object.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timestamp.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timestamp} does not operate on any particular scheduler but uses the current time
* from the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @return a Flowable that emits timestamped items from the source Publisher
* @see <a href="http://reactivex.io/documentation/operators/timestamp.html">ReactiveX operators documentation: Timestamp</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<Timed<T>> timestamp() {
return timestamp(TimeUnit.MILLISECONDS, Schedulers.computation());
}
代码示例来源:origin: redisson/redisson
/**
* Returns a Flowable that emits each item emitted by the source Publisher, wrapped in a
* {@link Timed} object.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timestamp.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timestamp} does not operate on any particular scheduler but uses the current time
* from the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param unit the time unit for the current time
* @return a Flowable that emits timestamped items from the source Publisher
* @see <a href="http://reactivex.io/documentation/operators/timestamp.html">ReactiveX operators documentation: Timestamp</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<Timed<T>> timestamp(TimeUnit unit) {
return timestamp(unit, Schedulers.computation());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void timeIntervalDefault() {
final TestScheduler scheduler = new TestScheduler();
RxJavaPlugins.setComputationSchedulerHandler(new Function<Scheduler, Scheduler>() {
@Override
public Scheduler apply(Scheduler v) throws Exception {
return scheduler;
}
});
try {
Flowable.range(1, 5)
.timestamp()
.map(new Function<Timed<Integer>, Long>() {
@Override
public Long apply(Timed<Integer> v) throws Exception {
return v.time();
}
})
.test()
.assertResult(0L, 0L, 0L, 0L, 0L);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void timeIntervalDefaultSchedulerCustomUnit() {
final TestScheduler scheduler = new TestScheduler();
RxJavaPlugins.setComputationSchedulerHandler(new Function<Scheduler, Scheduler>() {
@Override
public Scheduler apply(Scheduler v) throws Exception {
return scheduler;
}
});
try {
Flowable.range(1, 5)
.timestamp(TimeUnit.SECONDS)
.map(new Function<Timed<Integer>, Long>() {
@Override
public Long apply(Timed<Integer> v) throws Exception {
return v.time();
}
})
.test()
.assertResult(0L, 0L, 0L, 0L, 0L);
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: bsideup/graphql-java-reactive
@Test
public void testCompletableFuture() throws Exception {
ReactiveExecutionStrategy strategy = new ReactiveExecutionStrategy();
GraphQLSchema schema = newQuerySchema(it -> it
.field(newStringField("a").dataFetcher(env -> CompletableFuture.completedFuture("static")))
);
ExecutionResult executionResult = new GraphQL(schema, strategy).execute("{ a }");
Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);
subscriber
.assertChanges(it -> it.containsExactly(
tuple("00:000", "", ImmutableMap.of("a", "static"))
))
.assertComplete();
}
代码示例来源:origin: bsideup/graphql-java-reactive
@Test
public void testPlainField() throws Exception {
GraphQLSchema schema = newQuerySchema(it -> it
.field(newLongField("a").dataFetcher(env -> Flowable.interval(1, SECONDS, scheduler).take(2)))
);
ExecutionResult executionResult = new GraphQL(schema, strategy).execute("{ a }");
assertThat(executionResult)
.isNotNull()
.satisfies(it -> assertThat(it.<Publisher<Change>>getData()).isNotNull().isInstanceOf(Publisher.class));
Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);
scheduler.advanceTimeBy(2, SECONDS);
subscriber
.assertChanges(it -> it.containsExactly(
tuple("01:000", "", ImmutableMap.of("a", 0L)),
tuple("02:000", "a", 1L)
))
.assertComplete();
}
代码示例来源:origin: bsideup/graphql-java-reactive
@Test
public void testArrayStartsWith() throws Exception {
GraphQLObjectType innerType = newObject()
.name("Inner")
.field(newStringField("b").staticValue("foo"))
.build();
GraphQLSchema schema = newQuerySchema(it -> it
.field(
newField("a", new GraphQLList(innerType))
.dataFetcher(env -> Flowable
.just(asList(true, true, true))
.delay(1, SECONDS, scheduler)
.startWith(new ArrayList<Boolean>())
)
)
);
ExecutionResult executionResult = new GraphQL(schema, strategy).execute("{ a { b } }");
Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);
scheduler.advanceTimeBy(2, SECONDS);
subscriber
.assertChanges(it -> it.containsExactly(
tuple("00:000", "", ImmutableMap.of("a", emptyList())),
tuple("01:000", "a", asList(ImmutableMap.of("b", "foo"), ImmutableMap.of("b", "foo"), ImmutableMap.of("b", "foo"))),
// FIXME this values are duplicated. For now, let's call it "at least once" delivery :D
tuple("01:000", "a[1]", ImmutableMap.of("b", "foo")),
tuple("01:000", "a[2]", ImmutableMap.of("b", "foo"))
))
.assertComplete();
}
代码示例来源:origin: bsideup/graphql-java-reactive
@Test
public void testStaticValues() throws Exception {
GraphQLSchema schema = newQuerySchema(it -> it
.field(newStringField("a").dataFetcher(env -> "staticA"))
.field(newStringField("b").dataFetcher(env -> "staticB"))
);
ExecutionResult executionResult = new GraphQL(schema, strategy).execute("{ a, b }");
Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);
subscriber
.assertChanges(it -> it.containsExactly(
tuple("00:000", "", ImmutableMap.of("a", "staticA", "b", "staticB"))
))
.assertComplete();
}
}
代码示例来源:origin: bsideup/graphql-java-reactive
Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);
代码示例来源:origin: bsideup/graphql-java-reactive
@Test
public void testStartWith() throws Exception {
GraphQLSchema schema = newQuerySchema(it -> it
.field(newLongField("a").dataFetcher(env -> Flowable.interval(1, SECONDS, scheduler).take(2).startWith(-42L)))
);
ExecutionResult executionResult = new GraphQL(schema, strategy).execute("{ a }");
Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);
scheduler.advanceTimeBy(2, SECONDS);
subscriber
.assertChanges(it -> it.containsExactly(
tuple("00:000", "", ImmutableMap.of("a", -42L)),
tuple("01:000", "a", 0L),
tuple("02:000", "a", 1L)
))
.assertComplete();
}
代码示例来源:origin: bsideup/graphql-java-reactive
@Test
public void testArray() throws Exception {
GraphQLObjectType innerType = newObject()
.name("Inner")
.field(newStringField("b").dataFetcher(env -> Flowable.interval(300, MILLISECONDS, scheduler).take(4).map(i -> i + " " + env.getSource())))
.build();
GraphQLSchema schema = newQuerySchema(it -> it
.field(newField("a", new GraphQLList(innerType)).dataFetcher(env -> Flowable.interval(500, MILLISECONDS, scheduler).take(3).toList().toFlowable()))
);
ExecutionResult executionResult = new GraphQL(schema, strategy).execute("{ a { b } }");
Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);
scheduler.advanceTimeBy(3, SECONDS);
subscriber
.assertChanges(it -> it.containsExactly(
tuple("01:800", "", ImmutableMap.of("a", asList(ImmutableMap.of("b", "0 0"), ImmutableMap.of("b", "0 1"), ImmutableMap.of("b", "0 2")))),
tuple("02:100", "a[0].b", "1 0"),
tuple("02:100", "a[1].b", "1 1"),
tuple("02:100", "a[2].b", "1 2"),
tuple("02:400", "a[0].b", "2 0"),
tuple("02:400", "a[1].b", "2 1"),
tuple("02:400", "a[2].b", "2 2"),
tuple("02:700", "a[0].b", "3 0"),
tuple("02:700", "a[1].b", "3 1"),
tuple("02:700", "a[2].b", "3 2")
))
.assertComplete();
}
代码示例来源:origin: bsideup/graphql-java-reactive
@Test
public void testNested() throws Exception {
GraphQLObjectType innerType = newObject()
.name("Inner")
.field(newStringField("b").dataFetcher(env -> Flowable.interval(300, MILLISECONDS, scheduler).take(4).map(i -> i + " " + env.getSource())))
.build();
GraphQLSchema schema = newQuerySchema(it -> it
.field(newField("a", innerType).dataFetcher(env -> Flowable.interval(1, SECONDS, scheduler).take(3)))
);
ExecutionResult executionResult = new GraphQL(schema, strategy).execute("{ a { b } }");
Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);
scheduler.advanceTimeBy(5, SECONDS);
subscriber
.assertChanges(it -> it.containsExactly(
tuple("01:300", "", ImmutableMap.of("a", ImmutableMap.of("b", "0 0"))),
tuple("01:600", "a.b", "1 0"),
tuple("01:900", "a.b", "2 0"),
tuple("02:300", "a", ImmutableMap.of("b", "0 1")),
tuple("02:600", "a.b", "1 1"),
tuple("02:900", "a.b", "2 1"),
tuple("03:300", "a", ImmutableMap.of("b", "0 2")),
tuple("03:600", "a.b", "1 2"),
tuple("03:900", "a.b", "2 2"),
tuple("04:200", "a.b", "3 2")
))
.assertComplete();
}
代码示例来源:origin: bsideup/graphql-java-reactive
@Test
public void testMultipleFields() throws Exception {
GraphQLSchema schema = newQuerySchema(it -> it
.field(newLongField("a").dataFetcher(env -> Flowable.interval(300, MILLISECONDS, scheduler).take(8)))
.field(newLongField("b").dataFetcher(env -> Flowable.timer(2, SECONDS, scheduler)))
.field(newLongField("c").dataFetcher(env -> Flowable.just(42L)))
);
ExecutionResult executionResult = new GraphQL(schema, strategy).execute("{ a, b, c }");
Flowable.fromPublisher((Publisher<Change>) executionResult.getData()).timestamp(scheduler).subscribe(subscriber);
// c resolved, wait for a and b
subscriber.assertEmpty();
scheduler.advanceTimeBy(1, SECONDS);
// a & c resolved, wait for b
subscriber.assertEmpty();
scheduler.advanceTimeBy(2, SECONDS);
subscriber
.assertChanges(it -> it.containsExactly(
tuple("02:000", "", ImmutableMap.of("a", 5L, "b", 0L, "c", 42L)),
tuple("02:100", "a", 6L),
tuple("02:400", "a", 7L)
))
.assertComplete();
}
内容来源于网络,如有侵权,请联系作者删除!