我在PySpark中有一个字符串数组的数据:
+----------+----------+
| user_id| actions|
+----------+----------+
| 1| [a,b]|
| 2| [b]|
| 3| [a,b]|
+----------+----------+
字符串
我想把它插入Postgres:
# remember to use full URL, i.e. jdbc:postgresql://
db_url = "jdbc:postgresql://localhost:5433/test_db"
table = "user_actions"
(
df
.write
.format("jdbc")
.option("url", db_url)
.option("dbtable", table)
.option("user", "postgres")
.option("password", "postgres")
.mode("overwrite")
.save()
)
型
我得到一个错误:
Py4JJavaError: An error occurred while calling o365.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 8) (192.168.0.80 executor driver): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.collection.convert.Wrappers$JListWrapper is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, member_id), LongType) AS member_id#374L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, community_id), LongType) AS community_id#375L
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, added), StringType), true, false) AS added#376
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, modified), StringType), true, false) AS modified#377
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, removed), StringType), true, false) AS removed#378
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, event_date), TimestampType), true, false) AS event_date#379
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, _processable), BooleanType) AS _processable#380
at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:193)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:708)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:890)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:888)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1020)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1020)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: scala.collection.convert.Wrappers$JListWrapper is not a valid external type for schema of string
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:207)
... 21 more
型
所以它不喜欢数组。我试着显式地添加这个:
.option("createTableColumnTypes", "user_id INTEGER, actions ARRAY")
型
有了这个,我得到:
ParseException:
DataType array is not supported.(line 1, pos 47)
== SQL ==
user_id INTEGER, actions ARRAY
-------------------------^^^
型
我看过a similar question,但答案不起作用(我使用Postgres驱动程序)。
可以从PySpark向Postgres中插入字符串数组吗?如果可以,我该怎么做?
2条答案
按热度按时间soat7uwm1#
我试图为postgres创建一个customType表,遇到了类似的问题。有一个BIGINT[]类型的列。没有找到解决方法。
字符串
如果我在def中输入BIGINT[],则会得到一个org.apache.spark.sql. catalytic.parser.ParseException:[PARSE_SYNTAX_ERROR]错误,该错误位于或接近'BIGINT'。
如果我输入ARRAY或ARRAY,解析器看起来工作正常,但确实得到了一个错误,因为org.postgresql.util.PSQLException:ERROR:语法错误在“array”或“array”附近
但是,如果我没有指定数组TableColumnTypes,则可以使用数组类型创建表。
fzsnzjdm2#
实际上我犯了一个新手错误--我在
df
中重用了旧的变量,但却忽略了它。在修复了这个问题(基本上改变了变量名)之后,text[]
类型就创建了。