我正在使用publishkafka\u0\u10处理器将流文件从nifi发布到kafka。当通过代码从kafka读取数据时,数据的序列不被维护(根据时间戳排序)。我的数据集是:时间戳,通道,值。
为了调试,我使用putsql将相同的流文件发布到phoenix,我可以看到在phoenix表中,数据是连续的(按时间排序)。如果有人能解释我为什么不能按顺序读取Kafka的数据,那就太好了。Kafka的主题只有一个划分。提前谢谢。
我正在使用publishkafka\u0\u10处理器将流文件从nifi发布到kafka。当通过代码从kafka读取数据时,数据的序列不被维护(根据时间戳排序)。我的数据集是:时间戳,通道,值。
为了调试,我使用putsql将相同的流文件发布到phoenix,我可以看到在phoenix表中,数据是连续的(按时间排序)。如果有人能解释我为什么不能按顺序读取Kafka的数据,那就太好了。Kafka的主题只有一个划分。提前谢谢。
1条答案
按热度按时间rjjhvcjd1#
Kafka只保证分区内的秩序。既然你说这是一个分区,那好吧。
我的数据集是:时间戳,通道,值。
消息时间戳只是记录元数据(nifi不会将您自己的时间戳传递到kafka producerrecord类中)。而且,时间戳对排序没有影响。换句话说,如果一个“延迟时间戳”的消息在另一个“较早”的时间之前提交,那么是的,它的时间顺序是错误的,但是Kafka只看到偏移量已经移动了。
为什么我不能按顺序读取Kafka的数据
你是的,但是按照传递给Kafka的顺序。
您的消费者代码应该提取记录时间戳,并相应地对其重新排序。例如,kafka connect有一个记录时间戳提取器,它可以基于这个时间将数据写入分区目录。我假设您的putsql处理器正在读取按顺序排列的流文件(它们有自己的时间戳,而不是数据中的时间戳,除非您运行modifyattribute处理器),而不是使用consumekafka处理器?