对象的Kafka序列化

w1jd8yoj  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(440)

这个问题在这里已经有答案了

编写自定义Kafka序列化程序(3个答案)
9个月前关门了。
我开始和Kafka玩。我已经设置了一个zookeeper配置,并且成功地发送和使用了字符串消息。现在我尝试传递一个对象(在java中),但是由于某种原因,在消费者中解析消息时,我遇到了头问题。我尝试了几个序列化选项(使用解码器/编码器),所有这些都返回相同的头问题。
这是我的代码制作人:

Properties props = new Properties();
        props.put("zk.connect", "localhost:2181");
        props.put("serializer.class", "com.inneractive.reporter.kafka.EventsDataSerializer");
        ProducerConfig config = new ProducerConfig(props);
        Producer<Long, EventDetails> producer = new Producer<Long, EventDetails>(config);
        ProducerData<Long, EventDetails> data = new ProducerData<Long, EventDetails>("test3", 1, Arrays.asList(new EventDetails());
        try {
           producer.send(data);
        } finally {
           producer.close();
        }

消费者:

Properties props = new Properties();
        props.put("zk.connect", "localhost:2181");
        props.put("zk.connectiontimeout.ms", "1000000");
        props.put("groupid", "test_group");

        // Create the connection to the cluster
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

        // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
        Map<String, List<KafkaMessageStream<EventDetails>>> topicMessageStreams =
                consumerConnector.createMessageStreams(ImmutableMap.of("test3", 4), new EventsDataSerializer());
        List<KafkaMessageStream<EventDetails>> streams = topicMessageStreams.get("test3");

        // create list of 4 threads to consume from each of the partitions
        ExecutorService executor = Executors.newFixedThreadPool(4);

        // consume the messages in the threads
        for (final KafkaMessageStream<EventDetails> stream: streams) {
            executor.submit(new Runnable() {
                public void run() {
                    for(EventDetails event: stream) {
                        System.err.println("**********Got message" + event.toString());        
                    }
                }
            });
        }

以及我的序列化程序:

public  class EventsDataSerializer implements Encoder<EventDetails>, Decoder<EventDetails> {
    public Message toMessage(EventDetails eventDetails) {
        try {
            ObjectMapper mapper = new ObjectMapper(new SmileFactory());
            byte[] serialized = mapper.writeValueAsBytes(eventDetails);
            return new Message(serialized);
} catch (IOException e) {
            e.printStackTrace();
            return null;   // TODO
        }
}
    public EventDetails toEvent(Message message) {
        EventDetails event = new EventDetails();

        ObjectMapper mapper = new ObjectMapper(new SmileFactory());
        try {
            //TODO handle error
            return mapper.readValue(message.payload().array(), EventDetails.class);
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }

    }
}

这就是我得到的错误:

org.codehaus.jackson.JsonParseException: Input does not start with Smile format header (first byte = 0x0) and parser has REQUIRE_HEADER enabled: can not parse
 at [Source: N/A; line: -1, column: -1]

当我和 MessagePack 用简单的文字 ObjectOutputStream 我也有类似的头球问题。我还尝试将有效负载crc32添加到消息中,但也没有起到作用。
我做错什么了?

oknwwptz

oknwwptz1#

bytebuffers.array()方法不太可靠。这取决于具体的实施。你也许想试试

ByteBuffer bb = message.payload()

byte[] b = new byte[bb.remaining()]
bb.get(b, 0, b.length);
return mapper.readValue(b, EventDetails.class)
deyfvvtc

deyfvvtc2#

嗯,我还没有遇到同样的标题问题,你会遇到,但我的项目没有编译正确时,我没有提供一个 VerifiableProperties 我的编码器/解码器中的构造函数。奇怪的是,丢失的构造函数会破坏Jackson的反序列化。
或许可以尝试拆分编码器和解码器,并包含 VerifiableProperties 二者的构造器;你不需要实施 Decoder[T] 用于序列化。我能够使用 ObjectMapper 按照这篇文章的格式。
祝你好运!

相关问题