Spark Dataframe 过滤器功能不工作

c3frrgcw  于 2023-01-09  发布在  Apache
关注(0)|答案(1)|浏览(208)

我是spark的新手,我们有一个从hbase读取数据并保存到rdd的项目。 Dataframe 计数是5280000,下面是代码:

val df = spark.createDataFrame(rddDump, schema)

def sampledOrNot = udf((count: Int) => {
  if(count < TEN_K_SELLER_ITEM_BENCH){
    1
  }else{
    val randomId = random.nextLong(0, 1000000000000L)
    var targetValue = 10000/count.toDouble
    var base = 1
    while (targetValue < 1){
      targetValue = targetValue * base
      base = base * 10
    }
    if(randomId % base <= (targetValue.intValue() + 1)) 1 else 0
  }
})

val sampleBasedAll = df.withColumn("sampled", sampledOrNot(col("count")))
sampleBasedAll.repartition(10).write.option("header", value = true).option("compression", "gzip").csv("/sampleBasedAll")

val sampledDF = sampleBasedAll.repartition(100).filter("sampled = 1").select($"sellerId", $"siteId", $"count", $"desc")
scribe.info("sampledDF.count = " + sampledDF.count())

奇怪的是文件夹sampleBasedAll保存了有效的csv Dataframe 结果,但生产日志显示的sampledDF.count为零。
我从sampleBasedAll文件夹下载csv,然后重新运行

sampleBasedAll.repartition(100).filter("sampled = 1").select($"sellerId", $"siteId", $"count", $"desc").count()

有13500条记录显示...
我的问题是为什么

sampleBasedAll.filter("sampled = 1")

在本地运行时有记录,但生产运行未生成任何记录...

qco9c6ql

qco9c6ql1#

这篇文章Unexpected behavior of UDF for random integers with join operation给了我提示
“Spark关于UDF是确定性函数的假设”
udf可以被执行不止一次,通过添加.asNondeterministic()更新样本udf,如下所示

def sampledOrNot = udf((count: Int) => {
if(count < TEN_K_SELLER_ITEM_BENCH){
  1
}else{
  val randomId = random.nextLong(0, 1000000000000L)
  var targetValue = 10000/count.toDouble
  var base = 1
  while (targetValue < 1){
   targetValue = targetValue * base
   base = base * 10
  }
  if(randomId % base <= 10000/count.toDouble * base) 1 else 0
}.asNondeterministic()
})

解决了不一致问题

相关问题