我使用. joinWindows函数连接了两个kakfa流,之后生成了两个变更日志主题
- {使用者组}--KSTREAM-JOINOTHER-000000005-存储更改日志
- {使用者组}--KSTREAM-JOINTHIS-000000004-存储更改日志
1.这两个专题的目的是什么?
2.它们存储的是什么数据,是键值对吗?
3.是否有办法查询这些内部主题并获得这些内部主题中出现的事件数量?
可以将内部存储传递给处理器,然后尝试使用windowstore访问它,但它没有一个函数来获取处理的事件数。代码:
Keyvalue stateStore = processorContext.getStateStore("KSTREAM-JOINTHIS-0000000005-store");
stateStore.approximateNumEntries();
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$WindowStoreReadWriteDecorator cannot be cast to class org.apache.kafka.streams.state.KeyValueStore
我希望获得跨各个分区进入流的事件总数以及发生的连接数
1条答案
按热度按时间qij5mzcb1#
1.这两个专题的目的是什么?
这些是支持连接中所涉及的状态存储的变更日志主题,连接的每一"方"将在另一方的状态存储中查找键的匹配,即执行连接。
2.它们存储的是什么数据?是键值对吗?
是的,changelog主题是Kafka主题,它们将键-值对存储在状态存储中以保持持久性。
3.是否有办法查询这些内部主题并获得这些内部主题中出现的事件数量?
您可以将消费者指向这些主题以检查记录。但是这样做的用例是什么呢?此外,您还希望确保永远不会生成这些主题的记录。
连接的状态存储不能通过交互式查询来访问,如果你想了解连接的数量,你可以在连接后添加一个操作符,记录运行计数器和记录结果或类似的东西。