nominals
是我的参考数据集,包含每天在某台机器上测量的所有点。它有以下简化的结构
nominals
+------------+----+----+-----+----+
| point_name | x| y| z|side|
+------------+----+----+-----+----+
|str1 |. |. |. | . |
|str2 |. |. |. | . |
|str3 |. |. |. | . |
+------------+----+----+-----+----+
``` `x,y,z` 是标称点坐标。这个 `point_name` 是测量点的唯一标识符,包含 `side` . 因此,每边都有一个需要测量的点的列表,例如伪代码 `point_name[point_name.side=="L"].unique()` .
存储实际测量值的数据集具有类似的结构,但有一个附加列 `id` 用来识别被测量的机器。每 `id` 测量左右两侧。
actuals
+------------+----+----+-----+----+----+
| point_name | x| y| z|side| id |
+------------+----+----+-----+----+----+
|str4 |. |. |. | . |. |
|str5 |. |. |. | . |. |
|str6 |. |. |. | . |. |
+------------+----+----+-----+----+----+
现在,我想创建一个新的数据集,其中包含一个额外的列,该列列出了每个 `id-side` 一对。换句话说,对于每一个 `id-side` 我要查一下 `point_name` 将实际测量值与标称值进行比较,找出两者之间的差异。预期结果将是
+-----------------------+-----------+
| id | missing_L | missing_R |
+-----------------------+-----------+
|9433| point_1, point_12| point_14 |
|9512| null | point_15 |
+-----------------------+-----------+
在当前的解决方案中,我围绕 `point_name` 并检查 `null` 价值观。
nominals_left = (
nominals
.filter(nominals.side == "L")
.select("point_name")
)
nominals_left_list = nominals_left.rdd.flatMap(lambda x: x).collect()
actuals_left = (
actuals
.filter(actuals.side == "L")
.groupBy("id")
.pivot("point_name", values=nominals_left_list)
.sum("x")
)
actuals_left_final = (
actuals_left
.withColumn("missing_L", F.array(*[F.when(F.isnull(c), F.lit(c)) for c in actuals_left.columns]))
.withColumn("missing_L", F.expr("array_join(missing_L, ', ')"))
.select("id", "missing_L")
)
有没有更简单或更有效的方法?而且似乎我必须明确地对每个 `side` 以避免由于属于另一侧的点而创建假阳性空值。大家一起聚在一起会很好 `id` 以及 `side` .
1条答案
按热度按时间ccgok5k51#
我想我在研究了一点之后找到了一个方法。我只需从名义表中创建一个所有必需点的列表,并将其与上的测量表连接起来
side
,然后在这两列数组之间进行比较。似乎还不错。下面是一个例子nominals
table和tableactuals
table。首先创建一个列表,列出每个id
以及side
```nominal_list = (
nominals
.groupby("side")
.agg(F.collect_list(F.col("point_name")))
.withColumnRenamed("collect_list(point_name)", "nominal_points")
)
missing = (
actuals
.groupby(["id", "side"])
.agg(F.collect_list(F.col("point_name")))
.withColumnRenamed("collect_list(point_name)", "measured_points")
.join(nominal_list, "side")
.withColumn('missing', F.array_except('nominal_points', 'measured_points'))
)