根据Kafka集成指南,我正在编写一个使用Kafka作为源的批处理查询,并希望定期提交此批处理,比如每天一次,以处理自上次运行以来添加的记录。在运行pyspark
的测试期间,我注意到每次运行批处理时,它都会读取所有记录,不只是上次运行后添加的那些。我的代码大致如下。
问题是:我必须改变什么,以便每次运行时,我只处理新的Kafka记录?
builder = (pyspark.sql.SparkSession.builder.appName("MyApp")
.master("local[*]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.execution.arrow.pyspark.enabled", "true")
.config("spark.hadoop.fs.s3a.access.key", s3a_access_key)
.config("spark.hadoop.fs.s3a.secret.key", s3a_secret_key)
.config("spark.hadoop.fs.s3a.endpoint", s3a_host_port)
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.driver.extraJavaOptions", "-Dlog4j.configuration=file:/data/custom-log4j.properties")
)
my_packages = [
# "io.delta:delta-spark_2.12:3.0.0", -> no need, since configure_spark_with_delta_pip below adds it
"org.apache.hadoop:hadoop-aws:3.3.4",
"org.apache.hadoop:hadoop-client-runtime:3.3.4",
"org.apache.hadoop:hadoop-client-api:3.3.4",
"io.delta:delta-contribs_2.12:3.0.0",
"io.delta:delta-hive_2.12:3.0.0",
"com.amazonaws:aws-java-sdk-bundle:1.12.603",
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0",
]
# Create a Spark instance with the builder
# As a result, you now can read and write Delta tables
spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()
kdf = (spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers)
.option("kafka.security.protocol", kafka_security_protocol)
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("kafka.sasl.jaas.config", f"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{kafka_username}\" password=\"{kafka_password}\";")
.option("includeHeaders", "true")
.option("subscribe", "filebeat")
.option("checkpointLocation", "s3a://checkpointlocation/")
.load())
kdf = kdf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers", "CAST(topic AS STRING)", "CAST(partition AS STRING)", "CAST(offset AS STRING)")
out = kdf...
(out.select(["message", "partition", "offset"])
.show(
truncate=False,
n=MAX_JAVA_INT
))
spark.stop()
字符串
这将输出一个表,我可以看到每次运行都处理相同的偏移量。
1条答案
按热度按时间lx0bsm1f1#
您正在阅读批量模式链接中的主题,默认设置为
startingOffsets = earliest
。此外,checkpointLocation
在批量模式下没有效果,您必须在流式模式下读取spark.readStream...
,处理后的偏移量将存储在那里。示例应用:
字符串
应用程序的第一次迭代:
checkpointLocation
是空的,所以Spark将从最早的偏移量读取,直到当前偏移量。1.达到的偏移量将存储在
checkpointLocation
中1.应用程序停止。
第二次迭代应用
checkpointLocation
不为空,所以Spark将从那里的偏移量开始阅读,直到当前偏移量。1.到达的偏移量将存储在
checkpointLocation
中。1.应用程序停止。
请注意,
.option('checkpointLocation', '<some-location>')
必须在DataStreamWriter上调用,而不是在DataStreamReader上调用。