使用kstream连接一个ktable,但输出主题中没有任何内容

wn9m85ua  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(464)

我用ktable leftjoin了一个kstream,但没有看到输出主题的任何输出:

val stringSerde: Serde[String] = Serdes.String()
  val longSerde: Serde[java.lang.Long] = Serdes.Long()
  val genericRecordSerde: Serde[GenericRecord] = new GenericAvroSerde()

  val builder = new KStreamBuilder()

  val networkImprStream: KStream[Long, GenericRecord] = builder
    .stream(dfpGcsNetworkImprEnhanced)

  // Create a global table for advertisers. The data from this global table
  // will be fully replicated on each instance of this application.
  val advertiserTable: GlobalKTable[java.lang.Long, GenericRecord]= builder.globalTable(advertiserTopicName, "advertiser-store")

  // Join the network impr stream to the advertiser global table. As this is global table
  // we can use a non-key based join with out needing to repartition the input stream
  val networkImprWithAdvertiserNameKStream: KStream[java.lang.Long, GenericRecord] = networkImprStream.leftJoin(advertiserTable,
    (_, networkImpr) => {
      println(networkImpr)
      networkImpr.get("advertiserId").asInstanceOf[java.lang.Long]
    },
    (networkImpr: GenericRecord, adertiserIdToName: GenericRecord) => {
      println(networkImpr)
      networkImpr.put("advertiserName", adertiserIdToName.get("name"))
      networkImpr
    }
  )

  networkImprWithAdvertiserNameKStream.to(networkImprProcessed)

  val streams = new KafkaStreams(builder, streamsConfiguration)
  streams.cleanUp()
  streams.start()
  // usually the stream application would be running forever,
  // in this example we just let it run for some time and stop since the input data is finite.
  Thread.sleep(15000L)

如果我绕过连接并直接将输入主题输出到输出,我会看到消息到达。我已经将连接更改为左连接,添加了一些println以查看何时提取密钥(不过控制台上没有打印任何内容)。而且我每次都使用Kafka流重置工具,所以从头开始。我的主意快用完了。此外,我还添加了一些对存储的测试访问,它可以工作并包含来自流的键(尽管这不应该因为左连接而禁止任何输出)。

vohkndzv

vohkndzv1#

在我的源流中,密钥为空。虽然我没有使用这个键加入表,但这个键不能为null。所以用一个伪键创建一个中间流是可行的。因此,即使我在这里有一个全局ktable,流消息的密钥限制也适用于这里:http://docs.confluent.io/current/streams/developer-guide.html#kstream-K表联接
具有null键或null值的流的输入记录将被忽略,并且不会触发联接。

相关问题