我开始学习Kafka,现在,我正在发送/接收序列化/理想化的java类。我的问题是:我在配置中遗漏了什么,所以我不能从kafka反序列化对象
这是我的课:
public class Foo {
private String item;
private int quantity;
private Double price;
public Foo(String item, int quantity, final double price) {
this.item = item;
this.quantity = quantity;
this.price = price;
}
public String getItem() { return item; }
public int getQuantity() { return quantity; }
public Double getPrice() { return price; }
public void setQuantity(int quantity) { this.quantity = quantity; }
public void setPrice(double price) { this.price = price; }
@Override
public String toString() {
return "item=" + item + ", quantity=" + quantity + ", price=" + price;
}
}
我在主类中的属性:
producerpropsobject.put(producerconfig.client\u id\u config,appconfigs.applicationproducerserializedobject);producerpropsobject.put(producerconfig.bootstrap\u servers\u config,appconfigs.bootstrapservers);producerpropsobject.put(producerconfig.key\u serializer\u class\u config,stringserializer.class.getname());producerpropsobject.put(producerconfig.value\u serializer\u class\u config,fooserializer.class.getname());producerpropsobject.put(“主题”,appconfigs.topicnameforserializedobject);
consumerpropsobject.put(consumerconfig.group\u id\u config,appconfigs.applicationproducerserializedobject);consumerpropsobject.put(consumerconfig.bootstrap\u servers\u config,appconfigs.bootstrapservers);consumerpropsobject.put(consumerconfig.key_deserializer_class_config,stringdeserializer.class.getname());consumerpropsobject.put(consumerconfig.value_deserializer_class_config,foodeserializer.class.getname());consumerpropsobject.put(consumerconfig.max\u poll\u interval\u ms\u config,300000);consumerpropsobject.put(consumerconfig.enable\u auto\u commit\u config,true);consumerpropsobject.put(consumerconfig.auto_offset_reset_config,“最早”);consumerpropsobject.put(“主题”,appconfigs.topicnameforserializedobject);
以下是序列化程序/反序列化程序实现:
public class FooSerializer implements org.apache.kafka.common.serialization.Serializer {
public void configure(Map map, boolean b) { }
public byte[] serialize(String s, Object o) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(o);
oos.close();
byte[] b = baos.toByteArray();
return b;
} catch (IOException e) { return new byte[0]; }
}
public void close() { }
}
public class FooDeserializer implements org.apache.kafka.common.serialization.Deserializer {
@Override
public void close() { }
@Override
public Foo deserialize(String arg0, byte[] arg1) {
//Option #1:
//ObjectMapper mapper = new ObjectMapper();
//Option #2:
JsonFactory factory = new JsonFactory();
factory.enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES);
ObjectMapper mapper = new ObjectMapper(factory);
Foo fooObj = null;
try {
//Option #1:
//fooObj = mapper.readValue(arg1, Foo.class); // BREAKS HERE!!!
//Option #2:
fooObj = mapper.reader().forType(Foo.class).readValue(arg1); // BREAKS HERE!!!
}
catch (Exception e) { e.printStackTrace(); }
return fooObj;
}
}
最后,我试着从main生产和消费我的食物:
似乎,它工作得很好,因为我看到Kafka主题我的关键和价值以后
public void produceObjectToKafka(final Properties producerProps) {
final String[] ar = new String[]{"Matrix", "Naked Gun", "5th Element", "Die Hard", "Gone with a wind"};
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(producerProps);
final Foo j = new Foo(ar[getAnInt(4)], getAnInt(10), getAnDouble());
producer.send(new ProducerRecord<>(producerProps.getProperty("topic"), j.getItem(), j.toString().getBytes()));
producer.flush();
producer.close();
}
然而,当我的消费者正在捕捉输出时:
public void consumeFooFromKafka(final Properties consumerProps) {
final Consumer<String, Foo> myConsumer = new KafkaConsumer<>(consumerProps);
final Thread separateThread = new Thread(() -> {
try {
myConsumer.subscribe(Collections.singletonList(consumerProps.getProperty("topic")));
while (continueToRunFlag) {
final StringBuilder sb = new StringBuilder();
final ConsumerRecords<String, Foo> consumerRecords = myConsumer.poll(Duration.ofMillis(10));
if (consumerRecords.count() > 0) {
for (ConsumerRecord<String, Foo> cRec : consumerRecords) {
sb.append( cRec.key() ).append("<<").append(cRec.value().getItem() + ",").append(cRec.value().getQuantity() + ",").append(cRec.value().getPrice()).append("|");
}
}
if (sb.length() > 0) { System.out.println(sb.toString()); }
}
}
finally {
myConsumer.close();
}
});
separateThread.start();
}
============================================所以,实际上是通过运行“consumefoofromkafka”,当它触发“foodeserializer”。。。。。。在那里,我总是有相同的错误(不管选项1或选项2):
异常:方法引发了“com.fasterxml.jackson.core.jsonparseexception”异常。detailedmessage:意外的字符('¬' (代码172):应为有效值(json字符串、数字、数组、对象或标记“null”、“true”或“false”)
非常感谢您的帮助。。。。。。。提前谢谢你,史蒂夫
2条答案
按热度按时间iecba09b1#
如果您想从json反序列化,那么您需要将其序列化为json,也可以在您的序列化程序中使用jackson,一切都应该正常
pqwbnv8z2#
我不知道为什么要使用bytearray outputstream,但尝试在反序列化程序中读取json,但这解释了错误。您甚至可以通过直接调用serialize/deserialize方法来测试它,而不必使用kafka
在提供的链接中,序列化程序使用
objectMapper.writeValueAsString
,它返回json文本,而不是特定于java的outputstream。如果您想在不同的编程语言之间使用和生成数据(大多数公司都是这样),那么您应该避免使用这种特定的序列化格式注意:confluent为kafka提供了avro、protobuf和json序列化程序,因此如果您想使用其中一种格式,就不需要编写自己的序列化程序