pyspark Apache Spark 3.5结构化流在批处理模式下的Kafka偏移问题

7uhlpewt  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(140)

根据Kafka集成指南,我正在编写一个使用Kafka作为源的批处理查询,并希望定期提交此批处理,比如每天一次,以处理自上次运行以来添加的记录。在运行pyspark的测试期间,我注意到每次运行批处理时,它都会读取所有记录,不只是上次运行后添加的那些。我的代码大致如下。
问题是:我必须改变什么,以便每次运行时,我只处理新的Kafka记录?

  1. builder = (pyspark.sql.SparkSession.builder.appName("MyApp")
  2. .master("local[*]")
  3. .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  4. .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
  5. .config("spark.sql.execution.arrow.pyspark.enabled", "true")
  6. .config("spark.hadoop.fs.s3a.access.key", s3a_access_key)
  7. .config("spark.hadoop.fs.s3a.secret.key", s3a_secret_key)
  8. .config("spark.hadoop.fs.s3a.endpoint", s3a_host_port)
  9. .config("spark.hadoop.fs.s3a.path.style.access", "true")
  10. .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
  11. .config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
  12. .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
  13. .config("spark.driver.extraJavaOptions", "-Dlog4j.configuration=file:/data/custom-log4j.properties")
  14. )
  15. my_packages = [
  16. # "io.delta:delta-spark_2.12:3.0.0", -> no need, since configure_spark_with_delta_pip below adds it
  17. "org.apache.hadoop:hadoop-aws:3.3.4",
  18. "org.apache.hadoop:hadoop-client-runtime:3.3.4",
  19. "org.apache.hadoop:hadoop-client-api:3.3.4",
  20. "io.delta:delta-contribs_2.12:3.0.0",
  21. "io.delta:delta-hive_2.12:3.0.0",
  22. "com.amazonaws:aws-java-sdk-bundle:1.12.603",
  23. "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0",
  24. ]
  25. # Create a Spark instance with the builder
  26. # As a result, you now can read and write Delta tables
  27. spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()
  28. kdf = (spark
  29. .read
  30. .format("kafka")
  31. .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
  32. .option("kafka.security.protocol", kafka_security_protocol)
  33. .option("kafka.sasl.mechanism", "SCRAM-SHA-256")
  34. .option("kafka.sasl.jaas.config", f"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{kafka_username}\" password=\"{kafka_password}\";")
  35. .option("includeHeaders", "true")
  36. .option("subscribe", "filebeat")
  37. .option("checkpointLocation", "s3a://checkpointlocation/")
  38. .load())
  39. kdf = kdf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers", "CAST(topic AS STRING)", "CAST(partition AS STRING)", "CAST(offset AS STRING)")
  40. out = kdf...
  41. (out.select(["message", "partition", "offset"])
  42. .show(
  43. truncate=False,
  44. n=MAX_JAVA_INT
  45. ))
  46. spark.stop()

字符串
这将输出一个表,我可以看到每次运行都处理相同的偏移量。

lx0bsm1f

lx0bsm1f1#

您正在阅读批量模式链接中的主题,默认设置为startingOffsets = earliest。此外,checkpointLocation在批量模式下没有效果,您必须在流式模式下读取spark.readStream...,处理后的偏移量将存储在那里。
示例应用:

  1. source_df = (
  2. spark
  3. .readStream
  4. .format('kafka')
  5. .options(**{
  6. 'subscribe': 'some_topic',
  7. 'startingOffsets': 'earliest',
  8. })
  9. .load()
  10. )
  11. writer = (
  12. source_df
  13. .writeStream
  14. .format('parquet')
  15. .option('path', '/some_path')
  16. .outputMode('append')
  17. .option('checkpointLocation', '<some-location>')
  18. .trigger(availableNow=True)
  19. )
  20. streaming_query = writer.start()
  21. streaming_query.awaitTermination()
  22. spark.stop()

字符串
应用程序的第一次迭代:

  1. checkpointLocation是空的,所以Spark将从最早的偏移量读取,直到当前偏移量。
    1.达到的偏移量将存储在checkpointLocation
    1.应用程序停止。
    第二次迭代应用
  2. checkpointLocation不为空,所以Spark将从那里的偏移量开始阅读,直到当前偏移量。
    1.到达的偏移量将存储在checkpointLocation中。
    1.应用程序停止。
    请注意,.option('checkpointLocation', '<some-location>')必须在DataStreamWriter上调用,而不是在DataStreamReader上调用。
展开查看全部

相关问题