我需要在sparksql表中实现一个自动递增列,我怎么能做到呢。请引导我。我正在使用pyspark 2.0谢谢你kalyan
kmpatx3s1#
我会编写/重用有状态的hive udf,并向pyspark注册,因为spark sql对hive有很好的支持。检查这条线 @UDFType(deterministic = false, stateful = true) 在下面的代码中,以确保它是有状态的自定义项。
@UDFType(deterministic = false, stateful = true)
package org.apache.hadoop.hive.contrib.udf; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.hive.ql.udf.UDFType; import org.apache.hadoop.io.LongWritable; /** * UDFRowSequence. */ @Description(name = "row_sequence", value = "_FUNC_() - Returns a generated row sequence number starting from 1") @UDFType(deterministic = false, stateful = true) public class UDFRowSequence extends UDF { private LongWritable result = new LongWritable(); public UDFRowSequence() { result.set(0); } public LongWritable evaluate() { result.set(result.get() + 1); return result; } } // End UDFRowSequence.java
现在构建jar并在pyspark启动时添加位置。
$ pyspark --jars your_jar_name.jar
然后注册 sqlContext .
sqlContext
sqlContext.sql("CREATE TEMPORARY FUNCTION row_seq AS 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence'")
现在使用 row_seq() 在选择查询中
row_seq()
sqlContext.sql("SELECT row_seq(), col1, col2 FROM table_name")
在pyspark中使用配置单元自定义项的项目
1条答案
按热度按时间kmpatx3s1#
我会编写/重用有状态的hive udf,并向pyspark注册,因为spark sql对hive有很好的支持。
检查这条线
@UDFType(deterministic = false, stateful = true)
在下面的代码中,以确保它是有状态的自定义项。现在构建jar并在pyspark启动时添加位置。
然后注册
sqlContext
.现在使用
row_seq()
在选择查询中在pyspark中使用配置单元自定义项的项目