下面是我试图写Kafka主题的代码:
@Service
public class KafkaMessageSender {
@Autowired
KafkaTemplate<String, History> kafkaTemplate;
@Value(value = "${my.kafka.topic}")
private String topicName;
@Autowired
public KafkaMessageSender(KafkaTemplate<String, History> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public History writeToTopic(final History entry) throws Exception {
try {
String key = entry.getValue();
SendResult<String,History> future = kafkaTemplate.send(topicName, key, entry).get();
recordMetadata = future.getRecordMetadata();
if (future.getRecordMetadata() != null) {
LOGGER.info("message successfully sent to kafka);
}
} catch (KafkaException e) {
}
return entry;
}
我和Kafka的关系还没有建立起来。不明白是什么问题。
下面是stacktrace:
java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [kaas.my_topic]
at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:?]
at java.util.concurrent.FutureTask.get(FutureTask.java:191) ~[?:?]
at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:119) ~[spring-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at com.optum.chy.kafka.KafkaMessageSender.writeToTopic(KafkaMessageSender.java:42) ~[classes/:?]
暂无答案!
目前还没有任何答案,快来回答吧!