io.reactivex.Observable.forEach()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(12.7k)|赞(0)|评价(0)|浏览(136)

本文整理了Java中io.reactivex.Observable.forEach()方法的一些代码示例,展示了Observable.forEach()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.forEach()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:forEach

Observable.forEach介绍

[英]Subscribes to the ObservableSource and receives notifications for each element.

Alias to #subscribe(Consumer) Scheduler: forEach does not operate by default on a particular Scheduler.
[中]订阅ObservableSource并接收每个元素的通知。
#subscribe(Consumer)Scheduler的别名:默认情况下,forEach不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void forEachNull() {
  3. just1.forEach(null);
  4. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test(expected = NullPointerException.class)
  2. public void testForEachWithNull() {
  3. Observable.error(new Exception("boo"))
  4. //
  5. .forEach(null);
  6. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testIssue3008RetryInfinite() {
  3. final List<Long> list = new CopyOnWriteArrayList<Long>();
  4. final AtomicBoolean isFirst = new AtomicBoolean(true);
  5. Observable.<Long> just(1L, 2L, 3L).map(new Function<Long, Long>() {
  6. @Override
  7. public Long apply(Long x) {
  8. System.out.println("map " + x);
  9. if (x == 2 && isFirst.getAndSet(false)) {
  10. throw new RuntimeException("retryable error");
  11. }
  12. return x;
  13. }})
  14. .retry()
  15. .forEach(new Consumer<Long>() {
  16. @Override
  17. public void accept(Long t) {
  18. System.out.println(t);
  19. list.add(t);
  20. }});
  21. assertEquals(Arrays.asList(1L, 1L, 2L, 3L), list);
  22. }

代码示例来源:origin: ReactiveX/RxJava

  1. @Test
  2. public void testIssue3008RetryWithPredicate() {
  3. final List<Long> list = new CopyOnWriteArrayList<Long>();
  4. final AtomicBoolean isFirst = new AtomicBoolean(true);
  5. Observable.<Long> just(1L, 2L, 3L).map(new Function<Long, Long>() {
  6. @Override
  7. public Long apply(Long x) {
  8. System.out.println("map " + x);
  9. if (x == 2 && isFirst.getAndSet(false)) {
  10. throw new RuntimeException("retryable error");
  11. }
  12. return x;
  13. }})
  14. .retry(new BiPredicate<Integer, Throwable>() {
  15. @Override
  16. public boolean test(Integer t1, Throwable t2) {
  17. return true;
  18. }})
  19. .forEach(new Consumer<Long>() {
  20. @Override
  21. public void accept(Long t) {
  22. System.out.println(t);
  23. list.add(t);
  24. }});
  25. assertEquals(Arrays.asList(1L, 1L, 2L, 3L), list);
  26. }

代码示例来源:origin: Tristan971/Lyrebird

  1. private void textualContent() {
  2. JavaFxObservable.valuesOf(currentMessage)
  3. .map(DirectMessage::getText)
  4. .observeOn(JavaFxScheduler.platform())
  5. .forEach(messageContent::setText);
  6. }

代码示例来源:origin: LendingClub/mercator

  1. protected void mapElbToInstance(JsonNode instances, String elbArn, String region) {
  2. AtomicLong oldestRelationshipTs = new AtomicLong(Long.MAX_VALUE);
  3. for (JsonNode i : instances) {
  4. String instanceName = i.path("instanceId").asText();
  5. String instanceArn = String.format("arn:aws:ec2:%s:%s:instance/%s", region, getAccountId(), instanceName);
  6. // logger.info("{} instanceArn: {}",elbArn,instanceArn);
  7. String cypher = "match (x:AwsElb {aws_arn:{elbArn}}), (y:AwsEc2Instance {aws_arn:{instanceArn}}) "
  8. + "merge (x)-[r:DISTRIBUTES_TRAFFIC_TO]->(y) set r.updateTs=timestamp() return x,r,y";
  9. getNeoRxClient().execCypher(cypher, "elbArn", elbArn, "instanceArn", instanceArn).forEach(r -> {
  10. oldestRelationshipTs.set(Math.min(r.path("r").path("updateTs").asLong(), oldestRelationshipTs.get()));
  11. });
  12. if (oldestRelationshipTs.get() > 0 && oldestRelationshipTs.get() < Long.MAX_VALUE) {
  13. cypher = "match (x:AwsElb {aws_arn:{elbArn}})-[r:DISTRIBUTES_TRAFFIC_TO]-(y:AwsEc2Instance) where r.updateTs<{oldest} delete r";
  14. getNeoRxClient().execCypher(cypher, "elbArn", elbArn, "oldest", oldestRelationshipTs.get());
  15. }
  16. }
  17. }

代码示例来源:origin: LendingClub/mercator

  1. long saveDockerNode(String swarmClusterId, JsonNode n) {
  2. String swarmNodeId = n.get("swarmNodeId").asText();
  3. AtomicLong updateTs = new AtomicLong(Long.MAX_VALUE);
  4. dockerScanner.getNeoRxClient()
  5. .execCypher(
  6. "merge (n:DockerHost {swarmNodeId:{nodeId}}) set n+={props}, n.updateTs=timestamp() return n",
  7. "nodeId", swarmNodeId, "props", n)
  8. .forEach(actual -> {
  9. removeDockerLabels("DockerHost", "swarmNodeId", swarmNodeId, n, actual);
  10. updateTs.set(Math.min(updateTs.get(), actual.path("updateTs").asLong(Long.MAX_VALUE)));
  11. });
  12. logger.info("connecting swarm={} to node={}", swarmClusterId, swarmNodeId);
  13. dockerScanner.getNeoRxClient().execCypher(
  14. "match (s:DockerSwarm {swarmClusterId:{swarmClusterId}}), (n:DockerHost {swarmNodeId:{nodeId}}) merge (s)-[x:CONTAINS]->(n) set x.updateTs=timestamp()",
  15. "swarmClusterId", swarmClusterId, "nodeId", swarmNodeId);
  16. return updateTs.get();
  17. }

代码示例来源:origin: Tristan971/Lyrebird

  1. private void listenToNewConversations() {
  2. directMessages.directMessages().keySet().forEach(this::createTabForPal);
  3. JavaFxObservable.additionsOf(directMessages.directMessages())
  4. .map(Map.Entry::getKey)
  5. .forEach(this::createTabForPal);
  6. }

代码示例来源:origin: LendingClub/mercator

  1. private void projectTopic(Topic topic) {
  2. String arn = topic.getTopicArn();
  3. List<String> parts = Splitter.on(":").splitToList(arn);
  4. incrementEntityCount();
  5. ObjectNode n = mapper.createObjectNode();
  6. n.put("aws_account", getAccountId());
  7. n.put("aws_region", getRegion().getName());
  8. n.put("name", parts.get(parts.size() - 1));
  9. String cypher = "merge (t:AwsSnsTopic {aws_arn:{arn}}) set t+={props}, t.updateTs=timestamp() return t";
  10. getNeoRxClient().execCypher(cypher, "arn", arn, "props", n).forEach(r -> {
  11. getShadowAttributeRemover().removeTagAttributes("AwsSnsTopic", n, r);
  12. });
  13. cypher = "match (a:AwsAccount {aws_account:{account}}), (t:AwsSnsTopic {aws_account:{account}}) MERGE (a)-[r:OWNS]->(t) set r.updateTs=timestamp()";
  14. getNeoRxClient().execCypher(cypher, "account", getAccountId());
  15. }

代码示例来源:origin: LendingClub/mercator

  1. private void scanBucket(Bucket b) {
  2. ObjectNode props = mapper.createObjectNode();
  3. props.put("name", b.getName());
  4. props.put("aws_arn", computeArn(props).orElse(null));
  5. props.put("aws_account", getAccountId());
  6. String cypher = "merge (b:AwsS3Bucket { aws_arn:{aws_arn} }) set b+={props}, b.updateTs=timestamp()";
  7. getNeoRxClient().execCypher(cypher, "aws_arn",props.get("aws_arn"), "props",props).forEach(r->{
  8. getShadowAttributeRemover().removeTagAttributes("AwsS3Bucket", props, r);
  9. });
  10. incrementEntityCount();
  11. cypher = "match (a:AwsAccount {aws_account:{account}}), (b:AwsS3Bucket {aws_account:{account}}) MERGE (a)-[r:OWNS]->(b) set r.updateTs=timestamp()";
  12. getNeoRxClient().execCypher(cypher, "account",getAccountId());
  13. }
  14. }

