java—用spring事件替换计划任务

2guxujil  于 2021-06-29  发布在  Java
关注(0)|答案(5)|浏览(483)

赏金两天后到期。回答此问题可获得+350声望奖励。dó纳尔想引起更多的注意**这个问题。

在我的springboot应用程序中,客户可以提交文件。每个客户的文件都由每分钟运行一次的计划任务合并在一起。合并是由调度程序执行的这一事实有许多缺点,例如,很难编写端到端测试,因为在测试中,您必须等待调度程序运行,然后才能检索合并结果。
因此,我希望使用基于事件的方法,即。
客户提交文件
已发布包含此客户id的事件
合并服务侦听这些事件,并在事件对象中为客户执行合并操作
这样做的好处是在有文件可供合并后立即触发合并操作。
但是,这种方法有很多问题,我想得到一些帮助

并发

合并是一个相当昂贵的操作。这可能需要20秒,这取决于涉及的文件数量。因此,合并必须异步进行,也就是说,不能作为发布合并事件的同一线程的一部分。另外,我不想同时对同一个客户执行多个合并操作,以避免出现以下情况
customer1保存file2触发file1和file2的合并操作2
不久之后,customer1保存file3,触发file1、file2和file3的合并操作3
合并操作3完成保存合并文件3
合并操作2完成用merge-file2覆盖merge-file3
为了避免这种情况,我计划在事件侦听器中使用锁依次处理同一客户的合并操作,例如。

@Component
public class MergeEventListener implements ApplicationListener<MergeEvent> {

    private final ConcurrentMap<String, Lock> customerLocks = new ConcurrentHashMap<>();

    @Override
    public void onApplicationEvent(MergeEvent event) {
        var customerId = event.getCustomerId();
        var customerLock = customerLocks.computeIfAbsent(customerId, key -> new ReentrantLock());
        customerLock.lock();
        mergeFileForCustomer(customerId);
        customerLock.unlock();
    }

    private void mergeFileForCustomer(String customerId) {
        // implementation omitted
    }
}

容错

例如,如果应用程序在合并操作过程中关闭,或者在合并操作过程中发生错误,如何恢复?
计划方法的优点之一是它包含一个隐式重试机制,因为每次运行它都会查找具有未合并文件的客户。

摘要

我怀疑我提出的解决方案可能是针对这类问题重新实现了(糟糕的)现有技术,例如jms。我建议的解决方案是可取的,还是应该改用jms之类的东西?应用程序托管在azure上,因此我可以使用它提供的任何服务。
如果我的解决方案是可取的,我应该如何处理容错?

jk9hmnmh

jk9hmnmh1#

KafkaSpring Boot可以解决你的容错问题。
Kafka支持生产者-消费者模式。让客户将活动发布给Kafka制作人。
为kafka配置复制以避免丢失任何事件。
使用可以为每个事件调用合并服务的使用者。
一旦消费者读取customerid事件并合并,然后提交偏移量。
如果在合并事件之间发生任何故障,则不会提交偏移量,以便在应用程序再次启动时可以再次读取。
如果合并服务可以检测到具有给定数据的重复事件,则重新处理同一消息不应引起任何问题(kafka承诺事件的单次传递)。重复事件检测是对已完全处理但未能提交到kafka的事件的安全检查。

2lpgd968

2lpgd9682#

关于并发部分,如果每个客户提交的文件数(在给定的时间范围内)足够小,我认为使用锁的方法可以很好地工作。
您最终可以监视等待锁的线程数,以查看是否存在大量争用。如果有,那么您可能可以累积一些合并事件(在特定的时间段上),然后运行并行合并操作,这实际上会导致一个与调度器类似的解决方案。
在容错方面,基于消息队列的方法是可行的(还没有使用jms,但我看到它是一个消息队列的实现)。
出于可靠性的考虑,我会选择基于云的消息队列(例如sqs)。方法是:
将合并事件推入队列
合并服务一次扫描一个事件并启动合并作业
合并作业完成后,消息将从队列中删除
这样,如果在合并过程中出现问题,消息将保留在队列中,并在应用程序重新启动时再次读取。

yb3bgrhw

yb3bgrhw3#

经过一些考虑,我对这件事的看法是正确的。
根据op的规范,我限制了azure托管服务提供的可能解决方案。

