打印kafka流输入输出到控制台?

2w3kk1z5  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(472)

我已经查阅了很多关于我正在开发的java应用程序的kafka文档。我已经试过学习Java8中引入的lambda语法,但我对它的理解有点粗略,我还不太相信它应该是我目前使用的语法。
我有一个kafka/zookeeper服务运行没有任何问题,我想做的是写一个小的例子程序,根据输入将写出来,但不做字数,因为已经有这么多的例子。
至于示例数据,我将得到以下结构的字符串:

示例数据

This a sample string containing some keywords such as GPS, GEO and maybe a little bit of ACC.

问题

我想能够提取3个字母的关键字,并打印他们与一个 System.out.println . 如何获取包含输入的字符串变量?我知道如何应用正则表达式,甚至只是搜索字符串来获得关键字。

代码

public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "0:0:0:0:0:0:0:1:9092");
    props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "0:0:0:0:0:0:0:1:2181");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

    final Serde<String> stringSerde = Serdes.String();

    KStreamBuilder builder = new KStreamBuilder();

    KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");

    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();

    //How do I assign the input from in-stream to the following variable?
    String variable = ?
}

我有Zookeeper,Kafka,生产者和消费者运行都连接到同一个主题,所以我想基本上看到相同的 String 出现在所有示例(生产者、消费者和流)上。

fykwrbwg

fykwrbwg1#

如果使用kafka流,则需要对数据流应用函数/运算符。在您的例子中,您创建了一个 KStream 对象,因此,您希望对 source .
根据您要执行的操作,有一些操作符可以将函数独立地应用于流中的每个记录(例如。 map() ),或将函数应用于多个记录的其他运算符(例如。 aggregateByKey() ). 您应该查看一下文档:http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-流dsl和示例https://github.com/confluentinc/kafka-streams-examples
因此,您决不会像上面的示例中那样使用kafka流创建局部变量,而是将所有内容嵌入到链接在一起的操作符/函数中。
例如,如果要将所有输入记录打印到stdout,可以

KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");
source.foreach(new ForeachAction<String, String>() {
    void apply(String key, String value) {
        System.out.println(key + ": " + value);
    }
 });

因此,在您通过 streams.start() ,它将消费来自您输入主题的记录,并为您的主题的每条记录调用 apply(...) 完成后,在标准输出上打印记录。
当然,将流打印到控制台的一种更自然的方法是使用 source.print() (内部与图示基本相同) foreach() 已给定的运算符 ForeachAction .)
对于将字符串赋给局部变量的示例,需要将代码放入 apply(...) 并做你的正则表达式的东西等有“提取3个字母的关键字”。
然而,表达这一点的最佳方式是将 flatMapValues() 以及 print() (即, source.flatMapValues(...).print() ). flatMapValues() 为每个输入记录调用 null 所以你可以忽略它)。在你的 flatMapValue 函数中,应用regex并为每个匹配项将匹配项添加到最终返回的值列表中。

source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
    @Override
    public Iterable<String> apply(String value) {
        ArrayList<String> keywords = new ArrayList<String>();

        // apply regex to value and for each match add it to keywords

        return keywords;
    }
}

输出 flatMapValues 将是一个 KStream 同样,为每个找到的关键字包含一个记录(即,输出流是一个“union”,它覆盖了所有返回的列表) ValueMapper#apply() ). 最后,您只需通过将结果打印到控制台 print() . (当然,你也可以用一个 foreach 而不是 flatMapValue + print 但这就没有那么模块化了。)

相关问题