Kafka消费者以不可读的格式显示数字

jvidinwx  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(290)

我在试Kafka的音乐。我正在阅读一个主题的信息,然后进行groupbykey,然后计算组数。但问题是,邮件计数是作为不可读的“盒子”来的。
如果我运行console consumer,这些将作为空字符串出现
这是我写的字数代码

package streams;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Arrays;
import java.util.Properties;

public class WordCount {
    public static void main(String[] args) {

        Properties properties = new Properties();
        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streams-demo-2");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
        properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());

        // topology
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> input = builder.stream("temp-in");
        KStream<String, Long> fil = input.flatMapValues(val -> Arrays.asList(val.split(" "))) // making stream of text line to stream of words
                .selectKey((k, v) -> v) // changing the key
                .groupByKey().count().toStream(); // getting count after groupBy

        fil.to("temp-out");

        KafkaStreams streams = new KafkaStreams(builder.build(), properties);
        streams.start();

        System.out.println(streams.toString());

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

这是我在消费者中得到的输出。它在图片的右边

我试着把这个长到长的再铸一次,看看它是否管用。但它不起作用
如果有帮助的话,我也会附上消费代码。

package tutorial;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class Consumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // Once the consumer starts running it keeps running even after we stop in console
        // We should create new consumer to read from earliest because the previous one had already consumed until certain offset
        // when we run the same consumer in two consoles kafka detects it and re balances
        // In this case the consoles split the partitions they consume forming a consumer group
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-application-1"); // -> consumer id
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // -> From when consumer gets data

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singleton("temp-out"));

        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record: consumerRecords) {
                System.out.println(record.key() + " " + record.value());
                System.out.println(record.partition() + " " + record.offset());
            }
        }
    }
}

感谢您的帮助。提前谢谢。

w6lpcovy

w6lpcovy1#

您正在使用kafka streams编写的消息值是 Long ,而你却把它当作 String .
如果您对 Consumer 类,您将能够看到正确打印到标准输出的计数:

// Change this from StringDeserializer to LongDeserializer.
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());

        ...

        // The value you're consuming here is a Long, not a String.
        KafkaConsumer<String, Long> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singleton("temp-out"));

        while (true) {
            ConsumerRecords<String, Long> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, Long> record : consumerRecords) {
                System.out.println(record.key() + " " + record.value());
                System.out.println(record.partition() + " " + record.offset());
            }
        }

相关问题