在pysparkDataframe中将132k转换为132000,将224.4m转换为224400000

bzzcjhmw  于 2021-05-26  发布在  Spark
关注(0)|答案(2)|浏览(328)

我正在研究pysparkDataframe

MINFLT  MAJFLT  VSTEXT  VSIZE   RSIZE   VGROW   RGROW
132K    224.4M  11160K  0   224.4M  11160K  0K
134M    224.9K  12260K  0   224.4M  11160K  0K
132K    225.5M  11160K  0   224.4M  11160K  0K

试图得到如下输出:

MINFLT    MAJFLT  VSTEXT  VSIZE   RSIZE   VGROW   RGROW
  132000   24400000 11160000    0   224400000   11160000 0

我尝试了以下代码:

df.Val = (df.RSIZE.replace(r'[KM]+$', '', regex=True).astype(float) * \
      df.RSIZE.str.extract(r'[\d\.]+([KM]+)', expand=False)
      .fillna(1)
      .replace(['K','M'], [10**3, 10**6]).astype(int))

但是,我得到以下错误:

TypeError                                 Traceback (most recent call last)
<ipython-input-206-489237518a0c> in <module>
----> 1 df.Val = (df.RSIZE.replace(r'[KM]+$', '', regex=True).astype(float) * \
      2           df.RSIZE.str.extract(r'[\d\.]+([KM]+)', expand=False)
      3           .fillna(1)
      4           .replace(['K','M'], [10**3, 10**6]).astype(int))

TypeError: 'Column' object is not callable

我怎样才能解决这个问题?

fumotvh3

fumotvh31#

你能再检查一下你想用regexp\u替换还是引用替换吗。我在column类中没有找到replace。可以使用pyspark函数模块中的regex\u replace。
我已经为您的要求实现了以下代码:

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

from pyspark.sql import SparkSession
spark = SparkSession(sc)
from pyspark.sql import functions as sf
from pyspark.sql.window import Window
from pyspark.sql.types import *

sample_src = spark.read.csv("sample_src.txt", header=True)

# use different value for DecimalType, M and K, as per your need

converter = lambda clm, bas_vale: sf.regexp_replace(sf.col(clm),(r'[KM]'),"").cast(DecimalType(20,8)) *bas_vale

sample_src.withColumn("RSIZE", sf.when(sf.col("RSIZE").endswith("M"), converter("RSIZE", 1000000)).otherwise(converter("RSIZE",1000))).show()

输入:

+------+------+------+-----+------+------+-----+
|MINFLT|MAJFLT|VSTEXT|VSIZE| RSIZE| VGROW|RGROW|
+------+------+------+-----+------+------+-----+
|  132K|224.4M|11160K|    0|224.4M|11160K|   0K|
|  134M|224.9K|12260K|    0|224.4M|11160K|   0K|
|  132K|225.5M|11160K|    0|224.4M|11160K|   0K|
+------+------+------+-----+------+------+-----+

输出:

+------+------+------+-----+------------------+------+-----+
|MINFLT|MAJFLT|VSTEXT|VSIZE|             RSIZE| VGROW|RGROW|
+------+------+------+-----+------------------+------+-----+
|  132K|224.4M|11160K|    0|224400000.00000000|11160K|   0K|
|  134M|224.9K|12260K|    0|224400000.00000000|11160K|   0K|
|  132K|225.5M|11160K|    0|224400000.00000000|11160K|   0K|
+------+------+------+-----+------------------+------+-----+
lrl1mhuk

lrl1mhuk2#

你可以使用一个Map,做一些字符串操作,然后进行最后的计算。

from pyspark.sql.functions import translate, coalesce, lit, substring, expr

df = spark.createDataFrame([
      ('132K', '224.4M', '11160K', '0', '224.4M', '11160K', '0K')
    , ('134M', '224.9K', '12260K', '0', '224.4M', '11160K', '0K')
    , ('132K', '225.5M', '11160K', '0', '224.4M', '11160K', '0K')
], ['MINFLT', 'MAJFLT', 'VSTEXT', 'VSIZE', 'RSIZE', 'VGROW', 'RGROW'])

# create the Map

scale_map = expr("map('K',1000, 'M',1000000, 'G', 1000000000)")

# specify column names you want to process

cols_included = {'MAJFLT', 'RSIZE'}

# define a function to do the conversion

my_convert = lambda c: (translate(c, 'KMG', '')*coalesce(scale_map[substring(c,-1,1)],lit(1))).astype('bigint').alias(c)

df_new = df.select([ my_convert(c) if c in cols_included else c for c in df.columns ])
df_new.show()
+------+---------+------+-----+---------+------+-----+
|MINFLT|   MAJFLT|VSTEXT|VSIZE|    RSIZE| VGROW|RGROW|
+------+---------+------+-----+---------+------+-----+
|  132K|224400000|11160K|    0|224400000|11160K|   0K|
|  134M|   224900|12260K|    0|224400000|11160K|   0K|
|  132K|225500000|11160K|    0|224400000|11160K|   0K|
+------+---------+------+-----+---------+------+-----+

其中:在函数中 my_convert ,我们使用translate(c,'kmg','')删除字符 K , M 以及 G (您可以使用regexp\u replace执行相同的操作)。使用substring(c,-1,1)获取字符串的最后一个字符,并将其作为键从 scale_map[..] . 使用coalesce(..,lit(1))将比例设置为 1 当没有找到这样的钥匙时 scale_map .

相关问题