我用的是Kafka0.10.2 KafkaProducer
在主题中生成数据。下面是我的代码。
ProducerRecord record = new ProducerRecord(topic, (Integer)null, Long.valueOf(System.currentTimeMillis()), partitionKey.getBytes(), message.getBytes());
Future<RecordMetadata> future = this.producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null) {
LOGGER.error("Error producing to topic " + partitionKey, e.getCause());
} else {
LOGGER.info(" Successfully produced topic " + recordMetadata.topic() + " on partition " + recordMetadata.partition() + "at " + recordMetadata.timestamp());
}
}
});
此消息正在通过不同群集中的mirrormaker进行复制。我编写了一个mirrormaker处理程序来发送每个消息的延迟捕获。当我在mirrormaker上查看时间戳时。我看不到时间戳。下面是mirrormaker处理程序的代码。
@Override
public List<ProducerRecord<byte[], byte[]>> handle(BaseConsumerRecord record) {
LOGGER.debug("Timestamp from dal producer "+record.timestamp());
return Collections.singletonList(new ProducerRecord<byte[], byte[]>(topicToSend,partitionToSend,timeStampAtMM, record.key(), record.value()));
}
暂无答案!
目前还没有任何答案,快来回答吧!