apache zeppelin和spark在kubernetes上不可能通过shuffle实现执行程序之间的通信

6yjfywim  于 2023-06-21  发布在  Kubernetes
关注(0)|答案(1)|浏览(172)

我已经在kubernetes上实现了apache zeppelin 0.10.1。它使用Spark 3.2.1版本。
我的问题是执行者在 Shuffle 时不能相互通信,但仍然可以与驱动程序交换数据。因此,问题可能是网络问题、权限问题或数据序列化问题。
数据序列化问题,因为驱动程序在其日志文件中指出,在执行器错误弹出其日志文件之前的几秒钟,序列化数据时出现了问题。
Spark以客户端模式运行,每个执行器都在自己的pod中运行。

下面是spark-driver日志文件:

Interpreter download command: /usr/lib/jvm/java-8-openjdk-amd64/bin/java 
-Dfile.encoding=UTF-8 -Dlog4j.configuration=file:///opt/zeppelin/conf/log4j.properties 
-Dlog4j.configurationFile=file:///opt/zeppelin/conf/log4j2.properties 
-Dzeppelin.log.file=/opt/zeppelin/logs/zeppelin-interpreter-spark-shared_process--spark-
gxaleg.log -cp :/opt/zeppelin/interpreter/spark/*:::/opt/zeppelin/interpreter/zeppelin-
interpreter-shaded-0.10.1.jar:/opt/zeppelin/interpreter/spark/spark-interpreter-0.10.1.jar 
org.apache.zeppelin.interpreter.remote.RemoteInterpreterDownloader zeppelin-
server.zeppelin.svc 12320 spark /tmp/local-repo/spark
.
.(deleted those lines because of stackoverflow line restrictions)
.
    ERROR [2022-06-10 10:01:56,657] ({ParallelScheduler-Worker-1} Logging.scala[logError]:94) - failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 163, Column 76: Cannot determine simple type name "$line34105858122"
    org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 163, Column 76: Cannot determine simple type name "$line34105858122"
        at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12021)
        .. (i deleted some lines)
        at java.lang.Thread.run(Thread.java:748)
     INFO [2022-06-10 10:01:56,685] ({ParallelScheduler-Worker-1} Logging.scala[logInfo]:57) - 
    /* 001 */ public Object generate(Object[] references) {
    /* 002 */   return new GeneratedIteratorForCodegenStage1(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ // codegenStageId=1
    /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
    /* 007 */   private Object[] references;
    /* 008 */   private scala.collection.Iterator[] inputs;
    /* 009 */   private boolean agg_initAgg_0;
    /* 010 */   private boolean agg_bufIsNull_0;
    /* 011 */   private long agg_bufValue_0;
    /* 012 */   private agg_FastHashMap_0 agg_fastHashMap_0;
    /* 013 */   private org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> agg_fastHashMapIter_0;
    /* 014 */   private org.apache.spark.unsafe.KVIterator agg_mapIter_0;
    /* 015 */   private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap_0;
    /* 016 */   private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter_0;
    /* 017 */   private scala.collection.Iterator inputadapter_input_0;
    /* 018 */   private boolean serializefromobject_resultIsNull_0;
    /* 019 */   private boolean serializefromobject_resultIsNull_1;
    /* 020 */   private boolean serializefromobject_resultIsNull_2;
    /* 021 */   private java.lang.String[] serializefromobject_mutableStateArray_0 = new java.lang.String[3];
    /* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] serializefromobject_mutableStateArray_1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[6];
    /* 023 */
    /* 024 */   public GeneratedIteratorForCodegenStage1(Object[] references) {
    /* 025 */     this.references = references;
    /* 026 */   }
    /* 027 */
    /* 028 */   public void init(int index, scala.collection.Iterator[] inputs) {
    /* 029 */     partitionIndex = index;
    /* 030 */     this.inputs = inputs;
    /* 031 */
    /* 032 */     inputadapter_input_0 = inputs[0];
    /* 033 */     serializefromobject_mutableStateArray_1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(5, 96);
    /* 034 */     serializefromobject_mutableStateArray_1[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(5, 96);
    /* 035 */     serializefromobject_mutableStateArray_1[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 036 */     serializefromobject_mutableStateArray_1[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 037 */     serializefromobject_mutableStateArray_1[4] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
    /* 038 */     serializefromobject_mutableStateArray_1[5] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
    /* 039 */
    /* 040 */   }
    /* 041 */
    /* 042 */   public class agg_FastHashMap_0 {
    /* 043 */     private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
    /* 044 */     private int[] buckets;
    /* 045 */     private int capacity = 1 << 16;
    /* 046 */     private double loadFactor = 0.5;
    /* 047 */     private int numBuckets = (int) (capacity / loadFactor);
    /* 048 */     private int maxSteps = 2;
    /* 049 */     private int numRows = 0;
    /* 050 */     private Object emptyVBase;
    /* 051 */     private long emptyVOff;
    /* 052 */     private int emptyVLen;
    /* 053 */     private boolean isBatchFull = false;
    /* 054 */     private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
    /* 055 */
    /* 056 */     public agg_FastHashMap_0(
    /* 057 */       org.apache.spark.memory.TaskMemoryManager taskMemoryManager,
    /* 058 */       InternalRow emptyAggregationBuffer) {
    /* 059 */       batch = org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch
    /* 060 */       .allocate(((org.apache.spark.sql.types.StructType) references[1] /* keySchemaTerm */), ((org.apache.spark.sql.types.StructType) references[2] /* valueSchemaTerm */), taskMemoryManager, capacity);
    /* 061 */
    /* 062 */       final UnsafeProjection valueProjection = UnsafeProjection.create(((org.apache.spark.sql.types.StructType) references[2] /* valueSchemaTerm */));
    /* 063 */       final byte[] emptyBuffer = valueProjection.apply(emptyAggregationBuffer).getBytes();
    /* 064 */
    /* 065 */       emptyVBase = emptyBuffer;
    /* 066 */       emptyVOff = Platform.BYTE_ARRAY_OFFSET;
    /* 067 */       emptyVLen = emptyBuffer.length;
    /* 068 */
    /* 069 */       agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
    /* 070 */         1, 0);
    /* 071 */
    /* 072 */       buckets = new int[numBuckets];
    /* 073 */       java.util.Arrays.fill(buckets, -1);
    /* 074 */     }
    /* 075 */
    /* 076 */     public org.apache.spark.sql.catalyst.expressions.UnsafeRow findOrInsert(int agg_key_0) {
    /* 077 */       long h = hash(agg_key_0);
    /* 078 */       int step = 0;
    /* 079 */       int idx = (int) h & (numBuckets - 1);
    /* 080 */       while (step < maxSteps) {
    /* 081 */         // Return bucket index if it's either an empty slot or already contains the key
    /* 082 */         if (buckets[idx] == -1) {
    /* 083 */           if (numRows < capacity && !isBatchFull) {
    /* 084 */             agg_rowWriter.reset();
    /* 085 */             agg_rowWriter.zeroOutNullBytes();
    /* 086 */             agg_rowWriter.write(0, agg_key_0);
    /* 087 */             org.apache.spark.sql.catalyst.expressions.UnsafeRow agg_result
    /* 088 */             = agg_rowWriter.getRow();
    /* 089 */             Object kbase = agg_result.getBaseObject();
    /* 090 */             long koff = agg_result.getBaseOffset();
    /* 091 */             int klen = agg_result.getSizeInBytes();
    /* 092 */
    /* 093 */             UnsafeRow vRow
    /* 094 */             = batch.appendRow(kbase, koff, klen, emptyVBase, emptyVOff, emptyVLen);
    /* 095 */             if (vRow == null) {
    /* 096 */               isBatchFull = true;
    /* 097 */             } else {
    /* 098 */               buckets[idx] = numRows++;
    /* 099 */             }
    /* 100 */             return vRow;
    /* 101 */           } else {
    /* 102 */             // No more space
    /* 103 */             return null;
    /* 104 */           }
    /* 105 */         } else if (equals(idx, agg_key_0)) {
    /* 106 */           return batch.getValueRow(buckets[idx]);
    /* 107 */         }
    /* 108 */         idx = (idx + 1) & (numBuckets - 1);
    /* 109 */         step++;
    /* 110 */       }
    /* 111 */       // Didn't find it
    /* 112 */       return null;
    /* 113 */     }
    /* 114 */
    /* 115 */     private boolean equals(int idx, int agg_key_0) {
    /* 116 */       UnsafeRow row = batch.getKeyRow(buckets[idx]);
    /* 117 */       return (row.getInt(0) == agg_key_0);
    /* 118 */     }
    /* 119 */
    /* 120 */     private long hash(int agg_key_0) {
    /* 121 */       long agg_hash_0 = 0;
    /* 122 */
    /* 123 */       int agg_result_0 = agg_key_0;
    /* 124 */       agg_hash_0 = (agg_hash_0 ^ (0x9e3779b9)) + agg_result_0 + (agg_hash_0 << 6) + (agg_hash_0 >>> 2);
    /* 125 */
    /* 126 */       return agg_hash_0;
    /* 127 */     }
    /* 128 */
    /* 129 */     public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
    /* 130 */       return batch.rowIterator();
    /* 131 */     }
    /* 132 */
    /* 133 */     public void close() {
    /* 134 */       batch.close();
    /* 135 */     }
    /* 136 */
    /* 137 */   }
    /* 138 */
    /* 139 */   private void agg_doAggregateWithKeysOutput_0(UnsafeRow agg_keyTerm_0, UnsafeRow agg_bufferTerm_0)
    /* 140 */   throws java.io.IOException {
    /* 141 */     ((org.apache.spark.sql.execution.metric.SQLMetric) references[13] /* numOutputRows */).add(1);
    /* 142 */
    /* 143 */     boolean agg_isNull_7 = agg_keyTerm_0.isNullAt(0);
    /* 144 */     int agg_value_8 = agg_isNull_7 ?
    /* 145 */     -1 : (agg_keyTerm_0.getInt(0));
    /* 146 */     long agg_value_9 = agg_bufferTerm_0.getLong(0);
    /* 147 */
    /* 148 */     serializefromobject_mutableStateArray_1[5].reset();
    /* 149 */
    /* 150 */     serializefromobject_mutableStateArray_1[5].zeroOutNullBytes();
    /* 151 */
    /* 152 */     if (agg_isNull_7) {
    /* 153 */       serializefromobject_mutableStateArray_1[5].setNullAt(0);
    /* 154 */     } else {
    /* 155 */       serializefromobject_mutableStateArray_1[5].write(0, agg_value_8);
    /* 156 */     }
    /* 157 */
    /* 158 */     serializefromobject_mutableStateArray_1[5].write(1, agg_value_9);
    /* 159 */     append((serializefromobject_mutableStateArray_1[5].getRow()));
    /* 160 */
    /* 161 */   }
    /* 162 */
    /* 163 */   private void serializefromobject_doConsume_0(InternalRow inputadapter_row_0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank serializefromobject_expr_0_0, boolean serializefromobject_exprIsNull_0_0) throws java.io.IOException {
    /* 164 */     do {
    /* 165 */       if (serializefromobject_exprIsNull_0_0) {
    /* 166 */         throw new NullPointerException(((java.lang.String) references[7] /* errMsg */));
    /* 167 */       }
    /* 168 */       boolean serializefromobject_isNull_1 = true;
    /* 169 */       java.lang.Integer serializefromobject_value_1 = null;
    /* 170 */       serializefromobject_isNull_1 = false;
    /* 171 */       if (!serializefromobject_isNull_1) {
    /* 172 */         Object serializefromobject_funcResult_0 = null;
    /* 173 */         serializefromobject_funcResult_0 = serializefromobject_expr_0_0.age();
    /* 174 */
    /* 175 */         if (serializefromobject_funcResult_0 != null) {
    /* 176 */           serializefromobject_value_1 = (java.lang.Integer) serializefromobject_funcResult_0;
    /* 177 */         } else {
    /* 178 */           serializefromobject_isNull_1 = true;
    /* 179 */         }
    /* 180 */
    /* 181 */       }
    /* 182 */       boolean serializefromobject_isNull_0 = true;
    /* 183 */       int serializefromobject_value_0 = -1;
    /* 184 */       if (!serializefromobject_isNull_1) {
    /* 185 */         serializefromobject_isNull_0 = false;
    /* 186 */         if (!serializefromobject_isNull_0) {
    /* 187 */           serializefromobject_value_0 = serializefromobject_value_1.intValue();
    /* 188 */         }
    /* 189 */       }
    /* 190 */
    /* 191 */       boolean filter_value_2 = !serializefromobject_isNull_0;
    /* 192 */       if (!filter_value_2) continue;
    /* 193 */
    /* 194 */       boolean filter_value_3 = false;
    /* 195 */       filter_value_3 = serializefromobject_value_0 < 30;
    /* 196 */       if (!filter_value_3) continue;
    /* 197 */
    /* 198 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[12] /* numOutputRows */).add(1);
    /* 199 */
    /* 200 */       // common sub-expressions
    /* 201 */
    /* 202 */       agg_doConsume_0(serializefromobject_value_0, false);
    /* 203 */
    /* 204 */     } while(false);
    /* 205 */
    /* 206 */   }
    /* 207 */
  ...
    /* 330 */   }
    /* 331 */
    /* 332 */ }
    
     WARN [2022-06-10 10:01:56,689] ({ParallelScheduler-Worker-1} Logging.scala[logWarning]:69) - Whole-stage codegen disabled for plan (id=1):
     *(1) HashAggregate(keys=[age#6], functions=[partial_count(1)], output=[age#6, count#29L])
    +- *(1) Project [age#6]
       +- *(1) Filter (isnotnull(age#6) AND (age#6 < 30))
          +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank, true])).age.intValue AS age#6, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank, true])).job, true, false, true) AS job#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank, true])).marital, true, false, true) AS marital#8, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank, true])).education, true, false, true) AS education#9, knownnotnull(assertnotnull(input[0, $line34105858122.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$Bank, true])).balance.intValue AS balance#10]
             +- Scan[obj#5]

以下为第一执行人:

22/06/10 10:01:57 INFO CoarseGrainedExecutorBackend: Got assigned task 0
22/06/10 10:01:57 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
22/06/10 10:01:57 INFO TorrentBroadcast: Started reading broadcast variable 0 with 1 pieces (estimated total size 4.0 MiB)
22/06/10 10:01:57 INFO TransportClientFactory: Successfully created connection to spark-gxaleg.zeppelin.svc/10.42.8.188:22322 after 3 ms (0 ms spent in bootstraps)
22/06/10 10:01:57 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.2 KiB, free 413.9 MiB)
22/06/10 10:01:57 INFO TorrentBroadcast: Reading broadcast variable 0 took 221 ms
22/06/10 10:01:58 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 28.3 KiB, free 413.9 MiB)
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 717.142754 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 52.025602 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 12.942418 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 14.006444 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 14.341434 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 13.481751 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 20.743977 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 76.48826 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 100.251714 ms
22/06/10 10:02:01 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2538 bytes result sent to driver
22/06/10 10:02:27 INFO Executor: Told to re-register on heartbeat
22/06/10 10:02:27 INFO BlockManager: BlockManager BlockManagerId(1, 10.42.9.230, 22322, None) re-registering with master
22/06/10 10:02:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(1, 10.42.9.230, 22322, None)
22/06/10 10:02:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(1, 10.42.9.230, 22322, None)
22/06/10 10:02:27 INFO BlockManager: Reporting 2 blocks to the master.

以下为第二执行人:

22/06/10 10:01:57 INFO CoarseGrainedExecutorBackend: Got assigned task 1
22/06/10 10:01:57 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
22/06/10 10:01:57 INFO TorrentBroadcast: Started reading broadcast variable 0 with 1 pieces (estimated total size 4.0 MiB)
22/06/10 10:01:57 INFO TransportClientFactory: Successfully created connection to spark-gxaleg.zeppelin.svc/10.42.8.188:22322 after 27 ms (0 ms spent in bootstraps)
22/06/10 10:01:57 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.2 KiB, free 413.9 MiB)
22/06/10 10:01:57 INFO TorrentBroadcast: Reading broadcast variable 0 took 186 ms
22/06/10 10:01:57 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 28.3 KiB, free 413.9 MiB)
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 707.100922 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 19.523219 ms
22/06/10 10:02:00 INFO CodeGenerator: Code generated in 81.661107 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 16.067472 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 79.90976 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 17.750182 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 24.625673 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 16.542486 ms
22/06/10 10:02:01 INFO CodeGenerator: Code generated in 94.069283 ms
22/06/10 10:02:02 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2538 bytes result sent to driver
22/06/10 10:02:02 INFO CoarseGrainedExecutorBackend: Got assigned task 2
22/06/10 10:02:02 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
22/06/10 10:02:02 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
22/06/10 10:02:02 INFO TorrentBroadcast: Started reading broadcast variable 1 with 1 pieces (estimated total size 4.0 MiB)
22/06/10 10:02:02 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 17.6 KiB, free 413.9 MiB)
22/06/10 10:02:02 INFO TorrentBroadcast: Reading broadcast variable 1 took 19 ms
22/06/10 10:02:02 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 38.6 KiB, free 413.8 MiB)
22/06/10 10:02:02 INFO CodeGenerator: Code generated in 25.099699 ms
22/06/10 10:02:02 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 0, fetching them
22/06/10 10:02:02 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@spark-gxaleg.zeppelin.svc:22321)
22/06/10 10:02:02 INFO MapOutputTrackerWorker: Got the map output locations
22/06/10 10:02:02 INFO ShuffleBlockFetcherIterator: Getting 2 (1282.0 B) non-empty blocks including 1 (608.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) push-merged-local and 1 (674.0 B) remote blocks
22/06/10 10:02:02 INFO TransportClientFactory: Successfully created connection to /10.42.9.230:22322 after 2 ms (0 ms spent in bootstraps)
22/06/10 10:02:02 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from <unknown remote> is closed
22/06/10 10:02:02 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 71 ms
22/06/10 10:02:02 ERROR OneForOneBlockFetcher: Failed while starting block fetches
java.io.IOException: Connection from <unknown remote> closed
    at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    ... (i deleted some lines)
    at java.base/java.lang.Thread.run(Unknown Source)
