在一个kafka主题下发送两个序列化的java对象

zbq4xfa0  于 2021-07-24  发布在  Java
关注(0)|答案(2)|浏览(472)

我想实现kafka消费者和生产者发送和接收java对象。完整来源我试过这个:
制作人:

  1. @Configuration
  2. public class KafkaProducerConfig {
  3. @Value(value = "${kafka.bootstrapAddress}")
  4. private String bootstrapAddress;
  5. @Bean
  6. public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() {
  7. Map<String, Object> configProps = new HashMap<>();
  8. configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
  9. configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  10. configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class);
  11. return new DefaultKafkaProducerFactory<>(configProps);
  12. }
  13. @Bean
  14. public ProducerFactory<String, String> producerFactory() {
  15. Map<String, Object> configProps = new HashMap<>();
  16. configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
  17. return new DefaultKafkaProducerFactory<>(configProps);
  18. }
  19. @Bean
  20. public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() {
  21. return new KafkaTemplate<>(saleRequestFactoryProducerFactory());
  22. }
  23. @Bean
  24. public KafkaTemplate<String, String> kafkaTemplate() {
  25. return new KafkaTemplate<>(producerFactory());
  26. }
  27. @Bean
  28. public ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> replyKafkaTemplate(ProducerFactory<String, SaleRequestFactory> producerFactory, ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory) {
  29. ConcurrentMessageListenerContainer<String, SaleResponseFactory> kafkaMessageListenerContainer = factory.createContainer("tp-sale");
  30. kafkaMessageListenerContainer.getContainerProperties().setGroupId("tp-sale.reply");
  31. return new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
  32. }
  33. }

发送对象:

  1. @RestController
  2. @RequestMapping("/checkout")
  3. public class CheckoutController {
  4. private TransactionService transactionService;
  5. private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;
  6. private ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate;
  7. private static String topic = "tp-sale";
  8. @Autowired
  9. public CheckoutController(ValidationMessage validationMessage, TransactionService transactionService,
  10. KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate,
  11. ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate){
  12. this.transactionService = transactionService;
  13. this.saleRequestFactoryKafkaTemplate = saleRequestFactoryKafkaTemplate;
  14. this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
  15. }
  16. @PostMapping("test")
  17. private void performPayment() throws ExecutionException, InterruptedException, TimeoutException {
  18. Transaction transaction = new Transaction();
  19. transaction.setStatus(PaymentTransactionStatus.IN_PROGRESS.getText());
  20. Transaction insertedTransaction = transactionService.save(transaction);
  21. SaleRequestFactory obj = new SaleRequestFactory();
  22. obj.setId(100);
  23. ProducerRecord<String, SaleRequestFactory> record = new ProducerRecord<>("tp-sale", obj);
  24. RequestReplyFuture<String, SaleRequestFactory, SaleResponseFactory> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
  25. SendResult<String, SaleRequestFactory> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
  26. ConsumerRecord<String, SaleResponseFactory> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
  27. SaleResponseFactory value = consumerRecord.value();
  28. System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
  29. }
  30. }

消费者:

  1. @EnableKafka
  2. @Configuration
  3. public class KafkaConsumerConfig {
  4. @Value(value = "${kafka.bootstrapAddress}")
  5. private String bootstrapAddress;
  6. private String groupId = "test";
  7. @Bean
  8. public ConsumerFactory<String, SaleResponseFactory> consumerFactory() {
  9. Map<String, Object> props = new HashMap<>();
  10. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
  11. props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  12. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  13. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class);
  14. return new DefaultKafkaConsumerFactory<>(props);
  15. }
  16. @Bean
  17. public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() {
  18. ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory =
  19. new ConcurrentKafkaListenerContainerFactory<>();
  20. factory.setConsumerFactory(consumerFactory());
  21. return factory;
  22. }
  23. }

