io.reactivex.Observable.forEach()方法的使用及代码示例

x33g5p2x  于2022-01-25 转载在 其他  
字(12.7k)|赞(0)|评价(0)|浏览(115)

本文整理了Java中io.reactivex.Observable.forEach()方法的一些代码示例,展示了Observable.forEach()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Observable.forEach()方法的具体详情如下:
包路径:io.reactivex.Observable
类名称:Observable
方法名:forEach

Observable.forEach介绍

[英]Subscribes to the ObservableSource and receives notifications for each element.

Alias to #subscribe(Consumer) Scheduler: forEach does not operate by default on a particular Scheduler.
[中]订阅ObservableSource并接收每个元素的通知。
#subscribe(Consumer)Scheduler的别名:默认情况下,forEach不会在特定的调度器上运行。

代码示例

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void forEachNull() {
  just1.forEach(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test(expected = NullPointerException.class)
public void testForEachWithNull() {
  Observable.error(new Exception("boo"))
  //
  .forEach(null);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testIssue3008RetryInfinite() {
  final List<Long> list = new CopyOnWriteArrayList<Long>();
  final AtomicBoolean isFirst = new AtomicBoolean(true);
  Observable.<Long> just(1L, 2L, 3L).map(new Function<Long, Long>() {
    @Override
    public Long apply(Long x) {
      System.out.println("map " + x);
      if (x == 2 && isFirst.getAndSet(false)) {
        throw new RuntimeException("retryable error");
      }
      return x;
    }})
  .retry()
  .forEach(new Consumer<Long>() {
    @Override
    public void accept(Long t) {
      System.out.println(t);
      list.add(t);
    }});
  assertEquals(Arrays.asList(1L, 1L, 2L, 3L), list);
}

代码示例来源:origin: ReactiveX/RxJava

@Test
public void testIssue3008RetryWithPredicate() {
  final List<Long> list = new CopyOnWriteArrayList<Long>();
  final AtomicBoolean isFirst = new AtomicBoolean(true);
  Observable.<Long> just(1L, 2L, 3L).map(new Function<Long, Long>() {
    @Override
    public Long apply(Long x) {
      System.out.println("map " + x);
      if (x == 2 && isFirst.getAndSet(false)) {
        throw new RuntimeException("retryable error");
      }
      return x;
    }})
  .retry(new BiPredicate<Integer, Throwable>() {
    @Override
    public boolean test(Integer t1, Throwable t2) {
      return true;
    }})
  .forEach(new Consumer<Long>() {
    @Override
    public void accept(Long t) {
      System.out.println(t);
      list.add(t);
    }});
  assertEquals(Arrays.asList(1L, 1L, 2L, 3L), list);
}

代码示例来源:origin: Tristan971/Lyrebird

private void textualContent() {
  JavaFxObservable.valuesOf(currentMessage)
          .map(DirectMessage::getText)
          .observeOn(JavaFxScheduler.platform())
          .forEach(messageContent::setText);
}

代码示例来源:origin: LendingClub/mercator

protected void mapElbToInstance(JsonNode instances, String elbArn, String region) {
  AtomicLong oldestRelationshipTs = new AtomicLong(Long.MAX_VALUE);
  for (JsonNode i : instances) {
    String instanceName = i.path("instanceId").asText();
    String instanceArn = String.format("arn:aws:ec2:%s:%s:instance/%s", region, getAccountId(), instanceName);
    // logger.info("{} instanceArn: {}",elbArn,instanceArn);
    String cypher = "match (x:AwsElb {aws_arn:{elbArn}}), (y:AwsEc2Instance {aws_arn:{instanceArn}}) "
        + "merge (x)-[r:DISTRIBUTES_TRAFFIC_TO]->(y) set r.updateTs=timestamp() return x,r,y";
    getNeoRxClient().execCypher(cypher, "elbArn", elbArn, "instanceArn", instanceArn).forEach(r -> {
      oldestRelationshipTs.set(Math.min(r.path("r").path("updateTs").asLong(), oldestRelationshipTs.get()));
    });
    if (oldestRelationshipTs.get() > 0 && oldestRelationshipTs.get() < Long.MAX_VALUE) {
      cypher = "match (x:AwsElb {aws_arn:{elbArn}})-[r:DISTRIBUTES_TRAFFIC_TO]-(y:AwsEc2Instance) where r.updateTs<{oldest}  delete r";
      getNeoRxClient().execCypher(cypher, "elbArn", elbArn, "oldest", oldestRelationshipTs.get());
    }
  }
}

代码示例来源:origin: LendingClub/mercator

long saveDockerNode(String swarmClusterId, JsonNode n) {
  String swarmNodeId = n.get("swarmNodeId").asText();
  AtomicLong updateTs = new AtomicLong(Long.MAX_VALUE);
  dockerScanner.getNeoRxClient()
      .execCypher(
          "merge (n:DockerHost {swarmNodeId:{nodeId}}) set n+={props}, n.updateTs=timestamp() return n",
          "nodeId", swarmNodeId, "props", n)
      .forEach(actual -> {
        removeDockerLabels("DockerHost", "swarmNodeId", swarmNodeId, n, actual);
        updateTs.set(Math.min(updateTs.get(), actual.path("updateTs").asLong(Long.MAX_VALUE)));
      });
  logger.info("connecting swarm={} to node={}", swarmClusterId, swarmNodeId);
  dockerScanner.getNeoRxClient().execCypher(
      "match (s:DockerSwarm {swarmClusterId:{swarmClusterId}}), (n:DockerHost {swarmNodeId:{nodeId}}) merge (s)-[x:CONTAINS]->(n) set x.updateTs=timestamp()",
      "swarmClusterId", swarmClusterId, "nodeId", swarmNodeId);
  return updateTs.get();
}

代码示例来源:origin: Tristan971/Lyrebird

private void listenToNewConversations() {
  directMessages.directMessages().keySet().forEach(this::createTabForPal);
  JavaFxObservable.additionsOf(directMessages.directMessages())
          .map(Map.Entry::getKey)
          .forEach(this::createTabForPal);
}

代码示例来源:origin: LendingClub/mercator

private void projectTopic(Topic topic) {
  String arn = topic.getTopicArn();
  List<String> parts = Splitter.on(":").splitToList(arn);
  incrementEntityCount();
  ObjectNode n = mapper.createObjectNode();
  n.put("aws_account", getAccountId());
  n.put("aws_region", getRegion().getName());
  n.put("name", parts.get(parts.size() - 1));
  String cypher = "merge (t:AwsSnsTopic {aws_arn:{arn}}) set t+={props}, t.updateTs=timestamp() return t";
  getNeoRxClient().execCypher(cypher, "arn", arn, "props", n).forEach(r -> {
    getShadowAttributeRemover().removeTagAttributes("AwsSnsTopic", n, r);
  });
  cypher = "match (a:AwsAccount {aws_account:{account}}), (t:AwsSnsTopic {aws_account:{account}}) MERGE (a)-[r:OWNS]->(t) set r.updateTs=timestamp()";
  getNeoRxClient().execCypher(cypher, "account", getAccountId());
}

代码示例来源:origin: LendingClub/mercator

private void scanBucket(Bucket b) {
  
  
    ObjectNode props = mapper.createObjectNode();
    props.put("name", b.getName());
    
    props.put("aws_arn", computeArn(props).orElse(null));
    props.put("aws_account", getAccountId());
    

    String cypher = "merge (b:AwsS3Bucket { aws_arn:{aws_arn} }) set b+={props}, b.updateTs=timestamp()";
    
    getNeoRxClient().execCypher(cypher, "aws_arn",props.get("aws_arn"), "props",props).forEach(r->{
      getShadowAttributeRemover().removeTagAttributes("AwsS3Bucket", props, r);
    });
    incrementEntityCount();
  
    cypher = "match (a:AwsAccount {aws_account:{account}}), (b:AwsS3Bucket {aws_account:{account}}) MERGE (a)-[r:OWNS]->(b) set r.updateTs=timestamp()";
    
    getNeoRxClient().execCypher(cypher, "account",getAccountId());
    
  
  }
}

代码示例来源:origin: LendingClub/mercator

private void scanInstanceProfile(GraphNodeGarbageCollector gc, InstanceProfile instanceProfile) {
  ObjectNode n = convertAwsObject(instanceProfile, null);
  NeoRxClient neo4j = getNeoRxClient();
  String cypher = "merge (n:AwsInstanceProfile { aws_arn: {a} }) set n += {p}, n.updateTs = timestamp() return n";
  neo4j.execCypher(cypher, "a", instanceProfile.getArn(), "p", n).forEach(it -> {
    gc.MERGE_ACTION.accept(it);
  });
  incrementEntityCount();
  LinkageHelper linkage = newLinkageHelper();
  linkage.withTargetLabel("AwsIamRole").withFromArn(instanceProfile.getArn()).withLinkLabel("HAS_ROLE")
      .withTargetValues(instanceProfile.getRoles().stream().map(Role::getArn).collect(Collectors.toList()))
      .execute();
}

代码示例来源:origin: LendingClub/mercator

private void projectElb(LoadBalancerDescription elb, GraphNodeGarbageCollector gc) {
  ObjectNode n = convertAwsObject(elb, getRegion());
  incrementEntityCount();
  String elbArn = n.path("aws_arn").asText();
  logger.debug("Scanning elb: {}", elbArn);
  String cypher = "merge (x:AwsElb {aws_arn:{aws_arn}}) set x+={props} set x.updateTs=timestamp() return x";
  Preconditions.checkNotNull(getNeoRxClient());
  getNeoRxClient().execCypher(cypher, "aws_arn", elbArn, "props", n).forEach(it -> {
    if (gc != null) {
      gc.MERGE_ACTION.accept(it);
    }
  });
  mapElbRelationships(elb, elbArn, getRegion().getName());
}

代码示例来源:origin: LendingClub/mercator

private void scanConnection(GraphNodeGarbageCollector gc, VpcPeeringConnection peeringConnection) {
  NeoRxClient neo4j = getNeoRxClient();
  ObjectNode n = convertAwsObject(peeringConnection, getRegion());
  neo4j.execCypher("merge (n:AwsVpcPeeringConnection { aws_arn: {a} }) set n += {p} return n", "a",
      n.path("aws_arn"), "p", n).forEach(gc.MERGE_ACTION);
  incrementEntityCount();
}

代码示例来源:origin: LendingClub/mercator

private void scanVpnGateway(GraphNodeGarbageCollector gc, VpnGateway c) {
  NeoRxClient neo4j = getNeoRxClient();
  ObjectNode n = convertAwsObject(c, getRegion());
  neo4j.execCypher("merge (n:AwsVpnGateway { aws_arn: {a} }) set n += {p} return n", "a",
      n.path("aws_arn"), "p", n).forEach(gc.MERGE_ACTION);
  incrementEntityCount();
}

代码示例来源:origin: LendingClub/mercator

private void scanInternetGateway(GraphNodeGarbageCollector gc, InternetGateway c) {
  NeoRxClient neo4j = getNeoRxClient();
  ObjectNode n = convertAwsObject(c, getRegion());
  neo4j.execCypher("merge (n:AwsInternetGateway { aws_arn: {a} }) set n += {p} return n", "a", n.path("aws_arn"), "p",
      n).forEach(gc.MERGE_ACTION);
  incrementEntityCount();
}

代码示例来源:origin: LendingClub/mercator

private void mergeQueue(ObjectNode n) {
  incrementEntityCount();
  String cypher = "merge (t:AwsSqsQueue {aws_arn:{aws_arn}}) set t+={props}, t.updateTs=timestamp() return t";
  getNeoRxClient().execCypher(cypher, "aws_arn", n.path("aws_arn").asText(), "props", n).forEach(r -> {
    getShadowAttributeRemover().removeTagAttributes("AwsSqsQueue", n, r);
  });
  cypher = "match (a:AwsAccount {aws_account:{account}}), (q:AwsSqsQueue {aws_account:{account}}) MERGE (a)-[r:OWNS]->(q) set r.updateTs=timestamp()";
  getNeoRxClient().execCypher(cypher, "account", getAccountId());
}

代码示例来源:origin: LendingClub/mercator

@Override
protected void doScan() {
  GraphNodeGarbageCollector gc = newGarbageCollector().label("AwsRegion").bindScannerContext();
  rateLimit();
  DescribeRegionsResult result = getClient().describeRegions();
  result.getRegions().forEach(it -> {
    try {
      ObjectNode n = convertAwsObject(it, getRegion());
      n.remove(AccountScanner.ACCOUNT_ATTRIBUTE);
      String cypher = "merge (x:AwsRegion {aws_regionName:{aws_regionName}}) set x+={props}  remove x.aws_region,x.aws_account set x.updateTs=timestamp() return x";
      NeoRxClient neoRx = getNeoRxClient();
      Preconditions.checkNotNull(neoRx);
      neoRx.execCypher(cypher, "aws_regionName", n.path("aws_regionName").asText(),
          AWSScanner.AWS_REGION_ATTRIBUTE, n.path(AWSScanner.AWS_REGION_ATTRIBUTE).asText(), "props", n)
          .forEach(gc.MERGE_ACTION);
      ScannerContext.getScannerContext().ifPresent(sc -> {
        sc.incrementEntityCount();
      });
    } catch (RuntimeException e) {
      maybeThrow(e, "problem scanning regions");
    }
  });
}

代码示例来源:origin: LendingClub/mercator

@Override
protected void doScan() {

  rateLimit();
  DescribeSubnetsResult result = getClient().describeSubnets();
  GraphNodeGarbageCollector gc = newGarbageCollector().label("AwsSubnet").region(getRegion());
  
  result.getSubnets().forEach(it -> {
    try {
      ObjectNode n = convertAwsObject(it, getRegion());
      
      
      String cypher = "MERGE (v:AwsSubnet {aws_arn:{aws_arn}}) set v+={props}, v.updateTs=timestamp() return v";
      
      NeoRxClient client = getNeoRxClient();
      Preconditions.checkNotNull(client);
      client.execCypher(cypher, "aws_arn",n.get("aws_arn").asText(),"props",n).forEach(r->{
        gc.MERGE_ACTION.accept(r);
        getShadowAttributeRemover().removeTagAttributes("AwsSubnet", n, r);
      });
      incrementEntityCount();
    } catch (RuntimeException e) {
      gc.markException(e);
      maybeThrow(e,"problem scanning subnets");
    }
  });
  
  gc.invoke();
}

代码示例来源:origin: LendingClub/mercator

private void doScan(String... asgNames) {
  
  GraphNodeGarbageCollector gc = newGarbageCollector();
  if (asgNames==null || asgNames.length==0) {
    gc.bindScannerContext();
  }
  forEachAsg(asg -> {
    try {
      ObjectNode n = convertAwsObject(asg, getRegion());
      String asgArn = n.path("aws_arn").asText();
      String cypher = "merge (x:AwsAsg {aws_arn:{aws_arn}}) set x+={props}, x.updateTs=timestamp() return x";
      Preconditions.checkNotNull(getNeoRxClient());
      getNeoRxClient().execCypher(cypher, "aws_arn", asgArn, "props", n).forEach(r -> {
        gc.MERGE_ACTION.accept(r);
        getShadowAttributeRemover().removeTagAttributes("AwsAsg", n, r);
      });
      incrementEntityCount();
      mapAsgRelationships(asg, asgArn, getRegion().getName());
    } catch (RuntimeException e) {
  
      maybeThrow(e, "problem scanning asg");
    }
  }, asgNames);
  
}

代码示例来源:origin: Tristan971/Lyrebird

private void profilePictures() {
  JavaFxObservable.valuesOf(currentMessage).forEach(messageEvent -> {
    final boolean isSentByMe = sessionManager.isCurrentUser(messageEvent.getSenderId());
    final User sender = cachedTwitterInfoService.getUser(messageEvent.getSenderId());
    if (isSentByMe) {
      container.setAlignment(Pos.TOP_RIGHT);
      ppSetupSender(currentUserPpBox, sender);
      ppSetupReceiver(otherPpBox);
    } else {
      container.setAlignment(Pos.TOP_LEFT);
      ppSetupSender(otherPpBox, sender);
      ppSetupReceiver(currentUserPpBox);
    }
  });
}

相关文章

Observable类方法