代码示例来源:origin: LendingClub/mercator

  1. private void scanInstanceProfile(GraphNodeGarbageCollector gc, InstanceProfile instanceProfile) {
  2. ObjectNode n = convertAwsObject(instanceProfile, null);
  3. NeoRxClient neo4j = getNeoRxClient();
  4. String cypher = "merge (n:AwsInstanceProfile { aws_arn: {a} }) set n += {p}, n.updateTs = timestamp() return n";
  5. neo4j.execCypher(cypher, "a", instanceProfile.getArn(), "p", n).forEach(it -> {
  6. gc.MERGE_ACTION.accept(it);
  7. });
  8. incrementEntityCount();
  9. LinkageHelper linkage = newLinkageHelper();
  10. linkage.withTargetLabel("AwsIamRole").withFromArn(instanceProfile.getArn()).withLinkLabel("HAS_ROLE")
  11. .withTargetValues(instanceProfile.getRoles().stream().map(Role::getArn).collect(Collectors.toList()))
  12. .execute();
  13. }

代码示例来源:origin: LendingClub/mercator

  1. private void projectElb(LoadBalancerDescription elb, GraphNodeGarbageCollector gc) {
  2. ObjectNode n = convertAwsObject(elb, getRegion());
  3. incrementEntityCount();
  4. String elbArn = n.path("aws_arn").asText();
  5. logger.debug("Scanning elb: {}", elbArn);
  6. String cypher = "merge (x:AwsElb {aws_arn:{aws_arn}}) set x+={props} set x.updateTs=timestamp() return x";
  7. Preconditions.checkNotNull(getNeoRxClient());
  8. getNeoRxClient().execCypher(cypher, "aws_arn", elbArn, "props", n).forEach(it -> {
  9. if (gc != null) {
  10. gc.MERGE_ACTION.accept(it);
  11. }
  12. });
  13. mapElbRelationships(elb, elbArn, getRegion().getName());
  14. }