22/06/10 10:02:02 INFO RetryingBlockTransferor: Retrying fetch (1/3) for 1 outstanding blocks after 5000 ms
22/06/10 10:02:02 INFO CodeGenerator: Code generated in 99.746798 ms
22/06/10 10:02:03 INFO CodeGenerator: Code generated in 12.970246 ms
22/06/10 10:02:07 INFO TransportClientFactory: Found inactive connection to /10.42.9.230:22322, creating a new one.
22/06/10 10:02:07 INFO TransportClientFactory: Successfully created connection to /10.42.9.230:22322 after 2 ms (0 ms spent in bootstraps)
22/06/10 10:02:07 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from <unknown remote> is closed
22/06/10 10:02:07 ERROR OneForOneBlockFetcher: Failed while starting block fetches
java.io.IOException: Connection from <unknown remote> closed
    at

这是我尝试运行的代码片段:

import org.apache.commons.io.IOUtils
import java.net.URL
import java.nio.charset.Charset

// Zeppelin creates and injects sc (SparkContext) and sqlContext (HiveContext or SqlContext)
// So you don't need create them manually

// load bank data
val bankText = sc.parallelize(
    IOUtils.toString(
        new URL("https://s3.amazonaws.com/apache-zeppelin/tutorial/bank/bank.csv"),
        Charset.forName("utf8")).split("\n"))

