我有一个进程正在运行,它利用followsudo代码从kafka读取内容,然后发布到elasticsearch。
try {
val outStreamES = spark.readStream
.format("kafka")
.option("subscribe", topics.keys.mkString(","))
.options(kafkaConfig)
.load()
.select($"key".cast(StringType), $"value".cast(StringType), $"topic")
// Convert untyped dataframe to dataset
.as[(String, String, String)]
// Merge all manifests for vehicle in minibatch
.groupByKey(_._1)
//Start of merge
.flatMapGroupsWithState(OutputMode.Append, GroupStateTimeout.ProcessingTimeTimeout)(mergeGroup)
// .select($"key".cast(StringType),from_json($"value",schema).as("manifest"))
.select($"_1".alias("key"), $"_2".alias("manifest"))
val inStreamManifestMain = outStreamES
inStreamManifestMain
.select("key", "manifest.*")
// Convert timestamp columns to strings - avoids conversion to longs otherwise
.writeStream
.outputMode("append")
.format("org.elasticsearch.spark.sql")
.trigger(Trigger.ProcessingTime(conf.getString("spark.trigger")))
.option("mode", "DROPMALFORMED")
.options(configToMap(conf.getObject("esConf")))
.start()
在mergegroup中,我有一个try/catch来查找任何与模式不匹配的坏记录。有没有办法拒绝与模式不匹配的坏记录,而不是杀死整个spark流?
我正在使用的try/catch的sudo代码,哪一条记录导致流连续失败,清除记录的唯一方法是清除整个主题
val manifests = rows.map(r => (
try {
read[ProductManifestDocument](r._2)
} catch {
case ex: MappingException => throw MappingException(ex.msg + "\n" + r._2 + "vRECORD FAILED TO MAPv ", ex)
},
//all topics
topics(r._3)
))
.toList
暂无答案!
目前还没有任何答案,快来回答吧!