这是输入Dataframe:
df1_input = spark.createDataFrame([ \
("P1","A","B","C"), \
("P1","D","E","F"), \
("P1","G","H","I"), \
("P1","J","K","L") ], ["Person","L1B","B2E","J3A"])
df1_input.show()
+------+---+---+---+
|Person|L1B|B2E|J3A|
+------+---+---+---+
| P1| A| B| C|
| P1| D| E| F|
| P1| G| H| I|
| P1| J| K| L|
+------+---+---+---+
下面给出了相应的说明:
df1_item_details = spark.createDataFrame([ \
("L1B","item Desc1","A","Detail Desc1"), \
("L1B","item Desc1","D","Detail Desc2"), \
("L1B","item Desc1","G","Detail Desc3"), \
("L1B","item Desc1","J","Detail Desc4"), \
("B2E","item Desc2","B","Detail Desc5"), \
("B2E","item Desc2","E","Detail Desc6"), \
("B2E","item Desc2","H","Detail Desc7"), \
("B2E","item Desc2","K","Detail Desc8"), \
("J3A","item Desc3","C","Detail Desc9"), \
("J3A","item Desc3","F","Detail Desc10"), \
("J3A","item Desc3","I","Detail Desc11"), \
("J3A","item Desc3","L","Detail Desc12")], ["Item","Item Desc","Detail","Detail Desc"])
df1_item_details.show()
+----+----------+------+-------------+
|Item| Item Desc|Detail| Detail Desc|
+----+----------+------+-------------+
| L1B|item Desc1| A| Detail Desc1|
| L1B|item Desc1| D| Detail Desc2|
| L1B|item Desc1| G| Detail Desc3|
| L1B|item Desc1| J| Detail Desc4|
| B2E|item Desc2| B| Detail Desc5|
| B2E|item Desc2| E| Detail Desc6|
| B2E|item Desc2| H| Detail Desc7|
| B2E|item Desc2| K| Detail Desc8|
| J3A|item Desc3| C| Detail Desc9|
| J3A|item Desc3| F|Detail Desc10|
| J3A|item Desc3| I|Detail Desc11|
| J3A|item Desc3| L|Detail Desc12|
+----+----------+------+-------------+
以下是一些需要在最终输出上粘贴的标准信息:
df1_stdColumns = spark.createDataFrame([ \
("School","BMM"), \
("College","MSRIT"), \
("Workplace1","Blr"), \
("Workplace2","Chn")], ["StdKey","StdVal"])
df1_stdColumns.show()
+----------+------+
| StdKey|StdVal|
+----------+------+
| School| BMM|
| College| MSRIT|
|Workplace1| Blr|
|Workplace2| Chn|
+----------+------+
预期输出如下所示:
+--------+-----+---------------+-----+---------------+-----+----------------+--------+---------+------------+------------+
| Person | L1B | Item Desc1 | B2E | Item Desc2 | J3A | Item Desc3 | School | College | WorkPlace1 | WorkPlace2 |
+--------+-----+---------------+-----+---------------+-----+----------------+--------+---------+------------+------------+
| P1 | A | Detail Desc 1 | B | Detail Desc 5 | C | Detail Desc 9 | Bmm | MSRIT | Blr | Chn |
| P1 | D | Detail Desc 2 | E | Detail Desc 6 | F | Detail Desc 10 | Bmm | MSRIT | Blr | Chn |
| P1 | G | Detail Desc 3 | H | Detail Desc 7 | I | Detail Desc 11 | Bmm | MSRIT | Blr | Chn |
| P1 | J | Detail Desc 4 | K | Detail Desc 8 | L | Detail Desc 12 | Bmm | MSRIT | Blr | Chn |
+--------+-----+---------------+-----+---------------+-----+----------------+--------+---------+------------+------------+
有人能提出一个最佳的方法吗?输入数据集的大小以百万为单位。目前的代码,我已经运行了约10小时,它不是最佳的。。如果可能的话,寻找一些性能良好的spark(python\scala\sql)代码
编辑:下面是代码,我有工作,但需要永远完成时,输入量以百万计
from pyspark.sql.functions import monotonically_increasing_id
import pyspark.sql.functions as F
from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql.types import StructType, StructField, LongType
from pyspark.sql import DataFrame
from typing import Iterable
# Databricks runtime 7.3 on spark 3.0.1 which supports AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.shuffle.partitions", "10000")
spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionNum","1")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "1")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "10KB")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "1B")
df1_input=df1_input.withColumn("RecordId", monotonically_increasing_id())
df1_input_2=df1_input
# Custom function to do transpose
def melt(df: DataFrame, id_vars: Iterable[str], value_vars: Iterable[str], var_name: str="variable", value_name: str="Value") -> DataFrame:
# Create array<struct<variable: str, value: ...>>
_vars_and_vals = array(*(
struct(lit(c).alias(var_name), col(c).alias(value_name))
for c in value_vars))
# Add to the DataFrame and explode
_tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
cols = id_vars + [
col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
return _tmp.select(*cols)
df9_ColsList=melt(df1_stdColumns, id_vars=['StdKey'], value_vars=df1_stdColumns.columns).filter("variable <>'StdKey'")
df9_ColsList=df9_ColsList.groupBy("variable").pivot("StdKey").agg(F.first("value")).drop("variable")
df1_input_2=melt(df1_input_2, id_vars=['RecordId','Person'], value_vars=df1_input_2.columns).filter("variable != 'Person'").filter("variable != 'RecordId'").withColumnRenamed('variable','Name')
df_prevStepInputItemDets=(df1_input_2.join(df1_item_details,(df1_input_2.Name == df1_item_details.Item) & (df1_input_2.Value == df1_item_details.Detail)))
# Since pivot performs better if the columns are know in advance, sacrificing a collect to do it. (Since Pivot without providing this was performing worse)
CurrStagePivotCols_tmp = df1_item_details.select("Item Desc").rdd.flatMap(lambda x: x).collect()
CurrStagePivotCols = []
[CurrStagePivotCols.append(x) for x in CurrStagePivotCols_tmp if x not in CurrStagePivotCols]
df_prevStepInputItemDets=(df_prevStepInputItemDets \
.groupBy('RecordId',"Person") \
.pivot("Item Desc",CurrStagePivotCols) \
#.pivot("Item Desc") \
.agg(F.first("Detail Desc"))).drop("RecordId")
# combine codes and descriptions
# Add rowNumber to both dataframes so that they can be merged side-by-side
def add_rowNum(sdf):
new_schema = StructType(sdf.schema.fields + [StructField("RowNum", LongType(), False),])
return sdf.rdd.zipWithIndex().map(lambda row: row[0] + (row[1],)).toDF(schema=new_schema)
ta = df1_input.alias('ta')
tb = df_prevStepInputItemDets.alias('tb')
ta = add_rowNum(ta)
tb = add_rowNum(tb)
df9_code_desc = tb.join(ta.drop("Katashiki"), on="RowNum",how='inner').drop("RowNum")
# CrossJoin to plaster standard columns
df9_final=df9_code_desc.crossJoin(df9_ColsList).drop("RecordId")
display(df9_final)
暂无答案!
目前还没有任何答案,快来回答吧!