我们有一个spark应用程序,它来自kafka,并使用客户活动。我正在尝试确定客户是否已在我们的系统上停止活动3分钟(即3分钟内未收到该客户的另一笔交易)。
我不确定我是否试图以正确的方式实现它,或者在spark中使用这个逻辑是否有意义,但是我正在尝试使用recurringtimer类来实现这一点。有人实现过类似的东西吗?如果有,spark库中使用了什么实用函数?
任何例子,指针等也将不胜感激
我们有一个spark应用程序,它来自kafka,并使用客户活动。我正在尝试确定客户是否已在我们的系统上停止活动3分钟(即3分钟内未收到该客户的另一笔交易)。
我不确定我是否试图以正确的方式实现它,或者在spark中使用这个逻辑是否有意义,但是我正在尝试使用recurringtimer类来实现这一点。有人实现过类似的东西吗?如果有,spark库中使用了什么实用函数?
任何例子,指针等也将不胜感激
1条答案
按热度按时间8gsdolmq1#
看一看
mapWithState
,基本上您将聚合到一个键/值对中,该键/值对由客户的某个标识符和接收到的最后一个事务的时间戳组成。每个微批处理在执行此聚合之后,您可以检查并查看其中是否有任何用户具有
timestamp < now() - 3min
然后做一些事情(例如,将一条消息推入另一个Kafka队列等)上的样本
mapWithState
在这里可以买到