kafka流:混合匹配papi和dsl ktable而不是共分区

ncgqoxb0  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(414)

我有一个混合匹配的scala拓扑,其中主要工作是一个papi处理器,其他部分通过dsl连接。

  1. EventsProcessor:
  2. INPUT: eventsTopic
  3. OUTPUT: visitorsTopic (and others)

整个主题的数据(包括原始数据) eventsTopic )通过一个,我们称之为 DoubleKey 它有两个字段。访客被送到 visitorsTopic 通过Flume:

  1. .addSink(VISITOR_SINK_NAME, visitorTopicName,
  2. DoubleKey.getSerializer(), Visitor.getSerializer(), visitorSinkPartitioner, EVENT_PROCESSOR_NAME)

在dsl中,我在这个主题上创建了一个表:

  1. val visitorTable = builder.table(
  2. visitorTopicName,
  3. Consumed.`with`(DoubleKey.getKafkaSerde(),
  4. Visitor.getKafkaSerde()),
  5. Materialized.as(visitorStoreName))

我后来连接到 EventProcessor :

  1. topology.connectProcessorAndStateStores(EVENT_PROCESSOR_NAME, visitorStoreName)

所有内容都是共同分区的(通过双键)。 visitorSinkPartitioner 执行典型的模运算:

  1. Math.abs(partitionKey.hashCode % numPartitions)

在papi处理器eventsprocessor中,我查询这个表以查看是否已经存在访问者。
但是,在我的测试中(使用embeddedkafka,但这不会有什么区别),如果我用一个分区运行它们,一切都很好(eventsprocessor在同一个分区上的两个事件上检查ktable) DoubleKey ,在第二个事件中,经过一些延迟,它可以看到存在的 Visitor 但是如果我用一个更高的数字运行它,eventprocessor将永远看不到存储中的值。
但是如果我通过api检查存储(迭代 store.all() ),记录就在那里。所以我知道它一定是去了不同的分区。
由于ktable应该处理其分区上的数据,并且所有内容都发送到同一分区(使用显式分区器调用同一代码),因此ktable应该在同一分区上获取该数据。
我的假设正确吗?会发生什么?
kafkastreams 1.0.0,scala 2.12.4。
附言:当然,这样做是可行的 put 在papi上通过papi而不是 StreamsBuilder.table() ,因为这肯定会使用运行代码的同一分区,但这是不可能的。

ippsafx7

ippsafx71#

是的,假设是正确的。
以防对任何人有帮助:
我在将分区器传递给scala嵌入的kafka库时遇到了一个问题。在其中一个测试套件中,它没有做对。现在,遵循重构的健康实践,我在这个拓扑的所有套件中都使用了这个方法。

  1. def getEmbeddedKafkaTestConfig(zkPort: Int, kafkaPort: Int) :
  2. EmbeddedKafkaConfig = {
  3. val producerProperties = Map(ProducerConfig.PARTITIONER_CLASS_CONFIG ->
  4. classOf[DoubleKeyPartitioner].getCanonicalName)
  5. EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort,
  6. customProducerProperties = producerProperties)
  7. }

相关问题