spark获取列中数组中具有相同值的所有行

ttisahbt  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(538)

我有一个带列的sparkDataframe id 以及 hashes ,其中列 hashes 包含 Seq 长度整数值 n . 例子:

  1. +----+--------------------+
  2. + id| hashes|
  3. +----+--------------------+
  4. |0 | [1, 2, 3, 4, 5]|
  5. |1 | [1, 5, 3, 7, 9]|
  6. |2 | [9, 3, 6, 8, 0]|
  7. +-------------------------+

我想得到一个Dataframe,其中包含数组所在的所有行 hashes 至少在一个位置上匹配。更正式地说,我想要一个带有附加列的Dataframe matches 每排都是这样 r 包含 Seqid 一排排的 hashes[r][i] == hashes[k][i]k 至少有一个值的任何其他行 i .
对于我的示例数据,结果是:

  1. +---+---------------+-------+
  2. |id |hashes |matches|
  3. +---+---------------+-------+
  4. |0 |[1, 2, 3, 4, 5]|[1] |
  5. |1 |[1, 5, 3, 7, 9]|[0] |
  6. |2 |[9, 3, 6, 8, 0]|[] |
  7. +---+---------------+-------+
zbdgwd5y

zbdgwd5y1#

在spark3中,下面的代码比较行之间的数组,只保留两个数组在同一位置共享至少一个元素的行。 df 是您的输入Dataframe:

  1. df.join(
  2. df.withColumnRenamed("id", "id2").withColumnRenamed("hashes", "hashes2"),
  3. exists(arrays_zip(col("hashes"), col("hashes2")), x => x("hashes") === x("hashes2"))
  4. )
  5. .groupBy("id")
  6. .agg(first(col("hashes")).as("hashes"), collect_list("id2").as("matched"))
  7. .withColumn("matched", filter(col("matched"), x => x.notEqual(col("id"))))

详细说明

首先,我们执行一个自动交叉联接,根据两个哈希数组中至少有一个元素位于同一位置的条件进行过滤。
为了构建这个条件,我们压缩了两个哈希数组,一个来自第一个Dataframe,一个用于第二个连接的Dataframe,也就是重命名了列的第一个Dataframe。通过压缩,我们得到一个数组 {"hashes":x, "hashes2":y} 接下来我们只需要检查数组中是否存在一个元素 x = y . 完整条件如下:

  1. exists(arrays_zip(col("hashes"), col("hashes2")), x => x("hashes") === x("hashes2"))

然后,我们将按列进行聚合 id 收集所有 id2 保留的行数,表示与您的条件匹配的行数
为了保持“hashes”列,对于具有相同“id”的两行,列“hashes”相等,我们得到每个“id”的第一个“hashes”。我们使用collect\u list收集所有“id2”:

  1. .agg(first(col("hashes")).as("hashes"), collect_list("id2").as("matches"))

最后,我们从列“matches”中筛选出当前行的id

  1. .withColumn("matches", filter(col("matches"), x => x.notEqual(col("id"))))

如果您需要“id”按顺序排列,可以添加 orderBy 条款:

  1. .orderBy("id")

使用Dataframe df 包含以下值:

  1. +---+---------------+
  2. |id |hashes |
  3. +---+---------------+
  4. |0 |[1, 2, 3, 4, 5]|
  5. |1 |[1, 5, 3, 7, 9]|
  6. |2 |[9, 3, 6, 8, 0]|
  7. +---+---------------+

您将获得以下输出:

  1. +---+---------------+-------+
  2. |id |hashes |matches|
  3. +---+---------------+-------+
  4. |0 |[1, 2, 3, 4, 5]|[1] |
  5. |1 |[1, 5, 3, 7, 9]|[0] |
  6. |2 |[9, 3, 6, 8, 0]|[] |
  7. +---+---------------+-------+

极限

join是笛卡尔积,非常昂贵。虽然条件过滤结果,但在大数据集上可能会导致大量计算/洗牌,并且性能可能非常差。
如果您使用的spark版本在3.0之前,则必须用用户定义的函数替换一些内置spark函数

展开查看全部

相关问题