hadoop—在相对较大的输入上运行spark作业时出现内存问题

6yoyoihd  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(360)

我正在运行一个有50台机器的Spark束。每台机器都是一个具有8核和50gb内存的虚拟机(spark似乎有41个可用内存)。
我在几个输入文件夹上运行,我估计输入的大小约为250gbgz压缩。
虽然在我看来,我使用的机器的数量和配置似乎足够了,但在运行约40分钟后,作业失败,我可以在日志中看到以下错误:

2558733 [Result resolver thread-2] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 345.0 in stage 1.0 (TID 345, hadoop-w-3.c.taboola-qa-01.internal): java.lang.OutOfMemoryError: Java heap space

java.lang.StringCoding$StringDecoder.decode(StringCoding.java:149)
java.lang.StringCoding.decode(StringCoding.java:193)
java.lang.String.<init>(String.java:416)
java.lang.String.<init>(String.java:481)
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:699)
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:660)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

还有:

2653545 [Result resolver thread-2] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 122.1 in stage 1.0 (TID 392, hadoop-w-22.c.taboola-qa-01.internal): java.lang.OutOfMemoryError: GC overhead limit exceeded

java.lang.StringCoding$StringDecoder.decode(StringCoding.java:149)
java.lang.StringCoding.decode(StringCoding.java:193)
java.lang.String.<init>(String.java:416)
java.lang.String.<init>(String.java:481)
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:699)
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:660)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

如何调试这样的问题?
编辑:我找到了问题的根本原因。这是一段代码:

private static final int MAX_FILE_SIZE = 40194304;
    ....
    ....
        JavaPairRDD<String, List<String>> typedData = filePaths.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, List<String>>() {
            @Override
            public Iterable<Tuple2<String, List<String>>> call(Iterator<String> filesIterator) throws Exception {
                List<Tuple2<String, List<String>>> res = new ArrayList<>();
                String fileType = null;
                List<String> linesList = null;
                if (filesIterator != null) {
                    while (filesIterator.hasNext()) {
                        try {
                            Path file = new Path(filesIterator.next());
                            // filter non-trc files
                            if (!file.getName().startsWith("1")) {
                                continue;
                            }
                            fileType = getType(file.getName());
                            Configuration conf = new Configuration();
                            CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
                            CompressionCodec codec = compressionCodecs.getCodec(file);
                            FileSystem fs = file.getFileSystem(conf);
                            ContentSummary contentSummary = fs.getContentSummary(file);
                            long fileSize = contentSummary.getLength();
                            InputStream in = fs.open(file);
                            if (codec != null) {
                                in = codec.createInputStream(in);
                            } else {
                                throw new IOException();
                            }

                            byte[] buffer = new byte[MAX_FILE_SIZE];

                            BufferedInputStream bis = new BufferedInputStream(in, BUFFER_SIZE);
                            int count = 0;
                            int bytesRead = 0;
                            try {
                                while ((bytesRead = bis.read(buffer, count, BUFFER_SIZE)) != -1) {
                                    count += bytesRead;
                                }
                            } catch (Exception e) {
                                log.error("Error reading file: " + file.getName() + ", trying to read " + BUFFER_SIZE + " bytes at offset: " + count);
                                throw e;
                            }

                            Iterable<String> lines = Splitter.on("\n").split(new String(buffer, "UTF-8").trim());
                            linesList = Lists.newArrayList(lines);

                            // get rid of first line in file

                            Iterator<String> it = linesList.iterator();
                            if (it.hasNext()) {
                                it.next();
                                it.remove();
                            }
                            //res.add(new Tuple2<>(fileType,linesList));
                        } finally {
                            res.add(new Tuple2<>(fileType, linesList));
                        }

                    }

                }
                return res;
            }

特别是为每个文件分配大小为40m的缓冲区,以便使用bufferedinputstream读取文件的内容。这会导致堆栈内存在某个点结束。
问题是:
如果我逐行读取(不需要缓冲区),读起来会非常低效
如果我分配一个缓冲区并在每次读取文件时重用它,那么在并行性意义上有可能吗?或者它会被几个线程覆盖吗?
欢迎任何建议。。。
edit2:通过将字节数组分配移到迭代器之外,修复了第一个内存问题,因此它可以被所有分区元素重用。但是仍然有一个新的字符串(buffer,“utf-8”).trim()),它是为分割目的而创建的,这是一个每次都会创建的对象。我可以使用stringbuffer/builder,但是如果没有string对象,如何设置字符集编码?

cbjzeqam

cbjzeqam1#

最后我修改了代码如下:

// Transform list of files to list of all files' content in lines grouped by type
        JavaPairRDD<String,List<String>> typedData = filePaths.mapToPair(new PairFunction<String, String, List<String>>() {
            @Override
            public Tuple2<String, List<String>> call(String filePath) throws Exception {
                Tuple2<String, List<String>> tuple = null;
                try {
                    String fileType = null;
                    List<String> linesList = new ArrayList<String>();
                    Configuration conf = new Configuration();
                    CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
                    Path path = new Path(filePath);
                    fileType = getType(path.getName());
                    tuple = new Tuple2<String, List<String>>(fileType, linesList);

                    // filter non-trc files
                    if (!path.getName().startsWith("1")) {
                        return tuple;
                    }

                    CompressionCodec codec = compressionCodecs.getCodec(path);
                    FileSystem fs = path.getFileSystem(conf);
                    InputStream in = fs.open(path);
                    if (codec != null) {
                        in = codec.createInputStream(in);
                    } else {
                        throw new IOException();
                    }

                    BufferedReader r = new BufferedReader(new InputStreamReader(in, "UTF-8"), BUFFER_SIZE);
                    // Get rid of the first line in the file
                    r.readLine();

                    // Read all lines
                    String line;
                    while ((line = r.readLine()) != null) {
                        linesList.add(line);
                    }
                } catch (IOException e) { // Filtering of files whose reading went wrong
                    log.error("Reading of the file " + filePath + " went wrong: " + e.getMessage());
                } finally {
                    return tuple;
                }
            }

        });

因此,现在我不使用大小为40m的缓冲区,而是使用数组列表动态构建行列表。这解决了我目前的记忆问题,但现在我有其他奇怪的错误失败的工作。会用另一个问题来报告。。。

相关问题