我正在使用Kafka Consumer接收图像。以下是片段:
byte[][] message_send = new byte[size_array] [] ; //create matrix
System.out.println("Starting Consuming");
while ((dispatcher.AcceptedNumberJobs > 0) || (dispatcher.queue_size > 0) ) {
dispatcher.consumer.subscribe(Collections.singletonList(Topic));
System.out.println("Polling");
ConsumerRecords<String,byte[]> records = dispatcher.consumer.poll(Duration.ofMillis(10));
int i = 0;
for (ConsumerRecord record : records) {
dispatcher.AcceptedNumberJobs -= 1;
dispatcher.queue_size -= 1;
System.out.println(record.getClass());
System.out.println(record.value().getClass());
message_send[i]= java.util.Arrays.copyOf((byte[])record.value(), ((byte[])record.value()).length);
使用者的创建过程如下:
Properties prop = new Properties();
prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG , BootstrapServer);
prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, ConsumerID);
prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG , "earliest");
prop.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
KafkaConsumer<String , byte[]> consumer = new KafkaConsumer <String , byte[]>(prop);
consumer.subscribe(Arrays.asList(Topic));
当我将记录复制到message_send时,收到以下错误:
Exception in thread "main" java.lang.ClassCastException: class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')
你能帮我一下吗?
更新:我更改了消费者配置:
prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
但是我得到了另一个奇怪的行为,第一个图像被正确接收,数组中的其他元素为空。你能帮助我吗?还有,在Java中有没有引用不是apache kafka的引用?
因此:
message_send[0]有一个值,其他没有。
1条答案
按热度按时间uubf1zoe1#
您已设置
ConsumerRecords<String,byte[]>
但这与
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
不匹配,它应该是ByteArrayDeserializer.class.getName())
因此,错误是String(来自您的消费者配置)无法转换为字节数组(来自您的构造函数和轮询语句)