使用同一Dataframe读取配置单元表时 HiveWarehouseConnector
在计算过程中多次出现异常。
例子:
val hive = com.hortonworks.spark.sql.hive.llap.HiveWarehouseBuilder.session(spark).build()
hive.setDatabase("db")
val df_data = hive.table("table")
val df_one_col = df_data.select("col1")
val df_two_col = df_data.select("col1", "col2")
val df_res = df_two_col.join(df_one_col, "col1")
df_res.show()
我们得到一个 ArrayIndexOutOfBoundsException
任务执行期间:
20/01/15 19:46:36 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 18, host, executor 1): java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.spark.sql.vectorized.ColumnarBatch.column(ColumnarBatch.java:98)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.datasourcev2scan_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
发生异常时生成的代码如下第39行所示:
/* 031 */ ...
/* 032 */ private void datasourcev2scan_nextBatch_0() throws java.io.IOException {
/* 033 */ long getBatchStart = System.nanoTime();
/* 034 */ if (datasourcev2scan_mutableStateArray_0[0].hasNext()) {
/* 035 */ datasourcev2scan_mutableStateArray_1[0] = (org.apache.spark.sql.vectorized.ColumnarBatch)datasourcev2scan_mutableStateArray_0[0].next();
/* 036 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(datasourcev2scan_mutableStateArray_1[0].numRows());
/* 037 */ datasourcev2scan_batchIdx_0 = 0;
/* 038 */ datasourcev2scan_mutableStateArray_2[0] = (org.apache.spark.sql.vectorized.ColumnVector) datasourcev2scan_mutableStateArray_1[0].column(0);
/* 039 */ datasourcev2scan_mutableStateArray_2[1] = (org.apache.spark.sql.vectorized.ColumnVector) datasourcev2scan_mutableStateArray_1[0].column(1);
/* 040 */
/* 041 */ }
/* 042 */ datasourcev2scan_scanTime_0 += System.nanoTime() - getBatchStart;
/* 043 */ }
/* 044 */ ...
这个 ArrayIndexOutOfBoundsException
在访问第二列时抛出。
实际计划如下:
== Physical Plan ==
* (5) Project [col1#333, col2#334]
+- *(5) SortMergeJoin [col1#333], [col1#390], Inner
:- *(2) Sort [col1#333 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(col1#333, 200)
: +- *(1) DataSourceV2Scan [col1#333, col2#334], com.hortonworks.spark.sql.hive.llap.HiveWarehouseDataSourceReader@4527e4
+- *(4) Sort [col1#390 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(col1#390, 200)
+- *(3) DataSourceV2Scan [col1#390], com.hortonworks.spark.sql.hive.llap.HiveWarehouseDataSourceReader@4527e4
我们可以看到,我们使用相同的hivewarehousedatasourcereader示例。
spark日志显示,触发的两个配置单元查询只请求“col1”列。
20/01/15 19:46:32 INFO LlapBaseInputFormat: Handle ID c259528d-60ac-42d5-a201-9646335151dd: query=select `col1` from (SELECT * FROM table) as q_d89e3f8df0cd4ce3bdf4c7d938c006ad WHERE col1 IS NOT NULL
...
20/01/15 19:46:35 INFO LlapBaseInputFormat: Handle ID c259528d-60ac-42d5-a201-9646335151dd: query=select `col1` from (SELECT * FROM table) as q_76d9b5c5a5af482a8a3e671b6c8421a8 WHERE col1 IS NOT NULL
在spark逻辑规划优化过程中,列剪枝在同一时刻发生了两次 HiveWarehouseDataSourceReader
示例将所需列仅保留为“col1”。令人惊讶的是 DataSourceV2Relation
根据可变读取器的不同,是可变的。
我正在寻找一种解决方案,可以使用 HiveWarehouseConnector
.
我将hdp 3.1.0与以下组件一起使用:
-apache spark 2.3.2版
-spark llap HiveWarehouse连接器1.0.0
-Hive3.1.0
1条答案
按热度按时间2lpgd9681#
Spark2.4
DataSourceV2
特别是spark-23203数据源v2应该使用不可变树Spark2.3
在中禁用列修剪
HiveWarehouseConnector
数据源读取器。hortonworks已经修复了这个问题,正如HDP3.1.5发行说明所述。
我们可以在其hivewarehouseconnector github存储库中找到更正:
此外,hdp 3.1.5配置单元集成文档还规定:
为防止此版本中出现数据正确性问题,默认情况下禁用修剪和投影下推。
...
为了防止这些问题并确保结果正确,请不要启用修剪和下推。