我已经在kubernetes上实现了apache zeppelin 0.10.1。它使用Spark 3.2.1版本。
我的问题是执行者在 Shuffle 时不能相互通信,但仍然可以与驱动程序交换数据。因此,问题可能是网络问题、权限问题或数据序列化问题。
数据序列化问题,因为驱动程序在其日志文件中指出,在执行器错误弹出其日志文件之前的几秒钟,序列化数据时出现了问题。
Spark以客户端模式运行,每个执行器都在自己的pod中运行。
下面是spark-driver日志文件:
Interpreter download command: /usr/lib/jvm/java-8-openjdk-amd64/bin/java
-Dfile.encoding=UTF-8 -Dlog4j.configuration=file:///opt/zeppelin/conf/log4j.properties
-Dlog4j.configurationFile=file:///opt/zeppelin/conf/log4j2.properties
-Dzeppelin.log.file=/opt/zeppelin/logs/zeppelin-interpreter-spark-shared_process--spark-
gxaleg.log -cp :/opt/zeppelin/interpreter/spark/*:::/opt/zeppelin/interpreter/zeppelin-
interpreter-shaded-0.10.1.jar:/opt/zeppelin/interpreter/spark/spark-interpreter-0.10.1.jar
org.apache.zeppelin.interpreter.remote.RemoteInterpreterDownloader zeppelin-
server.zeppelin.svc 12320 spark /tmp/local-repo/spark
.
.(deleted those lines because of stackoverflow line restrictions)
.
ERROR [2022-06-10 10:01:56,657] ({ParallelScheduler-Worker-1} Logging.scala[logError]:94) - failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 163, Column 76: Cannot determine simple type name "$line34105858122"
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 163, Column 76: Cannot determine simple type name "$line34105858122"
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12021)
.. (i deleted some lines)
at java.lang.Thread.run(Thread.java:748)
INFO [2022-06-10 10:01:56,685] ({ParallelScheduler-Worker-1} Logging.scala[logInfo]:57) -
/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private boolean agg_initAgg_0;
/* 010 */ private boolean agg_bufIsNull_0;
/* 011 */ private long agg_bufValue_0;
/* 012 */ private agg_FastHashMap_0 agg_fastHashMap_0;
/* 013 */ private org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> agg_fastHashMapIter_0;
/* 014 */ private org.apache.spark.unsafe.KVIterator agg_mapIter_0;
/* 015 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap_0;
/* 016 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter_0;
/* 017 */ private scala.collection.Iterator inputadapter_input_0;
/* 018 */ private boolean serializefromobject_resultIsNull_0;
/* 019 */ private boolean serializefromobject_resultIsNull_1;
/* 020 */ private boolean serializefromobject_resultIsNull_2;
/* 021 */ private java.lang.String[] serializefromobject_mutableStateArray_0 = new java.lang.String[3];
/* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] serializefromobject_mutableStateArray_1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[6];
/* 023 */
/* 024 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 025 */ this.references = references;
/* 026 */ }
/* 027 */
/* 028 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 029 */ partitionIndex = index;
/* 030 */ this.inputs = inputs;
/* 031 */
/* 032 */ inputadapter_input_0 = inputs[0];
/* 033 */ serializefromobject_mutableStateArray_1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(5, 96);
/* 034 */ serializefromobject_mutableStateArray_1[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(5, 96);
/* 035 */ serializefromobject_mutableStateArray_1[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 036 */ serializefromobject_mutableStateArray_1[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 037 */ serializefromobject_mutableStateArray_1[4] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 038 */ serializefromobject_mutableStateArray_1[5] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 039 */
/* 040 */ }
/* 041 */
/* 042 */ public class agg_FastHashMap_0 {
/* 043 */ private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
/* 044 */ private int[] buckets;
/* 045 */ private int capacity = 1 << 16;
/* 046 */ private double loadFactor = 0.5;
/* 047 */ private int numBuckets = (int) (capacity / loadFactor);
/* 048 */ private int maxSteps = 2;
/* 049 */ private int numRows = 0;
/* 050 */ private Object emptyVBase;
/* 051 */ private long emptyVOff;
/* 052 */ private int emptyVLen;
/* 053 */ private boolean isBatchFull = false;
/* 054 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
/* 055 */
/* 056 */ public agg_FastHashMap_0(
/* 057 */ org.apache.spark.memory.TaskMemoryManager taskMemoryManager,
/* 058 */ InternalRow emptyAggregationBuffer) {
/* 059 */ batch = org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch
/* 060 */ .allocate(((org.apache.spark.sql.types.StructType) references[1] /* keySchemaTerm */), ((org.apache.spark.sql.types.StructType) references[2] /* valueSchemaTerm */), taskMemoryManager, capacity);
/* 061 */
/* 062 */ final UnsafeProjection valueProjection = UnsafeProjection.create(((org.apache.spark.sql.types.StructType) references[2] /* valueSchemaTerm */));
/* 063 */ final byte[] emptyBuffer = valueProjection.apply(emptyAggregationBuffer).getBytes();
/* 064 */
/* 065 */ emptyVBase = emptyBuffer;
/* 066 */ emptyVOff = Platform.BYTE_ARRAY_OFFSET;
/* 067 */ emptyVLen = emptyBuffer.length;
/* 068 */
/* 069 */ agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
/* 070 */ 1, 0);
/* 071 */
/* 072 */ buckets = new int[numBuckets];
/* 073 */ java.util.Arrays.fill(buckets, -1);
/* 074 */ }
/* 075 */
/* 076 */ public org.apache.spark.sql.catalyst.expressions.UnsafeRow findOrInsert(int agg_key_0) {
/* 077 */ long h = hash(agg_key_0);
/* 078 */ int step = 0;
/* 079 */ int idx = (int) h & (numBuckets - 1);
/* 080 */ while (step < maxSteps) {
/* 081 */ // Return bucket index if it's either an empty slot or already contains the key
/* 082 */ if (buckets[idx] == -1) {
/* 083 */ if (numRows < capacity && !isBatchFull) {
/* 084 */ agg_rowWriter.reset();
/* 085 */ agg_rowWriter.zeroOutNullBytes();
/* 086 */ agg_rowWriter.write(0, agg_key_0);
/* 087 */ org.apache.spark.sql.catalyst.expressions.UnsafeRow agg_result
/* 088 */ = agg_rowWriter.getRow();
/* 089 */ Object kbase = agg_result.getBaseObject();
/* 090 */ long koff = agg_result.getBaseOffset();
/* 091 */ int klen = agg_result.getSizeInBytes();
/* 092 */
/* 093 */ UnsafeRow vRow
/* 094 */ = batch.appendRow(kbase, koff, klen, emptyVBase, emptyVOff, emptyVLen);
/* 095 */ if (vRow == null) {
/* 096 */ isBatchFull = true;
/* 097 */ } else {
/* 098 */ buckets[idx] = numRows++;
/* 099 */ }
/* 100 */ return vRow;
/* 101 */ } else {
/* 102 */ // No more space
/* 103 */ return null;
/* 104 */ }
/* 105 */ } else if (equals(idx, agg_key_0)) {
/* 106 */ return batch.getValueRow(buckets[idx]);
/* 107 */ }
/* 108 */ idx = (idx + 1) & (numBuckets - 1);
/* 109 */ step++;
/* 110 */ }
/* 111 */ // Didn't find it
/* 112 */ return null;
/* 113 */ }
/* 114 */
/* 115 */ private boolean equals(int idx, int agg_key_0) {
/* 116 */ UnsafeRow row = batch.getKeyRow(buckets[idx]);
/* 117 */ return (row.getInt(0) == agg_key_0);
/* 118 */ }
/* 119 */
/* 120 */ private long hash(int agg_key_0) {
/* 121 */ long agg_hash_0 = 0;
/* 122 */
/* 123 */ int agg_result_0 = agg_key_0;
/* 124 */ agg_hash_0 = (agg_hash_0 ^ (0x9e3779b9)) + agg_result_0 + (agg_hash_0 << 6) + (agg_hash_0 >>> 2);
/* 125 */
/* 126 */ return agg_hash_0;
/* 127 */ }
/* 128 */
/* 129 */ public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
/* 130 */ return batch.rowIterator();
/* 131 */ }
/* 132 */
/* 133 */ public void close() {
/* 134 */ batch.close();
/* 135 */ }
/* 136 */
/* 137 */ }
/* 138 */
/* 139 */ private void agg_doAggregateWithKeysOutput_0(UnsafeRow agg_keyTerm_0, UnsafeRow agg_bufferTerm_0)
/* 140 */ throws java.io.IOException {
/* 141 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[13] /* numOutputRows */).add(1);
/* 142 */
/* 143 */ boolean agg_isNull_7 = agg_keyTerm_0.isNullAt(0);
/* 144 */ int agg_value_8 = agg_isNull_7 ?
/* 145 */ -1 : (agg_keyTerm_0.getInt(0));
/* 146 */ long agg_value_9 = agg_bufferTerm_0.getLong(0);
/* 147 */
/* 148 */ serializefromobject_mutableStateArray_1[5].reset();
/* 149 */
/* 150 */ serializefromobject_mutableStateArray_1[5].zeroOutNullBytes();
/* 151 */
/* 152 */ if (agg_isNull_7) {
/* 153 */ serializefromobject_mutableStateArray_1[5].setNullAt(0);
/* 154 */ } else {
/* 155 */ serializefromobject_mutableStateArray_1[5].write(0, agg_value_8);
/* 156 */ }
/* 157 */
/* 158 */ serializefromobject_mutableStateArray_1[5].write(1, agg_value_9);
/* 159 */ append((serializefromobject_mutableStateArray_1[5].getRow()));
/* 160 */
/* 161 */ }
/* 162 */
/* 163 */ private void serializefromobject_doConsume_0(InternalRow inputadapter_row_0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank serializefromobject_expr_0_0, boolean serializefromobject_exprIsNull_0_0) throws java.io.IOException {
/* 164 */ do {
/* 165 */ if (serializefromobject_exprIsNull_0_0) {
/* 166 */ throw new NullPointerException(((java.lang.String) references[7] /* errMsg */));
/* 167 */ }
/* 168 */ boolean serializefromobject_isNull_1 = true;
/* 169 */ java.lang.Integer serializefromobject_value_1 = null;
/* 170 */ serializefromobject_isNull_1 = false;
/* 171 */ if (!serializefromobject_isNull_1) {
/* 172 */ Object serializefromobject_funcResult_0 = null;
/* 173 */ serializefromobject_funcResult_0 = serializefromobject_expr_0_0.age();
/* 174 */
/* 175 */ if (serializefromobject_funcResult_0 != null) {
/* 176 */ serializefromobject_value_1 = (java.lang.Integer) serializefromobject_funcResult_0;
/* 177 */ } else {
/* 178 */ serializefromobject_isNull_1 = true;
/* 179 */ }
/* 180 */
/* 181 */ }
/* 182 */ boolean serializefromobject_isNull_0 = true;
/* 183 */ int serializefromobject_value_0 = -1;
/* 184 */ if (!serializefromobject_isNull_1) {
/* 185 */ serializefromobject_isNull_0 = false;
/* 186 */ if (!serializefromobject_isNull_0) {
/* 187 */ serializefromobject_value_0 = serializefromobject_value_1.intValue();
/* 188 */ }
/* 189 */ }
/* 190 */
/* 191 */ boolean filter_value_2 = !serializefromobject_isNull_0;
/* 192 */ if (!filter_value_2) continue;
/* 193 */
/* 194 */ boolean filter_value_3 = false;
/* 195 */ filter_value_3 = serializefromobject_value_0 < 30;
/* 196 */ if (!filter_value_3) continue;
/* 197 */
/* 198 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[12] /* numOutputRows */).add(1);
/* 199 */
/* 200 */ // common sub-expressions
/* 201 */
/* 202 */ agg_doConsume_0(serializefromobject_value_0, false);
/* 203 */
/* 204 */ } while(false);
/* 205 */
/* 206 */ }
/* 207 */
...
/* 330 */ }
/* 331 */
/* 332 */ }
WARN [2022-06-10 10:01:56,689] ({ParallelScheduler-Worker-1} Logging.scala[logWarning]:69) - Whole-stage codegen disabled for plan (id=1):
*(1) HashAggregate(keys=[age#6], functions=[partial_count(1)], output=[age#6, count#29L])
+- *(1) Project [age#6]
+- *(1) Filter (isnotnull(age#6) AND (age#6 < 30))
+- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank, true])).age.intValue AS age#6, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank, true])).job, true, false, true) AS job#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank, true])).marital, true, false, true) AS marital#8, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank, true])).education, true, false, true) AS education#9, knownnotnull(assertnotnull(input[0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank, true])).balance.intValue AS balance#10]
+- Scan[obj#5]
以下为第一执行人:
22/06/10 10:01:57 INFO CoarseGrainedExecutorBackend: Got assigned task 0
22/06/10 10:01:57 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
22/06/10 10:01:57 INFO TorrentBroadcast: Started reading broadcast variable 0 with 1 pieces (estimated total size 4.0 MiB)
22/06/10 10:01:57 INFO TransportClientFactory: Successfully created connection to spark-gxaleg.zeppelin.svc/10.42.8.188:22322 after 3 ms (0 ms spent in bootstraps)
22/06/10 10:01:57 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.2 KiB, free 413.9 MiB)
22/06/10 10:01:57 INFO TorrentBroadcast: Reading broadcast variable 0 took 221 ms
22/06/10 10:01:58 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 28.3 KiB, free 413.9 MiB)
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 717.142754 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 52.025602 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 12.942418 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 14.006444 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 14.341434 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 13.481751 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 20.743977 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 76.48826 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 100.251714 ms
22/06/10 10:02:01 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2538 bytes result sent to driver
22/06/10 10:02:27 INFO Executor: Told to re-register on heartbeat
22/06/10 10:02:27 INFO BlockManager: BlockManager BlockManagerId(1, 10.42.9.230, 22322, None) re-registering with master
22/06/10 10:02:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(1, 10.42.9.230, 22322, None)
22/06/10 10:02:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(1, 10.42.9.230, 22322, None)
22/06/10 10:02:27 INFO BlockManager: Reporting 2 blocks to the master.
以下为第二执行人:
22/06/10 10:01:57 INFO CoarseGrainedExecutorBackend: Got assigned task 1
22/06/10 10:01:57 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
22/06/10 10:01:57 INFO TorrentBroadcast: Started reading broadcast variable 0 with 1 pieces (estimated total size 4.0 MiB)
22/06/10 10:01:57 INFO TransportClientFactory: Successfully created connection to spark-gxaleg.zeppelin.svc/10.42.8.188:22322 after 27 ms (0 ms spent in bootstraps)
22/06/10 10:01:57 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.2 KiB, free 413.9 MiB)
22/06/10 10:01:57 INFO TorrentBroadcast: Reading broadcast variable 0 took 186 ms
22/06/10 10:01:57 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 28.3 KiB, free 413.9 MiB)
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 707.100922 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 19.523219 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 81.661107 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 16.067472 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 79.90976 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 17.750182 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 24.625673 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 16.542486 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 94.069283 ms
22/06/10 10:02:02 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2538 bytes result sent to driver
22/06/10 10:02:02 INFO CoarseGrainedExecutorBackend: Got assigned task 2
22/06/10 10:02:02 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
22/06/10 10:02:02 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
22/06/10 10:02:02 INFO TorrentBroadcast: Started reading broadcast variable 1 with 1 pieces (estimated total size 4.0 MiB)
22/06/10 10:02:02 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 17.6 KiB, free 413.9 MiB)
22/06/10 10:02:02 INFO TorrentBroadcast: Reading broadcast variable 1 took 19 ms
22/06/10 10:02:02 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 38.6 KiB, free 413.8 MiB)
22/06/10 10:02:02 INFO CodeGenerator: Code generated in 25.099699 ms
22/06/10 10:02:02 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 0, fetching them
22/06/10 10:02:02 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@spark-gxaleg.zeppelin.svc:22321)
22/06/10 10:02:02 INFO MapOutputTrackerWorker: Got the map output locations
22/06/10 10:02:02 INFO ShuffleBlockFetcherIterator: Getting 2 (1282.0 B) non-empty blocks including 1 (608.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) push-merged-local and 1 (674.0 B) remote blocks
22/06/10 10:02:02 INFO TransportClientFactory: Successfully created connection to /10.42.9.230:22322 after 2 ms (0 ms spent in bootstraps)
22/06/10 10:02:02 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from <unknown remote> is closed
22/06/10 10:02:02 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 71 ms
22/06/10 10:02:02 ERROR OneForOneBlockFetcher: Failed while starting block fetches
java.io.IOException: Connection from <unknown remote> closed
at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
... (i deleted some lines)
at java.base/java.lang.Thread.run(Unknown Source)
22/06/10 10:02:02 INFO RetryingBlockTransferor: Retrying fetch (1/3) for 1 outstanding blocks after 5000 ms
22/06/10 10:02:02 INFO CodeGenerator: Code generated in 99.746798 ms
22/06/10 10:02:03 INFO CodeGenerator: Code generated in 12.970246 ms
22/06/10 10:02:07 INFO TransportClientFactory: Found inactive connection to /10.42.9.230:22322, creating a new one.
22/06/10 10:02:07 INFO TransportClientFactory: Successfully created connection to /10.42.9.230:22322 after 2 ms (0 ms spent in bootstraps)
22/06/10 10:02:07 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from <unknown remote> is closed
22/06/10 10:02:07 ERROR OneForOneBlockFetcher: Failed while starting block fetches
java.io.IOException: Connection from <unknown remote> closed
at
这是我尝试运行的代码片段:
import org.apache.commons.io.IOUtils
import java.net.URL
import java.nio.charset.Charset
// Zeppelin creates and injects sc (SparkContext) and sqlContext (HiveContext or SqlContext)
// So you don't need create them manually
// load bank data
val bankText = sc.parallelize(
IOUtils.toString(
new URL("https://s3.amazonaws.com/apache-zeppelin/tutorial/bank/bank.csv"),
Charset.forName("utf8")).split("\n"))
case class Bank(age: Integer, job: String, marital: String, education: String, balance: Integer)
val bank = bankText.map(s => s.split(";")).filter(s => s(0) != "\"age\"").map(
s => Bank(s(0).toInt,
s(1).replaceAll("\"", ""),
s(2).replaceAll("\"", ""),
s(3).replaceAll("\"", ""),
s(5).replaceAll("\"", "").toInt
)
).toDF()
bank.registerTempTable("bank")
在下面的代码片段中弹出错误:
%sql
select age, count(1) value
from bank
where age < 30
group by age
order by age
-> spark使用命令group by开始** Shuffle **
有谁知道怎么解决这个问题吗?
1条答案
按热度按时间2vuwiymt1#
一开始,我没有密码了。我离开了我为之工作的公司,但我们找到了解决方案。
这并没有成功,因为我们需要一个无头服务。它被藏在spark kubernetes的文档里,没有人认出它。错误消息弹出,因为工作人员无法相互通信。这是因为他们没有对方的地址,并且使用无头服务,他们进行反向查找以找到所有工人的IP(或名称)。
请记住,您必须以特定的名称命名headless服务(我不知道它是什么),以便工作人员可以找到它。
headless服务不是在apache zeppelin解释器-模板文件内的manifest中预配置。所以你必须单独添加它。
这解决了我的问题。我希望这能帮助到一些人