搜索一个非常简单的esperiokafka示例

3vpjnl9f  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(489)

我正在拼命寻找esper-cep-kafka适配器代码的示例代码。我已经安装了kafka,并使用producer将数据写入kafka主题,现在我想用espercep处理它。不幸的是,Kafka适配器的esper文档不是很有意义。有人举过一个非常简单的例子吗?
编辑:
到目前为止,我添加了一个适配器,它似乎工作。但是,我不知道如何读取适配器,也不知道如何将cep模式与此适配器链接。这是我目前的代码:

config.addImport(KafkaOutputDefault.class);
Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group.id");

props.put(EsperIOKafkaConfig.INPUT_SUBSCRIBER_CONFIG, EsperIOKafkaInputSubscriberByTopicList.class.getName());
props.put(EsperIOKafkaConfig.TOPICS_CONFIG, "test123");
props.put(EsperIOKafkaConfig.INPUT_PROCESSOR_CONFIG, EsperIOKafkaInputProcessorDefault.class.getName());
props.put(EsperIOKafkaConfig.INPUT_TIMESTAMPEXTRACTOR_CONFIG, EsperIOKafkaInputTimestampExtractorConsumerRecord.class.getName());

Configuration config2 = new Configuration();
config2.addPluginLoader("KafkaInput", EsperIOKafkaInputAdapterPlugin.class.getName(), props, null);

EsperIOKafkaInputAdapter adapter = new EsperIOKafkaInputAdapter(props, "default");
adapter.start();
flvlnr44

flvlnr441#

下面是示例代码。此代码假设主题中已经有一些消息。这不会循环并等待更多消息。

Properties consumerProps = new Properties();
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup");
    KafkaConsumer consumer = new KafkaConsumer<>(consumerProps);
    ConsumerRecords<String, String> rows = consumer.poll(1000);
    Iterator<ConsumerRecord<String, String>> it = rows.iterator();
    while (it.hasNext()) {
        ConsumerRecord<String, String> row = it.next();
        MyEvent event = new MyEvent(row.value()); // transform string to event

        // process event
        runtime.sendEvent(event);
    }
gev0vcfq

gev0vcfq2#

我也有同样的问题。我创建了一个示例项目,你可以看看,特别是平原埃斯珀分支。
更简化的版本是:

public class KafkaExample implements Runnable {
    private String runtimeURI;

    public KafkaExample(String runtimeURI) {
        this.runtimeURI = runtimeURI;
    }

    public static void main(String[] args){
        new KafkaExample("KafkaExample").run();
    }

    @Override
    public void run() {
        Configuration configuration = new Configuration();
        configuration.getCommon().addImport(KafkaOutputDefault.class);
        configuration.getCommon().addEventType(String.class);

        Properties consumerProps = new Properties();

        // Kafka Consumer Properties
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());

        // EsperIO Kafka Input Adapter Properties
        consumerProps.put(EsperIOKafkaConfig.INPUT_SUBSCRIBER_CONFIG, Consumer.class.getName());
        consumerProps.put(EsperIOKafkaConfig.INPUT_PROCESSOR_CONFIG, InputProcessor.class.getName());
        consumerProps.put(EsperIOKafkaConfig.INPUT_TIMESTAMPEXTRACTOR_CONFIG, EsperIOKafkaInputTimestampExtractorConsumerRecord.class.getName());

        configuration.getRuntime().addPluginLoader("KafkaInput", EsperIOKafkaInputAdapterPlugin.class.getName(), consumerProps, null);

        String stmt = "@name('sampleQuery') select * from String";
        EPCompiled compiled;
        try {
            compiled = EPCompilerProvider.getCompiler().compile(stmt, new CompilerArguments(configuration));
        } catch (EPCompileException ex) {
            throw new RuntimeException(ex);
        }

        EPRuntime runtime = EPRuntimeProvider.getRuntime(runtimeURI, configuration);
        EPDeployment deployment;
        try {
            deployment = runtime.getDeploymentService().deploy(compiled, new DeploymentOptions().setDeploymentId(UUID.randomUUID().toString()));
        } catch (EPDeployException ex) {
            throw new RuntimeException(ex);
        }

        EPStatement statement = runtime.getDeploymentService().getStatement(deployment.getDeploymentId(), "sampleQuery");

        statement.addListener((newData, oldData, sta, run) -> {
            for (EventBean nd : newData) {
                System.out.println(nd.getUnderlying());
            }
        });

        while (true) {}
    }
}
public class Consumer implements EsperIOKafkaInputSubscriber {
    @Override
    public void subscribe(EsperIOKafkaInputSubscriberContext context) {
        Collection<String> collection = new ArrayList<String>();
        collection.add("input");
        context.getConsumer().subscribe(collection);
    }
}
public class InputProcessor implements EsperIOKafkaInputProcessor {
    private EPRuntime runtime;

    @Override
    public void init(EsperIOKafkaInputProcessorContext context) {
        this.runtime = context.getRuntime();
    }

    @Override
    public void process(ConsumerRecords<Object, Object> records) {
        for (ConsumerRecord record : records) {
            if (record.value() != null) {
                try {
                    runtime.getEventService().sendEventBean(record.value().toString(), "String");
                } catch (Exception e) {
                    throw e;
                }
            }
        }
    }

    public void close() {}
}

相关问题