本文整理了Java中io.reactivex.Observable.forEach()
方法的一些代码示例,展示了Observable.forEach()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.forEach()
方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:forEach
[英]Subscribes to the ObservableSource and receives notifications for each element.
Alias to #subscribe(Consumer) Scheduler: forEach does not operate by default on a particular Scheduler.
[中]订阅ObservableSource并接收每个元素的通知。
#subscribe(Consumer)Scheduler的别名:默认情况下,forEach不会在特定的调度器上运行。
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void forEachNull() {
just1.forEach(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test(expected = NullPointerException.class)
public void testForEachWithNull() {
Observable.error(new Exception("boo"))
//
.forEach(null);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testIssue3008RetryInfinite() {
final List<Long> list = new CopyOnWriteArrayList<Long>();
final AtomicBoolean isFirst = new AtomicBoolean(true);
Observable.<Long> just(1L, 2L, 3L).map(new Function<Long, Long>() {
@Override
public Long apply(Long x) {
System.out.println("map " + x);
if (x == 2 && isFirst.getAndSet(false)) {
throw new RuntimeException("retryable error");
}
return x;
}})
.retry()
.forEach(new Consumer<Long>() {
@Override
public void accept(Long t) {
System.out.println(t);
list.add(t);
}});
assertEquals(Arrays.asList(1L, 1L, 2L, 3L), list);
}
代码示例来源:origin: ReactiveX/RxJava
@Test
public void testIssue3008RetryWithPredicate() {
final List<Long> list = new CopyOnWriteArrayList<Long>();
final AtomicBoolean isFirst = new AtomicBoolean(true);
Observable.<Long> just(1L, 2L, 3L).map(new Function<Long, Long>() {
@Override
public Long apply(Long x) {
System.out.println("map " + x);
if (x == 2 && isFirst.getAndSet(false)) {
throw new RuntimeException("retryable error");
}
return x;
}})
.retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(Integer t1, Throwable t2) {
return true;
}})
.forEach(new Consumer<Long>() {
@Override
public void accept(Long t) {
System.out.println(t);
list.add(t);
}});
assertEquals(Arrays.asList(1L, 1L, 2L, 3L), list);
}
代码示例来源:origin: Tristan971/Lyrebird
private void textualContent() {
JavaFxObservable.valuesOf(currentMessage)
.map(DirectMessage::getText)
.observeOn(JavaFxScheduler.platform())
.forEach(messageContent::setText);
}
代码示例来源:origin: LendingClub/mercator
protected void mapElbToInstance(JsonNode instances, String elbArn, String region) {
AtomicLong oldestRelationshipTs = new AtomicLong(Long.MAX_VALUE);
for (JsonNode i : instances) {
String instanceName = i.path("instanceId").asText();
String instanceArn = String.format("arn:aws:ec2:%s:%s:instance/%s", region, getAccountId(), instanceName);
// logger.info("{} instanceArn: {}",elbArn,instanceArn);
String cypher = "match (x:AwsElb {aws_arn:{elbArn}}), (y:AwsEc2Instance {aws_arn:{instanceArn}}) "
+ "merge (x)-[r:DISTRIBUTES_TRAFFIC_TO]->(y) set r.updateTs=timestamp() return x,r,y";
getNeoRxClient().execCypher(cypher, "elbArn", elbArn, "instanceArn", instanceArn).forEach(r -> {
oldestRelationshipTs.set(Math.min(r.path("r").path("updateTs").asLong(), oldestRelationshipTs.get()));
});
if (oldestRelationshipTs.get() > 0 && oldestRelationshipTs.get() < Long.MAX_VALUE) {
cypher = "match (x:AwsElb {aws_arn:{elbArn}})-[r:DISTRIBUTES_TRAFFIC_TO]-(y:AwsEc2Instance) where r.updateTs<{oldest} delete r";
getNeoRxClient().execCypher(cypher, "elbArn", elbArn, "oldest", oldestRelationshipTs.get());
}
}
}
代码示例来源:origin: LendingClub/mercator
long saveDockerNode(String swarmClusterId, JsonNode n) {
String swarmNodeId = n.get("swarmNodeId").asText();
AtomicLong updateTs = new AtomicLong(Long.MAX_VALUE);
dockerScanner.getNeoRxClient()
.execCypher(
"merge (n:DockerHost {swarmNodeId:{nodeId}}) set n+={props}, n.updateTs=timestamp() return n",
"nodeId", swarmNodeId, "props", n)
.forEach(actual -> {
removeDockerLabels("DockerHost", "swarmNodeId", swarmNodeId, n, actual);
updateTs.set(Math.min(updateTs.get(), actual.path("updateTs").asLong(Long.MAX_VALUE)));
});
logger.info("connecting swarm={} to node={}", swarmClusterId, swarmNodeId);
dockerScanner.getNeoRxClient().execCypher(
"match (s:DockerSwarm {swarmClusterId:{swarmClusterId}}), (n:DockerHost {swarmNodeId:{nodeId}}) merge (s)-[x:CONTAINS]->(n) set x.updateTs=timestamp()",
"swarmClusterId", swarmClusterId, "nodeId", swarmNodeId);
return updateTs.get();
}
代码示例来源:origin: Tristan971/Lyrebird
private void listenToNewConversations() {
directMessages.directMessages().keySet().forEach(this::createTabForPal);
JavaFxObservable.additionsOf(directMessages.directMessages())
.map(Map.Entry::getKey)
.forEach(this::createTabForPal);
}
代码示例来源:origin: LendingClub/mercator
private void projectTopic(Topic topic) {
String arn = topic.getTopicArn();
List<String> parts = Splitter.on(":").splitToList(arn);
incrementEntityCount();
ObjectNode n = mapper.createObjectNode();
n.put("aws_account", getAccountId());
n.put("aws_region", getRegion().getName());
n.put("name", parts.get(parts.size() - 1));
String cypher = "merge (t:AwsSnsTopic {aws_arn:{arn}}) set t+={props}, t.updateTs=timestamp() return t";
getNeoRxClient().execCypher(cypher, "arn", arn, "props", n).forEach(r -> {
getShadowAttributeRemover().removeTagAttributes("AwsSnsTopic", n, r);
});
cypher = "match (a:AwsAccount {aws_account:{account}}), (t:AwsSnsTopic {aws_account:{account}}) MERGE (a)-[r:OWNS]->(t) set r.updateTs=timestamp()";
getNeoRxClient().execCypher(cypher, "account", getAccountId());
}
代码示例来源:origin: LendingClub/mercator
private void scanBucket(Bucket b) {
ObjectNode props = mapper.createObjectNode();
props.put("name", b.getName());
props.put("aws_arn", computeArn(props).orElse(null));
props.put("aws_account", getAccountId());
String cypher = "merge (b:AwsS3Bucket { aws_arn:{aws_arn} }) set b+={props}, b.updateTs=timestamp()";
getNeoRxClient().execCypher(cypher, "aws_arn",props.get("aws_arn"), "props",props).forEach(r->{
getShadowAttributeRemover().removeTagAttributes("AwsS3Bucket", props, r);
});
incrementEntityCount();
cypher = "match (a:AwsAccount {aws_account:{account}}), (b:AwsS3Bucket {aws_account:{account}}) MERGE (a)-[r:OWNS]->(b) set r.updateTs=timestamp()";
getNeoRxClient().execCypher(cypher, "account",getAccountId());
}
}
代码示例来源:origin: LendingClub/mercator
private void scanInstanceProfile(GraphNodeGarbageCollector gc, InstanceProfile instanceProfile) {
ObjectNode n = convertAwsObject(instanceProfile, null);
NeoRxClient neo4j = getNeoRxClient();
String cypher = "merge (n:AwsInstanceProfile { aws_arn: {a} }) set n += {p}, n.updateTs = timestamp() return n";
neo4j.execCypher(cypher, "a", instanceProfile.getArn(), "p", n).forEach(it -> {
gc.MERGE_ACTION.accept(it);
});
incrementEntityCount();
LinkageHelper linkage = newLinkageHelper();
linkage.withTargetLabel("AwsIamRole").withFromArn(instanceProfile.getArn()).withLinkLabel("HAS_ROLE")
.withTargetValues(instanceProfile.getRoles().stream().map(Role::getArn).collect(Collectors.toList()))
.execute();
}
代码示例来源:origin: LendingClub/mercator
private void projectElb(LoadBalancerDescription elb, GraphNodeGarbageCollector gc) {
ObjectNode n = convertAwsObject(elb, getRegion());
incrementEntityCount();
String elbArn = n.path("aws_arn").asText();
logger.debug("Scanning elb: {}", elbArn);
String cypher = "merge (x:AwsElb {aws_arn:{aws_arn}}) set x+={props} set x.updateTs=timestamp() return x";
Preconditions.checkNotNull(getNeoRxClient());
getNeoRxClient().execCypher(cypher, "aws_arn", elbArn, "props", n).forEach(it -> {
if (gc != null) {
gc.MERGE_ACTION.accept(it);
}
});
mapElbRelationships(elb, elbArn, getRegion().getName());
}
代码示例来源:origin: LendingClub/mercator
private void scanConnection(GraphNodeGarbageCollector gc, VpcPeeringConnection peeringConnection) {
NeoRxClient neo4j = getNeoRxClient();
ObjectNode n = convertAwsObject(peeringConnection, getRegion());
neo4j.execCypher("merge (n:AwsVpcPeeringConnection { aws_arn: {a} }) set n += {p} return n", "a",
n.path("aws_arn"), "p", n).forEach(gc.MERGE_ACTION);
incrementEntityCount();
}
代码示例来源:origin: LendingClub/mercator
private void scanVpnGateway(GraphNodeGarbageCollector gc, VpnGateway c) {
NeoRxClient neo4j = getNeoRxClient();
ObjectNode n = convertAwsObject(c, getRegion());
neo4j.execCypher("merge (n:AwsVpnGateway { aws_arn: {a} }) set n += {p} return n", "a",
n.path("aws_arn"), "p", n).forEach(gc.MERGE_ACTION);
incrementEntityCount();
}
代码示例来源:origin: LendingClub/mercator
private void scanInternetGateway(GraphNodeGarbageCollector gc, InternetGateway c) {
NeoRxClient neo4j = getNeoRxClient();
ObjectNode n = convertAwsObject(c, getRegion());
neo4j.execCypher("merge (n:AwsInternetGateway { aws_arn: {a} }) set n += {p} return n", "a", n.path("aws_arn"), "p",
n).forEach(gc.MERGE_ACTION);
incrementEntityCount();
}
代码示例来源:origin: LendingClub/mercator
private void mergeQueue(ObjectNode n) {
incrementEntityCount();
String cypher = "merge (t:AwsSqsQueue {aws_arn:{aws_arn}}) set t+={props}, t.updateTs=timestamp() return t";
getNeoRxClient().execCypher(cypher, "aws_arn", n.path("aws_arn").asText(), "props", n).forEach(r -> {
getShadowAttributeRemover().removeTagAttributes("AwsSqsQueue", n, r);
});
cypher = "match (a:AwsAccount {aws_account:{account}}), (q:AwsSqsQueue {aws_account:{account}}) MERGE (a)-[r:OWNS]->(q) set r.updateTs=timestamp()";
getNeoRxClient().execCypher(cypher, "account", getAccountId());
}
代码示例来源:origin: LendingClub/mercator
@Override
protected void doScan() {
GraphNodeGarbageCollector gc = newGarbageCollector().label("AwsRegion").bindScannerContext();
rateLimit();
DescribeRegionsResult result = getClient().describeRegions();
result.getRegions().forEach(it -> {
try {
ObjectNode n = convertAwsObject(it, getRegion());
n.remove(AccountScanner.ACCOUNT_ATTRIBUTE);
String cypher = "merge (x:AwsRegion {aws_regionName:{aws_regionName}}) set x+={props} remove x.aws_region,x.aws_account set x.updateTs=timestamp() return x";
NeoRxClient neoRx = getNeoRxClient();
Preconditions.checkNotNull(neoRx);
neoRx.execCypher(cypher, "aws_regionName", n.path("aws_regionName").asText(),
AWSScanner.AWS_REGION_ATTRIBUTE, n.path(AWSScanner.AWS_REGION_ATTRIBUTE).asText(), "props", n)
.forEach(gc.MERGE_ACTION);
ScannerContext.getScannerContext().ifPresent(sc -> {
sc.incrementEntityCount();
});
} catch (RuntimeException e) {
maybeThrow(e, "problem scanning regions");
}
});
}
代码示例来源:origin: LendingClub/mercator
@Override
protected void doScan() {
rateLimit();
DescribeSubnetsResult result = getClient().describeSubnets();
GraphNodeGarbageCollector gc = newGarbageCollector().label("AwsSubnet").region(getRegion());
result.getSubnets().forEach(it -> {
try {
ObjectNode n = convertAwsObject(it, getRegion());
String cypher = "MERGE (v:AwsSubnet {aws_arn:{aws_arn}}) set v+={props}, v.updateTs=timestamp() return v";
NeoRxClient client = getNeoRxClient();
Preconditions.checkNotNull(client);
client.execCypher(cypher, "aws_arn",n.get("aws_arn").asText(),"props",n).forEach(r->{
gc.MERGE_ACTION.accept(r);
getShadowAttributeRemover().removeTagAttributes("AwsSubnet", n, r);
});
incrementEntityCount();
} catch (RuntimeException e) {
gc.markException(e);
maybeThrow(e,"problem scanning subnets");
}
});
gc.invoke();
}
代码示例来源:origin: LendingClub/mercator
private void doScan(String... asgNames) {
GraphNodeGarbageCollector gc = newGarbageCollector();
if (asgNames==null || asgNames.length==0) {
gc.bindScannerContext();
}
forEachAsg(asg -> {
try {
ObjectNode n = convertAwsObject(asg, getRegion());
String asgArn = n.path("aws_arn").asText();
String cypher = "merge (x:AwsAsg {aws_arn:{aws_arn}}) set x+={props}, x.updateTs=timestamp() return x";
Preconditions.checkNotNull(getNeoRxClient());
getNeoRxClient().execCypher(cypher, "aws_arn", asgArn, "props", n).forEach(r -> {
gc.MERGE_ACTION.accept(r);
getShadowAttributeRemover().removeTagAttributes("AwsAsg", n, r);
});
incrementEntityCount();
mapAsgRelationships(asg, asgArn, getRegion().getName());
} catch (RuntimeException e) {
maybeThrow(e, "problem scanning asg");
}
}, asgNames);
}
代码示例来源:origin: Tristan971/Lyrebird
private void profilePictures() {
JavaFxObservable.valuesOf(currentMessage).forEach(messageEvent -> {
final boolean isSentByMe = sessionManager.isCurrentUser(messageEvent.getSenderId());
final User sender = cachedTwitterInfoService.getUser(messageEvent.getSenderId());
if (isSentByMe) {
container.setAlignment(Pos.TOP_RIGHT);
ppSetupSender(currentUserPpBox, sender);
ppSetupReceiver(otherPpBox);
} else {
container.setAlignment(Pos.TOP_LEFT);
ppSetupSender(otherPpBox, sender);
ppSetupReceiver(currentUserPpBox);
}
});
}
内容来源于网络,如有侵权,请联系作者删除!