java—在spring批处理作业中,在单个事务中编写两个kafka主题的最佳方法是什么

pieyvz9o  于 2021-06-27  发布在  Java
关注(0)|答案(2)|浏览(358)

当前实施情况
我有一个spring批处理工作,写一个Kafka主题。我从数据库中读取记录,将其转换并写入Kafka主题。
对现有作业的新更改
我想在写主旨的同时再写一个审计主题。
对于从数据库中读取的每条记录,我都会向主主题写入一个类类型的消息,对于同一条记录,我会向审计主题写入另一个实体类类型的消息。
问题陈述
目前,我正在使用不同的kakfatemplate来写入这两个主题,但问题是如何处理作业在写入主主题之后失败,而它从未写入审计主题的情况。如何回滚事务(我在当前实现中没有实现事务)。
我是否需要更改应用程序的整个实现??我应该在一个事务中同时写这两个主题,还是我当前的实现有任何解决方案?
事务管理器

@Override
protected JobRepository createJobRepo(){
JobRepositoryFactoryBean fac = new JobRepositoryFactoryBean;
fac.setDataSource(ds);
fac.setTransactionManger(transactionManger);
fac.set();
return fac.getObject();
blpfk2vs

blpfk2vs1#

从长远来看,改变实现方式将使您的生活更加轻松。您所描述的问题称为事务发件箱模式,并且有许多广为接受的实现。
批处理作业适合于kafka连接器(debezium是一种更复杂、更灵活的解决方案)。连接器自动处理缩放、协调、偏移处理和并发,否则您必须使用selectforupdate等实现这些功能。
我比较喜欢的解决办法是简化这个问题。把它分成两部分。
使用连接器将记录写入主题。使用带有smt(无状态单消息转换)的kafka streams应用程序,并使用精确的once语义生成转换后的消息到审核日志。这样,只有在生成原始主题中的消息时,adit日志中才会有消息。事务复杂性已经过时了。
kafka连接器(debezium)将处理重试、故障转移、偏移等。
另一个较老的方法是事务发件箱,可以使用debezium tx发件箱

m4pnthwp

m4pnthwp2#

为了正确地实现这一点,您需要使用协调 DatasourceTransactionManager (用于springbatch的技术元数据)和 KafkaTransactionManager (用于您的业务数据)。
在spring批处理作业中,在一个事务中编写两个kafka主题的最佳方法是什么
如果您在这里使用类似于前面问题的建议:https://stackoverflow.com/a/65287130/5019386,两个writer将在spring批处理驱动的同一事务中执行。

相关问题