我有什么?
我在hadoop集群上有一个spark流应用程序(在kafka streams上),它聚合了每5分钟用户的点击和在网站上完成的一些操作,并将它们转换为度量。
另外,我在greenplum中有一个表(在它自己的集群中),其中包含可能更新的用户数据。此表是通过kafka使用逻辑日志流复制填充的。表大小为1亿用户。
我想要什么?
我希望每隔1或5分钟将spark streams与greenplum的静态数据连接起来,然后使用静态表中的用户年龄等现有数据进行聚合。
笔记
当然,我不需要从users表中读取所有记录。有相当稳定的核心部分+每分钟注册的新用户数。目前我使用pyspark 2.1.0
我的解决方案
将数据从greenplum集群复制到hadoop集群,并将其保存为orc/parquet文件。每5分钟为新用户添加新文件。每天一次重新加载所有文件。
在hadoop上创建新的db,并像greenplum那样通过kafka设置日志复制。从数据库读取数据并使用内置的spark流连接。
从缓存中spark上的greenplum读取数据。将流数据与缓存连接起来。
对于每5分钟在文件中保存/附加新用户数据,忽略旧用户数据。存储额外列,例如。 last_action
如果用户在过去2周内不在网站上活动,则截断此文件。因此,将此文件与流连接起来。
问题
这些解决方案中有哪些更适合mvp?为了生产?
对于这类问题有没有更好的解决方案/最佳做法。(一些文献)
2条答案
按热度按时间biswetbf1#
您可能需要检查gpdb spark连接器--
http://greenplum-spark-connector.readthedocs.io/en/latest/
https://greenplum-spark.docs.pivotal.io/130/index.html
您可以直接将数据段中的数据加载到spark中。目前,如果您想写回gpdb,您需要使用标准的jdbc到主服务器。
ecr0jaav2#
spark从ApacheGeode这样的缓存中流式读取数据可以让这一切变得更好。在实时欺诈用例中使用了这种方法。在nutshell中,我使用历史数据在greenplum数据库中生成了特征。特征数据和一些决策查找数据被推送到geode中。特征定期刷新(间隔10分钟),然后在geode中刷新。spark scoring流式作业不断地对来自greenplum的事务进行评分,而不进行读取。spark流媒体作业也会将分数放入geode中,geode使用不同的线程与greenplum同步。我用k8在cloudfoundry上运行spark流。这是一个非常高的水平,但应该给你一个想法。