case class Bank(age: Integer, job: String, marital: String, education: String, balance: Integer)

val bank = bankText.map(s => s.split(";")).filter(s => s(0) != "\"age\"").map(
    s => Bank(s(0).toInt, 
            s(1).replaceAll("\"", ""),
            s(2).replaceAll("\"", ""),
            s(3).replaceAll("\"", ""),
            s(5).replaceAll("\"", "").toInt
        )
).toDF()
bank.registerTempTable("bank")

在下面的代码片段中弹出错误:

%sql 
select age, count(1) value
from bank 
where age < 30 
group by age 
order by age

-> spark使用命令group by开始** Shuffle **
有谁知道怎么解决这个问题吗?

2vuwiymt

2vuwiymt1#

一开始,我没有密码了。我离开了我为之工作的公司,但我们找到了解决方案。
这并没有成功,因为我们需要一个无头服务。它被藏在spark kubernetes的文档里,没有人认出它。错误消息弹出,因为工作人员无法相互通信。这是因为他们没有对方的地址,并且使用无头服务,他们进行反向查找以找到所有工人的IP(或名称)。
请记住,您必须以特定的名称命名headless服务(我不知道它是什么),以便工作人员可以找到它。

headless服务不是在apache zeppelin解释器-模板文件内的manifest中预配置。所以你必须单独添加它。

这解决了我的问题。我希望这能帮助到一些人

相关问题