pyspark Spark从Mysql强制架构读取

jdzmm42g  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(139)

我有一个从mysql中读取表的spark作业,但是由于某种原因spark将int列定义为boolean。我该如何在读取表时强制使用数据类型呢?
Spark会议:

spark = (SparkSession.builder
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.config("spark.sql.adaptive.enabled", "true") 
.config("spark.jars.packages", "mysql:mysql-connector-java:8.0.22")
.enableHiveSupport().getOrCreate()
)

Spark读数:

spark.read.format("jdbc") \
    .option("url", f"jdbc:mysql://host:port/dbname?zeroDateTimeBehavior=convertToNull") \
    .option("query", f"SELECT * FROM {table}") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .load() \
    .registerTempTable(table_name)
zzlelutf

zzlelutf1#

有几种方法可以解决这个问题。

提供customSchema选项

对于jdbc数据源,可以指定多个read options,其中一个是customSchema,它使您可以提供列名(如表中所示)和要转换为的类型。在示例中:

spark.read.format("jdbc") \
    .option("url", f"jdbc:mysql://host:port/dbname?zeroDateTimeBehavior=convertToNull") \
    .option("query", f"SELECT * FROM {table}") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("customSchema", "COLUMN_NAME INT") \
    .load() \
    .registerTempTable(table_name)

转换(查询)

在查询选项中,您应该能够指定要转换的列并在该列执行转换,但这需要更改一些列名。

import pyspark.sql.functions as F
df = spark.read.format("jdbc") \
    .option("url", f"jdbc:mysql://host:port/dbname?zeroDateTimeBehavior=convertToNull") \
    .option("query", f"SELECT t.*, CAST(COLUMN_NAME AS int) AS COLUMN_NAME_INT FROM {table} t") \    
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .load()
df = df.drop(F.col(COLUMN_NAME))
df = df.withColumnRenamed(COLUMN_NAME_INT, COLUMN_NAME)
df.registerTempTable(table)

转换( Dataframe )

或者,可以在将数据读入 Dataframe 之后进行强制转换:

from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F

df = spark.read.format("jdbc") \
    .option("url", f"jdbc:mysql://host:port/dbname?zeroDateTimeBehavior=convertToNull") \
    .option("query", f"SELECT * FROM {table}") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .load() 
df = df.withColumn(COLUMN_NAME, F.col(COLUMN_NAME).cast(IntegerType()))
df.registerTempTable(table)

相关问题