azure blob存储函数触发器

因为这个问题是关于存储文件的,所以让我们先来研究blob存储,它带有在创建文件时触发的触发器函数。根据doc的说法,azure函数最多可以运行230秒,默认重试次数为5次。
但是,这个解决方案将要求来自单个客户的文件以一种不会引起并发问题的方式到达,因此我们暂时不讨论这个解决方案。

azure队列存储

不保证先进先出(fifo)订购交货,因此不符合要求。
存储队列和服务总线队列-比较和对比:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-azure-and-service-bus-queues-compared-contrasted

azure服务总线

azure服务总线是一个fifo队列,似乎符合要求。
https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-azure-and-service-bus-queues-compared-contrasted#compare-存储队列和服务总线队列
从上面的文档中,我们看到大文件不适合作为消息负载。为了解决这个问题,文件可能存储在azure blob存储中,消息将包含在哪里可以找到文件的信息。
选择了azure服务总线和azure blob存储之后,让我们讨论一下实现注意事项。
队列生产者
在aws上,生产商方面的解决方案如下:
专用端点为客户应用程序提供预签名的url
客户应用程序将文件上载到s3
s3对象创建触发的lambda将消息插入队列
不幸的是,azure还没有一个预签名的url等价物(它们有不相等的共享访问签名),因此文件上传必须通过一个端点来完成,该端点反过来将文件存储到azure blob存储。当需要文件上载端点时,让文件上载端点也负责将消息插入队列似乎是合适的。
排队消费者
因为文件合并需要大量的时间(大约20秒),所以应该可以扩展到使用者端。对于多个使用者,我们必须确保一个单一的使用者不会被多个使用者示例处理。这可以通过使用消息会话来解决:https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions
为了实现容错,使用者应该在文件合并期间使用peek锁(而不是receive和delete),并在文件合并完成时将消息标记为completed。当消息标记为已完成时,使用者可能负责删除blob存储中的多余文件。
现有解决方案和未来解决方案可能存在的问题
如果客户a开始上传一个大文件#1,然后紧接着开始上传一个小文件#2,则文件#2的文件上传可能在文件#1之前完成,并导致出现故障情况。
我假设这是一个通过使用某种锁定机制或文件名约定在现有解决方案中解决的问题。

3pvhb19x

3pvhb19x4#

首先,基于事件的方法适合这种情况。对于发布子事件消息,应该使用外部代理。
请注意,默认情况下,spring发布事件是同步的。
假设您有3个服务:
应用程序服务
合并服务
cdc服务(更改数据捕获)
经纪服务(kafka、rabbitmq等)
基于“发件箱模式”的主流程:
应用服务将事件消息保存到发件箱消息表
cdc服务监视发件箱表并将事件消息从发件箱表发布到代理服务
合并服务订阅到代理服务器并接收事件消息(消息是有序的)
合并服务执行合并操作
您可以将eventuate lib用于此流。
此外,您还可以将ddd应用于您的体系结构。利用axon框架实现cqrs模式,对公共域事件进行处理。
请参阅:
发件箱模式:https://microservices.io/patterns/data/transactional-outbox.html

2exbekwf

2exbekwf5#

听起来你真的需要一个 Stream 或者一个 ETL 工作的工具。当你在开发一个应用程序,并且你有一些优先排序/排队/批处理的需求时,很容易看到你如何用一个 Cron + SQL Database ,可能有一个队列来将工作与生产工作解耦。
这很可能是最容易构建的,因为您对这种方法有很多粒度和控制。如果您相信您实际上可以以低风险的方式以相当快的速度满足您的需求,那么您可以这样做。
有一些软件组件更适合这些任务,但它们确实有一些学习曲线,并且取决于您可能使用的paas或云。您将获得开箱即用的监控、可扩展性和可用性恢复能力。开放源代码或云服务将减轻您的管理负担。
使用什么还取决于您的优先级和要求。如果您想使用etl方法,它非常适合存储作业,那么您可能需要使用诸如glue t之类的方法。如果您想使用优先级排序功能,您可能需要使用多个队列,这取决于具体情况。您还需要使用 Jmeter 板进行监视,以查看无论采用哪种方法,合并所需的等待时间。

相关问题