添加配置参数-spark&kafka:acks和compression

kdfy810k  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(695)

我想为我的应用程序spark&kafka添加一些参数,以便将Dataframe写入主题kafka。
我在spark kafka文档中没有找到acks和compression.codec

.write
   .format("kafka")
   .option("kafka.sasl.mechanism", Config.KAFKA_SASL_MECHANISM)
   .option("kafka.security.protocol", Config.KAFKA_SECURITY_PROTOCOL)
   .option("kafka.sasl.jaas.config", KAFKA_JAAS_CONFIG)
   .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)
   .option("fetchOffset.numRetries", 6)
   .option("acks","all")
   .option("compression.codec","lz4")
   .option("kafka.request.timeout.ms", 120000)
   .option("topic", topic)
   .save()```
jdzmm42g

jdzmm42g1#

可以使用此特定属性定义序列化程序:default.value.serde

9gm1akwq

9gm1akwq2#

对于序列化程序,创建一个case类或一到三列的dataframe Array[Byte] 的字段 key 以及 value (字符串也会起作用)。那么 topic 字符串字段。如果只需要kafka值,那么只需要一列Dataframe
在写入kafka之前,您需要Map当前数据以序列化所有数据。
然后,文档确实说任何其他生产者财产只是前缀 kafka. 更多信息请点击此处https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-Kafka的资料
对于sasl属性,我认为您需要使用 spark.executor.options 并通过 --files 不过,在提交过程中

相关问题