如何在PySpark上同时将所有int数据类型转换为double数据类型

n53p2ov0  于 2023-01-16  发布在  Spark
关注(0)|答案(3)|浏览(265)

这是我dataset

DataFrame[column1: double, column2: double, column3: int, column4: int, column5: int, ... , column300: int]

我想要的是

DataFrame[column1: double, column2: double, column3: double, column4: double, column5: double, ... , column300: double]

我所做的
太手动了,你能告诉我怎么做吗?

q7solyqu

q7solyqu1#

可以使用列表解析来构造转换后的字段列表。

import pyspark.sql.functions as F
...
cols = [F.col(field[0]).cast('double') if field[1] == 'int' else F.col(field[0]) for field in df.dtypes]
df = df.select(cols)
df.printSchema()
i86rm4rw

i86rm4rw2#

首先需要从可用模式中过滤出int列类型。
然后结合reduce,您可以迭代DataFrame以将它们强制转换为您的选择
reduce是一个非常重要和有用功能,通常可以用来导航Spark中的任何迭代用例

数据准备

df = pd.DataFrame({
        'id':[f'id{i}' for i in range(0,10)],
        'col1': [i for i in range(80,90)],
        'col2': [i for i in range(5,15)],
        'col3': [6,7,5,3,4,2,9,12,4,10]
    
})

sparkDF = sql.createDataFrame(df)

sparkDF.printSchema()

root
 |-- id: string (nullable = true)
 |-- col1: long (nullable = true)
 |-- col2: long (nullable = true)
 |-- col3: long (nullable = true)

标识

sparkDF.dtypes

## [('id', 'string'), ('col1', 'bigint'), ('col2', 'bigint'), ('col3', 'bigint')]

long_double_list = [ col for col,dtyp in sparkDF.dtypes if dtyp == 'bigint' ]

long_double_list

## ['col1', 'col2', 'col3']

减少

sparkDF = reduce(lambda df,c: df.withColumn(c,F.col(c).cast(DoubleType()))
                ,long_double_list
                ,sparkDF
            )

sparkDF.printSchema()

root
 |-- id: string (nullable = true)
 |-- col1: double (nullable = true)
 |-- col2: double (nullable = true)
 |-- col3: double (nullable = true)
fd3cxomn

fd3cxomn3#

VectorAssembler将整数值转换为多个列中的浮点值。您可以将向量列分成多个列,并按如下所示重命名列。

import numpy as np
import pandas as pd
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.functions import vector_to_array
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.getOrCreate()

# Create an input DataFrame
N_COLUMNS = 3
pdf_int = pd.DataFrame(
    data={f"column{x}": [x, x*2] for x in range(1, N_COLUMNS+1)},
    dtype=np.int64)
pdf_double = pd.DataFrame(
    data={f"column{x}": [x+0.5, x*2+0.5] for x in range(N_COLUMNS+1, N_COLUMNS*2+1)},
    dtype=np.float64)
pdf_input = pd.concat([pdf_int, pdf_double], axis=1)

df_input = spark.createDataFrame(pdf_input)
col_names = df_input.columns
df_input.show()

# Convert all integer columns to double and leave double values unchanged
df_output = VectorAssembler().setInputCols(col_names).setOutputCol("vector"). \
    transform(df_input).select("vector").\
    withColumn("array", vector_to_array("vector")).select("array"). \
    select([col("array")[i] for i in range(len(col_names))]). \
    toDF(*col_names)
df_output.show()
print(type(df_output))

# Verify all values are equal except types between the input and output
pdf_output = df_output.toPandas()
assert pdf_input.astype("float64").equals(pdf_output)
assert df_input.schema != df_output.schema

相关问题