如何使用esper对来自csv数据的流执行复杂的事件处理?

vbkedwbf  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(368)

我使用kafka producer连续发送csv行到kafka主题,并生成如下csv行流
温度=23.3,压力=1015,湿度=63.4,风速=7.4。。。。。
现在,我想使用esper对这个流执行复杂的事件处理,但是esper使用预定义的pojo类并使用epl语句来过滤事件。
如何在运行时将csv流转换为事件流,以便esper可以基于某些逻辑(例如,在5分钟时间段内变化的提取参数)提取或过滤参数,如温度或压力?

6ljaweal

6ljaweal1#

下面是示例代码。此示例假设主题中已经有一些消息。这不会循环并等待更多消息。

Properties consumerProps = new Properties();
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup");
    KafkaConsumer consumer = new KafkaConsumer<>(consumerProps);
    ConsumerRecords<String, String> rows = consumer.poll(1000);
    Iterator<ConsumerRecord<String, String>> it = rows.iterator();
    while (it.hasNext()) {
        ConsumerRecord<String, String> row = it.next();
        MyEvent event = new MyEvent(row.value()); // transform string to event

        // process event
        runtime.sendEvent(event);
    }

相关问题