akka.actor.Props.withMailbox()方法的使用及代码示例

x33g5p2x  于2022-01-26 转载在 其他  
字(7.4k)|赞(0)|评价(0)|浏览(78)

本文整理了Java中akka.actor.Props.withMailbox()方法的一些代码示例,展示了Props.withMailbox()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Props.withMailbox()方法的具体详情如下:
包路径:akka.actor.Props
类名称:Props
方法名:withMailbox

Props.withMailbox介绍

暂无

代码示例

代码示例来源:origin: org.opendaylight.controller/sal-distributed-datastore

private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator,
    final String shardDispatcher, final String shardManagerId) {
  Exception lastException = null;
  for(int i=0;i<100;i++) {
    try {
      return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox(
          ActorContext.BOUNDED_MAILBOX), shardManagerId);
    } catch (Exception e){
      lastException = e;
      Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
      LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying (retry count = {})",
        shardManagerId, e.getMessage(), i);
    }
  }
  throw new IllegalStateException("Failed to create Shard Manager", lastException);
}

代码示例来源:origin: krudolph/akkaflow

ext.props("supervisor").withMailbox("akka.priority-mailbox"));

代码示例来源:origin: opendaylight/controller

@Override
public void preStart() throws Exception {
  super.preStart();
  rpcInvoker = getContext().actorOf(RpcInvoker.props(rpcServices)
    .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
  LOG.debug("Listening for RPC invocation requests with {}", rpcInvoker);
  rpcRegistrar = getContext().actorOf(RpcRegistrar.props(config, rpcProvisionRegistry)
    .withMailbox(config.getMailBoxName()), config.getRpcRegistrarName());
  LOG.debug("Registering remote RPCs with {}", rpcRegistrar);
  rpcRegistry = getContext().actorOf(RpcRegistry.props(config, rpcInvoker, rpcRegistrar)
      .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
  LOG.debug("Propagating RPC information with {}", rpcRegistry);
  final RpcListener rpcListener = new RpcListener(rpcRegistry);
  LOG.debug("Registering local availabitility listener {}", rpcListener);
  listenerReg = rpcServices.registerRpcListener(rpcListener);
}

代码示例来源:origin: org.opendaylight.controller/sal-distributed-datastore

private ActorRef getNotifierActor() {
    if(notifierActor == null) {
      LOG.debug("Creating actor {}", actorName);

      String dispatcher = new Dispatchers(actorContext.system().dispatchers()).getDispatcherPath(
          Dispatchers.DispatcherType.Notification);
      notifierActor = actorContext.actorOf(ShardDataTreeNotificationPublisherActor.props(actorName)
          .withDispatcher(dispatcher).withMailbox(
              org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX), actorName);
    }

    return notifierActor;
  }
}

代码示例来源:origin: org.opendaylight.controller/sal-remoterpc-connector

private void createRpcActors() {
  LOG.debug("Create rpc registry and broker actors");
  rpcRegistry =
      getContext().actorOf(RpcRegistry.props(config).
        withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
  rpcBroker =
      getContext().actorOf(RpcBroker.props(rpcServices).
        withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
  final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
  rpcRegistry.tell(localRouter, self());
}

代码示例来源:origin: sabomichal/akka-java-springfactory

@SuppressWarnings("unchecked")
private ActorRef doCreateObject() throws Exception {
  Props props;
  if (actorClass != null) {
    props = Props.create(new SpringCreator(ctx, Class.forName(actorClass), args));
  } else if (actorBeanName != null && actorBeanClass != null) {
    props = SpringProps.create(actorSystem, actorBeanName, (Class<? extends AbstractActor>) Class.forName(actorBeanClass));
  } else if (actorBeanClass != null) {
    props = SpringProps.create(actorSystem, (Class<? extends AbstractActor>) Class.forName(actorBeanClass));
  } else {
    props = SpringProps.create(actorSystem, actorBeanName);
  }
  if (props == null) {
    throw new BeanCreationException("Can not create ActorRef for given parameters, actorClass=" + actorClass + ", actorBeanClass=" + actorBeanClass + ", actorBeanName=" + actorBeanName);
  }
  if (routerConfig != null) {
    props = props.withRouter(routerConfig);
  }
  if (deploy != null) {
    props = props.withDeploy(deploy);
  }
  if (mailbox != null) {
    props = props.withMailbox(mailbox);
  }
  if (dispatcher != null) {
    props = props.withDispatcher(dispatcher);
  }
  return actorSystem.actorOf(props);
}

代码示例来源:origin: opendaylight/controller

@Override
public void preStart() {
  ActorRefProvider provider = getContext().provider();
  selfAddress = provider.getDefaultAddress();
  if (provider instanceof ClusterActorRefProvider) {
    getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
  }
}

代码示例来源:origin: org.opendaylight.controller/sal-remoterpc-connector

@Override
public void preStart(){
  ActorRefProvider provider = getContext().provider();
  selfAddress = provider.getDefaultAddress();
  if ( provider instanceof ClusterActorRefProvider) {
    getContext().actorOf(Props.create(Gossiper.class, config).withMailbox(config.getMailBoxName()), "gossiper");
  }
}

代码示例来源:origin: opendaylight/controller

@Test
public void shouldSendMsgToDeadLetterWhenQueueIsFull() {
  final TestKit mockReceiver = new TestKit(actorSystem);
  actorSystem.eventStream().subscribe(mockReceiver.testActor(), DeadLetter.class);
  final FiniteDuration twentySeconds = new FiniteDuration(20, TimeUnit.SECONDS);
  ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(config.getMailBoxName()),
                         "pingpongactor");
  actorSystem.mailboxes().settings();
  lock.lock();
  try {
    //queue capacity = 10
    //need to send 12 messages; 1 message is dequeued and actor waits on lock,
    //2nd to 11th messages are put on the queue
    //12th message is sent to dead letter.
    for (int i = 0; i < 12; i++) {
      pingPongActor.tell("ping", mockReceiver.testActor());
    }
    mockReceiver.expectMsgClass(twentySeconds, DeadLetter.class);
  } finally {
    lock.unlock();
  }
  mockReceiver.receiveN(11, twentySeconds);
}

代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-updater-actors

private ThingsUpdater(final int numberOfShards,
    final ShardRegionFactory shardRegionFactory,
    final ThingsSearchUpdaterPersistence searchUpdaterPersistence,
    final CircuitBreaker circuitBreaker,
    final boolean eventProcessingActive,
    final Duration thingUpdaterActivityCheckInterval,
    final int maxBulkSize) {
  final ActorSystem actorSystem = context().system();
  // Start the proxy for the Things and Policies sharding, too.
  final ActorRef thingsShardRegion = shardRegionFactory.getThingsShardRegion(numberOfShards);
  final ActorRef policiesShardRegion = shardRegionFactory.getPoliciesShardRegion(numberOfShards);
  final ActorRef pubSubMediator = DistributedPubSub.get(actorSystem).mediator();
  final Props thingUpdaterProps =
      ThingUpdater.props(searchUpdaterPersistence, circuitBreaker, thingsShardRegion, policiesShardRegion,
          thingUpdaterActivityCheckInterval, ThingUpdater.DEFAULT_THINGS_TIMEOUT, maxBulkSize)
          .withMailbox("akka.actor.custom-updater-mailbox");
  shardRegion = shardRegionFactory.getSearchUpdaterShardRegion(numberOfShards, thingUpdaterProps);
  this.searchUpdaterPersistence = searchUpdaterPersistence;
  materializer = ActorMaterializer.create(getContext());
  retrieveStatisticsDetailsResponseSupplier = RetrieveStatisticsDetailsResponseSupplier.of(shardRegion,
      ThingsSearchConstants.SHARD_REGION, log);
  if (eventProcessingActive) {
    pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(ThingEvent.TYPE_PREFIX, UPDATER_GROUP, self()),
        self());
    pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(PolicyEvent.TYPE_PREFIX, UPDATER_GROUP, self()),
        self());
  }
}

代码示例来源:origin: eclipse/ditto

policiesShardRegion, thingUpdaterActivityCheckInterval, ThingUpdater.DEFAULT_THINGS_TIMEOUT,
maxBulkSize)
.withMailbox("akka.actor.custom-updater-mailbox");

相关文章