java:newsinglethreadscheduledexecutor在与akka框架并行使用时未给出预期结果

eeq64g8w  于 2021-07-12  发布在  Java
关注(0)|答案(1)|浏览(301)

在我的用例中使用akka框架,我创建了一个supervisoractor和两个子Actor,现在并行于我的token服务,它需要在到期前更新我的缓存,请查找代码:

  1. public class TokenCacheService {
  2. final Logger logger = LoggerFactory.getLogger(TokenCacheService.class);
  3. private static final String KEY = "USER_TOKEN";
  4. private LoadingCache<String, String> tokenCache;
  5. private final ScheduledExecutorService cacheScheduler;
  6. ThreadFactory threadFactory = new ThreadFactoryBuilder()
  7. .setNameFormat("MyCacheRefresher-pool-%d").setDaemon(true)
  8. .build();
  9. public UserTokenCacheService(CacheConfig cacheConfig) {
  10. cacheScheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
  11. buildCache(cacheConfig);
  12. }
  13. public String getToken() {
  14. String token = StringUtils.EMPTY;
  15. try {
  16. token = tokenCache.get(KEY);
  17. } catch (ExecutionException ex) {
  18. logger.debug("unable to process get token...");
  19. }
  20. return token;
  21. }
  22. private void buildCache(CacheConfig cacheConfig) {
  23. tokenCache = CacheBuilder.newBuilder()
  24. .refreshAfterWrite(4, "HOURS")
  25. .expireAfterWrite(5, "HOURS")
  26. .maximumSize(2)
  27. .build(new CacheLoader<String, String>() {
  28. @Override
  29. @ParametersAreNonnullByDefault
  30. public String load(String queryKey) {
  31. logger.debug("cache load()");
  32. return <token method call which return token>
  33. }
  34. @Override
  35. @ParametersAreNonnullByDefault
  36. public ListenableFutureTask<String> reload(final String key, String prevToken) {
  37. logger.debug("cache reload()");
  38. ListenableFutureTask<String> task = ListenableFutureTask.create(() -> return <token method call which return token>);
  39. cacheScheduler.execute(task);
  40. return task;
  41. }
  42. });
  43. cacheScheduler.scheduleWithFixedDelay(() -> tokenCache.refresh(KEY), 0,
  44. 4, "HOURS");
  45. }
  46. }

它在测试类中运行良好:

  1. public static void main(String[] args) throws InterruptedException {
  2. TokenCacheService userTokenCacheService = new TokenCacheService();
  3. while(true){
  4. System.out.println(tokenCacheService.getToken());
  5. Thread.sleep(180000);
  6. }
  7. }

上面的方法打印正确的日志4小时后,这是预期的,但当我运行上述代码与我的实际应用程序(与akka演员)我只能看到第一个日志 cache load() 除此之外,它不会打印进一步的日志来重新加载缓存。
请告诉我我做错了什么。

qzwqbdag

qzwqbdag1#

我将priority设置为1并将其替换,从而稍微调整了代码 scheduleWithFixedDelayscheduleAtFixedRate ```
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("MyCacheRefresher-pool-%d")
.setPriority(1)
.build();

  1. public UserTokenCacheService(CacheConfig cacheConfig) {
  2. idsTokenApplication = new IdsTokenApplication();
  3. cacheScheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
  4. buildCache(cacheConfig);
  5. }

cacheScheduler.scheduleAtFixedRate(() -> tokenCache.refresh(KEY), 0,
cacheConfig.getReloadCache(), TimeUnit.valueOf(cacheConfig.getReloadCacheTimeUnit()));

展开查看全部

相关问题