apachestorm:使不可伸缩的资源可伸缩

bogh5gae  于 2021-06-21  发布在  Storm
关注(0)|答案(1)|浏览(282)

我最近开始向bigdata世界介绍自己,并尝试使用apachestorm。我曾经面对过下面的问题,想了很多如何解决它,但我所有的方法似乎都不适用ï爱。

技术

apache storm 0.9.3、java 1.8.0\u 20

上下文

需要逐行读取一个大的xml文件(~400mb)(xml文件喷口)。然后,每个读取的文件行由一系列螺栓发出和处理。
它必须是有保证的消息处理(用锚定发出…)

问题

由于文件相当大(包含大约200亿行),我使用扫描仪读取它,基于缓冲流而不是将整个文件加载到内存中。到现在为止,一直都还不错。当处理过程中的某个地方出现错误时,问题就会出现:xml文件本身停止工作,或者存在内部问题。。。
光环会重新启动喷口,但整个过程从一开始就开始了;
这种方法根本无法扩展。

解决思路

解决第一个问题的最初想法是将当前状态保存到某个地方:分布式缓存、jms队列、本地磁盘文件。当喷口打开时,它应该找到这样的存储,读取状态并从指定的文件行开始。在这里,我也考虑过将状态存储在storm的zookeeper中,但我不知道是否有可能从喷口中寻址zookeeper(有这样的能力吗)?你能建议一下这方面的最佳做法吗?
对于问题2,我考虑将初始文件分解为一组子文件并并行处理它们。它可以通过引入一个新的“打破”喷口来完成,每个文件将由一个专用的螺栓处理。在这种情况下,有保证的处理会引发一个大问题,因为在出现错误的情况下,必须完全重新处理包含失败行的子文件(saut的ack/fail方法)。。。你能提出解决这个问题的最佳做法吗?

更新

好吧,到目前为止我做了什么。
先决条件
以下拓扑之所以有效,是因为它的所有部分(喷口和螺栓)都是幂等的。
引入了一个单独的spout,它读取文件行(一行一行)并将它们发送到中间activemq队列(“file-line-queue”),以便能够轻松地重放失败的文件行(请参阅下一步);
为“文件行队列”队列创建了一个单独的喷口,该喷口接收每个文件行并将其发送到后续队列。就我所使用的保证消息处理而言,如果任何螺栓出现故障,则会重新处理一条消息,如果螺栓链成功,则会确认相应的消息(客户端确认模式)。
在第一个(文件读取)喷口失败的情况下,会抛出runtimeexception,从而终止喷口。稍后,一个专门的主管重新启动喷口,重新读取输入输出文件。这将导致重复的消息,但只要一切都是幂等的,这就不是问题。此外,这里值得考虑使用状态存储库来生成较少的重复项。。。
新发行
为了使中间jms更加可靠,我添加了一个on异常监听器,用于恢复使用者和生产者的连接和会话。问题出在使用者身上:如果会话被还原,并且在bolt处理过程中我有一个jms消息未确认,在成功处理之后,我需要确认它,但是只要会话是新的,我就会收到“找不到相关id”问题。
有人能建议怎么处理吗?

cgvd09ve

cgvd09ve1#

首先回答您的问题:
是的,您可以将状态存储在zookeeper之类的地方,并使用ApacheCurator之类的库来处理它。
分解文件可能会有帮助,但仍然不能解决必须管理状态的问题。
让我们在这里谈谈设计。storm是为流媒体而建的,不是为批量生产而建的。在我看来,hadoop技术更适合批处理:mapreduce、hive、spark等等。
如果您打算使用storm,那么它将有助于将数据流传输到更易于使用的地方。您可以将文件写入kafka或队列,以帮助解决管理状态、确认/失败和重试的问题。

相关问题