我正在尝试创建一个kafka消费者,它正在侦听一个特定的主题,并将消费的消息作为json进行处理。我在这里尝试了遵循spring文档中给出的方法,但无法将消息作为json获取。
这是我的接收器配置代码:
@Configuration
@EnableKafka
public class ReceiverConfig {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Bean
public Map consumerConfigs() {
Map props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections
// to the Kakfa cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// consumer groups allow a pool of processes to divide the work of
// consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "Waitlist");
return props;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Receiver receiver() {
return new Receiver();
}
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
}
消费者:
public class Receiver {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(topics = "Reservation",
containerFactory = "kafkaJsonListenerContainerFactory")
public void receiveMessage(Message<?> message) {
LOGGER.info("received message='{}'", message);
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
尝试在远程服务器上发布主题时,出现以下错误:
2017-02-09 13:42:49.122 [1;31mERROR[0;39m [36mo.s.k.listener.LoggingErrorHandler[0;39m Error while processing: ConsumerRecord(topic = Reservation, partition = 0, offset = 3394, CreateTime = 1486626082480, checksum = 1777660938, serialized key size = -1, serialized value size = 2, key = null, value = hi)
org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hi': was expecting ('true', 'false' or 'null')
at [Source: hi; line: 1, column: 5]
at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:81)
at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:82)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:157)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:68)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:764)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:708)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2500(KafkaMessageListenerContainer.java:230)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:975)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hi': was expecting ('true', 'false' or 'null')
at [Source: hi; line: 1, column: 5]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2835)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1903)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:749)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2880)
at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:78)
... 11 common frames omitted
但是,如果我从侦听器中删除containerfactory,我可以接收消息,但它们不是json格式,而是字符串:
2017-02-09 15:04:58.408 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='{'
2017-02-09 15:04:58.408 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "_eventType":"Reservation",'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "_timestamp":"2017-01-23T09:19:35Z",'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "_operation":"create",'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "type":"excursion",'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "reservationId":"46d353ac_9575_492a_9291_98d15bf4cc82",'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "eventReservationLinkId":"9b0bafb4_406e_43ae_94f2_36a913ce23d2",'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "master":true,'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "partySize":2,'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "startTime":"2017-01-27T08:30:00Z",'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "endTime":"2017-01-27T10:00:00Z",'
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "timeslotId":"c2304a34_b9ba_4f3c_8e45_3e3c7677d6c2",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "variantSku":"ocean_polar_1606_FLL-640B",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "guestId":"378741",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "createdBy":"149673",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "purchaser":"143679",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "eventId":"ocean_polar_1606_FLL-640",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "scheduledEventId":"02c95434_3a99_452e_a2a8_51712683926c",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "resourceId":"",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "surpriseFlag":false,'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "venueId":"FLL001",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "status":"CONFIRMED",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "primaryId":"378741",'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "partyId":"9b0bafb4_406e_43ae_94f2_36a913ce23d2"'
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='}'
1条答案
按热度按时间9avjhtql1#
您的消息是json文档的单个片段
为了从json转换,需要将其封装在单个消息中。