我想做一个kstream来加入ktable。使用ktable作为查找表。下面的步骤显示了执行代码的顺序
构造ktable
密钥表
构造kstream
密钥流
加入kstream-ktable
假设kstream中有8000条记录,ktable中有14条记录,并假设kstream中的每个键在ktable中都有一条记录。因此,预期的输出将是8000条记录。
每次我第一次加入或启动应用程序时。预期的输出是8000条记录,但我有时只看到6200条记录,有时看到8000条完整的记录(两次),有时没有记录,等等。
问题1:为什么每次运行应用程序时记录都不一致?
在构造ktable(construct+rekey)之前,先构造kstream,并且数据可以从kstream端进行连接,然后连接从out ktable开始,这样在构造ktable之前,在最终连接中不会看到数据。构造完ktable之后,我们就可以看到剩余记录的连接发生了。
问题2:如何解决加入记录不一致的问题?
我尝试了使用嵌入式kafka for kstream和ktable join的测试用例。有10条记录来自kstreams,3条记录来自ktable。当我第一次运行测试用例时,没有连接,连接之后也没有看到任何数据。当第二次跑的时候,它跑得很完美。如果我清除状态存储,然后返回到零。
问题3:为什么会发生这种行为?
我试过使用ksql,连接工作得很好,我得到了8000条记录,然后我进入ksql源代码,我注意到ksql也在做同样的连接函数。
问题4:ksql如何解决这个问题?
我看到了一些例子建议的答案
使用不起作用的globalktable。我得到了同样不一致的加入。
使用自定义连接符https://github.com/confluentinc/kafka-streams-examples/blob/5.2.1-post/src/test/java/io/confluent/examples/streams/customstreamtablejoinintegrationtest.java 但没用
我使用springcloudstreams作为依赖项。
我还看到了一个开放的问题,关于这个地方的吉拉。
1条答案
按热度按时间ut6juiuv1#
下面的步骤显示了执行代码的顺序
请注意,构建拓扑只是提供数据流程序的逻辑描述,没有不同运算符的“执行顺序”。程序将被翻译,所有操作员将同时执行。因此,来自所有主题的数据将被并行读取。
这种并行处理是您观察的根本原因,即在处理开始之前没有先加载表(至少在默认情况下没有保证),因此即使表没有完全加载,也可以处理流端数据。
不同主题之间的处理顺序取决于记录时间戳:首先处理时间戳较小的记录。因此,如果要确保先处理ktable数据,则必须确保记录时间戳小于流侧记录时间戳。当您将输入数据生成到输入主题中时,或者通过使用自定义时间戳提取器,可以确保这一点。
第二,从主题获取数据是不确定的,因此,如果仅返回流侧的数据(而不是表侧数据),则无法进行时间戳比较,因此流侧数据将在表侧数据之前进行处理。为了解决这个问题,您可以增加配置参数
max.task.idle.ms
(默认为0ms
). 如果您增加这个配置(我相信ksql在默认情况下也会这样做),如果一个输入没有数据,任务将阻塞并尝试获取空输入的数据(只有在空闲时间过后,即使一方是空的,处理也会继续)。为了一个
GlobalKTable
行为不同。此表将在启动时加载,然后再开始任何处理。因此,我不知道为什么这对你不起作用。