Kafka Streams Twitter Wordcount -序列化后不久的计数值

yduiuuwa  于 2022-11-21  发布在  Apache
关注(0)|答案(2)|浏览(139)

我正在AWS EC2示例上运行Kafka Cluster Docker Compose。我想接收特定关键字的所有tweet并将其推送到Kafka。这很好。但我还想统计这些tweet中使用最多的单词。
下面是WordCount代码:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.StreamsBuilder;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import java.util.concurrent.CountDownLatch;

import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG;

public class WordCount {

    public static void main(String[] args) {

        final StreamsBuilder builder = new StreamsBuilder();

        final KStream<String, String> textLines = builder
                .stream("test-topic");

        textLines
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .count(Materialized.as("WordCount"))
                .toStream()
                .to("test-output", Produced.with(Serdes.String(), Serdes.Long()));

        final Topology topology = builder.build();

        Properties props = new Properties();
        props.put(APPLICATION_ID_CONFIG, "streams-word-count");
        props.put(BOOTSTRAP_SERVERS_CONFIG, "ec2-ip:9092");
        props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final KafkaStreams streams = new KafkaStreams(topology, props);

        final CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(
                new Thread("streams-shutdown-hook") {
                    @Override
                    public void run() {
                        streams.close();
                        latch.countDown();
                    }
                });
        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

当我在控制中心中检查输出主题时,它看起来像这样:
Key
Value
看起来它可以将tweet拆分成单个单词,但是计数值不是Long格式,尽管它在代码中指定了。
当我用kafka-console-consumer来消费这个主题时,它说:
“LongDeserializer接收的数据大小不是8”

kjthegm6

kjthegm61#

默认情况下,控制中心UI和控制台使用者只能呈现UTF8数据。
您需要将LongDeserializer显式传递给控制台使用者,仅作为值反序列化器

lnlaulya

lnlaulya2#

请尝试使用KTable:

KStream<String, String> textLines = builder.stream("test-topic", Consumed.with(stringSerde, stringSerde));

KTable<String, Long> wordCounts = textLines    
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
    .groupBy((key, value) -> value)    
    .count()
    .toStream()
    .to("test-output", Produced.with(Serdes.String(), Serdes.Long()));

相关问题