我在将数据从一个数据库传输到另一个数据库时遇到了一个挑战。当我使用Spark从源数据库读取表时,我注意到数据类型正在发生变化。例如,VARCHAR 2(900)/VARCHAR 2(4000)正在转换为str。当我试图将数据插入第二个数据库时,这种转换会产生问题。我尝试了各种方法来解决这个问题,但并没有取得令人满意的效果。
我的目标是自动创建表,使两个数据库中的两个表具有完全相同的数据类型。如果您对此问题有任何见解或解决方案,我将不胜感激。
数据中的结果类型:
x1c 0d1x的数据
插入数据的结果类型:
的
1-
from pyspark.sql import SparkSession # Create a Spark session spark = SparkSession.builder.appName("example").getOrCreate() # Assuming you have a DataFrame df df = spark.createDataFrame([(1, "text1"), (2, "text2")], ["id", "column1"]) # Specify Oracle connection properties oracle_properties = { "user": "user", "password": "password", "driver": "oracle.jdbc.driver.OracleDriver", "url": "jdbc:oracle:thin:@//ip:port/t", } # Specify the Oracle table name oracle_table_name = "your_table_name" # Specify the data type for the column (use STRING or VARCHAR) column_data_type = "STRING" # Save the DataFrame to Oracle df.write \ .format("jdbc") \ .option("url", oracle_properties["url"]) \ .option("dbtable", oracle_table_name) \ .option("user", oracle_properties["user"]) \ .option("password", oracle_properties["password"]) \ .option("driver", oracle_properties["driver"]) \ .option("createTableColumnTypes", f"column1 {column_data_type}(600)") \ .mode("overwrite") \ .save()
字符串
2-
from pyspark.sql import SparkSession # Create a Spark session spark = SparkSession.builder.appName("example").getOrCreate() # Assuming you have a DataFrame df df = spark.createDataFrame([(1, "text1"), (2, "text2")], ["id", "column1"]) # Specify Oracle connection properties oracle_properties = { "user": "user", "password": "password", "driver": "oracle.jdbc.driver.OracleDriver", "url": "jdbc:oracle:thin:@//ip:port/t", } # Specify the Oracle table name oracle_table_name = "VARCHAR2_600" # Specify the data type for the column column_data_type = "VARCHAR2(600)" # Create the table in Oracle with the specified data type df.write \ .format("jdbc") \ .option("url", oracle_properties["url"]) \ .option("dbtable", oracle_table_name) \ .option("user", oracle_properties["user"]) \ .option("password", oracle_properties["password"]) \ .option("driver", oracle_properties["driver"]) \ .option("createTableColumnTypes", f"column1 {column_data_type}") \ .mode("overwrite") \ .save()
型
3-
from pyspark.sql import SparkSession
driver = "oracle.jdbc.driver.OracleDriver"
url = "jdbc:oracle:thin:@//ip:port/O"
user = "user"
password = "password"
table = "table"
spark = SparkSession.builder.appName("YourAppName").getOrCreate()
df = spark.read.format('jdbc').option('driver', driver).option('url', url).option('user', user).option('dbtable', table).option('password', password).load()
oracle_url = "jdbc:oracle:thin:@//ip1:port2/O2"
oracle_properties = {
"user": "user",
"password": "password",
"driver": "oracle.jdbc.driver.OracleDriver"
}
spark = SparkSession.builder.appName("CreateTableInOracleExample").getOrCreate()
table_name = "tspcbcif_test_sample_new"
df.write.jdbc(url=oracle_url, table=table_name, mode="overwrite", properties=oracle_properties)
read_df = spark.read.jdbc(url=oracle_url, table=table_name, properties=oracle_properties)
型
1条答案
按热度按时间xzv2uavs1#
第一种方法:
字符串
在此方法中,您允许Oracle将大字符串值直接插入数据库。
第二种方法:
型
在此方法中,您需要访问数据库信息,以便将这些值正确插入数据库。
在使用Oracle数据库时,这些方法提供了两种不同的方法来处理大字符串值。第一种方法涉及一个特定的JDBC选项,允许Oracle处理大字符串,而第二种方法涉及在Spark中操作DataFrame以在插入数据库之前处理字符串长度约束。