pyspark Apache Spark 3.5结构化流在批处理模式下的Kafka偏移问题

7uhlpewt  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(115)

根据Kafka集成指南,我正在编写一个使用Kafka作为源的批处理查询,并希望定期提交此批处理,比如每天一次,以处理自上次运行以来添加的记录。在运行pyspark的测试期间,我注意到每次运行批处理时,它都会读取所有记录,不只是上次运行后添加的那些。我的代码大致如下。
问题是:我必须改变什么,以便每次运行时,我只处理新的Kafka记录?

builder = (pyspark.sql.SparkSession.builder.appName("MyApp")
            .master("local[*]")
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
            .config("spark.sql.execution.arrow.pyspark.enabled", "true")
            .config("spark.hadoop.fs.s3a.access.key", s3a_access_key)
            .config("spark.hadoop.fs.s3a.secret.key", s3a_secret_key)
            .config("spark.hadoop.fs.s3a.endpoint", s3a_host_port)
            .config("spark.hadoop.fs.s3a.path.style.access", "true")
            .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
            .config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
            .config("spark.driver.extraJavaOptions", "-Dlog4j.configuration=file:/data/custom-log4j.properties")
           )

my_packages = [
                   # "io.delta:delta-spark_2.12:3.0.0", -> no need, since configure_spark_with_delta_pip below adds it
                   "org.apache.hadoop:hadoop-aws:3.3.4",
                   "org.apache.hadoop:hadoop-client-runtime:3.3.4",
                   "org.apache.hadoop:hadoop-client-api:3.3.4",
                   "io.delta:delta-contribs_2.12:3.0.0",
                   "io.delta:delta-hive_2.12:3.0.0",
                   "com.amazonaws:aws-java-sdk-bundle:1.12.603",
                   "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0",
               ]

# Create a Spark instance with the builder
# As a result, you now can read and write Delta tables
spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

kdf = (spark
        .read
        .format("kafka")
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
        .option("kafka.security.protocol", kafka_security_protocol)
        .option("kafka.sasl.mechanism", "SCRAM-SHA-256")
        .option("kafka.sasl.jaas.config", f"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{kafka_username}\" password=\"{kafka_password}\";")
        .option("includeHeaders", "true")
        .option("subscribe", "filebeat")
        .option("checkpointLocation", "s3a://checkpointlocation/")
        .load())

kdf = kdf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers", "CAST(topic AS STRING)", "CAST(partition AS STRING)", "CAST(offset AS STRING)")

out = kdf...

(out.select(["message", "partition", "offset"])
                     .show(
                        truncate=False,
                        n=MAX_JAVA_INT
                    ))

spark.stop()

字符串
这将输出一个表,我可以看到每次运行都处理相同的偏移量。

lx0bsm1f

lx0bsm1f1#

您正在阅读批量模式链接中的主题,默认设置为startingOffsets = earliest。此外,checkpointLocation在批量模式下没有效果,您必须在流式模式下读取spark.readStream...,处理后的偏移量将存储在那里。
示例应用:

source_df = (
    spark
    .readStream
    .format('kafka')
    .options(**{
        'subscribe': 'some_topic',
        'startingOffsets': 'earliest',
    })
    .load()
)
writer = (
    source_df
    .writeStream
    .format('parquet')
    .option('path', '/some_path')
    .outputMode('append')
    .option('checkpointLocation', '<some-location>')
    .trigger(availableNow=True)
)

streaming_query = writer.start()
streaming_query.awaitTermination()
spark.stop()

字符串
应用程序的第一次迭代:

  1. checkpointLocation是空的,所以Spark将从最早的偏移量读取,直到当前偏移量。
    1.达到的偏移量将存储在checkpointLocation
    1.应用程序停止。
    第二次迭代应用
  2. checkpointLocation不为空,所以Spark将从那里的偏移量开始阅读,直到当前偏移量。
    1.到达的偏移量将存储在checkpointLocation中。
    1.应用程序停止。
    请注意,.option('checkpointLocation', '<some-location>')必须在DataStreamWriter上调用,而不是在DataStreamReader上调用。

相关问题