我使用kafka producer连续发送csv行到kafka主题,并生成如下csv行流
温度=23.3,压力=1015,湿度=63.4,风速=7.4。。。。。
现在,我想使用esper对这个流执行复杂的事件处理,但是esper使用预定义的pojo类并使用epl语句来过滤事件。
如何在运行时将csv流转换为事件流,以便esper可以基于某些逻辑(例如,在5分钟时间段内变化的提取参数)提取或过滤参数,如温度或压力?
我使用kafka producer连续发送csv行到kafka主题,并生成如下csv行流
温度=23.3,压力=1015,湿度=63.4,风速=7.4。。。。。
现在,我想使用esper对这个流执行复杂的事件处理,但是esper使用预定义的pojo类并使用epl语句来过滤事件。
如何在运行时将csv流转换为事件流,以便esper可以基于某些逻辑(例如,在5分钟时间段内变化的提取参数)提取或过滤参数,如温度或压力?
1条答案
按热度按时间6ljaweal1#
下面是示例代码。此示例假设主题中已经有一些消息。这不会循环并等待更多消息。