我正在拼命寻找esper-cep-kafka适配器代码的示例代码。我已经安装了kafka,并使用producer将数据写入kafka主题,现在我想用espercep处理它。不幸的是,Kafka适配器的esper文档不是很有意义。有人举过一个非常简单的例子吗?
编辑:
到目前为止,我添加了一个适配器,它似乎工作。但是,我不知道如何读取适配器,也不知道如何将cep模式与此适配器链接。这是我目前的代码:
config.addImport(KafkaOutputDefault.class);
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group.id");
props.put(EsperIOKafkaConfig.INPUT_SUBSCRIBER_CONFIG, EsperIOKafkaInputSubscriberByTopicList.class.getName());
props.put(EsperIOKafkaConfig.TOPICS_CONFIG, "test123");
props.put(EsperIOKafkaConfig.INPUT_PROCESSOR_CONFIG, EsperIOKafkaInputProcessorDefault.class.getName());
props.put(EsperIOKafkaConfig.INPUT_TIMESTAMPEXTRACTOR_CONFIG, EsperIOKafkaInputTimestampExtractorConsumerRecord.class.getName());
Configuration config2 = new Configuration();
config2.addPluginLoader("KafkaInput", EsperIOKafkaInputAdapterPlugin.class.getName(), props, null);
EsperIOKafkaInputAdapter adapter = new EsperIOKafkaInputAdapter(props, "default");
adapter.start();
2条答案
按热度按时间flvlnr441#
下面是示例代码。此代码假设主题中已经有一些消息。这不会循环并等待更多消息。
gev0vcfq2#
我也有同样的问题。我创建了一个示例项目,你可以看看,特别是平原埃斯珀分支。
更简化的版本是: