我想实现kafka消费者和生产者发送和接收java对象。完整来源我试过这个:
制作人:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() {
return new KafkaTemplate<>(saleRequestFactoryProducerFactory());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> replyKafkaTemplate(ProducerFactory<String, SaleRequestFactory> producerFactory, ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory) {
ConcurrentMessageListenerContainer<String, SaleResponseFactory> kafkaMessageListenerContainer = factory.createContainer("tp-sale");
kafkaMessageListenerContainer.getContainerProperties().setGroupId("tp-sale.reply");
return new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
}
}
发送对象:
@RestController
@RequestMapping("/checkout")
public class CheckoutController {
private TransactionService transactionService;
private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;
private ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate;
private static String topic = "tp-sale";
@Autowired
public CheckoutController(ValidationMessage validationMessage, TransactionService transactionService,
KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate,
ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate){
this.transactionService = transactionService;
this.saleRequestFactoryKafkaTemplate = saleRequestFactoryKafkaTemplate;
this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
}
@PostMapping("test")
private void performPayment() throws ExecutionException, InterruptedException, TimeoutException {
Transaction transaction = new Transaction();
transaction.setStatus(PaymentTransactionStatus.IN_PROGRESS.getText());
Transaction insertedTransaction = transactionService.save(transaction);
SaleRequestFactory obj = new SaleRequestFactory();
obj.setId(100);
ProducerRecord<String, SaleRequestFactory> record = new ProducerRecord<>("tp-sale", obj);
RequestReplyFuture<String, SaleRequestFactory, SaleResponseFactory> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, SaleRequestFactory> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, SaleResponseFactory> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
SaleResponseFactory value = consumerRecord.value();
System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
}
}
消费者:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
private String groupId = "test";
@Bean
public ConsumerFactory<String, SaleResponseFactory> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
接收对象
@Component
public class ProcessingSaleListener {
private static String topic = "tp-sale";
@KafkaListener(topics = "tp-sale")
public SaleResponseFactory process(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception {
System.out.println(tf.getId());
SaleResponseFactory resObj = new SaleResponseFactory();
resObj.setUnique_id("123123");
return resObj;
}
}
自定义对象
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleRequestFactory implements Serializable {
private static final long serialVersionUID = 1744050117179344127L;
private int id;
}
序列化程序
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
public class SaleRequestFactorySerializer implements Serializable, Serializer<SaleRequestFactory> {
@Override
public byte[] serialize(String topic, SaleRequestFactory data)
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
try
{
ObjectOutputStream outputStream = new ObjectOutputStream(out);
outputStream.writeObject(data);
out.close();
}
catch (IOException e)
{
throw new RuntimeException("Unhandled", e);
}
return out.toByteArray();
}
}
响应对象
import java.io.Serializable;
import java.time.LocalDateTime;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleResponseFactory implements Serializable {
private static final long serialVersionUID = 1744050117179344127L;
private String unique_id;
}
响应类
import org.apache.kafka.common.serialization.Deserializer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
public class SaleResponseFactoryDeserializer implements Serializable, Deserializer<SaleRequestFactory> {
@Override
public SaleRequestFactory deserialize(String topic, byte[] data)
{
SaleRequestFactory saleRequestFactory = null;
try
{
ByteArrayInputStream bis = new ByteArrayInputStream(data);
ObjectInputStream in = new ObjectInputStream(bis);
saleRequestFactory = (SaleRequestFactory) in.readObject();
in.close();
}
catch (IOException | ClassNotFoundException e)
{
throw new RuntimeException("Unhandled", e);
}
return saleRequestFactory;
}
}
我想根据对象类型发送和接收不同的序列化java对象。例如有时 SaleRequestFactory
接受 SaleResponseFactory
或发送 AuthRequestFactory
接受 AuthResponseFactory
. 是否可以使用一个主题发送和接收不同的java对象?
完整示例代码
2条答案
按热度按时间rur96b6h1#
使用
Object
作为值类型-下面是一个使用boot的自动配置基础结构bean的示例。。。uujelgoq2#
这是可能的,但是每个对象类型需要两个独立的生产者工厂。或者使用bytearrayserializer并自己序列化对象(相当于gary的答案)
如果您真的想要正确地反序列化对象,那么对于使用者也是如此。否则,您将使用bytearraydeserializer(同样,相当于gary显示的反序列化程序),然后假设java无法确定字节中的对象类型(哪些序列化的对象流是}),您将在记录中包含额外的元数据,例如头,或者解析一个特定的键来确定如何反序列化数据,然后自己调用相应的反序列化方法
总的来说,我建议您重新评估为什么需要在一个主题中放置不同类型的记录,或者查看其他消息格式,包括cloudevents规范,或者使用avro/protobuf/polymorphic json类型,这些类型更适合与kafka以外的客户机一起使用