Spring云流Kafkaktable作为输入不工作

w1jd8yoj  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(376)

Spring Cloud流Kafka,ktable作为输入不工作
接收器.java

  1. public interface EventSink {
  2. @Input("inputTable")
  3. KTable<?, ?> inputTable();
  4. }

消息接收器.java

  1. @EnableBinding(EventSink .class)
  2. public class MessageReceiver {
  3. @StreamListener
  4. public void process(@Input("inputTable") KTable<String, Event> KTable) {
  5. // below code is just for representation. I need to do lot of things after getting this KTable
  6. KTable.toStream()
  7. .foreach((key, value) -> System.out.println(value));
  8. }
  9. }

应用程序.yml

  1. server:
  2. port: 8083
  3. spring:
  4. cloud:
  5. stream:
  6. kafka:
  7. streams:
  8. binder:
  9. application-id: kafka-stream-demo
  10. configuration:
  11. default:
  12. key:
  13. serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  14. value:
  15. serde: org.springframework.kafka.support.serializer.JsonSerde
  16. bindings:
  17. inputTable:
  18. materialized-as: event_store
  19. binder:
  20. brokers: localhost:9092
  21. bindings:
  22. inputTable:
  23. destination: nscevent
  24. group: nsceventGroup

我的错误率正在下降

  1. Exception in thread "kafka-stream-demo-1e64cf93-de19-4185-bee4-8fc882275010-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
  2. at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
  3. at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
  4. at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
  5. at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:677)
  6. at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943)
  7. at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
  8. at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
  9. at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
  10. Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
  11. at org.springframework.util.Assert.state(Assert.java:73)
  12. at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:370)
  13. at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
  14. at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
  15. ... 7 more

有人能告诉我是什么问题吗?以kstream作为输入,它正在工作,但不是ktable。提前谢谢

jyztefdp

jyztefdp1#

ktable总是使用kafka流的本机serde特性进行转换。框架级别的转换没有在ktable上完成(尽管添加它有一个问题)。由于您对值使用自定义类型,因此需要指定正确的serde,而不是使用默认的字符串serde。您可以将这些添加到配置中。

  1. spring.cloud.stream.kafka.streams.binder.configuration:
  2. default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
  3. spring.json.value.default.type: RawAccounting

ktable不自动转换为输入通道

相关问题