基于spark 3.0的介绍,https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html. 应该可以设置“kafka.group.id”来跟踪偏移量。对于我们的用例,我希望避免流式spark作业失败并重新启动时可能的数据丢失。基于我之前的问题,我觉得spark 3.0中的kafka.group.id会有所帮助。
如何为spark structured streaming指定kafka consumer的组id?
如何确保通过spark结构化流媒体接收kafka数据时不丢失数据?
不过,我在spark 3.0中尝试了如下设置。
package com.example
/**
* @author ${user.name}
*/
import scala.math.random
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, BooleanType, LongType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import org.apache.spark.SparkFiles
import java.util.Properties
import org.postgresql.Driver
import org.apache.spark.sql.streaming.Trigger
import java.time.Instant
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Statement
//import org.apache.spark.sql.hive.HiveContext
import scala.io.Source
import java.nio.charset.StandardCharsets
import com.amazonaws.services.kms.{AWSKMS, AWSKMSClientBuilder}
import com.amazonaws.services.kms.model.DecryptRequest
import java.nio.ByteBuffer
import com.google.common.io.BaseEncoding
object App {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("MY-APP")
.getOrCreate()
import spark.sqlContext.implicits._
spark.catalog.clearCache()
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
spark.sparkContext.setLogLevel("ERROR")
spark.sparkContext.setCheckpointDir("/home/ec2-user/environment/spark/spark-local/checkpoint")
System.gc()
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "mybroker.io:6667")
.option("subscribe", "mytopic")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", "/home/ec2-user/environment/spark/spark-local/creds/cacerts")
.option("kafka.ssl.truststore.password", "changeit")
.option("kafka.ssl.truststore.type", "JKS")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("kafka.sasl.mechanism", "GSSAPI")
.option("kafka.group.id","MYID")
.load()
df.printSchema()
val schema = new StructType()
.add("id", StringType)
.add("x", StringType)
.add("eventtime", StringType)
val idservice = df.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), schema).as("data"))
.select("data.*")
val monitoring_df = idservice
.selectExpr("cast(id as string) id",
"cast(x as string) x",
"cast(eventtime as string) eventtime")
val monitoring_stream = monitoring_df.writeStream
.trigger(Trigger.ProcessingTime("120 seconds"))
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
if(!batchDF.isEmpty)
{
batchDF.persist()
printf("At %d, the %dth microbatch has %d records and %d partitions \n", Instant.now.getEpochSecond, batchId, batchDF.count(), batchDF.rdd.partitions.size)
batchDF.show()
batchDF.write.mode(SaveMode.Overwrite).option("path", "/home/ec2-user/environment/spark/spark-local/tmp").saveAsTable("mytable")
spark.catalog.refreshTable("mytable")
batchDF.unpersist()
spark.catalog.clearCache()
}
}
.start()
.awaitTermination()
}
}
spark作业通过使用下面的spark submit命令在独立模式下进行测试,但是当我在aws emr中以集群模式部署时也存在同样的问题。
spark-submit --master local[1] --files /home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf,/home/ec2-user/environment/spark/spark-localreds/cacerts,/home/ec2-user/environment/spark/spark-local/creds/krb5.conf,/home/ec2-user/environment/spark/spark-local/creds/my.keytab --driver-java-options "-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf spark.dynamicAllocation.enabled=false --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf spark.yarn.maxAppAttempts=1000 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 --class com.example.App ./target/sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar
然后,我开始了流式处理工作,读取Kafka主题中的流式处理数据。过了一段时间,我干掉了那份工作。然后,我等了1个小时才重新开始工作。如果我理解正确,新的流数据应该从我终止spark作业时的偏移量开始。但是,它仍然以最新的偏移量开始,这在我停止作业期间导致了数据丢失。
我是否需要配置更多选项以避免数据丢失?还是我对spark 3.0有些误解?谢谢!
问题已解决
这里的关键问题是必须将检查点专门添加到查询中。仅仅为sparkcontext添加检查点是不够的。在添加检查点之后,它正在工作。在checkpoint文件夹中,它将创建一个偏移子文件夹,其中包含偏移文件0、1、2、3。。。。对于每个文件,它将显示不同分区的偏移量信息。
{"8":109904920,"2":109905750,"5":109905789,"4":109905621,"7":109905330,"1":109905746,"9":109905750,"3":109905936,"6":109905531,"0":109905583}}
一个建议是将检查点放在一些外部存储中,例如s3。即使您需要重建emr集群本身,它也可以帮助恢复偏移量,以备不时之需。
1条答案
按热度按时间knpiaxh11#
根据spark结构化集成指南,spark本身正在跟踪补偿,没有补偿提交给kafka。这意味着,如果spark流式处理作业失败并重新启动,所有有关偏移量的必要信息都将存储在spark的检查点文件中。
即使将consumergroup名称设置为
kafka.group.id
,应用程序仍不会将消息提交回kafka。有关要读取的下一个偏移量的信息仅在spark应用程序的检查点文件中可用。如果在不重新部署的情况下停止并重新启动应用程序,并确保不删除旧的检查点文件,则应用程序将继续从停止的位置读取。
spark structured streaming文档中关于使用检查点从故障中恢复的内容如下:
如果出现故障或有意关闭,可以恢复上一个查询的上一个进度和状态,并在停止的地方继续。这是使用检查点和预写日志完成的。您可以使用检查点位置配置查询,该查询将保存所有进度信息(即每个触发器中处理的偏移量范围)[…]”
这可以通过在应用程序中设置以下选项来实现
writeStream
查询(仅在sparkcontext配置中设置检查点目录是不够的):文档中还指出,“此检查点位置必须是hdfs兼容文件系统中的路径,并且可以在启动查询时在datastreamwriter中设置为选项。”
此外,spark结构化流的容错能力还取决于输出接收器,如“输出接收器”一节所述。
因为您当前正在使用
ForeachBatch
接收器,则应用程序中可能没有重新启动功能。