我已经查阅了很多关于我正在开发的java应用程序的kafka文档。我已经试过学习Java8中引入的lambda语法,但我对它的理解有点粗略,我还不太相信它应该是我目前使用的语法。
我有一个kafka/zookeeper服务运行没有任何问题,我想做的是写一个小的例子程序,根据输入将写出来,但不做字数,因为已经有这么多的例子。
至于示例数据,我将得到以下结构的字符串:
示例数据
This a sample string containing some keywords such as GPS, GEO and maybe a little bit of ACC.
问题
我想能够提取3个字母的关键字,并打印他们与一个 System.out.println
. 如何获取包含输入的字符串变量?我知道如何应用正则表达式,甚至只是搜索字符串来获得关键字。
代码
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "0:0:0:0:0:0:0:1:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "0:0:0:0:0:0:0:1:2181");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
final Serde<String> stringSerde = Serdes.String();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
//How do I assign the input from in-stream to the following variable?
String variable = ?
}
我有Zookeeper,Kafka,生产者和消费者运行都连接到同一个主题,所以我想基本上看到相同的 String
出现在所有示例(生产者、消费者和流)上。
1条答案
按热度按时间fykwrbwg1#
如果使用kafka流,则需要对数据流应用函数/运算符。在您的例子中,您创建了一个
KStream
对象,因此,您希望对source
.根据您要执行的操作,有一些操作符可以将函数独立地应用于流中的每个记录(例如。
map()
),或将函数应用于多个记录的其他运算符(例如。aggregateByKey()
). 您应该查看一下文档:http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-流dsl和示例https://github.com/confluentinc/kafka-streams-examples因此,您决不会像上面的示例中那样使用kafka流创建局部变量,而是将所有内容嵌入到链接在一起的操作符/函数中。
例如,如果要将所有输入记录打印到stdout,可以
因此,在您通过
streams.start()
,它将消费来自您输入主题的记录,并为您的主题的每条记录调用apply(...)
完成后,在标准输出上打印记录。当然,将流打印到控制台的一种更自然的方法是使用
source.print()
(内部与图示基本相同)foreach()
已给定的运算符ForeachAction
.)对于将字符串赋给局部变量的示例,需要将代码放入
apply(...)
并做你的正则表达式的东西等有“提取3个字母的关键字”。然而,表达这一点的最佳方式是将
flatMapValues()
以及print()
(即,source.flatMapValues(...).print()
).flatMapValues()
为每个输入记录调用null
所以你可以忽略它)。在你的flatMapValue
函数中,应用regex并为每个匹配项将匹配项添加到最终返回的值列表中。输出
flatMapValues
将是一个KStream
同样,为每个找到的关键字包含一个记录(即,输出流是一个“union”,它覆盖了所有返回的列表)ValueMapper#apply()
). 最后,您只需通过将结果打印到控制台print()
. (当然,你也可以用一个foreach
而不是flatMapValue
+print
但这就没有那么模块化了。)