根据pyspark中拆分的字数填充索引号

z18hc3ub  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(323)

Dataframe(pyspark)

+-----------+---+-----------+
|       Name|Age|word_count |
+-----------+---+-----------+
|      John | 23|         1 |
|Paul Din O.| 45|         3 |        
|Kelvin Tino| 12|         2 |
+-----------+---+-----------+

预期产量:

+-----------+---+-----------+
|       Name|Age|word_index |
+-----------+---+-----------+
|      John | 23|         0 |
|      Paul | 45|         0 |
|       Din | 45|         1 |
|        O. | 45|         2 |        
|    Kelvin | 12|         0 |
|      Tino | 12|         1 |
+-----------+---+-----------+

目标:
通过拆分 name 现场。
重新索引拆分的每个块 name . (i、 e索引将为每个 name )
通过以下代码计算计数,

def countc(inp='', search_chars='', modifier=None):
"""
Counts the number of characters that appear or do not appear in a list of characters.
"""

# Modifier(s)

if modifier is not None: 
    modifier = modifier.lower()
    if modifier == 'i':
        # Ignore case
        inp = inp.lower()

count = 0
for c in search_chars:
    count += inp.count(c)
return count

udf_countc = F.udf(lambda x, y: countc(x, y), IntegerType())

# spark.udf.register('udf_countc', udf_countc)

df = df.withColumn('word_count', udf_countc(F.col('Name'), F.lit(' ')))

用下面的代码生成了重复的行。

df.withColumn('DuplicatedRow', F.expr('explode(array_repeat(name, F.col('word_count')))')).show()

如何将拆分的单词放入每个单元格并填充 index 在分裂的每一个街区 name 场?

vptzau2j

vptzau2j1#

posexplode 做你想做的事:

import pyspark.sql.functions as F

df2 = df.select(F.posexplode(F.split('Name', ' ')).alias('word_index', 'Name'), 'Age')

df2.show()
+----------+------+---+
|word_index|  Name|Age|
+----------+------+---+
|         0|  John| 23|
|         0|  Paul| 45|
|         1|   Din| 45|
|         2|    O.| 45|
|         0|Kelvin| 12|
|         1|  Tino| 12|
+----------+------+---+

对于您编辑的问题和评论,

def countc(inp='', search_chars='', modifier=None):
    """
    Counts the number of characters that appear or do not appear in a list of characters.
    """
    # Modifier(s)
    if modifier is not None: 
        modifier = modifier.lower()
        if modifier == 'i':
            # Ignore case
            inp = inp.lower()    
    count = 0
    for c in search_chars:
        count += inp.count(c)
    return list(range(count+1))

udf_countc = F.udf(lambda x, y: countc(x, y), 'array<int>')
df2 = df.withColumn('word_count', F.explode(udf_countc(F.col('Name'), F.lit(' '))))

df2.show()
+-----------+---+----------+
|       Name|Age|word_count|
+-----------+---+----------+
|       John| 23|         0|
|Paul Din O.| 45|         0|
|Paul Din O.| 45|         1|
|Paul Din O.| 45|         2|
|Kelvin Tino| 12|         0|
|Kelvin Tino| 12|         1|
+-----------+---+----------+

相关问题