spark流:在spark结构化流中不允许使用kafka组id

5us2dqdw  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(375)

我正在用pyspark编写一个spark结构化流应用程序来读取kafka的数据。
但是,spark的当前版本是2.1.0,它不允许我将group id设置为参数,并且将为每个查询生成唯一的id。但是kafka连接是基于组的授权,需要预先设置组id。
因此,是否有任何解决方法来建立连接,而不需要将spark更新到2.2,因为我的团队不需要它。
我的代码:

if __name__ == "__main__":
    spark = SparkSession.builder.appName("DNS").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")

    # Subscribe to 1 topic
    lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:9092").option("subscribe", "record").option('kafka.security.protocol',"SASL_PLAINTEXT").load()
    print(lines.isStreaming) #print TRUE
    lines.selectExpr("CAST(value AS STRING)")
    # Split the lines into words
    words = lines.select(
    explode(
        split(lines.value, " ")
        ).alias("word")
    )
    # Generate running word count
    wordCounts = words.groupBy("word").count()

    # Start running the query that prints the running counts to the console
    query = wordCounts \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()

    query.awaitTermination()
rsaldnfx

rsaldnfx1#

KafkaUtils 类将重写的参数值 "group.id" . 它会凝结的 "spark-executor-" 在原始组id的中。
下面是kafkautils的代码,他正在执行此操作:

// driver and executor should be in different consumer groups
    val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
    if (null == originalGroupId) {
      logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")
    }
    val groupId = "spark-executor-" + originalGroupId
    logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)

我们面临同样的问题。kafka是基于acl的,具有预设的组id,所以惟一的事情就是在kafka配置中更改组id。除了我们原来的组id "spark-executor-" + originalGroupId

kx1ctssn

kx1ctssn2#

现在可以使用spark 3.x设置group.id。请参阅《结构化流媒体+Kafka集成指南》,其中写道:
kafka.group.id:从kafka读取时在kafka使用者中使用的kafka组id。小心使用。默认情况下,每个查询为读取数据生成一个唯一的组id。这样可以确保每个kafka源都有自己的使用者组,该使用者组不会受到任何其他使用者的干扰,因此可以读取其订阅主题的所有分区。在某些情况下(例如,基于kafka组的授权),您可能希望使用特定的授权组id来读取数据。您可以选择设置组id。但是,请非常小心,因为这可能会导致意外行为。同时运行的查询(批处理和流式处理)或具有相同组id的源可能相互干扰,导致每个查询只读取部分数据。当连续快速启动/重新启动查询时,也可能发生这种情况。要最小化此类问题,请将kafka使用者会话超时设置为非常小(通过设置选项“kafka.session.timeout.ms”)。设置此选项后,将忽略选项“groupidprefix”。
但是,这个group.id仍然不用于将偏移提交回kafka,并且偏移管理仍保留在spark的检查点文件中。在我的回答中,我给出了更多的细节(同样适用于spark<3.x):
如何在spark结构化流媒体中手动设置group.id并提交kafka偏移量?
如何在spark 3.0中使用kafka.group.id

相关问题