目前,我有以下几个方面
+-------+--------------------+-----+
| key| created_at|count|
+-------+--------------------+-----+
|Bullish|[2017-08-06 08:00...| 12|
|Bearish|[2017-08-06 08:00...| 1|
+-------+--------------------+-----+
我使用以下方法将数据流传输到Kafka
df.selectExpr("CAST(key AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092").option("topic","chart3").option("checkpointLocation", "/tmp/checkpoints2")
.outputMode("complete")
.start()
这里的问题是,对于dataframe中的每一行,它都会逐一写入kafka。我的消费者会一个接一个地得到信息。
是否有任何方法可以将所有行合并到一个数组中并流式传输到kafka,这样我的消费者就可以一次性获得整个数据。
谢谢你的建议。
1条答案
按热度按时间kmynzznz1#
我的消费者会一个接一个地得到信息。
不完全是。这可能取决于Kafka的财产。您可以指定自己的属性并使用,例如:
在后台,spark使用普通缓存的kafkaproducer。它将使用您在提交查询时在选项中提供的属性。
另请参见java文档。请注意,它可能无法正确缩放