你知道这是可能的吗?如果是的话,使用kafka connect和kafka来确保一次交付给hdfs的最佳方法是什么?我知道kafka connect试图在“\uu consumer\u offset”中为其消费者组查找偏移量,但我需要额外检查,因为不接受重复项
wgx48brx1#
hdfsconnect已经声称通过在hdfs中使用预写日志来支持一次。当connect重新启动时,它实际上会检查该日志,除非最近更改了逻辑,而不是偏移量主题
dsekswqp2#
当连接器将文件写入hdfs时,它首先将文件写入临时文件,然后将临时文件重命名为最终文件。此最终文件的命名具有该文件中存在的偏移量。因此,当connect启动时,它会在hdfs上查找并找到最新提交的偏移量,该偏移量应保证一次性交付。如果在hdfs中找不到偏移量,则它允许使用者偏移重置策略https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/datawriter.java 以及https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/topicpartitionwriter.java 至了解更多
2条答案
按热度按时间wgx48brx1#
hdfsconnect已经声称通过在hdfs中使用预写日志来支持一次。当connect重新启动时,它实际上会检查该日志,除非最近更改了逻辑,而不是偏移量主题
dsekswqp2#
当连接器将文件写入hdfs时,它首先将文件写入临时文件,然后将临时文件重命名为最终文件。此最终文件的命名具有该文件中存在的偏移量。因此,当connect启动时,它会在hdfs上查找并找到最新提交的偏移量,该偏移量应保证一次性交付。如果在hdfs中找不到偏移量,则它允许使用者偏移重置策略https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/datawriter.java 以及https://github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/java/io/confluent/connect/hdfs/topicpartitionwriter.java 至了解更多