我最近遇到了一个问题,我想用另一个Dataframe的列Map一个Dataframe的许多列—实际上是一个查找表,允许我用另一个ID替换一组ID。这可以通过一些简单的方法来实现 joins ,匹配要Map的列数。在 Scala Spark ,这可以通过 Map . 类似的事情能在中国做吗 PySpark ?
joins
Scala Spark
Map
PySpark
f5emj3cl1#
这可以通过利用pyspark.sql.functions.map_from_entries pyspark.sql.functions.collect_list pyspark.sql.functions.struct crossJoin 按以下顺序:
pyspark.sql.functions.collect_list
crossJoin
# original_and_new_df is a 2 column table containing rows original_id, new_id# input_df is a table containg colums with original_ids,# such as f.col(id_col_1), f.col(id_col_2)input_df_with_mapping_col = input_df.crossJoin( original_and_new_df.select( f.map_from_entries( f.collect_list( f.struct(f.col(original_id), f.col(new_id))) ).alias(mapping_column_name) ))# apply the mappingsinput_df_with_mapping_col.select( f.col(mapping_column_name)[f.col(id_col_1)].alias(id_col_1), f.col(mapping_column_name)[f.col(id_col_2)].alias(id_col_2),)
# original_and_new_df is a 2 column table containing rows original_id, new_id
# input_df is a table containg colums with original_ids,
# such as f.col(id_col_1), f.col(id_col_2)
input_df_with_mapping_col = input_df.crossJoin(
original_and_new_df.select(
f.map_from_entries(
f.collect_list(
f.struct(f.col(original_id), f.col(new_id)))
).alias(mapping_column_name)
)
# apply the mappings
input_df_with_mapping_col.select(
f.col(mapping_column_name)[f.col(id_col_1)].alias(id_col_1),
f.col(mapping_column_name)[f.col(id_col_2)].alias(id_col_2),
1条答案
按热度按时间f5emj3cl1#
这可以通过利用
pyspark.sql.functions.map_from_entries
pyspark.sql.functions.collect_list
pyspark.sql.functions.structcrossJoin
按以下顺序: