本文整理了Java中io.reactivex.Flowable.collect()
方法的一些代码示例,展示了Flowable.collect()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Flowable.collect()
方法的具体详情如下:
包路径:io.reactivex.Flowable
类名称:Flowable
方法名:collect
[英]Collects items emitted by the finite source Publisher into a single mutable data structure and returns a Single that emits this structure.
This is a simplified version of reduce that does not need to return the state on each pass.
Note that this operator requires the upstream to signal onComplete for the accumulator object to be emitted. Sources that are infinite and never complete will never emit anything through this operator and an infinite source may lead to a fatal OutOfMemoryError. Backpressure: This operator does not support backpressure because by intent it will receive all values and reduce them to a single onNext. Scheduler: collect does not operate by default on a particular Scheduler.
[中]将有限源发布服务器发出的项收集到单个可变数据结构中,并返回发出此结构的单个数据结构。
这是reduce的简化版本,不需要在每次传递时返回状态。
请注意,此运算符要求上游发出信号onComplete,以发出累加器对象。无限且永远不完整的源永远不会通过此运算符发出任何信息,无限源可能导致致命的OutOfMemoryError。背压:此运算符不支持背压,因为它将接收所有值,并将它们减少到单个onNext。调度程序:默认情况下,collect不会在特定调度程序上运行。
代码示例来源:origin: ReactiveX/RxJava
@Override
public Single<ArrayList<Integer>> apply(Flowable<Integer> f) throws Exception {
return f.collect(Functions.justCallable(new ArrayList<Integer>()),
new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> a, Integer b) throws Exception {
a.add(b);
}
});
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void collectInitialSupplierNull() {
just1.collect((Callable<Integer>)null, new BiConsumer<Integer, Integer>() {
@Override
public void accept(Integer a, Integer b) { }
});
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void collectInitialCollectorNull() {
just1.collect(new Callable<Object>() {
@Override
public Object call() {
return 1;
}
}, null);
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Flowable<ArrayList<Integer>> apply(Flowable<Integer> f) throws Exception {
return f.collect(Functions.justCallable(new ArrayList<Integer>()),
new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> a, Integer b) throws Exception {
a.add(b);
}
}).toFlowable();
}
});
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void collectInitialSupplierReturnsNull() {
just1.collect(new Callable<Object>() {
@Override
public Object call() {
return null;
}
}, new BiConsumer<Object, Integer>() {
@Override
public void accept(Object a, Integer b) { }
}).blockingGet();
}
代码示例来源:origin: ReactiveX/RxJava
@Override
public Publisher<List<Integer>> createPublisher(final long elements) {
return
Flowable.range(1, 1000).collect(Functions.<Integer>createArrayList(128), new BiConsumer<List<Integer>, Integer>() {
@Override
public void accept(List<Integer> a, Integer b) throws Exception {
a.add(b);
}
}).toFlowable()
;
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCollectToList() {
Single<List<Integer>> o = Flowable.just(1, 2, 3)
.collect(new Callable<List<Integer>>() {
@Override
public List<Integer> call() {
return new ArrayList<Integer>();
}
}, new BiConsumer<List<Integer>, Integer>() {
@Override
public void accept(List<Integer> list, Integer v) {
list.add(v);
}
});
List<Integer> list = o.blockingGet();
assertEquals(3, list.size());
assertEquals(1, list.get(0).intValue());
assertEquals(2, list.get(1).intValue());
assertEquals(3, list.get(2).intValue());
// test multiple subscribe
List<Integer> list2 = o.blockingGet();
assertEquals(3, list2.size());
assertEquals(1, list2.get(0).intValue());
assertEquals(2, list2.get(1).intValue());
assertEquals(3, list2.get(2).intValue());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void dispose() {
TestHelper.checkDisposed(Flowable.just(1, 2)
.collect(Functions.justCallable(new ArrayList<Integer>()), new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> a, Integer b) throws Exception {
a.add(b);
}
}));
TestHelper.checkDisposed(Flowable.just(1, 2)
.collect(Functions.justCallable(new ArrayList<Integer>()), new BiConsumer<ArrayList<Integer>, Integer>() {
@Override
public void accept(ArrayList<Integer> a, Integer b) throws Exception {
a.add(b);
}
}).toFlowable());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCollectToString() {
String value = Flowable.just(1, 2, 3)
.collect(
new Callable<StringBuilder>() {
@Override
public StringBuilder call() {
return new StringBuilder();
}
},
new BiConsumer<StringBuilder, Integer>() {
@Override
public void accept(StringBuilder sb, Integer v) {
if (sb.length() > 0) {
sb.append("-");
}
sb.append(v);
}
}).blockingGet().toString();
assertEquals("1-2-3", value);
}
代码示例来源:origin: ReactiveX/RxJava
/**
* This uses the public API collect which uses scan under the covers.
*/
@Test
public void testSeedFactory() {
Single<List<Integer>> o = Flowable.range(1, 10)
.collect(new Callable<List<Integer>>() {
@Override
public List<Integer> call() {
return new ArrayList<Integer>();
}
}, new BiConsumer<List<Integer>, Integer>() {
@Override
public void accept(List<Integer> list, Integer t2) {
list.add(t2);
}
});
assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.blockingGet());
assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), o.blockingGet());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCollectToStringFlowable() {
String value = Flowable.just(1, 2, 3)
.collect(
new Callable<StringBuilder>() {
@Override
public StringBuilder call() {
return new StringBuilder();
}
},
new BiConsumer<StringBuilder, Integer>() {
@Override
public void accept(StringBuilder sb, Integer v) {
if (sb.length() > 0) {
sb.append("-");
}
sb.append(v);
}
}).toFlowable().blockingLast().toString();
assertEquals("1-2-3", value);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCollectorFailureDoesNotResultInTwoErrorEmissions() {
try {
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
RxJavaPlugins.setErrorHandler(addToList(list));
final RuntimeException e1 = new RuntimeException();
final RuntimeException e2 = new RuntimeException();
Burst.items(1).error(e2) //
.collect(callableListCreator(), biConsumerThrows(e1)) //
.test() //
.assertError(e1) //
.assertNotComplete();
assertEquals(1, list.size());
assertEquals(e2, list.get(0).getCause());
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCollectorFailureDoesNotResultInTwoErrorEmissionsFlowable() {
try {
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
RxJavaPlugins.setErrorHandler(addToList(list));
final RuntimeException e1 = new RuntimeException();
final RuntimeException e2 = new RuntimeException();
Burst.items(1).error(e2) //
.collect(callableListCreator(), biConsumerThrows(e1))
.toFlowable()
.test() //
.assertError(e1) //
.assertNotComplete();
assertEquals(1, list.size());
assertEquals(e2, list.get(0).getCause());
} finally {
RxJavaPlugins.reset();
}
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testFactoryFailureResultsInErrorEmissionFlowable() {
final RuntimeException e = new RuntimeException();
Flowable.just(1).collect(new Callable<List<Integer>>() {
@Override
public List<Integer> call() throws Exception {
throw e;
}
}, new BiConsumer<List<Integer>, Integer>() {
@Override
public void accept(List<Integer> list, Integer t) {
list.add(t);
}
})
.test()
.assertNoValues()
.assertError(e)
.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testFactoryFailureResultsInErrorEmission() {
final RuntimeException e = new RuntimeException();
Flowable.just(1).collect(new Callable<List<Integer>>() {
@Override
public List<Integer> call() throws Exception {
throw e;
}
}, new BiConsumer<List<Integer>, Integer>() {
@Override
public void accept(List<Integer> list, Integer t) {
list.add(t);
}
})
.test()
.assertNoValues()
.assertError(e)
.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCollectorFailureDoesNotResultInErrorAndCompletedEmissions() {
final RuntimeException e = new RuntimeException();
Burst.item(1).create() //
.collect(callableListCreator(), biConsumerThrows(e)) //
.test() //
.assertError(e) //
.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCollectorFailureDoesNotResultInErrorAndOnNextEmissions() {
final RuntimeException e = new RuntimeException();
final AtomicBoolean added = new AtomicBoolean();
BiConsumer<Object, Integer> throwOnFirstOnly = new BiConsumer<Object, Integer>() {
boolean once = true;
@Override
public void accept(Object o, Integer t) {
if (once) {
once = false;
throw e;
} else {
added.set(true);
}
}
};
Burst.items(1, 2).create() //
.collect(callableListCreator(), throwOnFirstOnly)//
.test() //
.assertError(e) //
.assertNoValues() //
.assertNotComplete();
assertFalse(added.get());
}
代码示例来源:origin: ReactiveX/RxJava
/**
* This uses the public API collect which uses scan under the covers.
*/
@Test
public void testSeedFactoryFlowable() {
Flowable<List<Integer>> f = Flowable.range(1, 10)
.collect(new Callable<List<Integer>>() {
@Override
public List<Integer> call() {
return new ArrayList<Integer>();
}
}, new BiConsumer<List<Integer>, Integer>() {
@Override
public void accept(List<Integer> list, Integer t2) {
list.add(t2);
}
}).toFlowable().takeLast(1);
assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), f.blockingSingle());
assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), f.blockingSingle());
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCollectorFailureDoesNotResultInErrorAndCompletedEmissionsFlowable() {
final RuntimeException e = new RuntimeException();
Burst.item(1).create() //
.collect(callableListCreator(), biConsumerThrows(e)) //
.toFlowable()
.test() //
.assertError(e) //
.assertNotComplete();
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testCollectorFailureDoesNotResultInErrorAndOnNextEmissionsFlowable() {
final RuntimeException e = new RuntimeException();
final AtomicBoolean added = new AtomicBoolean();
BiConsumer<Object, Integer> throwOnFirstOnly = new BiConsumer<Object, Integer>() {
boolean once = true;
@Override
public void accept(Object o, Integer t) {
if (once) {
once = false;
throw e;
} else {
added.set(true);
}
}
};
Burst.items(1, 2).create() //
.collect(callableListCreator(), throwOnFirstOnly)//
.toFlowable()
.test() //
.assertError(e) //
.assertNoValues() //
.assertNotComplete();
assertFalse(added.get());
}
内容来源于网络,如有侵权,请联系作者删除!