接收对象

  1. @Component
  2. public class ProcessingSaleListener {
  3. private static String topic = "tp-sale";
  4. @KafkaListener(topics = "tp-sale")
  5. public SaleResponseFactory process(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception {
  6. System.out.println(tf.getId());
  7. SaleResponseFactory resObj = new SaleResponseFactory();
  8. resObj.setUnique_id("123123");
  9. return resObj;
  10. }
  11. }

自定义对象

  1. import java.io.Serializable;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Builder;
  4. import lombok.Getter;
  5. import lombok.NoArgsConstructor;
  6. import lombok.Setter;
  7. @Getter
  8. @Setter
  9. @NoArgsConstructor
  10. @AllArgsConstructor
  11. @Builder(toBuilder = true)
  12. public class SaleRequestFactory implements Serializable {
  13. private static final long serialVersionUID = 1744050117179344127L;
  14. private int id;
  15. }

序列化程序

  1. import org.apache.kafka.common.serialization.Serializer;
  2. import java.io.ByteArrayOutputStream;
  3. import java.io.IOException;
  4. import java.io.ObjectOutputStream;
  5. import java.io.Serializable;
  6. public class SaleRequestFactorySerializer implements Serializable, Serializer<SaleRequestFactory> {
  7. @Override
  8. public byte[] serialize(String topic, SaleRequestFactory data)
  9. {
  10. ByteArrayOutputStream out = new ByteArrayOutputStream();
  11. try
  12. {
  13. ObjectOutputStream outputStream = new ObjectOutputStream(out);
  14. outputStream.writeObject(data);
  15. out.close();
  16. }
  17. catch (IOException e)
  18. {
  19. throw new RuntimeException("Unhandled", e);
  20. }
  21. return out.toByteArray();
  22. }
  23. }

响应对象

  1. import java.io.Serializable;
  2. import java.time.LocalDateTime;
  3. import lombok.AllArgsConstructor;
  4. import lombok.Builder;
  5. import lombok.Getter;
  6. import lombok.NoArgsConstructor;
  7. import lombok.Setter;
  8. @Getter
  9. @Setter
  10. @NoArgsConstructor
  11. @AllArgsConstructor
  12. @Builder(toBuilder = true)
  13. public class SaleResponseFactory implements Serializable {
  14. private static final long serialVersionUID = 1744050117179344127L;
  15. private String unique_id;
  16. }

响应类

  1. import org.apache.kafka.common.serialization.Deserializer;
  2. import java.io.ByteArrayInputStream;
  3. import java.io.IOException;
  4. import java.io.ObjectInputStream;
  5. import java.io.Serializable;
  6. public class SaleResponseFactoryDeserializer implements Serializable, Deserializer<SaleRequestFactory> {
  7. @Override
  8. public SaleRequestFactory deserialize(String topic, byte[] data)
  9. {
  10. SaleRequestFactory saleRequestFactory = null;
  11. try
  12. {
  13. ByteArrayInputStream bis = new ByteArrayInputStream(data);
  14. ObjectInputStream in = new ObjectInputStream(bis);
  15. saleRequestFactory = (SaleRequestFactory) in.readObject();
  16. in.close();
  17. }
  18. catch (IOException | ClassNotFoundException e)
  19. {
  20. throw new RuntimeException("Unhandled", e);
  21. }
  22. return saleRequestFactory;
  23. }
  24. }

我想根据对象类型发送和接收不同的序列化java对象。例如有时 SaleRequestFactory 接受 SaleResponseFactory 或发送 AuthRequestFactory 接受 AuthResponseFactory . 是否可以使用一个主题发送和接收不同的java对象?
完整示例代码

rur96b6h

rur96b6h1#

使用 Object 作为值类型-下面是一个使用boot的自动配置基础结构bean的示例。。。

  1. @SpringBootApplication
  2. public class So65866763Application {
  3. public static void main(String[] args) {
  4. SpringApplication.run(So65866763Application.class, args);
  5. }
  6. @Bean
  7. public ApplicationRunner runner(KafkaTemplate<String, Object> template) {
  8. return args -> {
  9. template.send("so65866763", new Foo());
  10. template.send("so65866763", new Bar());
  11. };
  12. }
  13. @Bean
  14. public NewTopic topic() {
  15. return TopicBuilder.name("so65866763").partitions(1).replicas(1).build();
  16. }
  17. }
  18. class Foo implements Serializable {
  19. }
  20. class Bar implements Serializable {
  21. }
  22. @Component
  23. @KafkaListener(id = "so65866763", topics = "so65866763")
  24. class Listener {
  25. @KafkaHandler
  26. void fooListener(Foo foo) {
  27. System.out.println("In fooListener: " + foo);
  28. }
  29. @KafkaHandler
  30. void barListener(Bar bar) {
  31. System.out.println("In barListener: " + bar);
  32. }
  33. }
  1. public class JavaSerializer implements Serializer<Object> {
  2. @Override
  3. public byte[] serialize(String topic, Object data) {
  4. return null;
  5. }
  6. @Override
  7. public byte[] serialize(String topic, Headers headers, Object data) {
  8. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  9. try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
  10. oos.writeObject(data);
  11. return baos.toByteArray();
  12. }
  13. catch (IOException e) {
  14. throw new UncheckedIOException(e);
  15. }
  16. }
  17. }
  1. public class JavaDeserializer implements Deserializer<Object> {
  2. @Override
  3. public Object deserialize(String topic, byte[] data) {
  4. return null;
  5. }
  6. @Override
  7. public Object deserialize(String topic, Headers headers, byte[] data) {
  8. ByteArrayInputStream bais = new ByteArrayInputStream(data);
  9. try (ObjectInputStream ois = new ObjectInputStream(bais)) {
  10. return ois.readObject();
  11. }
  12. catch (IOException e) {
  13. throw new UncheckedIOException(e);
  14. }
  15. catch (ClassNotFoundException e) {
  16. throw new IllegalStateException(e);
  17. }
  18. }
  19. }
  1. spring.kafka.consumer.auto-offset-reset=earliest
  2. spring.kafka.producer.value-serializer=com.example.demo.JavaSerializer
  3. spring.kafka.consumer.value-deserializer=com.example.demo.JavaDeserializer
  1. In fooListener: com.example.demo.Foo@331ca660
  2. In barListener: com.example.demo.Bar@26f54288
展开查看全部
uujelgoq

uujelgoq2#

这是可能的,但是每个对象类型需要两个独立的生产者工厂。或者使用bytearrayserializer并自己序列化对象(相当于gary的答案)
如果您真的想要正确地反序列化对象,那么对于使用者也是如此。否则,您将使用bytearraydeserializer(同样,相当于gary显示的反序列化程序),然后假设java无法确定字节中的对象类型(哪些序列化的对象流是}),您将在记录中包含额外的元数据,例如头,或者解析一个特定的键来确定如何反序列化数据,然后自己调用相应的反序列化方法
总的来说,我建议您重新评估为什么需要在一个主题中放置不同类型的记录,或者查看其他消息格式,包括cloudevents规范,或者使用avro/protobuf/polymorphic json类型,这些类型更适合与kafka以外的客户机一起使用

相关问题