我使用的是ApacheStormCore0.9.6(它已经过时了,由于遗留问题而无能为力) KafkaSpout
. 如果我可以记录元组偏移量,那么调试我所面临的反序列化问题将非常有帮助。
到目前为止我已经看到了 storm.kafka.KafkaUtils
有两种方法:
public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config)
public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime)
第一个是第二个的 Package 。在我看来,我唯一不知道如何调用这个函数的是 consumer
. 我读过这本书 KafkaSpout
代码并没有弄清楚如何从中获得Kafka消费者。
1条答案
按热度按时间8ljdwjyq1#
如果您 checkout storm kafka代码,那么在获取消息之后,您可以执行任何需要的日志记录https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/partitionmanager.java#l162. 您可以自己构建修改后的storm kafka版本,并将构建工具指向您自己的副本。