如何在sparksql(pyspark)中实现自动增量

vecaoik1  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(523)

我需要在sparksql表中实现一个自动递增列,我怎么能做到呢。请引导我。我正在使用pyspark 2.0
谢谢你kalyan

kmpatx3s

kmpatx3s1#

我会编写/重用有状态的hive udf,并向pyspark注册,因为spark sql对hive有很好的支持。
检查这条线 @UDFType(deterministic = false, stateful = true) 在下面的代码中,以确保它是有状态的自定义项。

package org.apache.hadoop.hive.contrib.udf;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.io.LongWritable;

/**
 * UDFRowSequence.
 */
@Description(name = "row_sequence",
    value = "_FUNC_() - Returns a generated row sequence number starting from 1")
@UDFType(deterministic = false, stateful = true)
public class UDFRowSequence extends UDF
{
  private LongWritable result = new LongWritable();

  public UDFRowSequence() {
    result.set(0);
  }

  public LongWritable evaluate() {
    result.set(result.get() + 1);
    return result;
  }
}

// End UDFRowSequence.java

现在构建jar并在pyspark启动时添加位置。

$ pyspark --jars your_jar_name.jar

然后注册 sqlContext .

sqlContext.sql("CREATE TEMPORARY FUNCTION row_seq AS 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence'")

现在使用 row_seq() 在选择查询中

sqlContext.sql("SELECT row_seq(), col1, col2 FROM table_name")

在pyspark中使用配置单元自定义项的项目

相关问题