acid事务对从spark添加的数据无效

slwdgvem  于 2021-06-02  发布在  Hadoop
关注(0)|答案(3)|浏览(314)

我正在尝试在配置单元中使用acid事务,但是在使用spark添加数据时遇到了一个问题。
首先,我用以下语句创建了一个表:

CREATE TABLE testdb.test(id string, col1 string)
CLUSTERED BY (id) INTO 4 BUCKETS
STORED AS ORC TBLPROPERTIES('transactional'='true');

然后我用这些查询添加了数据:

INSERT INTO testdb.test VALUES("1", "A");
INSERT INTO testdb.test VALUES("2", "B");
INSERT INTO testdb.test VALUES("3", "C");

我可以用这个查询删除行:

DELETE FROM testdb.test WHERE id="1";

所有这些都工作得很好,但是当我尝试删除用spark添加的行时出现了一个问题。
我在spark的工作(ipython):

hc = HiveContext(sc)
data = sc.parallelize([["1", "A"], ["2", "B"], ["3", "C"]])
data_df = hc.createDataFrame(data)
data_df.registerTempTable(data_df)
hc.sql("INSERT INTO testdb.test SELECT * FROM data_df");

然后,当我回到配置单元时,我就可以在这个“test”表上运行select查询了。但是,当我尝试运行与以前完全相同的delete查询时,出现以下错误(发生在reduce阶段之后):

Error: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{"reducesinkkey0":{"transactionid":0,"bucketid":-1,"rowid":0}},"value":null}
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:265)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{"reducesinkkey0":{"transactionid":0,"bucketid":-1,"rowid":0}},"value":null}
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:253)
... 7 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(FileSinkOperator.java:723)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:815)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:244)
... 7 more

我不知道这是从哪里来的,这就是为什么我要寻找的想法。
我使用的是cloudera快速启动vm(5.4.2)。配置单元版本:1.1.0 spark版本:1.3.0
下面是hive delete命令的完整输出:

hive> delete from testdb.test where id="1";

Query ID = cloudera_20160914090303_795e40b7-ab6a-45b0-8391-6d41d1cfe7bd
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 4
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1473858545651_0036, Tracking URL =http://quickstart.cloudera:8088/proxy/application_1473858545651_0036/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1473858545651_0036
Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 4
2016-09-14 09:03:55,571 Stage-1 map = 0%,  reduce = 0%
2016-09-14 09:04:14,898 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU 1.66 sec
2016-09-14 09:04:15,944 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 3.33 sec
2016-09-14 09:04:44,101 Stage-1 map = 100%,  reduce = 17%, Cumulative CPU 4.21 sec
2016-09-14 09:04:46,523 Stage-1 map = 100%,  reduce = 25%, Cumulative CPU 4.79 sec
2016-09-14 09:04:47,673 Stage-1 map = 100%,  reduce = 42%, Cumulative CPU 5.8 sec
2016-09-14 09:04:50,041 Stage-1 map = 100%,  reduce = 75%, Cumulative CPU 7.45 sec
2016-09-14 09:05:18,486 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 7.69 sec
MapReduce Total cumulative CPU time: 7 seconds 690 msec
Ended Job = job_1473858545651_0036 with errors
Error during job, obtaining debugging information...
Job Tracking URL: http://quickstart.cloudera:8088/proxy/application_1473858545651_0036/
Examining task ID: task_1473858545651_0036_m_000000 (and more) from job job_1473858545651_0036

Task with the most failures(4): 
-----
Task ID:
  task_1473858545651_0036_r_000001

URL:
http://0.0.0.0:8088/taskdetails.jsp?jobid=job_1473858545651_0036&tipid=task_1473858545651_0036_r_000001
-----
Diagnostic Messages for this Task:
Error: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{"reducesinkkey0":{"transactionid":0,"bucketid":-1,"rowid":0}},"value":null}
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:265)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{"reducesinkkey0":{"transactionid":0,"bucketid":-1,"rowid":0}},"value":null}
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:253)
... 7 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(FileSinkOperator.java:723)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:815)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.mr.ExecReducer.reduce(ExecReducer.java:244)
... 7 more

FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 2  Reduce: 4   Cumulative CPU: 7.69 sec   HDFS Read: 21558 HDFS Write: 114 FAIL
Total MapReduce CPU Time Spent: 7 seconds 690 msec

谢谢!

6l7fqoea

6l7fqoea1#

配置单元acid表的目录结构、文件格式与普通表不同。积垢需要从Hive中产生。
关于spark,正常表读取与hive acid表读取不兼容。我们无法使用本机sparkapi来读取表。
另外,目前还不支持在spark中进行更新、删除、插入。至于读取数据,可以使用连接器-http://github.com/qubole/spark-acid

nvbavucw

nvbavucw2#

我在hue上也遇到了同样的问题,但是在我从hive cli设置了这些参数之后,它就开始工作了:

set hive.support.concurrency=true
set hive.enforce.bucketing=true
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DBTxnManager;
set hive.compactor.initiator.on=true;
00jrzges

00jrzges3#

使用spark hiveacid数据源-http://github.com/qubole/spark-acid

val df = spark.read.format("HiveAcid").option("table", "testdb.test").load()
 df.collect()

spark需要与hms3.1.1一起运行,以便底层数据源可以使用必要的锁等。

相关问题