如何使用spark 3.0结构化流媒体中的kafka.group.id和检查点继续读取kafka在重新启动后停止的内容?

jei2mxaa  于 2021-05-26  发布在  Spark
关注(0)|答案(1)|浏览(594)

基于spark 3.0的介绍,https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html. 应该可以设置“kafka.group.id”来跟踪偏移量。对于我们的用例,我希望避免流式spark作业失败并重新启动时可能的数据丢失。基于我之前的问题,我觉得spark 3.0中的kafka.group.id会有所帮助。
如何为spark structured streaming指定kafka consumer的组id?
如何确保通过spark结构化流媒体接收kafka数据时不丢失数据?
不过,我在spark 3.0中尝试了如下设置。

package com.example

/**
 * @author ${user.name}
 */
import scala.math.random

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, BooleanType, LongType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import org.apache.spark.SparkFiles
import java.util.Properties
import org.postgresql.Driver
import org.apache.spark.sql.streaming.Trigger
import java.time.Instant
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Statement

//import org.apache.spark.sql.hive.HiveContext

import scala.io.Source

import java.nio.charset.StandardCharsets

import com.amazonaws.services.kms.{AWSKMS, AWSKMSClientBuilder}
import com.amazonaws.services.kms.model.DecryptRequest
import java.nio.ByteBuffer
import com.google.common.io.BaseEncoding

object App {

    def main(args: Array[String]): Unit = {

      val spark: SparkSession = SparkSession.builder()
        .appName("MY-APP")
        .getOrCreate()

      import spark.sqlContext.implicits._

      spark.catalog.clearCache()
      spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
      spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

      spark.sparkContext.setLogLevel("ERROR")
      spark.sparkContext.setCheckpointDir("/home/ec2-user/environment/spark/spark-local/checkpoint")

      System.gc()

      val df = spark.readStream
        .format("kafka")
          .option("kafka.bootstrap.servers", "mybroker.io:6667")
          .option("subscribe", "mytopic")
          .option("kafka.security.protocol", "SASL_SSL")
          .option("kafka.ssl.truststore.location", "/home/ec2-user/environment/spark/spark-local/creds/cacerts")
          .option("kafka.ssl.truststore.password", "changeit")
          .option("kafka.ssl.truststore.type", "JKS")
          .option("kafka.sasl.kerberos.service.name", "kafka")
          .option("kafka.sasl.mechanism", "GSSAPI")
          .option("kafka.group.id","MYID")
          .load()

      df.printSchema()

      val schema = new StructType()
        .add("id", StringType)
        .add("x", StringType)
        .add("eventtime", StringType)

      val idservice = df.selectExpr("CAST(value AS STRING)")
        .select(from_json(col("value"), schema).as("data"))
        .select("data.*")

      val monitoring_df = idservice
                .selectExpr("cast(id as string) id", 
                            "cast(x as string) x",
                            "cast(eventtime as string) eventtime")              

      val monitoring_stream = monitoring_df.writeStream
                              .trigger(Trigger.ProcessingTime("120 seconds"))
                              .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
                                if(!batchDF.isEmpty) 
                                {
                                    batchDF.persist()
                                    printf("At %d, the %dth microbatch has %d records and %d partitions \n", Instant.now.getEpochSecond, batchId, batchDF.count(), batchDF.rdd.partitions.size)                                    
                                    batchDF.show()

                                    batchDF.write.mode(SaveMode.Overwrite).option("path", "/home/ec2-user/environment/spark/spark-local/tmp").saveAsTable("mytable")
                                    spark.catalog.refreshTable("mytable")

                                    batchDF.unpersist()
                                    spark.catalog.clearCache()
                                }
                            }
                            .start()
                            .awaitTermination()
    }

}

spark作业通过使用下面的spark submit命令在独立模式下进行测试,但是当我在aws emr中以集群模式部署时也存在同样的问题。

spark-submit --master local[1] --files /home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf,/home/ec2-user/environment/spark/spark-localreds/cacerts,/home/ec2-user/environment/spark/spark-local/creds/krb5.conf,/home/ec2-user/environment/spark/spark-local/creds/my.keytab --driver-java-options "-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf spark.dynamicAllocation.enabled=false --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf spark.yarn.maxAppAttempts=1000 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 --class com.example.App ./target/sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar

然后,我开始了流式处理工作,读取Kafka主题中的流式处理数据。过了一段时间,我干掉了那份工作。然后,我等了1个小时才重新开始工作。如果我理解正确,新的流数据应该从我终止spark作业时的偏移量开始。但是,它仍然以最新的偏移量开始,这在我停止作业期间导致了数据丢失。
我是否需要配置更多选项以避免数据丢失?还是我对spark 3.0有些误解?谢谢!
问题已解决
这里的关键问题是必须将检查点专门添加到查询中。仅仅为sparkcontext添加检查点是不够的。在添加检查点之后,它正在工作。在checkpoint文件夹中,它将创建一个偏移子文件夹,其中包含偏移文件0、1、2、3。。。。对于每个文件,它将显示不同分区的偏移量信息。

{"8":109904920,"2":109905750,"5":109905789,"4":109905621,"7":109905330,"1":109905746,"9":109905750,"3":109905936,"6":109905531,"0":109905583}}

一个建议是将检查点放在一些外部存储中,例如s3。即使您需要重建emr集群本身,它也可以帮助恢复偏移量,以备不时之需。

knpiaxh1

knpiaxh11#

根据spark结构化集成指南,spark本身正在跟踪补偿,没有补偿提交给kafka。这意味着,如果spark流式处理作业失败并重新启动,所有有关偏移量的必要信息都将存储在spark的检查点文件中。
即使将consumergroup名称设置为 kafka.group.id ,应用程序仍不会将消息提交回kafka。有关要读取的下一个偏移量的信息仅在spark应用程序的检查点文件中可用。
如果在不重新部署的情况下停止并重新启动应用程序,并确保不删除旧的检查点文件,则应用程序将继续从停止的位置读取。
spark structured streaming文档中关于使用检查点从故障中恢复的内容如下:
如果出现故障或有意关闭,可以恢复上一个查询的上一个进度和状态,并在停止的地方继续。这是使用检查点和预写日志完成的。您可以使用检查点位置配置查询,该查询将保存所有进度信息(即每个触发器中处理的偏移量范围)[…]”
这可以通过在应用程序中设置以下选项来实现 writeStream 查询(仅在sparkcontext配置中设置检查点目录是不够的):

.option("checkpointLocation", "path/to/HDFS/dir")

文档中还指出,“此检查点位置必须是hdfs兼容文件系统中的路径,并且可以在启动查询时在datastreamwriter中设置为选项。”
此外,spark结构化流的容错能力还取决于输出接收器,如“输出接收器”一节所述。
因为您当前正在使用 ForeachBatch 接收器,则应用程序中可能没有重新启动功能。

相关问题