通过apache camel使用来自kafka的avro消息

svdrlsy4  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(461)

我有一个apachecamel路由,将avro消息发布到apachekafka主题上。我只在设置producer属性“serializerclass=kafka.serializer.stringencoder”时才能使用它。否则我会
java.lang.classcastexception:java.lang.string不能在kafka.serializer.defaultencoder.tobytes(编码器)处转换为[b]。scala:34)在kafka.producer.async.defaulteventhandler$$anonfun$serialize$1.apply(defaulteventhandler)。scala:130)在kafka.producer.async.defaulteventhandler$$anonfun$serialize$1.apply(defaulteventhandler)。scala:125)在scala.collection.indexedseqoptimized$class.foreach(indexedseqoptimized。scala:33)在scala.collection.mutable.wrappedarray.foreach(wrappedarray。scala:34)在kafka.producer.async.defaulteventhandler.serialize(defaulteventhandler。scala:125)在kafka.producer.async.defaulteventhandler.handle(defaulteventhandler。scala:52)在Kafka。制片人。制片人。发送(制片人。scala:77)在kafka.javaapi.producer.producer.send(producer。scala:33)在org.apache.camel.component.kafka.kafkaproducer.process(kafkaproducer。java:84)
另一方面,我有第二个apache camel路由,应该使用上面的主题,但是失败了
java.io.ioexception:org.apache.avro.io.binarydecover.innerlongdecode(binarydecover)中的长编码无效。java:217)在org.apache.avro.io.binarydecoder.readlong(binarydecoder。java:176)在org.apache.avro.io.resolvingdecoder.readlong(resolvingdecoder。java:162)在org.apache.avro.generic.genericdatumreader.read(genericdatumreader。java:160)在org.apache.avro.generic.genericdatumreader.readfield(genericdatumreader。java:193)在org.apache.avro.generic.genericdatumreader.readrecord(genericdatumreader。java:183)在org.apache.avro.generic.genericdatumreader.read(genericdatumreader。java:151)在org.apache.avro.generic.genericdatumreader.read(genericdatumreader。java:142)在org.apache.camel.dataformat.avro.avrodataformat.unmarshal(avrodataformat。java:133)在org.apache.camel.processor.unmarshalprocessor.process(unmarshalprocessor。java:67)
以下是我使用的apache camel消费代码:

<route id="cassandra.publisher">
            <from
                uri="{{kafka.base.uri}}&amp;topic=sensordata&amp;groupId=Cassandra_ConsumerGroup&amp;consumerId=CassandraConsumer_Instance_1&amp;clientId=adapter2" />      
            <unmarshal>
                <custom ref="avroSensorData" />
            </unmarshal>
wlwcrazw

wlwcrazw1#

为了解决这个问题,您必须为camel kafka消费者提供keydeserializer和valuedeserializer,如下所示:
&keydeserializer=org.apache.kafka.common.serialization.stringdeserializer&valuedeserializer=org.apache.kafka.common.serialization.bytearraydeserializer

sulc1iza

sulc1iza2#

http://camel.465427.n5.nabble.com/camel-kafka-component-td5749525.html#a5769561
描述ApacheCamel版本2.16.0/2.15.3将支持各种数据类型,而不仅仅是字符串消息。
正如所承诺的,ApacheCamel 2.15.3已经修复了这个问题,而camel-8790也修复了这个问题(https://issues.apache.org/jira/browse/camel-8790).

相关问题