我有一个混合匹配的scala拓扑,其中主要工作是一个papi处理器,其他部分通过dsl连接。
EventsProcessor:
INPUT: eventsTopic
OUTPUT: visitorsTopic (and others)
整个主题的数据(包括原始数据) eventsTopic
)通过一个,我们称之为 DoubleKey
它有两个字段。访客被送到 visitorsTopic
通过Flume:
.addSink(VISITOR_SINK_NAME, visitorTopicName,
DoubleKey.getSerializer(), Visitor.getSerializer(), visitorSinkPartitioner, EVENT_PROCESSOR_NAME)
在dsl中,我在这个主题上创建了一个表:
val visitorTable = builder.table(
visitorTopicName,
Consumed.`with`(DoubleKey.getKafkaSerde(),
Visitor.getKafkaSerde()),
Materialized.as(visitorStoreName))
我后来连接到 EventProcessor
:
topology.connectProcessorAndStateStores(EVENT_PROCESSOR_NAME, visitorStoreName)
所有内容都是共同分区的(通过双键)。 visitorSinkPartitioner
执行典型的模运算:
Math.abs(partitionKey.hashCode % numPartitions)
在papi处理器eventsprocessor中,我查询这个表以查看是否已经存在访问者。
但是,在我的测试中(使用embeddedkafka,但这不会有什么区别),如果我用一个分区运行它们,一切都很好(eventsprocessor在同一个分区上的两个事件上检查ktable) DoubleKey
,在第二个事件中,经过一些延迟,它可以看到存在的 Visitor
但是如果我用一个更高的数字运行它,eventprocessor将永远看不到存储中的值。
但是如果我通过api检查存储(迭代 store.all()
),记录就在那里。所以我知道它一定是去了不同的分区。
由于ktable应该处理其分区上的数据,并且所有内容都发送到同一分区(使用显式分区器调用同一代码),因此ktable应该在同一分区上获取该数据。
我的假设正确吗?会发生什么?
kafkastreams 1.0.0,scala 2.12.4。
附言:当然,这样做是可行的 put
在papi上通过papi而不是 StreamsBuilder.table()
,因为这肯定会使用运行代码的同一分区,但这是不可能的。
1条答案
按热度按时间ippsafx71#
是的,假设是正确的。
以防对任何人有帮助:
我在将分区器传递给scala嵌入的kafka库时遇到了一个问题。在其中一个测试套件中,它没有做对。现在,遵循重构的健康实践,我在这个拓扑的所有套件中都使用了这个方法。