无法为大型数据集运行spark作业

krugob8w  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(451)

我编写了一个spark任务,从s3中的配置单元数据读取并生成hfiles。
当只读取一个orc文件(约190 mb)时,此作业可以正常工作,但是,当我使用它读取整个s3目录时,约400个orc文件,因此约400*190 mb=76 gb数据,它会不断抛出以下错误/堆栈跟踪:

17/06/12 01:48:03 ERROR server.TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/importer-all.jar, byteCount=194727686, body=FileSegmentManagedBuffer{file=/tmp/importer-all.jar, offset=0, length=194727686}} to /10.211.XX.XX:39149; closing connection
java.nio.channels.ClosedChannelException
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)
17/06/12 01:48:03 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 0.0 (TID 6, ip-10-211-127-63.ap-northeast-2.compute.internal, executor 9): java.nio.channels.ClosedChannelException
    at org.apache.spark.network.client.StreamInterceptor.channelInactive(StreamInterceptor.java:60)
    at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:179)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:230)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1289)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:237)
    at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:893)
    at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:691)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:455)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    at java.lang.Thread.run(Thread.java:745)

17/06/12 01:48:03 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 0.0 (TID 541, ip-10-211-126-250.ap-northeast-2.compute.internal, executor 72, partition 6, PROCESS_LOCAL, 6680 bytes)
17/06/12 01:48:03 ERROR server.TransportRequestHandler: Error sending result StreamResponse{streamId=/jars/importer-all.jar, byteCount=194727686, body=FileSegmentManagedBuffer{file=/tmp/importer-all.jar, offset=0, length=194727686}} to /10.211.XX.XX:39151; closing connection
java.nio.channels.ClosedChannelException
    at io.netty.channel.AbstractChannel$AbstractUnsafe.close(...)(Unknown Source)

我的集群足够大,可以处理它:(已经验证过了)
它有40个节点,超过800 gb的可用内存,320个vCore。
下面是我的java代码:

protected void sparkGenerateHFiles(JavaRDD<Row> rdd) throws IOException {
        System.out.println("In sparkGenerateHFiles....");
        JavaPairRDD<ImmutableBytesWritable, KeyValue> javaPairRDD = rdd.mapToPair(
            new PairFunction<Row, ImmutableBytesWritable, KeyValue>() {
            public Tuple2<ImmutableBytesWritable, KeyValue> call(Row row) throws Exception {
                System.out.println("running call now ....");
                String key = (String) row.get(0);
                String value = (String) row.get(1);

                ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
                byte[] rowKeyBytes = Bytes.toBytes(key);
                rowKey.set(rowKeyBytes);

                KeyValue keyValue = new KeyValue(rowKeyBytes,
                    Bytes.toBytes("fam"),
                    Bytes.toBytes("qualifier"),
                    ProductJoin.newBuilder()
                        .setId(key)
                        .setSolrJson(value)
                        .build().toByteArray());

                return new Tuple2<ImmutableBytesWritable, KeyValue>(rowKey, keyValue);
            }
        });
        Partitioner partitioner = new IntPartitioner(2);
        // repartition and sort the data - HFiles want sorted data
        JavaPairRDD<ImmutableBytesWritable, KeyValue> repartitionedRDD =
            javaPairRDD.repartitionAndSortWithinPartitions(partitioner);

        Configuration baseConf = HBaseConfiguration.create();
        Configuration conf = new Configuration();
        conf.set(HBASE_ZOOKEEPER_QUORUM, importerParams.zkQuorum);
        Job job = new Job(baseConf, "map data");
        HTable table = new HTable(conf, importerParams.hbaseTargetTable);
        System.out.println("gpt table: " + table.getName());
        HFileOutputFormat2.configureIncrementalLoad(job, table);
        System.out.println("Done configuring incremental load....");

        Configuration config = job.getConfiguration();

        repartitionedRDD.saveAsNewAPIHadoopFile(
            "HFILE_OUTPUT_PATH",
            ImmutableBytesWritable.class,
            KeyValue.class,
            HFileOutputFormat2.class,
            config
            );
        System.out.println("Saved to HFILE_OUTPUT_PATH: " + HFILE_OUTPUT_PATH);
    }

protected JavaRDD<Row> readJsonTable() {
        System.out.println("In readJsonTable.....");
        SparkSession.Builder builder = SparkSession.builder().appName("Importer");
        String hiveTable = "";
        if (importerParams.local) {
            builder.master("local");
            hiveTable = HIVE_TABLE_S3A_DEV_SAMPLE;
        } else {
            hiveTable = importerParams.hiveSourceTable;
        }
        SparkSession spark = builder.getOrCreate();

        SparkContext sparkContext = spark.sparkContext();

        // this is important. need to set the endpoint to ap-northeast-2
        sparkContext.hadoopConfiguration()
            .set("fs.s3a.endpoint", "s3.ap-northeast-2.amazonaws.com");

        Dataset<Row> rows = null;
        if (importerParams.local) {
            rows = spark.read().format("orc").load(hiveTable);
        } else {
            rows = spark.read().format("orc").load(hiveTable);//use this one temporarily
//          rows = spark.read().format("orc").load(HIVE_TABLE_S3A_PREFIX
            // + importerParams.latestDateHour);
        }
        System.out.println("Finished loading hive table from S3, rows.count() = "
            + (rows != null ? rows.count() : 0));

        return rows.toJavaRDD();
    }

主程序:

long startTime = System.currentTimeMillis();
        JavaRDD<Row> rdd = readJsonTable();

        sparkGenerateHFiles(rdd);
        System.out.println("it took " + (System.currentTimeMillis() - startTime)/1000 + " seconds to generate HFiles...\n\n\n\n");

我试过的:
我在stackoverflow上看到最近的一个帖子。那我就定了 builder.config("spark.shuffle.blockTransferService", "nio"); 但还是没有运气。
非常感谢您的帮助!

1bqhqjot

1bqhqjot1#

正如@wang指出的,这确实是由于我的数据倾斜问题。
为了解决这个问题,我所做的是:
我重新创建了hbase表,但这次 SPLITS ,并将我的hbase表拆分为80个区域。然后在我的spark代码中,我编写了一个定制的分区器,根据每个条目的键对其进行重新分区,这样,就没有了 HOTSPOTTING 问题,即一个区域服务器过载,而其他服务器处于空闲状态。
在使用时还学到了一些其他技巧 SPLITS 要创建hbase表,默认情况下 startkey 第一区域和 endkey 最后一个区域的值是空字符串 "" ,一定要在那里做正确的事情,以避免热点太多。
这是一个我的分区工作的例子。
谢谢!

相关问题