使用组查询时,greenplum spark connector无法保存greenplum表

2exbekwf  于 2021-05-26  发布在  Spark
关注(0)|答案(0)|浏览(668)

我用Spark在青梅创建了一个青梅表。greenplum表是在使用条件查询(where)创建表时创建的,但是当我使用组查询创建表时,得到下面的错误消息。如何创建表?
spark版本是:spark-2.4.4-bin-hadoop2.6 greenplum版本是:3.6连接器是:greenplum-connector-spark_2.11-2.1.0.jar/greenplum-spark_2.11-1.6.2.jar
greenplum创建表

drop table if exists test.sys_user;
create table test.sys_user(id bigint primary key,name varchar(50), age int,create_time timestamp,update_time timestamp) distributed by(id);

insert into test.sys_user values(1,'hello1',21,now(),now());
insert into test.sys_user values(2,'hello2',22,now(),now());
insert into test.sys_user values(3,'hello3',23,now(),now());
insert into test.sys_user values(4,'hello4',24,now(),now());
insert into test.sys_user values(5,'hello5',25,now(),now());
insert into test.sys_user values(6,'hello6',26,now(),now());

Spark代码

val gsom = Map(
      "url" -> "jdbc:postgresql://127.0.0.1:5432/iot_data",
      "user" -> "gpuser",
      "password" -> "xxxx",
      "dbschema" -> "p10000",
      "dbtable" -> "sys_user",
      "partitionColumn" -> "id"
)

val gpdf = spark.read.format("greenplum").options(gsom).load()

val gsom2 = Map(
      "url" -> "jdbc:postgresql://127.0.0.1:5432/iot_data",
      "user" -> "gpuser",
      "password" -> "xxxx",
      "dbschema" -> "p10000",
      "dbtable" -> "sys_user3",
      "partitionColumn" -> "age"
)

val gpdf2 = gpdf.groupBy("age").agg(avg("id")).sort("age")

gpdf2.write.format("greenplum").options(gsom2).save()

日志是

20/12/07 16:38:15.361 WARN TaskSetManager: Lost task 0.0 in stage 103.0 (TID 120, 192.168.10.175, executor 0): org.postgresql.util.PSQLException: ERROR: http response code 500 from gpfdist (gpfdist://192.168.10.175:4305/spark_c860d531982ab11e_3d9d854163f8f07a_0_204): HTTP/1.1 500 Server Error  (seg7 slice1 192.168.10.175:6007 pid=2135)
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2183)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:308)
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:441)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:365)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:307)
    at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:293)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:270)
    at org.postgresql.jdbc.PgStatement.executeUpdate(PgStatement.java:244)
    at com.zaxxer.hikari.pool.ProxyStatement.executeUpdate(ProxyStatement.java:117)
    at com.zaxxer.hikari.pool.HikariProxyStatement.executeUpdate(HikariProxyStatement.java)
    at io.pivotal.greenplum.spark.SqlExecutor$$anonfun$executeUpdate$2.apply(SqlExecutor.scala:38)
    at io.pivotal.greenplum.spark.SqlExecutor$$anonfun$executeUpdate$2.apply(SqlExecutor.scala:36)
    at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:52)
    at resource.AbstractManagedResource$$anonfun$5.apply(AbstractManagedResource.scala:88)
    at scala.util.control.Exception$Catch$$anonfun$either$1.apply(Exception.scala:125)
    at scala.util.control.Exception$Catch$$anonfun$either$1.apply(Exception.scala:125)
    at scala.util.control.Exception$Catch.apply(Exception.scala:103)
    at scala.util.control.Exception$Catch.either(Exception.scala:125)
    at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88)
    at resource.DeferredExtractableManagedResource.acquireFor(AbstractManagedResource.scala:27)
    at resource.ManagedResourceOperations$class.apply(ManagedResourceOperations.scala:26)
    at resource.DeferredExtractableManagedResource.apply(AbstractManagedResource.scala:24)
    at resource.DeferredExtractableManagedResource$$anonfun$tried$1.apply(AbstractManagedResource.scala:33)
    at scala.util.Try$.apply(Try.scala:192)
    at resource.DeferredExtractableManagedResource.tried(AbstractManagedResource.scala:33)
    at io.pivotal.greenplum.spark.SqlExecutor.tryFromManaged(SqlExecutor.scala:66)
    at io.pivotal.greenplum.spark.SqlExecutor.executeUpdate(SqlExecutor.scala:35)
    at io.pivotal.greenplum.spark.externaltable.GreenplumTableManager.copyTableFromExternal(GreenplumTableManager.scala:136)
    at io.pivotal.greenplum.spark.externaltable.GreenplumDataMover$$anonfun$1.apply(GreenplumDataMover.scala:49)
    at io.pivotal.greenplum.spark.externaltable.GreenplumDataMover$$anonfun$1.apply(GreenplumDataMover.scala:46)
    at scala.util.Success.flatMap(Try.scala:231)
    at io.pivotal.greenplum.spark.externaltable.GreenplumDataMover.moveData(GreenplumDataMover.scala:46)
    at io.pivotal.greenplum.spark.externaltable.PartitionWriter$$anonfun$getClosure$1$$anonfun$2.apply(PartitionWriter.scala:39)
    at io.pivotal.greenplum.spark.externaltable.PartitionWriter$$anonfun$getClosure$1$$anonfun$2.apply(PartitionWriter.scala:37)
    at resource.AbstractManagedResource$$anonfun$5.apply(AbstractManagedResource.scala:88)
    at scala.util.control.Exception$Catch$$anonfun$either$1.apply(Exception.scala:125)
    at scala.util.control.Exception$Catch$$anonfun$either$1.apply(Exception.scala:125)
    at scala.util.control.Exception$Catch.apply(Exception.scala:103)
    at scala.util.control.Exception$Catch.either(Exception.scala:125)
    at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88)
    at resource.ManagedResourceOperations$class.apply(ManagedResourceOperations.scala:26)
    at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50)
    at resource.DeferredExtractableManagedResource$$anonfun$tried$1.apply(AbstractManagedResource.scala:33)
    at scala.util.Try$.apply(Try.scala:192)
    at resource.DeferredExtractableManagedResource.tried(AbstractManagedResource.scala:33)
    at io.pivotal.greenplum.spark.externaltable.PartitionWriter$$anonfun$getClosure$1.apply(PartitionWriter.scala:45)
    at io.pivotal.greenplum.spark.externaltable.PartitionWriter$$anonfun$getClosure$1.apply(PartitionWriter.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题