LazyInitializationException:如何将DB密集型进程卸载到Executor,同时仍然具有Hibernate的事务支持?

zrfyljdw  于 2023-11-21  发布在  其他
关注(0)|答案(2)|浏览(147)

我拥有的
我有一个正在运行的Web应用程序,它有一个端点来搜索项目。中间层的search方法看起来像这样:

private ListingScoringFunction scoringFunc;

@Transactional(readOnly = true)
public Page<Listing> search(ListingSearchParameters params) {
  Page<Long> pageOfListingIds = listingSearchRepo.searchForListingIds(params);
  return pageOfListingIds.map(eachListingId -> {
    var eachEntity = listingRepo.getOne(eachListingId).get();
    var score = scoringFunc.score(eachEntity);
    // Map the Hibernate entity to a serializable DTO
    return ModelMappings.toListing(eachEntity, score);
  });
}

字符串
当我调用ModelMappings.toListing()时,我访问了一堆延迟加载的字段。目前,这是可行的,因为所有操作都封装在活动会话/事务中。
我想要的
由于延迟加载的特性,我需要多次访问数据库。由于系统中的其他限制,我不想将这些字段更改为快速加载。我想通过多线程来增加吞吐量:当一个线程正在等待DB I/O时,将执行转移到准备运行评分函数的线程。
我创建了一个具有固定线程池的ExecutorService,将其连接到我的Service中,并修改代码如下所示:

private ListingScoringFunction scoringFunc;
private ExecutorService scoringExecutor;

@Transactional(readOnly = true)
public Page<Listing> search(ListingSearchParameters params) {
  Page<Long> pageOfListingIds = listingSearchRepo.searchForListingIds(params);
  List<Listing> scoredListings = scoreListingsById(pageOfListingIds);
  return new PageImpl<>(scoredListings);
}

private List<Listing> scoreListingsById(Page<Long> listingIds) {
  List<Listing> scoredListings;
  List<CompletableFuture<Listing>> resultList = listingIds.stream()
    .map(eachListingId -> {
        Supplier<Listing> scoreTask = () -> this.convertToScoredListing(eachListingId);
        return scoreTask;
     })
     .map(each -> CompletableFuture.supplyAsync(each, scoringExecutor))
     .toList();
  CompletableFuture.allOf(resultList.toArray(new CompletableFuture[0])).join();
  scoredListings = resultList.stream()
    .map(eachFuture -> eachFuture.getNow(null))
    .filter(Objects::nonNull)
    .toList();
  return scoredListings;
}

@Transactional
public Listing convertToScoredListing(Long eachListingId) {
  var eachListing = listingRepo.findById(eachListingId).get();
  var score = scoringFunc.score(eachListing);
  Listing scoredListing = ModelMappings.toListing(eachListing, score);
  return scoredListing;
}

出了什么问题

当我运行这段新代码并尝试从线程池中访问延迟加载的字段时,我得到了可怕的“no Session”错误:

org.hibernate.LazyInitializationException: failed to lazily initialize a collection of ___, 
  could not initialize proxy - no Session


据我所知,这通常意味着我调用代码的方式使得Spring/Hibernate无法判断我已经越过了事务边界(由@Transactional注解定义)。奇怪的是findById()调用成功,但是随后的ListingEntity.getLazyLoadedField()调用失败并抛出异常。我打开了一些Hibernate日志记录,看到它开始了一个transaction为listingRepo.findById()调用,但随后立即提交它,因此当我访问lazy字段时,transaction已提交,session已关闭。我解释这意味着当我调用@Repository示例时,Hibernate可以判断出这是一个已经越过的事务边界。ExecutorService,我调用convertToScoredListing(),它被注解为@Transactional。我不明白为什么这还不足以为整个方法的执行提供一个会话。
可能的混乱解决方案,我想避免:

  • convertToScoredListing()移到另一个类中,以使其更明显地表明边界已被跨越。当一个方法与该类紧密耦合时,将其从该类中强制出来似乎很混乱。
  • 手动管理线程中的会话/事务。
  • 在视图/表示层中启动会话。

什么是正确的配置卸载DB访问代码到一个线程?我如何让Hibernate在一个会话中运行我的线程?

o2g1uqev

o2g1uqev1#

好吧,我认为你的第一种方法有n+1issue,因为你在循环内查询getOnefindById。如果你打开spring.jpa.show-sql=true并计算查询次数,它将是pageSize + 1counting page又是1)。这是问题的根本原因。
有几种方法可以解决这个问题,但我在你的帖子中看到的最简单的方法是,在Repository中创建这个方法,而不是使用方法getOnefindById

List<ListingEntity> findByIdIn(Collection<Long> ids);

字符串
然后应用于服务

@Transactional(readOnly = true)
public Page<Listing> search(ListingSearchParameters params) {
  Page<Long> pageOfListingIds = listingSearchRepo.searchForListingIds(params);
  List<ListingEntity> listingEntities = repo.findByIdIn(pageOfListingIds.getContent());
  List<Listing> listings = convertToScoredListings(listingEntities);
  return new PageImpl<>(listings, pageable, pageOfListingIds.getTotalElements());
}

@Transactional
public Listing convertToScoredListings(List<ListingEntity> listingEntities) {
  return listingEntities.stream()
    .map(entity -> {
       var score = scoringFunc.score(eachListing);
       return ModelMappings.toListing(eachListing, score);
    })
    .collect(toList());
}


现在你可以检查一下,Hibernate只生成了3个查询(选择id,选择count,选择listingEntities by ids)
文章的第二部分很有趣,多线程可以提高性能。是的,但是当我们申请写数据而不是像这样的阅读(只有3个查询)时,它会很有帮助。

yquaqz18

yquaqz182#

最后,我通过在ExecutorService中使用TransactionTemplate来解决这个问题。每当线程异步运行时,我都会创建会话。基本策略是创建我自己的ExecutorService子类,它只是将提交的任务 Package 在事务中:

public class TransactionalExecutorService implements ExecutorService {
  private final ExecutorService internalExecutor;
  private final TransactionTemplate txnTemplate;

  public TransactionalExecutorService(
      int threadCount,
      PlatformTransactionManager transactionManager
  ) {
    internalExecutor = Executors.newFixedThreadPool(threadCount);
    txnTemplate = new TransactionTemplate(transactionManager);
  }

  private Runnable wrapInTransaction(Runnable threadWork) {
    return () -> txnTemplate.execute((txnStatus) -> {
      threadWork.run();
      return null;
    });
  }

  @Override
  public void execute(Runnable command) {
    internalExecutor.execute(wrapInTransaction(command));
  }

  /* ...similar implementations for submit/invoke methods */
}

字符串
它可以工作,但我仍然觉得我错过了框架已经提供的东西,或者误解了一个概念。

相关问题