我想使用pyspark构建一个Dataframe,其中一列是数据集其他两列的siphash结果。为此,我创建了一个在 rdd.map()
功能如下:
import siphash
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext( spark )
# Hashing function
def hash_two_columns( row ):
# Transform row to a dict
row_dict = row.asDict()
# Concat col1 and col2
concat_str = 'E'.join( [str(row_dict['col1']), str(row_dict['col2'])] )
# Fill string with 0 to get 16 bytes (otherwise error is raised)
sixteenBytes_str = concat_str.zfill(16)
# Preserve concatenated value for testing (this can be removed later)
row_dict["hashcols_str"] = sixteenBytes_str
# Calculate siphash
row_dict["hashcols_id"] = siphash.SipHash_2_4( sixteenBytes_str.encode('utf-8') ).hash()
return Row(**row_dict )
# Create test dataframe
test_df = spark.createDataFrame([
(1,"text1",58965,11111),
(3,"text2",78652,888888),
(4,"text3",78652,888888),
], ("id","item","col1","col2"))
# Build the schema
# Using this to avoid "ValueError: Some of types cannot be determined by the first 100 rows" when pyspark tries to deduct schema by itself
test_df_schema = StructType([
StructField("id", IntegerType(), True),
StructField("item", StringType(), True),
StructField("col1", IntegerType(), True),
StructField("col2", IntegerType(), True),
StructField("hashcols_str", StringType(), True),
StructField("hashcols_id", LongType(), True)
])
# Create the final Dataframe
final_test_df = sqlContext \
.createDataFrame(
test_df.rdd.map(hash_two_columns).collect(),
test_df_schema) \
.toDF()
final_test_df.show(truncate=False)
尽管架构定义与最终的Dataframe结构匹配,但运行此代码失败,并出现以下错误:
illegalargumentexception:要求失败:列数不匹配。旧列名(6):id,item,col1,col2,hashcols\u str,hashcols\u id新列名(0):(java.lang.runtimeexception)
有没有人知道如何正确地实施这一点?非常感谢您的支持。
1条答案
按热度按时间8nuwlpux1#
我根据这篇文章找到了一个解决方案:
以这种方式更新函数:
然后,使用udf(用户定义函数)将新列添加到dataframe中
withColumn
功能。