我用Spark在青梅创建了一个青梅表。greenplum表是在使用条件查询(where)创建表时创建的,但是当我使用组查询创建表时,得到下面的错误消息。如何创建表?
spark版本是:spark-2.4.4-bin-hadoop2.6 greenplum版本是:3.6连接器是:greenplum-connector-spark_2.11-2.1.0.jar/greenplum-spark_2.11-1.6.2.jar
greenplum创建表
drop table if exists test.sys_user;
create table test.sys_user(id bigint primary key,name varchar(50), age int,create_time timestamp,update_time timestamp) distributed by(id);
insert into test.sys_user values(1,'hello1',21,now(),now());
insert into test.sys_user values(2,'hello2',22,now(),now());
insert into test.sys_user values(3,'hello3',23,now(),now());
insert into test.sys_user values(4,'hello4',24,now(),now());
insert into test.sys_user values(5,'hello5',25,now(),now());
insert into test.sys_user values(6,'hello6',26,now(),now());
Spark代码
val gsom = Map(
"url" -> "jdbc:postgresql://127.0.0.1:5432/iot_data",
"user" -> "gpuser",
"password" -> "xxxx",
"dbschema" -> "p10000",
"dbtable" -> "sys_user",
"partitionColumn" -> "id"
)
val gpdf = spark.read.format("greenplum").options(gsom).load()
val gsom2 = Map(
"url" -> "jdbc:postgresql://127.0.0.1:5432/iot_data",
"user" -> "gpuser",
"password" -> "xxxx",
"dbschema" -> "p10000",
"dbtable" -> "sys_user3",
"partitionColumn" -> "age"
)
val gpdf2 = gpdf.groupBy("age").agg(avg("id")).sort("age")
gpdf2.write.format("greenplum").options(gsom2).save()
日志是
20/12/07 16:38:15.361 WARN TaskSetManager: Lost task 0.0 in stage 103.0 (TID 120, 192.168.10.175, executor 0): org.postgresql.util.PSQLException: ERROR: http response code 500 from gpfdist (gpfdist://192.168.10.175:4305/spark_c860d531982ab11e_3d9d854163f8f07a_0_204): HTTP/1.1 500 Server Error (seg7 slice1 192.168.10.175:6007 pid=2135)
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2440)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2183)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:308)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:441)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:365)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:307)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:293)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:270)
at org.postgresql.jdbc.PgStatement.executeUpdate(PgStatement.java:244)
at com.zaxxer.hikari.pool.ProxyStatement.executeUpdate(ProxyStatement.java:117)
at com.zaxxer.hikari.pool.HikariProxyStatement.executeUpdate(HikariProxyStatement.java)
at io.pivotal.greenplum.spark.SqlExecutor$$anonfun$executeUpdate$2.apply(SqlExecutor.scala:38)
at io.pivotal.greenplum.spark.SqlExecutor$$anonfun$executeUpdate$2.apply(SqlExecutor.scala:36)
at scala.Function1$$anonfun$andThen$1.apply(Function1.scala:52)
at resource.AbstractManagedResource$$anonfun$5.apply(AbstractManagedResource.scala:88)
at scala.util.control.Exception$Catch$$anonfun$either$1.apply(Exception.scala:125)
at scala.util.control.Exception$Catch$$anonfun$either$1.apply(Exception.scala:125)
at scala.util.control.Exception$Catch.apply(Exception.scala:103)
at scala.util.control.Exception$Catch.either(Exception.scala:125)
at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88)
at resource.DeferredExtractableManagedResource.acquireFor(AbstractManagedResource.scala:27)
at resource.ManagedResourceOperations$class.apply(ManagedResourceOperations.scala:26)
at resource.DeferredExtractableManagedResource.apply(AbstractManagedResource.scala:24)
at resource.DeferredExtractableManagedResource$$anonfun$tried$1.apply(AbstractManagedResource.scala:33)
at scala.util.Try$.apply(Try.scala:192)
at resource.DeferredExtractableManagedResource.tried(AbstractManagedResource.scala:33)
at io.pivotal.greenplum.spark.SqlExecutor.tryFromManaged(SqlExecutor.scala:66)
at io.pivotal.greenplum.spark.SqlExecutor.executeUpdate(SqlExecutor.scala:35)
at io.pivotal.greenplum.spark.externaltable.GreenplumTableManager.copyTableFromExternal(GreenplumTableManager.scala:136)
at io.pivotal.greenplum.spark.externaltable.GreenplumDataMover$$anonfun$1.apply(GreenplumDataMover.scala:49)
at io.pivotal.greenplum.spark.externaltable.GreenplumDataMover$$anonfun$1.apply(GreenplumDataMover.scala:46)
at scala.util.Success.flatMap(Try.scala:231)
at io.pivotal.greenplum.spark.externaltable.GreenplumDataMover.moveData(GreenplumDataMover.scala:46)
at io.pivotal.greenplum.spark.externaltable.PartitionWriter$$anonfun$getClosure$1$$anonfun$2.apply(PartitionWriter.scala:39)
at io.pivotal.greenplum.spark.externaltable.PartitionWriter$$anonfun$getClosure$1$$anonfun$2.apply(PartitionWriter.scala:37)
at resource.AbstractManagedResource$$anonfun$5.apply(AbstractManagedResource.scala:88)
at scala.util.control.Exception$Catch$$anonfun$either$1.apply(Exception.scala:125)
at scala.util.control.Exception$Catch$$anonfun$either$1.apply(Exception.scala:125)
at scala.util.control.Exception$Catch.apply(Exception.scala:103)
at scala.util.control.Exception$Catch.either(Exception.scala:125)
at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88)
at resource.ManagedResourceOperations$class.apply(ManagedResourceOperations.scala:26)
at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50)
at resource.DeferredExtractableManagedResource$$anonfun$tried$1.apply(AbstractManagedResource.scala:33)
at scala.util.Try$.apply(Try.scala:192)
at resource.DeferredExtractableManagedResource.tried(AbstractManagedResource.scala:33)
at io.pivotal.greenplum.spark.externaltable.PartitionWriter$$anonfun$getClosure$1.apply(PartitionWriter.scala:45)
at io.pivotal.greenplum.spark.externaltable.PartitionWriter$$anonfun$getClosure$1.apply(PartitionWriter.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
暂无答案!
目前还没有任何答案,快来回答吧!