如何将星星之火整合到Kafka的阵列中

mgdq6dx1  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(332)

目前,我有以下几个方面

+-------+--------------------+-----+
|    key|          created_at|count|
+-------+--------------------+-----+
|Bullish|[2017-08-06 08:00...|   12|
|Bearish|[2017-08-06 08:00...|    1|
+-------+--------------------+-----+

我使用以下方法将数据流传输到Kafka

df.selectExpr("CAST(key AS STRING) AS key", "to_json(struct(*)) AS value")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092").option("topic","chart3").option("checkpointLocation", "/tmp/checkpoints2")
  .outputMode("complete")
  .start()

这里的问题是,对于dataframe中的每一行,它都会逐一写入kafka。我的消费者会一个接一个地得到信息。
是否有任何方法可以将所有行合并到一个数组中并流式传输到kafka,这样我的消费者就可以一次性获得整个数据。
谢谢你的建议。

kmynzznz

kmynzznz1#

我的消费者会一个接一个地得到信息。
不完全是。这可能取决于Kafka的财产。您可以指定自己的属性并使用,例如:

props.put("batch.size", 16384);

在后台,spark使用普通缓存的kafkaproducer。它将使用您在提交查询时在选项中提供的属性。
另请参见java文档。请注意,它可能无法正确缩放

相关问题