pyspark 数据传输挑战:在Spark中保持数据库之间的数据类型一致性

hc2pp10m  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(141)

我在将数据从一个数据库传输到另一个数据库时遇到了一个挑战。当我使用Spark从源数据库读取表时,我注意到数据类型正在发生变化。例如,VARCHAR 2(900)/VARCHAR 2(4000)正在转换为str。当我试图将数据插入第二个数据库时,这种转换会产生问题。我尝试了各种方法来解决这个问题,但并没有取得令人满意的效果。
我的目标是自动创建表,使两个数据库中的两个表具有完全相同的数据类型。如果您对此问题有任何见解或解决方案,我将不胜感激。
数据中的结果类型:
x1c 0d1x的数据
插入数据的结果类型:



1-

from pyspark.sql import SparkSession  # Create a Spark session spark = SparkSession.builder.appName("example").getOrCreate()  # Assuming you have a DataFrame df df = spark.createDataFrame([(1, "text1"), (2, "text2")], ["id", "column1"])  # Specify Oracle connection properties oracle_properties = {     "user": "user",     "password": "password",     "driver": "oracle.jdbc.driver.OracleDriver",     "url": "jdbc:oracle:thin:@//ip:port/t", }  # Specify the Oracle table name oracle_table_name = "your_table_name"  # Specify the data type for the column (use STRING or VARCHAR) column_data_type = "STRING"  # Save the DataFrame to Oracle df.write \     .format("jdbc") \     .option("url", oracle_properties["url"]) \     .option("dbtable", oracle_table_name) \     .option("user", oracle_properties["user"]) \     .option("password", oracle_properties["password"]) \     .option("driver", oracle_properties["driver"]) \     .option("createTableColumnTypes", f"column1 {column_data_type}(600)") \     .mode("overwrite") \     .save()

字符串
2-

from pyspark.sql import SparkSession  # Create a Spark session spark = SparkSession.builder.appName("example").getOrCreate()  # Assuming you have a DataFrame df df = spark.createDataFrame([(1, "text1"), (2, "text2")], ["id", "column1"])  # Specify Oracle connection properties oracle_properties = {     "user": "user",     "password": "password",     "driver": "oracle.jdbc.driver.OracleDriver",     "url": "jdbc:oracle:thin:@//ip:port/t", }  # Specify the Oracle table name oracle_table_name = "VARCHAR2_600"  # Specify the data type for the column column_data_type = "VARCHAR2(600)"  # Create the table in Oracle with the specified data type df.write \     .format("jdbc") \     .option("url", oracle_properties["url"]) \     .option("dbtable", oracle_table_name) \     .option("user", oracle_properties["user"]) \     .option("password", oracle_properties["password"]) \     .option("driver", oracle_properties["driver"]) \     .option("createTableColumnTypes", f"column1 {column_data_type}") \     .mode("overwrite") \     .save()


3-

from pyspark.sql import SparkSession
driver = "oracle.jdbc.driver.OracleDriver"
url = "jdbc:oracle:thin:@//ip:port/O"
user = "user"
password = "password"
table = "table"
spark = SparkSession.builder.appName("YourAppName").getOrCreate()
df = spark.read.format('jdbc').option('driver', driver).option('url', url).option('user', user).option('dbtable', table).option('password', password).load()

oracle_url = "jdbc:oracle:thin:@//ip1:port2/O2"
oracle_properties = {
    "user": "user",
    "password": "password",
    "driver": "oracle.jdbc.driver.OracleDriver"
}

spark = SparkSession.builder.appName("CreateTableInOracleExample").getOrCreate()
table_name = "tspcbcif_test_sample_new"

df.write.jdbc(url=oracle_url, table=table_name, mode="overwrite", properties=oracle_properties)

read_df = spark.read.jdbc(url=oracle_url, table=table_name, properties=oracle_properties)

xzv2uavs

xzv2uavs1#

第一种方法:

.option("SetBigStringTryClob", "true")

字符串
在此方法中,您允许Oracle将大字符串值直接插入数据库。
第二种方法:

df_converted = df.withColumn("column1", col("column1").cast("string").substr(1, 4000))


在此方法中,您需要访问数据库信息,以便将这些值正确插入数据库。
在使用Oracle数据库时,这些方法提供了两种不同的方法来处理大字符串值。第一种方法涉及一个特定的JDBC选项,允许Oracle处理大字符串,而第二种方法涉及在Spark中操作DataFrame以在插入数据库之前处理字符串长度约束。

相关问题