代码示例来源:origin: LendingClub/mercator

  1. private void scanConnection(GraphNodeGarbageCollector gc, VpcPeeringConnection peeringConnection) {
  2. NeoRxClient neo4j = getNeoRxClient();
  3. ObjectNode n = convertAwsObject(peeringConnection, getRegion());
  4. neo4j.execCypher("merge (n:AwsVpcPeeringConnection { aws_arn: {a} }) set n += {p} return n", "a",
  5. n.path("aws_arn"), "p", n).forEach(gc.MERGE_ACTION);
  6. incrementEntityCount();
  7. }

代码示例来源:origin: LendingClub/mercator

  1. private void scanVpnGateway(GraphNodeGarbageCollector gc, VpnGateway c) {
  2. NeoRxClient neo4j = getNeoRxClient();
  3. ObjectNode n = convertAwsObject(c, getRegion());
  4. neo4j.execCypher("merge (n:AwsVpnGateway { aws_arn: {a} }) set n += {p} return n", "a",
  5. n.path("aws_arn"), "p", n).forEach(gc.MERGE_ACTION);
  6. incrementEntityCount();
  7. }

代码示例来源:origin: LendingClub/mercator

  1. private void scanInternetGateway(GraphNodeGarbageCollector gc, InternetGateway c) {
  2. NeoRxClient neo4j = getNeoRxClient();
  3. ObjectNode n = convertAwsObject(c, getRegion());
  4. neo4j.execCypher("merge (n:AwsInternetGateway { aws_arn: {a} }) set n += {p} return n", "a", n.path("aws_arn"), "p",
  5. n).forEach(gc.MERGE_ACTION);
  6. incrementEntityCount();
  7. }

代码示例来源:origin: LendingClub/mercator

  1. private void mergeQueue(ObjectNode n) {
  2. incrementEntityCount();
  3. String cypher = "merge (t:AwsSqsQueue {aws_arn:{aws_arn}}) set t+={props}, t.updateTs=timestamp() return t";
  4. getNeoRxClient().execCypher(cypher, "aws_arn", n.path("aws_arn").asText(), "props", n).forEach(r -> {
  5. getShadowAttributeRemover().removeTagAttributes("AwsSqsQueue", n, r);
  6. });
  7. cypher = "match (a:AwsAccount {aws_account:{account}}), (q:AwsSqsQueue {aws_account:{account}}) MERGE (a)-[r:OWNS]->(q) set r.updateTs=timestamp()";
  8. getNeoRxClient().execCypher(cypher, "account", getAccountId());
  9. }

