我有一个场景,在这个场景中,如果一个查询的一部分与一个事件匹配,我想从数据存储中提取一些其他事件来测试查询的其余部分例如,“如果无名氏从我店里买东西,她在过去3年里还买了其他东西吗”之类的话。flink、storm或wso2是否为这种复杂的事件处理提供支持?
hc2pp10m1#
flink可以做到这一点,但它要求您从最早关心的事件(例如3年前)开始处理所有事件,以便为每个客户构建状态。flink允许您管理这个状态(通常使用rocksdb),这样您就不必在系统出现故障时重放所有事件。如果您不能重放所有的历史记录,那么通常您会将其放入其他具有所需可伸缩性和性能特征的存储(cassandra/hbase、elasticsearch等),然后在收到新事件时使用flink的异步函数支持来查询它。
2j4z5cfb2#
wso2流处理器让您通过它的时间增量分析功能实现这样的功能。要实现您提到的场景,您可以将客户到达时触发的事件提供给名为“aggregate”的构造。当您不断地将事件提供给聚合时,它将随着时间的推移汇总数据,并将保存在已配置的持久性存储(如db)中。您可以查询此聚合以获取给定时间段的状态。例如,下面的查询将获取2014-2015年的名称、购买的物品总数和平均交易价值
from CustomerSummaryRetrievalStream as b join CustoemrAggregation as a on a.name == b.name within "2014-01-01 00:00:00 +05:30", "2015-01-01 00:00:00 +05:30" per “years” select a.name, a.total, a.avgTxValue insert into CustomerSummaryStream;
2条答案
按热度按时间hc2pp10m1#
flink可以做到这一点,但它要求您从最早关心的事件(例如3年前)开始处理所有事件,以便为每个客户构建状态。flink允许您管理这个状态(通常使用rocksdb),这样您就不必在系统出现故障时重放所有事件。
如果您不能重放所有的历史记录,那么通常您会将其放入其他具有所需可伸缩性和性能特征的存储(cassandra/hbase、elasticsearch等),然后在收到新事件时使用flink的异步函数支持来查询它。
2j4z5cfb2#
wso2流处理器让您通过它的时间增量分析功能实现这样的功能。要实现您提到的场景,您可以将客户到达时触发的事件提供给名为“aggregate”的构造。当您不断地将事件提供给聚合时,它将随着时间的推移汇总数据,并将保存在已配置的持久性存储(如db)中。
您可以查询此聚合以获取给定时间段的状态。例如,下面的查询将获取2014-2015年的名称、购买的物品总数和平均交易价值