postgresql PySpark -插入ARRAY类型到Postgres可能吗?

t1qtbnec  于 2023-11-18  发布在  PostgreSQL
关注(0)|答案(2)|浏览(168)

我在PySpark中有一个字符串数组的数据:

+----------+----------+
|   user_id|   actions|
+----------+----------+
|         1|     [a,b]|
|         2|       [b]|
|         3|     [a,b]|
+----------+----------+

字符串
我想把它插入Postgres:

# remember to use full URL, i.e. jdbc:postgresql://
db_url = "jdbc:postgresql://localhost:5433/test_db"
table = "user_actions"

(
    df
    .write
    .format("jdbc")
    .option("url", db_url)
    .option("dbtable", table)
    .option("user", "postgres")
    .option("password", "postgres")
    .mode("overwrite")
    .save()
)


我得到一个错误:

Py4JJavaError: An error occurred while calling o365.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 8) (192.168.0.80 executor driver): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.collection.convert.Wrappers$JListWrapper is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, member_id), LongType) AS member_id#374L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, community_id), LongType) AS community_id#375L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, added), StringType), true, false) AS added#376
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, modified), StringType), true, false) AS modified#377
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, removed), StringType), true, false) AS removed#378
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, event_date), TimestampType), true, false) AS event_date#379
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, _processable), BooleanType) AS _processable#380
    at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:193)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:708)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:890)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:888)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1020)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1020)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: scala.collection.convert.Wrappers$JListWrapper is not a valid external type for schema of string
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:207)
    ... 21 more


所以它不喜欢数组。我试着显式地添加这个:

.option("createTableColumnTypes", "user_id INTEGER, actions ARRAY")


有了这个,我得到:

ParseException: 
DataType array is not supported.(line 1, pos 47)

== SQL ==
user_id INTEGER, actions ARRAY
-------------------------^^^


我看过a similar question,但答案不起作用(我使用Postgres驱动程序)。
可以从PySpark向Postgres中插入字符串数组吗?如果可以,我该怎么做?

soat7uwm

soat7uwm1#

我试图为postgres创建一个customType表,遇到了类似的问题。有一个BIGINT[]类型的列。没有找到解决方法。

df.write
    .mode(SaveMode.Overwrite)
    .option("numPartitions", 4)
    .option("driver", "org.postgresql.Driver")
    .option("createTableColumnTypes", "array_col ARRAY<BIGINT>, id_column int")
    .jdbc(...)

字符串
如果我在def中输入BIGINT[],则会得到一个org.apache.spark.sql. catalytic.parser.ParseException:[PARSE_SYNTAX_ERROR]错误,该错误位于或接近'BIGINT'。
如果我输入ARRAY或ARRAY,解析器看起来工作正常,但确实得到了一个错误,因为org.postgresql.util.PSQLException:ERROR:语法错误在“array”或“array”附近
但是,如果我没有指定数组TableColumnTypes,则可以使用数组类型创建表。

fzsnzjdm

fzsnzjdm2#

实际上我犯了一个新手错误--我在df中重用了旧的变量,但却忽略了它。在修复了这个问题(基本上改变了变量名)之后,text[]类型就创建了。

相关问题