我是 Camel 和Kafka的新手。我正在使用camel 2.18.2和kafka 0.10.1.1
我得到这个错误,不明白为什么:
org.apache.kafka.common.errors.serializationexception:无法将类[b]的值转换为value.serializer中指定的类org.apache.kafka.common.serialization.stringserializer
我有以下路线:
from("direct://toEnrichEmail")
.routeId(routeId).marshal().json(JsonLibrary.Jackson, Map.class)
.log(LoggingLevel.INFO, "Sending to Kafka: ${body}")
.to("kafka:localhost:9092?topic=enrich-email&requestRequiredAcks=-1");
实际发送到路由的代码:
ProducerTemplate template = kafkaProducerFactory.getProducerTemplate();
logger.debug("Sending message type: {}, to uri: {}, route: {}", wimsConfiguration.getMessageType(),
wimsConfiguration.getDirectUri(), wimsConfiguration.getRouteName());
Map<String,Object>headers = new HashMap<>(); // added because the examples do
headers.put(KafkaConstants.PARTITION_KEY, 0);
headers.put(KafkaConstants.KEY, "1");
template.sendBodyAndHeaders(wimsConfiguration.getDirectUri(), wimsConfiguration.getWimsMessage(), headers);
路由中的日志消息显示消息是一个正确的json字符串,默认的序列化程序是string,那么为什么它抱怨它不能序列化呢?
我查看了camel-kafka组件测试用例,这看起来应该是可行的
下面是camel/kafka日志的输出示例:
16:32:20967 info[toenrichemail](默认任务-12)发送到kafka:{“messagetype”:“ordercreate”,“regioncode”:“in”,“regionlanguage”:[“en”],“orderheader”:{“order”:“3001357952”,“salesorg”:“2123”,“soldto”:“2035266752”,“currency”:“inr”,“documenttype”:“ta”,“validfrom”:null,“validto”:null,“contactname”:“ui-in”,“contactphone”:“,“contactemail”:“fu。bar@baz.com“},”shipto“{”customernumber“:”2035266752”,“companyname“:”eou的互联网价格,“cityname“:”班加罗尔“,”地区“,”邮政编码“,”街道名称“,”房屋号码“,”建筑“,”楼层“,”房间号码“,”countryname“:”印度“,”地区名称“:”卡纳塔克邦“},”billto“{”customernumber“:”2035266752”,“companyname”:“eou的互联网价格”,“cityname”:“indi”,“district”:“,”postalcode”:“,”streetname”:“,”housenumber“,”building“,”floor“,”roomnumber“,”countryname“:”india“,”regionname“:”karnataka“},“paymentinfo”:{“paymentmethod”:“po”,“purchaseorder”:“1234567”,“requisitionnumber”:“,”creditcardnumber“,”paymentterms“,”incoterms“},“ordersummary”:{“ordersubtotal”:“6479.52”,“折扣”:“0”,“shippingtransportation”:“0”,“salestax”:“1187.92”,“dutytotal”:“1712.56”,“ordertotal”:“9380”},“orderitems”:[{“quotenumber”:null,“quoteitemnumber”:“000010”,“totalreservedqunatity”:null,“remainingreservedquantity”:null,“lineitemno”:“000010”,“itemcategory”:“tan”,“Quantity”:“1”,“material”:“t6066-1kg”,“product”:“t6066”,“brandid”:“sigma”,“description”:“trizma(r)base,bioperformance certif&”,“yourref”:“,”yourprice”:“8192.08”,“listprice”:“8192.08”}]}16:32:21,008错误[org.apache.camel.processor.defaulterrorhandler](默认任务-12)传递失败(exchangeid:id-stldepx06-sial-com-49604-1486506685312-1-2上的messageid:id-stldepx06-sial-com-49604-1486506685312-1-2)。传递尝试后耗尽:1捕获到:org.apache.kafka.common.errors.serializationexception:无法将类[b]的值转换为value.serializer中指定的类org.apache.kafka.common.serialization.stringserializer
块引用
消息历史记录
routeid processorid处理器已用时间(毫秒)[toenrichemail][toenrichemail][direct://toenrichemail ][57]toenrichemail][marshall 1][marshall[org.apache.camel.model.dataformat。jsondataformat@142a2fec]][10][toenrichemail][log4][log][15][toenrichemail][to3][kafka:localhost:9092?topic=enrich email&requestrequiredacks=-1][25]
stacktrace
3条答案
按热度按时间djmepvbi1#
添加
serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer
到生产者配置或者
添加
.convertBodyTo(String.class)
如您所说将对象封送到json之后我更喜欢前者,因为它少了一次手术。
pu3pd22g2#
解决办法是
.convertBodyTo(String.class)
将类封送到json之后。这应该没必要,但这样做让Kafka很高兴。r7xajy2e3#
--------:org.apache.kafka.common.errors.serializationexception:无法将类[b]的值转换为value.serializer中指定的类org.apache.kafka.common.serialization.stringserializer
16:32:21011 info[stdout](默认任务-12)2017-02-07t16:32:21010 info com.sial.notifications.common.rest.notificationexceptionresponsemapper-mapping exception org.apache.camel.cameleexecutionexception:在exchange上执行时发生异常:exchange[id-stldepx06-sial-com-49604-1486506685312-1-2]16:32:21,021 info[stdout](默认任务-12)2017-02-07t16:32:21014 error com.sial.notifications.common.rest.notificationexceptionresponsemapper-mapped exception org.apache.camel.cameleexecutionexception:在exchange上执行时发生异常:exchange[id-stldepx06-sial-com-49604-1486506685312-1-2]16:32:21,021 info[stdout](默认任务-12)org.apache.camel.cameleExecutionException:在exchange上执行时发生异常:exchange[id-stldepx06-sial-com-49604-1486506685312-1-2]16:32:21,021 info[stdout](默认任务-12)位于org.apache.camel.util.objecthelper.wrapcamelexecutionexception(objecthelper)。java:1779)~[ Camel 核-2.18.2。jar:2.18.2]16:32:21021信息[stdout](默认任务-12)位于org.apache.camel.util.exchangehelper.extractresultbody(exchangehelper。java:677)~[ Camel 核-2.18.2。jar:2.18.2] 16:32:21,021 info[stdout](默认任务-12)位于org.apache.camel.impl.defaultproducertemplate.extractresultbody(defaultproducertemplate)。java:515)~[ Camel 核-2.18.2。jar:2.18.2] 16:32:21,021 info[stdout](默认任务-12)位于org.apache.camel.impl.defaultproducertemplate.extractresultbody(defaultproducertemplate)。java:511)~[ Camel 核-2.18.2。jar:2.18.2] 16:32:21,021 info[stdout](default task-12)位于org.apache.camel.impl.defaultproducertemplate.sendboyandheaders(defaultproducertemplate)。java:259)~[ Camel 核-2.18.2。jar:2.18.2] 16:32:21,021 info[stdout](default task-12)位于org.apache.camel.impl.defaultproducertemplate.sendboyandheaders(defaultproducertemplate)。java:253)~[ Camel 核-2.18.2。jar:2.18.2] 16:32:21,021 info[stdout](默认任务-12)位于com.sial.notifications.messages.rest.notificationmessageserviceimpl.submitmessagetoroute(notificationmessageserviceimpl。java:66)~[类:?]