本文整理了Java中akka.actor.Props.withMailbox()
方法的一些代码示例,展示了Props.withMailbox()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Props.withMailbox()
方法的具体详情如下:
包路径:akka.actor.Props
类名称:Props
方法名:withMailbox
暂无
代码示例来源:origin: org.opendaylight.controller/sal-distributed-datastore
private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator,
final String shardDispatcher, final String shardManagerId) {
Exception lastException = null;
for(int i=0;i<100;i++) {
try {
return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox(
ActorContext.BOUNDED_MAILBOX), shardManagerId);
} catch (Exception e){
lastException = e;
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying (retry count = {})",
shardManagerId, e.getMessage(), i);
}
}
throw new IllegalStateException("Failed to create Shard Manager", lastException);
}
代码示例来源:origin: krudolph/akkaflow
ext.props("supervisor").withMailbox("akka.priority-mailbox"));
代码示例来源:origin: opendaylight/controller
@Override
public void preStart() throws Exception {
super.preStart();
rpcInvoker = getContext().actorOf(RpcInvoker.props(rpcServices)
.withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
LOG.debug("Listening for RPC invocation requests with {}", rpcInvoker);
rpcRegistrar = getContext().actorOf(RpcRegistrar.props(config, rpcProvisionRegistry)
.withMailbox(config.getMailBoxName()), config.getRpcRegistrarName());
LOG.debug("Registering remote RPCs with {}", rpcRegistrar);
rpcRegistry = getContext().actorOf(RpcRegistry.props(config, rpcInvoker, rpcRegistrar)
.withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
LOG.debug("Propagating RPC information with {}", rpcRegistry);
final RpcListener rpcListener = new RpcListener(rpcRegistry);
LOG.debug("Registering local availabitility listener {}", rpcListener);
listenerReg = rpcServices.registerRpcListener(rpcListener);
}
代码示例来源:origin: org.opendaylight.controller/sal-distributed-datastore
private ActorRef getNotifierActor() {
if(notifierActor == null) {
LOG.debug("Creating actor {}", actorName);
String dispatcher = new Dispatchers(actorContext.system().dispatchers()).getDispatcherPath(
Dispatchers.DispatcherType.Notification);
notifierActor = actorContext.actorOf(ShardDataTreeNotificationPublisherActor.props(actorName)
.withDispatcher(dispatcher).withMailbox(
org.opendaylight.controller.cluster.datastore.utils.ActorContext.BOUNDED_MAILBOX), actorName);
}
return notifierActor;
}
}
代码示例来源:origin: org.opendaylight.controller/sal-remoterpc-connector
private void createRpcActors() {
LOG.debug("Create rpc registry and broker actors");
rpcRegistry =
getContext().actorOf(RpcRegistry.props(config).
withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
rpcBroker =
getContext().actorOf(RpcBroker.props(rpcServices).
withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
rpcRegistry.tell(localRouter, self());
}
代码示例来源:origin: sabomichal/akka-java-springfactory
@SuppressWarnings("unchecked")
private ActorRef doCreateObject() throws Exception {
Props props;
if (actorClass != null) {
props = Props.create(new SpringCreator(ctx, Class.forName(actorClass), args));
} else if (actorBeanName != null && actorBeanClass != null) {
props = SpringProps.create(actorSystem, actorBeanName, (Class<? extends AbstractActor>) Class.forName(actorBeanClass));
} else if (actorBeanClass != null) {
props = SpringProps.create(actorSystem, (Class<? extends AbstractActor>) Class.forName(actorBeanClass));
} else {
props = SpringProps.create(actorSystem, actorBeanName);
}
if (props == null) {
throw new BeanCreationException("Can not create ActorRef for given parameters, actorClass=" + actorClass + ", actorBeanClass=" + actorBeanClass + ", actorBeanName=" + actorBeanName);
}
if (routerConfig != null) {
props = props.withRouter(routerConfig);
}
if (deploy != null) {
props = props.withDeploy(deploy);
}
if (mailbox != null) {
props = props.withMailbox(mailbox);
}
if (dispatcher != null) {
props = props.withDispatcher(dispatcher);
}
return actorSystem.actorOf(props);
}
代码示例来源:origin: opendaylight/controller
@Override
public void preStart() {
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
if (provider instanceof ClusterActorRefProvider) {
getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
}
}
代码示例来源:origin: org.opendaylight.controller/sal-remoterpc-connector
@Override
public void preStart(){
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
if ( provider instanceof ClusterActorRefProvider) {
getContext().actorOf(Props.create(Gossiper.class, config).withMailbox(config.getMailBoxName()), "gossiper");
}
}
代码示例来源:origin: opendaylight/controller
@Test
public void shouldSendMsgToDeadLetterWhenQueueIsFull() {
final TestKit mockReceiver = new TestKit(actorSystem);
actorSystem.eventStream().subscribe(mockReceiver.testActor(), DeadLetter.class);
final FiniteDuration twentySeconds = new FiniteDuration(20, TimeUnit.SECONDS);
ActorRef pingPongActor = actorSystem.actorOf(PingPongActor.props(lock).withMailbox(config.getMailBoxName()),
"pingpongactor");
actorSystem.mailboxes().settings();
lock.lock();
try {
//queue capacity = 10
//need to send 12 messages; 1 message is dequeued and actor waits on lock,
//2nd to 11th messages are put on the queue
//12th message is sent to dead letter.
for (int i = 0; i < 12; i++) {
pingPongActor.tell("ping", mockReceiver.testActor());
}
mockReceiver.expectMsgClass(twentySeconds, DeadLetter.class);
} finally {
lock.unlock();
}
mockReceiver.receiveN(11, twentySeconds);
}
代码示例来源:origin: org.eclipse.ditto/ditto-services-thingsearch-updater-actors
private ThingsUpdater(final int numberOfShards,
final ShardRegionFactory shardRegionFactory,
final ThingsSearchUpdaterPersistence searchUpdaterPersistence,
final CircuitBreaker circuitBreaker,
final boolean eventProcessingActive,
final Duration thingUpdaterActivityCheckInterval,
final int maxBulkSize) {
final ActorSystem actorSystem = context().system();
// Start the proxy for the Things and Policies sharding, too.
final ActorRef thingsShardRegion = shardRegionFactory.getThingsShardRegion(numberOfShards);
final ActorRef policiesShardRegion = shardRegionFactory.getPoliciesShardRegion(numberOfShards);
final ActorRef pubSubMediator = DistributedPubSub.get(actorSystem).mediator();
final Props thingUpdaterProps =
ThingUpdater.props(searchUpdaterPersistence, circuitBreaker, thingsShardRegion, policiesShardRegion,
thingUpdaterActivityCheckInterval, ThingUpdater.DEFAULT_THINGS_TIMEOUT, maxBulkSize)
.withMailbox("akka.actor.custom-updater-mailbox");
shardRegion = shardRegionFactory.getSearchUpdaterShardRegion(numberOfShards, thingUpdaterProps);
this.searchUpdaterPersistence = searchUpdaterPersistence;
materializer = ActorMaterializer.create(getContext());
retrieveStatisticsDetailsResponseSupplier = RetrieveStatisticsDetailsResponseSupplier.of(shardRegion,
ThingsSearchConstants.SHARD_REGION, log);
if (eventProcessingActive) {
pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(ThingEvent.TYPE_PREFIX, UPDATER_GROUP, self()),
self());
pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(PolicyEvent.TYPE_PREFIX, UPDATER_GROUP, self()),
self());
}
}
代码示例来源:origin: eclipse/ditto
policiesShardRegion, thingUpdaterActivityCheckInterval, ThingUpdater.DEFAULT_THINGS_TIMEOUT,
maxBulkSize)
.withMailbox("akka.actor.custom-updater-mailbox");
内容来源于网络,如有侵权,请联系作者删除!