在jpa事务中使用@transactionaleventlistener发送消息时,不会提交消息(丢失)

wwodge7n  于 2021-07-22  发布在  Java
关注(0)|答案(1)|浏览(703)

代码背景:
为了复制一个生产场景,我创建了一个虚拟应用程序,它基本上将在事务中的db中保存一些内容,并且在相同的方法中,它publishevent和publishevent向rabbitmq发送消息。
类别和用法
事务从此方法开始:

@Override
    @Transactional
    public EmpDTO createEmployeeInTrans(EmpDTO empDto) {
        return createEmployee(empDto);
    }

此方法将记录保存在db中,并触发publishevent

@Override
    public EmpDTO createEmployee(EmpDTO empDTO) {

        EmpEntity empEntity = new EmpEntity();
        BeanUtils.copyProperties(empDTO, empEntity);

        System.out.println("<< In Transaction : "+TransactionSynchronizationManager.getCurrentTransactionName()+" >>  Saving data for employee " + empDTO.getEmpCode());

        // Record data into a database
        empEntity = empRepository.save(empEntity);  

        // Sending event , this will send the message.
        eventPublisher.publishEvent(new ActivityEvent(empDTO));

        return createResponse(empDTO, empEntity);
    }

这是activityevent

import org.springframework.context.ApplicationEvent;
import com.kuldeep.rabbitMQProducer.dto.EmpDTO;
public class ActivityEvent extends ApplicationEvent {
    public ActivityEvent(EmpDTO source) {
        super(source);
    }
}

这是上面事件的transactionaleventlistener。

//@Transactional(propagation = Propagation.REQUIRES_NEW)
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void onActivitySave(ActivityEvent activityEvent) {
        System.out.println("Activity got event ... Sending message .. ");
        kRabbitTemplate.convertAndSend(exchange, routingkey, empDTO);       
    }

这是krabbittemplate是一个bean配置,如下所示:

@Bean
    public RabbitTemplate kRabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate kRabbitTemplate = new RabbitTemplate(connectionFactory);
        kRabbitTemplate.setChannelTransacted(true);
        kRabbitTemplate.setMessageConverter(kJsonMessageConverter());
        return kRabbitTemplate;
    }

问题定义
当我使用上述代码流在rabbitmq上保存记录并发送消息时,我的消息没有在服务器上传递意味着它们丢失了。
我对amqp交易的理解是:
如果模板是事务处理的,但是没有从spring/jpa事务调用convertandsend,那么消息将在模板的convertandsend方法中提交。

// this is a snippet from org.springframework.amqp.rabbit.core.RabbitTemplate.doSend()
    if (isChannelLocallyTransacted(channel)) {
            // Transacted channel created by this template -> commit.
            RabbitUtils.commitIfNecessary(channel);
        }

但是如果模板是事务处理的,并且convertandsend是从spring/jpa事务调用的,那么dosend方法中的ischannellocalytransated将计算false,commit将在启动spring/jpa事务的方法中完成。
我在上面的代码中调查了消息丢失的原因后发现的。
当我调用convertandsend方法时,spring事务是活动的,所以它应该在spring事务中提交消息。
为此,在org.springframework.amqp.rabbit.connection.connectionfactoryutils的bindresourcetotransaction中发送消息之前,rabbittemplate绑定资源并注册同步。

public static RabbitResourceHolder bindResourceToTransaction(RabbitResourceHolder resourceHolder,
            ConnectionFactory connectionFactory, boolean synched) {
        if (TransactionSynchronizationManager.hasResource(connectionFactory)
                || !TransactionSynchronizationManager.isActualTransactionActive() || !synched) {
            return (RabbitResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory); // NOSONAR never null
        }
        TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolder);
        resourceHolder.setSynchronizedWithTransaction(true);
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            TransactionSynchronizationManager.registerSynchronization(new RabbitResourceSynchronization(resourceHolder,
                    connectionFactory));
        }
        return resourceHolder;
    }

在我的代码中,在资源绑定之后,它无法注册同步,因为 TransactionSynchronizationManager.isSynchronizationActive()==false . 而且,由于无法注册同步,因此rabbitmq消息的spring commit没有发生 AbstractPlatformTransactionManager.triggerAfterCompletion 为每个同步调用rabbitmq的commit。
因为上述问题我面临什么问题。
消息未在spring事务中提交,因此消息丢失。
由于在bindresourcetotransaction中添加了资源,因此此资源保持绑定状态,不允许为同一线程中发送的任何其他消息添加资源。
可能的根本原因 TransactionSynchronizationManager.isSynchronizationActive()==false 我发现启动事务的方法在org.springframework.transaction.support.abstractplatformtransactionmanager类完成后删除了triggerafter中的同步。因为 status.isNewSynchronization() 评价的 true 在db操作之后(如果在没有applicationevent的情况下调用convertandsend,通常不会发生这种情况)。

private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {
        if (status.isNewSynchronization()) {
            List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
            TransactionSynchronizationManager.clearSynchronization();
            if (!status.hasTransaction() || status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.trace("Triggering afterCompletion synchronization");
                }
                // No transaction or new transaction for the current scope ->
                // invoke the afterCompletion callbacks immediately
                invokeAfterCompletion(synchronizations, completionStatus);
            }
            else if (!synchronizations.isEmpty()) {
                // Existing transaction that we participate in, controlled outside
                // of the scope of this Spring transaction manager -> try to register
                // an afterCompletion callback with the existing (JTA) transaction.
                registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);
            }
        }
    }

我在这个问题上克服了什么
我只是补充了一句 @Transactional(propagation = Propagation.REQUIRES_NEW) 以及 @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) 在onactivitysave方法中,它在新事务启动时工作。
我需要知道的是
为什么会这样 status.isNewSynchronization 当使用applicationevent时,在triggeraftercompletion方法中?
如果事务应该在父方法中终止,为什么 TransactionSynchronizationManager.isActualTransactionActive()==true 在listner课堂上?
如果实际事务处于活动状态,是否应该删除同步?
在bindsourcetotransaction中,springamqp是否假定一个没有同步的活动事务?如果答案是肯定的,为什么不同步呢。如果没有激活,初始化?
如果我正在传播一个新的事务,那么我就失去了父事务,有没有更好的方法呢?
请帮助我在这个问题上,这是一个热门的生产问题,我不太确定我所做的修复。

a7qyws3x

a7qyws3x1#

这是一只虫子;rabbitmq事务代码早于 @TransactionalEventListener 密码,用了很多年。
问题是,在这种配置中,我们处于准事务状态,虽然确实有一个事务在处理中,但同步已经被清除,因为事务已经提交。
使用 @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) 作品。
我看到你已经提出了一个问题:
https://github.com/spring-projects/spring-amqp/issues/1309
以后,最好在这里提问,或者如果您觉得有bug,就提出问题。不要两个都做。

相关问题