我正在用pyspark编写一个spark结构化流应用程序来读取kafka的数据。
但是,spark的当前版本是2.1.0,它不允许我将group id设置为参数,并且将为每个查询生成唯一的id。但是kafka连接是基于组的授权,需要预先设置组id。
因此,是否有任何解决方法来建立连接,而不需要将spark更新到2.2,因为我的团队不需要它。
我的代码:
if __name__ == "__main__":
spark = SparkSession.builder.appName("DNS").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# Subscribe to 1 topic
lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:9092").option("subscribe", "record").option('kafka.security.protocol',"SASL_PLAINTEXT").load()
print(lines.isStreaming) #print TRUE
lines.selectExpr("CAST(value AS STRING)")
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
# Start running the query that prints the running counts to the console
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
2条答案
按热度按时间rsaldnfx1#
KafkaUtils
类将重写的参数值"group.id"
. 它会凝结的"spark-executor-"
在原始组id的中。下面是kafkautils的代码,他正在执行此操作:
我们面临同样的问题。kafka是基于acl的,具有预设的组id,所以惟一的事情就是在kafka配置中更改组id。除了我们原来的组id
"spark-executor-" + originalGroupId
kx1ctssn2#
现在可以使用spark 3.x设置group.id。请参阅《结构化流媒体+Kafka集成指南》,其中写道:
kafka.group.id:从kafka读取时在kafka使用者中使用的kafka组id。小心使用。默认情况下,每个查询为读取数据生成一个唯一的组id。这样可以确保每个kafka源都有自己的使用者组,该使用者组不会受到任何其他使用者的干扰,因此可以读取其订阅主题的所有分区。在某些情况下(例如,基于kafka组的授权),您可能希望使用特定的授权组id来读取数据。您可以选择设置组id。但是,请非常小心,因为这可能会导致意外行为。同时运行的查询(批处理和流式处理)或具有相同组id的源可能相互干扰,导致每个查询只读取部分数据。当连续快速启动/重新启动查询时,也可能发生这种情况。要最小化此类问题,请将kafka使用者会话超时设置为非常小(通过设置选项“kafka.session.timeout.ms”)。设置此选项后,将忽略选项“groupidprefix”。
但是,这个group.id仍然不用于将偏移提交回kafka,并且偏移管理仍保留在spark的检查点文件中。在我的回答中,我给出了更多的细节(同样适用于spark<3.x):
如何在spark结构化流媒体中手动设置group.id并提交kafka偏移量?
如何在spark 3.0中使用kafka.group.id