代码示例来源:origin: LendingClub/mercator

  1. @Override
  2. protected void doScan() {
  3. GraphNodeGarbageCollector gc = newGarbageCollector().label("AwsRegion").bindScannerContext();
  4. rateLimit();
  5. DescribeRegionsResult result = getClient().describeRegions();
  6. result.getRegions().forEach(it -> {
  7. try {
  8. ObjectNode n = convertAwsObject(it, getRegion());
  9. n.remove(AccountScanner.ACCOUNT_ATTRIBUTE);
  10. String cypher = "merge (x:AwsRegion {aws_regionName:{aws_regionName}}) set x+={props} remove x.aws_region,x.aws_account set x.updateTs=timestamp() return x";
  11. NeoRxClient neoRx = getNeoRxClient();
  12. Preconditions.checkNotNull(neoRx);
  13. neoRx.execCypher(cypher, "aws_regionName", n.path("aws_regionName").asText(),
  14. AWSScanner.AWS_REGION_ATTRIBUTE, n.path(AWSScanner.AWS_REGION_ATTRIBUTE).asText(), "props", n)
  15. .forEach(gc.MERGE_ACTION);
  16. ScannerContext.getScannerContext().ifPresent(sc -> {
  17. sc.incrementEntityCount();
  18. });
  19. } catch (RuntimeException e) {
  20. maybeThrow(e, "problem scanning regions");
  21. }
  22. });
  23. }

代码示例来源:origin: LendingClub/mercator

  1. @Override
  2. protected void doScan() {
  3. rateLimit();
  4. DescribeSubnetsResult result = getClient().describeSubnets();
  5. GraphNodeGarbageCollector gc = newGarbageCollector().label("AwsSubnet").region(getRegion());
  6. result.getSubnets().forEach(it -> {
  7. try {
  8. ObjectNode n = convertAwsObject(it, getRegion());
  9. String cypher = "MERGE (v:AwsSubnet {aws_arn:{aws_arn}}) set v+={props}, v.updateTs=timestamp() return v";
  10. NeoRxClient client = getNeoRxClient();
  11. Preconditions.checkNotNull(client);
  12. client.execCypher(cypher, "aws_arn",n.get("aws_arn").asText(),"props",n).forEach(r->{
  13. gc.MERGE_ACTION.accept(r);
  14. getShadowAttributeRemover().removeTagAttributes("AwsSubnet", n, r);
  15. });
  16. incrementEntityCount();
  17. } catch (RuntimeException e) {
  18. gc.markException(e);
  19. maybeThrow(e,"problem scanning subnets");
  20. }
  21. });
  22. gc.invoke();
  23. }

代码示例来源:origin: LendingClub/mercator

  1. private void doScan(String... asgNames) {
  2. GraphNodeGarbageCollector gc = newGarbageCollector();
  3. if (asgNames==null || asgNames.length==0) {
  4. gc.bindScannerContext();
  5. }
  6. forEachAsg(asg -> {
  7. try {
  8. ObjectNode n = convertAwsObject(asg, getRegion());
  9. String asgArn = n.path("aws_arn").asText();
  10. String cypher = "merge (x:AwsAsg {aws_arn:{aws_arn}}) set x+={props}, x.updateTs=timestamp() return x";
  11. Preconditions.checkNotNull(getNeoRxClient());
  12. getNeoRxClient().execCypher(cypher, "aws_arn", asgArn, "props", n).forEach(r -> {
  13. gc.MERGE_ACTION.accept(r);
  14. getShadowAttributeRemover().removeTagAttributes("AwsAsg", n, r);
  15. });
  16. incrementEntityCount();
  17. mapAsgRelationships(asg, asgArn, getRegion().getName());
  18. } catch (RuntimeException e) {
  19. maybeThrow(e, "problem scanning asg");
  20. }
  21. }, asgNames);
  22. }

代码示例来源:origin: Tristan971/Lyrebird

  1. private void profilePictures() {
  2. JavaFxObservable.valuesOf(currentMessage).forEach(messageEvent -> {
  3. final boolean isSentByMe = sessionManager.isCurrentUser(messageEvent.getSenderId());
  4. final User sender = cachedTwitterInfoService.getUser(messageEvent.getSenderId());
  5. if (isSentByMe) {
  6. container.setAlignment(Pos.TOP_RIGHT);
  7. ppSetupSender(currentUserPpBox, sender);
  8. ppSetupReceiver(otherPpBox);
  9. } else {
  10. container.setAlignment(Pos.TOP_LEFT);
  11. ppSetupSender(otherPpBox, sender);
  12. ppSetupReceiver(currentUserPpBox);
  13. }
  14. });
  15. }

相关文章

Observable类方法