无法在SpringCloudKafka中设置每个主题/活页夹级别的serde、producer和consumer属性

x8goxv8g  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(481)

我尝试使用springcloudkafka活页夹创建一个简单的pub-sub应用程序。但是,我无法在application.yml中设置序列化程序、反序列化程序属性以及其他生产者和使用者属性。我总是得到序列化/反序列化错误。甚至kafka在springboot项目中的日志也显示生产者和消费者仍然通过realyserializer配置用户。下面是代码示例。
pom.xml文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.2.4.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>io.github.kprasad99.kafka</groupId>
  12. <artifactId>kp-kafka-streams-example</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>kp-kafka-streams-example</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>11</java.version>
  18. <spring-cloud.version>Hoxton.SR1</spring-cloud.version>
  19. </properties>
  20. <dependencies>
  21. <dependency>
  22. <groupId>org.springframework.boot</groupId>
  23. <artifactId>spring-boot-starter-web</artifactId>
  24. </dependency>
  25. <!-- <dependency>
  26. <groupId>org.apache.kafka</groupId>
  27. <artifactId>kafka-streams</artifactId>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.springframework.cloud</groupId>
  31. <artifactId>spring-cloud-stream</artifactId>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.springframework.cloud</groupId>
  35. <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
  36. </dependency> -->
  37. <dependency>
  38. <groupId>org.springframework.cloud</groupId>
  39. <artifactId>spring-cloud-stream-binder-kafka</artifactId>
  40. </dependency>
  41. <dependency>
  42. <groupId>org.springframework.kafka</groupId>
  43. <artifactId>spring-kafka</artifactId>
  44. </dependency>
  45. <dependency>
  46. <groupId>org.springframework.cloud</groupId>
  47. <artifactId>spring-cloud-stream</artifactId>
  48. </dependency>
  49. <dependency>
  50. <groupId>org.springframework.boot</groupId>
  51. <artifactId>spring-boot-devtools</artifactId>
  52. <scope>runtime</scope>
  53. <optional>true</optional>
  54. </dependency>
  55. <dependency>
  56. <groupId>org.springframework.boot</groupId>
  57. <artifactId>spring-boot-configuration-processor</artifactId>
  58. <optional>true</optional>
  59. </dependency>
  60. <dependency>
  61. <groupId>org.projectlombok</groupId>
  62. <artifactId>lombok</artifactId>
  63. <optional>true</optional>
  64. </dependency>
  65. <dependency>
  66. <groupId>org.springframework.boot</groupId>
  67. <artifactId>spring-boot-starter-test</artifactId>
  68. <scope>test</scope>
  69. <exclusions>
  70. <exclusion>
  71. <groupId>org.junit.vintage</groupId>
  72. <artifactId>junit-vintage-engine</artifactId>
  73. </exclusion>
  74. </exclusions>
  75. </dependency>
  76. <dependency>
  77. <groupId>org.springframework.cloud</groupId>
  78. <artifactId>spring-cloud-stream-test-support</artifactId>
  79. <scope>test</scope>
  80. </dependency>
  81. </dependencies>
  82. <dependencyManagement>
  83. <dependencies>
  84. <dependency>
  85. <groupId>org.springframework.cloud</groupId>
  86. <artifactId>spring-cloud-dependencies</artifactId>
  87. <version>${spring-cloud.version}</version>
  88. <type>pom</type>
  89. <scope>import</scope>
  90. </dependency>
  91. </dependencies>
  92. </dependencyManagement>
  93. <build>
  94. <plugins>
  95. <plugin>
  96. <groupId>org.springframework.boot</groupId>
  97. <artifactId>spring-boot-maven-plugin</artifactId>
  98. </plugin>
  99. </plugins>
  100. </build>
  101. </project>

处理器.java

  1. public interface Processor {
  2. String INPUT="k-msg-source";
  3. String OUTPUT="k-msg-sink";
  4. @Input(INPUT)
  5. SubscribableChannel input();
  6. @Output(OUTPUT)
  7. MessageChannel output();
  8. }

克雷斯特. java
@restcontroller公共类krest{

  1. @Autowired
  2. private Processor processor;
  3. @GetMapping("/send")
  4. public ResponseEntity<Void> send(@RequestParam("key")String key, @RequestParam("msg") String text){
  5. processor.input().send(MessageBuilder.withPayload(Message.builder().text(text).build()).setHeader(KafkaHeaders.MESSAGE_KEY, key).build());
  6. return ResponseEntity.ok().build();
  7. }

}
消息.java

  1. @Data
  2. @NoArgsConstructor
  3. @AllArgsConstructor
  4. @Builder
  5. public class Message {
  6. private String text;
  7. }

最后是application.yml

  1. spring:
  2. cloud:
  3. stream:
  4. bindings:
  5. k-msg-source:
  6. binder: kafka
  7. content-type: application/json
  8. destination: topic.kp.msg
  9. group: kp.msg.source
  10. k-msg-sink:
  11. binder: kafka
  12. content-type: application/json
  13. destination: topic.kp.msg
  14. group: kp.msg.sink
  15. producer:
  16. partition-count: 10
  17. binders:
  18. kafka:
  19. type: kafka
  20. environment:
  21. spring.cloud.stream.kafka.binder:
  22. brokers: localhost:9092
  23. configuration:
  24. value.serde: JsonSerde
  25. key.serde: StringSerde
  26. producer:
  27. value.serde: JsonSerde
  28. key.serde: StringSerde
  29. replication-factor: 1

版本
Spring Boot:2.2.4
Spring Cloud:hoxton.sr1
Spring Cloud溪Kafka活页夹:3.0.1

5lhxktic

5lhxktic1#

最终 application.yml ```
spring.cloud.stream:
bindings:
k-msg-source:
binder: kafka
destination: topic.kp.msg
content-type: application/json
producer:
partition-count: 10
use-native-encoding: true
k-msg-sink:
binder: kafka
destination: topic.kp.msg
group: ne-publisher
content-type: application/json
consumer:
use-native-decoding: true
binders:
kafka:
type: kafka
environment:
spring.cloud.stream.kafka.binder:
brokers: PLAINTEXT://localhost:19092,PLAINTEXT://localhost:29092,PLAINTEXT://localhost:39092
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
consumer-properties:
spring.json.trusted.packages: "*"

展开查看全部
mzmfm0qo

mzmfm0qo2#

Serde Kafka流活页夹使用。
MessageChannel 粘合剂,性能 value.serializer 以及 value.deserializer (和 key... ),和 key/value.deserializer .
还必须指定类的完全限定名。

相关问题