我正在AWS EC2示例上运行Kafka Cluster Docker Compose。我想接收特定关键字的所有tweet并将其推送到Kafka。这很好。但我还想统计这些tweet中使用最多的单词。
下面是WordCount代码:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.StreamsBuilder;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import java.util.concurrent.CountDownLatch;
import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG;
public class WordCount {
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> textLines = builder
.stream("test-topic");
textLines
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.as("WordCount"))
.toStream()
.to("test-output", Produced.with(Serdes.String(), Serdes.Long()));
final Topology topology = builder.build();
Properties props = new Properties();
props.put(APPLICATION_ID_CONFIG, "streams-word-count");
props.put(BOOTSTRAP_SERVERS_CONFIG, "ec2-ip:9092");
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(
new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
当我在控制中心中检查输出主题时,它看起来像这样:
Key
Value
看起来它可以将tweet拆分成单个单词,但是计数值不是Long格式,尽管它在代码中指定了。
当我用kafka-console-consumer来消费这个主题时,它说:
“LongDeserializer接收的数据大小不是8”
2条答案
按热度按时间kjthegm61#
默认情况下,控制中心UI和控制台使用者只能呈现UTF8数据。
您需要将LongDeserializer显式传递给控制台使用者,仅作为值反序列化器
lnlaulya2#
请尝试使用KTable: