java:newsinglethreadscheduledexecutor在与akka框架并行使用时未给出预期结果

eeq64g8w  于 2021-07-12  发布在  Java
关注(0)|答案(1)|浏览(263)

在我的用例中使用akka框架,我创建了一个supervisoractor和两个子Actor,现在并行于我的token服务,它需要在到期前更新我的缓存,请查找代码:

public class TokenCacheService {
    final Logger logger = LoggerFactory.getLogger(TokenCacheService.class);
    private static final String KEY = "USER_TOKEN";
    private LoadingCache<String, String> tokenCache;
    private final ScheduledExecutorService cacheScheduler;
    ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat("MyCacheRefresher-pool-%d").setDaemon(true)
            .build();

    public UserTokenCacheService(CacheConfig cacheConfig) {
        cacheScheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
        buildCache(cacheConfig);
    }

    public String getToken() {
        String token = StringUtils.EMPTY;
        try {
            token = tokenCache.get(KEY);
        } catch (ExecutionException ex) {
            logger.debug("unable to process get token...");
        }
        return token;
    }

    private void buildCache(CacheConfig cacheConfig) {
        tokenCache = CacheBuilder.newBuilder()
                .refreshAfterWrite(4, "HOURS")
                .expireAfterWrite(5, "HOURS")
                .maximumSize(2)
                .build(new CacheLoader<String, String>() {
                    @Override
                    @ParametersAreNonnullByDefault
                    public String load(String queryKey) {
                        logger.debug("cache load()");
                        return <token method call which return token>
                    }

                    @Override
                    @ParametersAreNonnullByDefault
                    public ListenableFutureTask<String> reload(final String key, String prevToken) {
                        logger.debug("cache reload()");
                        ListenableFutureTask<String> task = ListenableFutureTask.create(() -> return <token method call which return token>);
                        cacheScheduler.execute(task);
                        return task;
                    }
                });
        cacheScheduler.scheduleWithFixedDelay(() -> tokenCache.refresh(KEY), 0,
                4, "HOURS");
    }
}

它在测试类中运行良好:

public static void main(String[] args) throws InterruptedException {
        TokenCacheService userTokenCacheService = new TokenCacheService();
        while(true){
            System.out.println(tokenCacheService.getToken());
            Thread.sleep(180000);
        }
    }

上面的方法打印正确的日志4小时后,这是预期的,但当我运行上述代码与我的实际应用程序(与akka演员)我只能看到第一个日志 cache load() 除此之外,它不会打印进一步的日志来重新加载缓存。
请告诉我我做错了什么。

qzwqbdag

qzwqbdag1#

我将priority设置为1并将其替换,从而稍微调整了代码 scheduleWithFixedDelayscheduleAtFixedRate ```
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("MyCacheRefresher-pool-%d")
.setPriority(1)
.build();

public UserTokenCacheService(CacheConfig cacheConfig) {
    idsTokenApplication = new IdsTokenApplication();
    cacheScheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
    buildCache(cacheConfig);
}

cacheScheduler.scheduleAtFixedRate(() -> tokenCache.refresh(KEY), 0,
cacheConfig.getReloadCache(), TimeUnit.valueOf(cacheConfig.getReloadCacheTimeUnit()));

相关问题