我在试Kafka的音乐。我正在阅读一个主题的信息,然后进行groupbykey,然后计算组数。但问题是,邮件计数是作为不可读的“盒子”来的。
如果我运行console consumer,这些将作为空字符串出现
这是我写的字数代码
package streams;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Arrays;
import java.util.Properties;
public class WordCount {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streams-demo-2");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
// topology
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> input = builder.stream("temp-in");
KStream<String, Long> fil = input.flatMapValues(val -> Arrays.asList(val.split(" "))) // making stream of text line to stream of words
.selectKey((k, v) -> v) // changing the key
.groupByKey().count().toStream(); // getting count after groupBy
fil.to("temp-out");
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
System.out.println(streams.toString());
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
这是我在消费者中得到的输出。它在图片的右边
我试着把这个长到长的再铸一次,看看它是否管用。但它不起作用
如果有帮助的话,我也会附上消费代码。
package tutorial;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Once the consumer starts running it keeps running even after we stop in console
// We should create new consumer to read from earliest because the previous one had already consumed until certain offset
// when we run the same consumer in two consoles kafka detects it and re balances
// In this case the consoles split the partitions they consume forming a consumer group
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-application-1"); // -> consumer id
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // -> From when consumer gets data
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("temp-out"));
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record: consumerRecords) {
System.out.println(record.key() + " " + record.value());
System.out.println(record.partition() + " " + record.offset());
}
}
}
}
感谢您的帮助。提前谢谢。
1条答案
按热度按时间w6lpcovy1#
您正在使用kafka streams编写的消息值是
Long
,而你却把它当作String
.如果您对
Consumer
类,您将能够看到正确打印到标准输出的计数: