我想为我的应用程序spark&kafka添加一些参数,以便将Dataframe写入主题kafka。
我在spark kafka文档中没有找到acks和compression.codec
.write
.format("kafka")
.option("kafka.sasl.mechanism", Config.KAFKA_SASL_MECHANISM)
.option("kafka.security.protocol", Config.KAFKA_SECURITY_PROTOCOL)
.option("kafka.sasl.jaas.config", KAFKA_JAAS_CONFIG)
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)
.option("fetchOffset.numRetries", 6)
.option("acks","all")
.option("compression.codec","lz4")
.option("kafka.request.timeout.ms", 120000)
.option("topic", topic)
.save()```
2条答案
按热度按时间jdzmm42g1#
可以使用此特定属性定义序列化程序:default.value.serde
9gm1akwq2#
对于序列化程序,创建一个case类或一到三列的dataframe
Array[Byte]
的字段key
以及value
(字符串也会起作用)。那么topic
字符串字段。如果只需要kafka值,那么只需要一列Dataframe在写入kafka之前,您需要Map当前数据以序列化所有数据。
然后,文档确实说任何其他生产者财产只是前缀
kafka.
更多信息请点击此处https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-Kafka的资料对于sasl属性,我认为您需要使用
spark.executor.options
并通过--files
不过,在提交过程中