pyspark 使用Spark Dataframe API计算列中的特定字符

0vvn1miw  于 2023-06-28  发布在  Spark
关注(0)|答案(4)|浏览(126)

我在Spark Dataframe df中有一个包含位的列。列为字符串格式:

10001010000000100000000000000000
10001010000000100000000100000000

有没有一种简单有效的方法来创建一个新的列"no_of_ones",使用DataFrame计算1的频率?使用RDD,我可以map(lambda x:x.count('1'))(PySpark)。
另外,我如何检索一个列表的位置?

e5nqia27

e5nqia271#

我能想到的一种方法是删除所有的零,然后计算字段的长度。

df.show
+--------------------+
|          bytestring|
+--------------------+
|10001010000000100...|
|10001010000000100...|
+--------------------+

df.withColumn("no_of_ones" , length(regexp_replace($"bytestring", "0", "")) ).show
+--------------------+----------+
|          bytestring|no_of_ones|
+--------------------+----------+
|10001010000000100...|         4|
|10001010000000100...|         5|
+--------------------+----------+
idfiyjo8

idfiyjo82#

一般来说,当你在(py)spark SQL的预定义函数中找不到你需要的东西时,你可以写一个用户定义函数(UDF),它可以做任何你想做的事情(参见UDF)。
注意,在你的例子中,一个好的udf可能比scala或java中的正则表达式解决方案更快,因为你不需要示例化一个新的字符串和编译一个正则表达式(一个for循环就可以)。然而,在pyspark中可能会慢得多,因为在executor上执行python代码总是会严重损害性能。

7gcisfzg

7gcisfzg3#

因为它是二进制(0/1),所以上面的答案将起作用。感谢Oli的回答。
但是以防万一你需要在一个字符串中找到一个字符/数字/符号的出现次数。
例如:

Find '~' in a string "ABCDE~FGH~IJAK~123~$$$"

选择以下解决方案。

df.withColumn("no_of_ones" , length($"bytestring") - length(regexp_replace($"bytestring", "~", "")) ).show
qmb5sa22

qmb5sa224#

**Spark 3.4+**的出现次数为regexp_count

F.expr(r"regexp_count(col_name, '1')")

注意:特殊字符需要使用\\进行转义,例如F.expr(r"regexp_count(col_name, '\\+')")

  • the count* 的完整示例:
from pyspark.sql import functions as F
df = spark.createDataFrame(
    [("10001010000000100000000000000000",),
     ("10001010000000100000000100000000",)],
    ["bytestring"])

df.withColumn('no_of_ones', F.expr(r"regexp_count(bytestring, '1')")).show()
# +--------------------+----------+
# |          bytestring|no_of_ones|
# +--------------------+----------+
# |10001010000000100...|         4|
# |10001010000000100...|         5|
# +--------------------+----------+

对于 * 位置 *,我可以建议更详细的方法,涉及高阶函数transformfilter

seq = "sequence(1, length(bytestring))"
located = F.expr(f"transform({seq}, x -> locate('1', bytestring, x))")
cleaned = F.filter(F.array_distinct(located), lambda x: x != 0)

df.withColumn('pos_of_ones', cleaned).show(truncate=0)
# +--------------------------------+-----------------+
# |bytestring                      |pos_of_ones      |
# +--------------------------------+-----------------+
# |10001010000000100000000000000000|[1, 5, 7, 15]    |
# |10001010000000100000000100000000|[1, 5, 7, 15, 24]|
# +--------------------------------+-----------------+

相关问题