redis实时分析处理系统设计

vcirk6k6  于 2021-06-26  发布在  Storm
关注(0)|答案(2)|浏览(493)

我正在设计一个系统,它应该分析大量的用户事务,并生成聚合度量(如趋势等)。系统应该工作快速、健壮和可扩展。系统是基于java的(基于linux)。
数据来自生成用户事务日志文件(基于csv)的系统。系统每分钟生成一个文件,每个文件包含不同用户的事务(按时间排序),每个文件可能包含数千个用户。
csv文件的示例数据结构:
10:30:01,用户1,。。。
10:30:01,用户1,。。。
10:30:02,用户78,。。。
10:30:02,用户2,。。。
10:30:03,用户1,。。。
10:30:04,用户2,。。。
. . .
我计划的系统应该实时处理文件并执行一些分析。它必须收集输入,将其发送到多个算法和其他系统,并将计算结果存储在数据库中。数据库不保存实际的输入记录,只保存有关事务的高级聚合分析。例如趋势等。
我计划使用的第一个算法要求最佳操作至少有10个用户记录,如果5分钟后找不到10个记录,则应使用任何可用数据。
我想使用storm来实现,但我更愿意将此讨论尽可能留在设计级别。
系统组件列表:
每分钟监视传入文件的任务。
读取文件、解析文件并使其可用于其他系统组件和算法的任务。
一个组件为一个用户缓冲10条记录(不超过5分钟),当收集到10条记录时,或者已经过了5分钟,是时候将数据发送给算法进行进一步处理了。因为要求为算法提供至少10条记录,所以我考虑使用storm field分组(这意味着同一个用户调用同一个任务)并跟踪任务中10条用户记录的集合,当然我计划有几个这样的任务,每个任务处理一部分用户。
还有其他一些组件在单个事务上工作,因此我计划为它们创建其他任务,在解析每个事务时接收这些事务(与其他任务并行)。
我需要你的帮助。
设计这样一个组件的最佳实践是什么?很明显,它需要为每个用户维护10条记录的数据。键值Map可能会有所帮助,是在任务本身中管理Map更好,还是使用分布式缓存更好?例如redis是一个键值存储(我以前从未使用过)。
谢谢你的帮助

myss37ts

myss37ts1#

根据您的要求1和2:[ApacheFlume或Kafka]
满足您的要求3:【埃斯珀·博尔特在Storm中。在redis中,要实现这一点,您必须重写esper逻辑。]

pieyvz9o

pieyvz9o2#

我和redis合作过不少。因此,我将对您使用redis的想法进行评论

3有3个要求

每个用户的缓冲区
10个任务的缓冲区
应该每5分钟过期一次
1.每用户缓冲区:redis只是一个键值存储。尽管它支持多种数据类型,但它们总是Map到字符串键的值。因此,您应该决定如何唯一地标识用户,以防需要每个用户的缓冲区。因为在redis中,重写一个键的新值时永远不会出错。一种解决方案是在写之前检查是否存在。
2.10个任务的缓冲区:显然可以在redis中实现一个队列。但是限制它的大小是留给你的。例:使用 LPUSH 以及 LTRIM 或使用 LLEN 检查长度并决定是否触发进程。与此队列关联的密钥应该是您在第1部分中决定的密钥。
3.缓冲区将在5分钟内过期:这是一项最艰巨的任务。在redis中,每个键,不管它的值具有什么底层数据类型,都可以有一个 expiry . 但到期过程是沉默的。任何密钥过期时都不会通知您。因此,如果使用此属性,您将悄悄地丢失缓冲区。解决这个问题的方法之一是,建立一个索引。也就是说,索引会将时间戳Map到所有需要在该时间戳值处过期的密钥。然后在后台,您可以每分钟读取一次索引,然后手动从redis中删除[读取后]键,并用缓冲区数据调用所需的进程。要有这样一个索引,您可以查看排序集。时间戳将是您的 score 并设置 member 将是您希望在该时间戳处删除的密钥[在第1部分中确定的Map到队列的每个用户的唯一密钥]。你能做到的 zrangebyscore 读取具有指定时间戳的所有集合成员
总体情况:
使用redis list实现队列。
使用llen确保你没有超过你的10限制。
每当你创建一个新的列表时,在索引[排序集]中做一个条目,分数为 Current Timestamp + 5 min 值作为列表的键。
当llen达到10时,记住读取,然后从索引[sorted set]和数据库[delete the key->list]中删除键。然后用数据触发进程。
对于每一分钟,生成当前时间戳,读取索引,对于每个键,读取数据,然后从数据库中删除键并触发进程。
这也许是我实现它的方法。也许还有其他更好的方法可以在redis中为数据建模

相关问题