运行kafka流的fat jar时出现异常

qoefvg9y  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(376)

我正在尝试学习Kafka流的wordcount例子。下面是使用的代码。我已经从项目中创建了一个胖jar,并开始向topic生成消息 word-count-input1 我正在从 word-count-output1 . 但当我运行胖jar时,我看到了一个例外- org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. ```
Properties properties = new Properties();

properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"word-count-example");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass());

KStreamBuilder builder = new KStreamBuilder();

// 1. Stream from kafka

KStream<String,String> wordcountInput = builder.stream("word-count-input1");

// 2. map values to lower case

KTable<String, Long> wordcount = wordcountInput.mapValues(value -> value.toLowerCase())

                               // 3. split by space on values
                                .flatMapValues(value -> Arrays.asList(value.split(" ")))

                               // 4. Create  a key to apply a key, so the word itself is a key

                                .selectKey((ignoredKey,words) -> words)

                                // 5. Group it by key

                                .groupByKey()

                                // 6. count occurences, add a column name - counts

                                .count("counts");

// Since the StreamsConfig was set to String and String, its mandatory to specify the Serdes o/p which is String and Long in our case
wordcount.to(Serdes.String(),Serdes.Long(),"word-count-output1");

KafkaStreams streams = new KafkaStreams(builder, properties);
streams.start();
System.out.println("Topology is " + streams.toString());
例外情况:

INFO stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread:1040)
INFO Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer:972)
INFO stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Removing all active tasks [0_0, 1_0, 0_1, 1_1, 0_2, 1_2] (org.apache.kafka.streams.processor.internals.StreamThread:1407)
INFO stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Removing all standby tasks [] (org.apache.kafka.streams.processor.internals.StreamThread:1421)
INFO stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Stream thread shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1072)
WARN stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Unexpected state transition from RUNNING to DEAD. (org.apache.kafka.streams.processor.internals.StreamThread:978)
Exception in thread "word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=word-count-input1, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
INFO stream-client [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a] State transition from RUNNING to PENDING_SHUTDOWN. (org.apache.kafka.streams.KafkaStreams:229)
INFO stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Informed thread to shut down (org.apache.kafka.streams.processor.internals.StreamThread:900)
WARN stream-thread [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a-StreamThread-1] Unexpected state transition from DEAD to PENDING_SHUTDOWN. (org.apache.kafka.streams.processor.internals.StreamThread:978)
INFO stream-client [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a] Stopped Kafka Streams process. (org.apache.kafka.streams.KafkaStreams:514)
INFO stream-client [word-count-example-052c3c3e-8dfd-40e7-8b5b-7ee06e3af96a] State transition from PENDING_SHUTDOWN to NOT_RUNNING. (org.apache.kafka.streams.KafkaStreams:229)

设置是我运行一个zookeeper和3个代理在一个linux虚拟机上。有人能建议一下吗?
xoshrz7s

xoshrz7s1#

将您的ktable更改为此应该可以解决此问题:

KTable<String, Long> wordcount = source
                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {
                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
                    }
                })
                .groupBy(new KeyValueMapper<String, String, String>() {
                    @Override
                    public String apply(String key, String value) {
                        return value;
                    }
                })
                .count("Counts");
l7mqbcuq

l7mqbcuq2#

你确实说了 LongDeserializer 对于'value\u反序列化程序?实际误差为:

Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

所以看起来,您的值不是8字节长的值。我假设,您的值实际上是输入主题的字符串?因此,您需要指定与数据匹配的正确反序列化程序。

相关问题