pyspark 将max+1分配给不匹配的记录

9gm1akwq  于 2022-12-11  发布在  Spark
关注(0)|答案(1)|浏览(132)

我有一个dataframe像下面提供:

+-------+--------------+----+-------------+
|recType|registerNumber|mnId|     sequence|
+-------+--------------+----+-------------+
|     01|      13578000|   0|            1|
|     11|      13578000|   1|            1|
|     13|      13578000|   2|            1|
|     14|      13578000|   3|            1|
|     14|      13578000|   4|            1|
|     01|      11121000|   5|            2|
|     11|      11121000|   6|            2|
|     13|      11121000|   7|            2|
|     14|      11121000|   8|            2|
|     01|      OC387000|   9|            3|
|     11|      OC387000|  10|            3|
|     13|      OC387000|  11|            3|
|     01|      11121000|  12|            4|
|     11|      11121000|  13|            4|
|     13|      11121000|  14|            4|
|     14|      11121000|  15|            4|
|     11|      OC321000|  16|            4|
|     13|      OC321000|  17|            4|
|     01|      OC322000|  18|            5|
|     11|      OC322000|  19|            5|
|     13|      OC322000|  20|            5|
|     11|      SO352000|  21|            5|
|     13|      SO352000|  22|            5|
+-------+--------------+----+-------------+

如果您注意到这里,sequence 4和5有多个registerNumber。这是因为某些记录集没有recType01。记录集通常以recType01开头。
OC3开始的registerNumberSO3可能有也可能没有recType01。因此,sequence列将它们视为前一记录集的一部分。
我想确认一下,如果两个registerNumbers位于同一个sequence下,并且registerNumberOC3SO3开头,则应该为它们分配一个新的Sequence值,该值应为max(sequence)+1
因此,结果dataframe应如下所示:

+-------+--------------+----+-------------+
|recType|registerNumber|mnId|     sequence|
+-------+--------------+----+-------------+
|     01|      13578000|   0|            1|
|     11|      13578000|   1|            1|
|     13|      13578000|   2|            1|
|     14|      13578000|   3|            1|
|     14|      13578000|   4|            1|
|     01|      11121000|   5|            2|
|     11|      11121000|   6|            2|
|     13|      11121000|   7|            2|
|     14|      11121000|   8|            2|
|     01|      OC387000|   9|            3|
|     11|      OC387000|  10|            3|
|     13|      OC387000|  11|            3|
|     01|      11121000|  12|            4|
|     11|      11121000|  13|            4|
|     13|      11121000|  14|            4|
|     14|      11121000|  15|            4|
|     11|      OC321000|  16|            6|
|     13|      OC321000|  17|            6|
|     01|      OC322000|  18|            5|
|     11|      OC322000|  19|            5|
|     13|      OC322000|  20|            5|
|     11|      SO352000|  21|            7|
|     13|      SO352000|  22|            7|
+-------+--------------+----+-------------+

谢谢大家的支持。

pftdvrlh

pftdvrlh1#

让我们使用collect_set函数来获取sequence内的一组寄存器,并使用它来派生“sequence内的sequence”列,该列随寄存器更改而更改。

SELECT recType, registerNumber, mnId, sequence,
       collect_set(registerNumber) over (partition by sequence order by mnId) as registerSet,
       size(collect_set(registerNumber) over (partition by sequence order by mnId)) as seqInSeq
  FROM dataframe
 ORDER BY mnId;

+-------+--------------+----+--------+--------------------+--------+
|recType|registerNumber|mnId|sequence|registerSet         |seqInSeq|
+-------+--------------+----+--------+--------------------+--------+
|1      |13578000      |0   |1       |[13578000]          |1       |
|11     |13578000      |1   |1       |[13578000]          |1       |
|13     |13578000      |2   |1       |[13578000]          |1       |
|14     |13578000      |3   |1       |[13578000]          |1       |
|14     |13578000      |4   |1       |[13578000]          |1       |
|1      |11121000      |5   |2       |[11121000]          |1       |
|11     |11121000      |6   |2       |[11121000]          |1       |
|13     |11121000      |7   |2       |[11121000]          |1       |
|14     |11121000      |8   |2       |[11121000]          |1       |
|1      |OC387000      |9   |3       |[OC387000]          |1       |
|11     |OC387000      |10  |3       |[OC387000]          |1       |
|13     |OC387000      |11  |3       |[OC387000]          |1       |
|1      |11121000      |12  |4       |[11121000]          |1       |
|11     |11121000      |13  |4       |[11121000]          |1       |
|13     |11121000      |14  |4       |[11121000]          |1       |
|14     |11121000      |15  |4       |[11121000]          |1       |
|11     |OC321000      |16  |4       |[OC321000, 11121000]|2       |
|13     |OC321000      |17  |4       |[OC321000, 11121000]|2       |
|1      |OC322000      |18  |5       |[OC322000]          |1       |
|11     |OC322000      |19  |5       |[OC322000]          |1       |
|13     |OC322000      |20  |5       |[OC322000]          |1       |
|11     |SO352000      |21  |5       |[OC322000, SO352000]|2       |
|13     |SO352000      |22  |5       |[OC322000, SO352000]|2       |
+-------+--------------+----+--------+--------------------+--------+

从这里sequence + seqInSeq直接确定组,使用dense_rank对它们重新编号是相当容易的,我们只需要小心排序-我们不是从头开始编号,而是为seqInSeq > 1的行添加额外的序列号。

SELECT recType, registerNumber, mnId, dense_rank() over (order by seqInSeq != 1, sequence, seqInSeq) as sequence
  FROM (SELECT recType, registerNumber, mnId, sequence,
            collect_set(registerNumber) over (partition by sequence order by mnId) as registerSet,
            size(collect_set(registerNumber) over (partition by sequence order by mnId)) as seqInSeq
        FROM dataframe)
 ORDER BY mnId;

+-------+--------------+----+--------+
|recType|registerNumber|mnId|sequence|
+-------+--------------+----+--------+
|1      |13578000      |0   |1       |
|11     |13578000      |1   |1       |
|13     |13578000      |2   |1       |
|14     |13578000      |3   |1       |
|14     |13578000      |4   |1       |
|1      |11121000      |5   |2       |
|11     |11121000      |6   |2       |
|13     |11121000      |7   |2       |
|14     |11121000      |8   |2       |
|1      |OC387000      |9   |3       |
|11     |OC387000      |10  |3       |
|13     |OC387000      |11  |3       |
|1      |11121000      |12  |4       |
|11     |11121000      |13  |4       |
|13     |11121000      |14  |4       |
|14     |11121000      |15  |4       |
|11     |OC321000      |16  |6       |
|13     |OC321000      |17  |6       |
|1      |OC322000      |18  |5       |
|11     |OC322000      |19  |5       |
|13     |OC322000      |20  |5       |
|11     |SO352000      |21  |7       |
|13     |SO352000      |22  |7       |
+-------+--------------+----+--------+

相关问题