我用这个 Spring 启动Kafka来设置我的项目。但当我运行它时,它会 org.apache.kafka.common.errors.SerializationException: Can't convert key of class java.lang.Integer to class org.apache.kafka.common.serialization.StringSerializer specified in key.serializer Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
使用kafka模板发送请求时出现此异常。
@EmbeddedKafka(partitions = 1)
@SpringBootTest
class KafkaApplicationTests {
@Autowired
private MyKafkaListener listener;
@Autowired
private KafkaTemplate<Integer, String> template;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
public void testSimple() throws Exception {
template.send("annotated1", 0, "foo");
template.flush();
assertTrue(this.listener.latch1.await(10, TimeUnit.SECONDS));
}
@Configuration
@EnableKafka
public class Config {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return props;
}
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
}
}
但当我改变了这一点 template.send("annotated1", "key-foo", "foo");
会有用的。我用过
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return props;
}
这种配置也是如此。但它仍然给我的原因是:java.lang.classcastexception:java.lang.integer不能转换为java.lang.string
如果有人在那里请帮助我。谢谢
1条答案
按热度按时间7gcisfzg1#
您需要将主应用程序类和配置类添加到
@SpringBootTest
```@SpringBootTest(classes = { So61985794Application.class, So61985794ApplicationTests.Config.class })