spark dataframe api中的phoenix动态列

pu3pd22g  于 2021-06-08  发布在  Hbase
关注(0)|答案(0)|浏览(261)

我正在尝试使用phoenix用动态内容填充hbase表。
所以我用动态柱做了一些实验(https://phoenix.apache.org/dynamic_columns.html)在 sqlline 工具。
e、 g.我依次运行以下查询:

CREATE TABLE t1 (a VARCHAR, b INTEGER PRIMARY KEY);

UPSERT INTO t1 VALUES ('test1', 1);

// Insert new row with a third column 'c'
UPSERT INTO t1 (a, b, c INTEGER) VALUES('test2', 2, 3);

SELECT * FROM t1;
+--------+----+
|   A    | B  |
+--------+----+
| test1  | 1  |
| test2  | 2  |
+--------+----+

// Add the new column
ALTER TABLE t1 ADD IF NOT EXISTS c INTEGER;

SELECT * FROM t1;  
+--------+----+-------+
|   A    | B  |   C   |
+--------+----+-------+
| test1  | 1  | null  |
| test2  | 2  | 3     |
+--------+----+-------+

这是向现有表中添加不同数据的好方法,例如,如果需要新的列(例如,由于新的数据架构)。
这对我来说很好 sqlline 工具,但现在我想用 Phoenix API 在我的spark应用程序中保存不同的 DataFrames 到我的hbase表。
保存 DataFrame 对象包含与表本身相同的列,则一切正常。但当我尝试添加另一个 DataFrame 有了一些新的专栏,我得到了一个例外。
我的命令是:

val readDF = sqlContext.read.format("org.apache.phoenix.spark").option("table", "T1").option("zkUrl", "jdbc:phoenix:zim1ext-vm.et-it.hs-offenburg.de:2181:/hbase-unsecure").load()
readDF.printSchema

readDF: org.apache.spark.sql.DataFrame = [A: string, B: int, C: int]
root
 |-- A: string (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: integer (nullable = true)

val newDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", ";").load("/public/test4.csv")
newDF.printSchema

newDF: org.apache.spark.sql.DataFrame = [B: int, C: int, D: string]
root
 |-- B: integer (nullable = true)
 |-- C: integer (nullable = true)
 |-- D: string (nullable = true)      // This is a new column!!!

newDF.write.format("org.apache.phoenix.spark")
  .mode("overwrite")
  .option("table", "T1")
  .option("zkUrl", "jdbc:phoenix:server-url:2181:/hbase-unsecure")
  .save()

我想,那Spark可以 UPSERT 新的列 DataFrame 对象,但我得到一个例外:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 4 times, most recent failure: Lost task 0.3 in stage 14.0 (TID 31, server-url): java.sql.SQLException: Unable to resolve these column names:
D
Available columns with column families:
0.A,B,0.C
    at org.apache.phoenix.util.PhoenixRuntime.generateColumnInfo(PhoenixRuntime.java:475)
    at org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getUpsertColumnMetadataList(PhoenixConfigurationUtil.java:252)
    at org.apache.phoenix.spark.DataFrameFunctions$$anonfun$1.apply(DataFrameFunctions.scala:48)
    at org.apache.phoenix.spark.DataFrameFunctions$$anonfun$1.apply(DataFrameFunctions.scala:44)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1144)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1074)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1074)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1074)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:994)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:985)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:985)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopFile(PairRDDFunctions.scala:985)
    at org.apache.phoenix.spark.DataFrameFunctions.saveToPhoenix(DataFrameFunctions.scala:58)
    at org.apache.phoenix.spark.DataFrameFunctions.saveToPhoenix(DataFrameFunctions.scala:27)
    at org.apache.phoenix.spark.DefaultSource.createRelation(DefaultSource.scala:47)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
    at $iwC$$iwC$$iwC.<init>(<console>:45)
    at $iwC$$iwC.<init>(<console>:47)
    at $iwC.<init>(<console>:49)
    at <init>(<console>:51)
    at .<init>(<console>:55)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at sun.reflect.GeneratedMethodAccessor35.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.zeppelin.spark.Utils.invokeMethod(Utils.java:38)
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:991)
    at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:1197)
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:1164)
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:1157)
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:101)
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:502)
    at org.apache.zeppelin.scheduler.Job.run(Job.java:175)
    at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:139)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: Unable to resolve these column names:
D
Available columns with column families:
0.A,B,0.C
    at org.apache.phoenix.util.PhoenixRuntime.generateColumnInfo(PhoenixRuntime.java:475)
    at org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getUpsertColumnMetadataList(PhoenixConfigurationUtil.java:252)
    at org.apache.phoenix.spark.DataFrameFunctions$$anonfun$1.apply(DataFrameFunctions.scala:48)
    at org.apache.phoenix.spark.DataFrameFunctions$$anonfun$1.apply(DataFrameFunctions.scala:44)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    ... 3 more

我现在的问题是:有没有可能在spark的phoenix api中使用dataframes向hbase表添加新列?也许是一个 .option("dynamic-columns", "true") 还是类似的?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题