我正在创建一个简单的kafka spark流程序,在这个程序中,我从topic读取数据,使用udf进行处理,并在console中打印键和处理后的值。udf中的processig是这样的:1.我们得到ptyid和date作为输入。2.如果我们得到新的pty id,我们返回一个值作为i+date(insert with date)到udf,并将其存储在hashmap中以供进一步比较。3.如果我们得到现有的ptyid,我们将当前记录的日期与hashmap中记录的日期进行比较。3a)如果当前记录的日期小于hashmap中可用的日期,我们将值作为“d”(discard)返回给udf。3b)如果当前记录的日期大于hashmap中可用的日期,我们将值作为u+date(update)返回到udf,并将其存储在hashmap中以供进一步比较。
问题:当我在本地模式下运行应用程序时,没有问题,并且每个ptyid的i,u,d的行为与预期一致。
当我在客户机模式下运行同一个应用程序时,如果我在输入主题中多次发布相同的数据,它的行为会不一致,并且对于已经使用的同一ptyid,会多次生成i+日期。期望是如果我们发布相同的数据,它会第一次给出i(insert),u/d比较。但我多次得到i+日期
下面是代码片段。任何帮助都将不胜感激。
object teststr{
var hashMap = new ConcurrenthashMap[String,String]()
def main(args: Array[Sttring]):Unit = {
val spark = SparkSession.builder.appname("test").getOrCreate()
import spark.impicits._
spark.readstream.format("kafka")
.option ......(all properties)
.load()
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value",offset","partition","timestamp")
.withColumn("value", processRecordsUDF(col("key"),col("value")))
.select("key","value")
.writestream
.format("console")
.outputMode("append")
.start().awaitTermination()
}
def processRecords(partyId: String,value:String):String = {
if (hashMap.containsKey(partyId))
{
if (value > hashMap.get(partyId))
{
hashMap.put(partyId,value)
return ("U"+value)
}
else
{
return ("D")
}}
else
{
hashMap.put(partyId,value)
return ("I"+value)
}}
def processRecordsUDF = udf(processRecords(_:String,_:String):String)
}
暂无答案!
目前还没有任何答案,快来回答吧!