AmazonWeb服务—从spark开始,向ElasticSearch的写入速度很慢,并且扩展节点会引发错误

6qfn3psc  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(323)

我有我的输入文件大小约3亿记录存储到亚马逊ElasticSearch,我使用亚马逊电子病历(Spark)加载到亚马逊ElasticSearch。
我的emr配置是master-r5.4xlagle(16 vcore,128 gib内存)和slaves-r5.8xlagle(10个节点,每个节点32 vcore,每个节点256 gib内存)。
我的es配置是示例类型(数据)i3.large.elasticsearch节点数5,主示例数-3
我正在读的输入来自s3,这是我的代码

SparkConf sparkConf = (new SparkConf())
              .setAppName("SparkLoadToES");
          sparkConf.set("spark.es.nodes", );
          sparkConf.set("spark.es.port", );
          sparkConf.set("spark.es.nodes.wan.only","true");
          sparkConf.set("spark.es.batch.write.refresh","false");
          sparkConf.set("spark.es.index.auto.create","true");
          sparkConf.set("spark.es.resource","test-index");
          sparkConf.set("spark.es.batch.size.entries","25000");
          sparkConf.set("spark.es.batch.write.retry.wait","30s");
          sparkConf.set("spark.es.batch.write.retry.count","-1");
          sparkConf.set("spark.es.batch.size.bytes","15mb");

 SparkSession sparkSession = SparkSession
              .builder()
              .config(sparkConf)
              .getOrCreate();

Dataset<Row> datasetToLoad = sparkSession.read().parquet("s3a://databucket/temp/").repartition(320);
 datasetToLoad.cache();

datasetToLoad.write().format("org.elasticsearch.spark.sql").mode(SaveMode.Append).save("test-index");

写入ElasticSearch时数据加载非常慢,加载2亿需要2小时,我想把这个时间降到30到40分钟左右,加载3亿,
我在amazon elasticsearch索引上的碎片数是9和0。
我想在30分钟内完成写给elasticsearch的工作,但是速度非常慢,看起来elasticsearch方面显然有一些瓶颈。
进一步增加碎片使elasticsearch群集变为红色状态..并且向elasticsearch群集添加太多节点也没有用,我是否可以使用理想参数调整emr和elasticsearch,以便在不到一小时内完成所有3亿条记录的任务?。
在elasticsearch的索引中添加太多碎片会导致此错误

executor 62: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest ([HEAD] on [test-index] failed; server[endpoint.us-east-1.es.amazonaws.com:443] returned [429|Too Many Requests:]) [duplicate 2]

在spark中添加太多节点会产生以下错误

org.elasticsearch.hadoop.rest.EsHadoopRemoteException: es_rejected_execution_exception: rejected execution of processing of [76498][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[test-index][7]] containing [3] requests, target allocation id: a02WQn91TdqUU7n3NSWufw, primary term: 1 on EsThreadPoolExecutor[name = 9e97ab8ff87aeecaf789b7203d64a894/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@7ca1f1c1[Running, pool size = 2, active threads = 2, queued tasks = 200, completed tasks = 22084]]
    {"index":{}}

Bailing out...
    at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.flush(BulkProcessor.java:519)
    at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.add(BulkProcessor.java:113)
    at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:192)
    at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:172)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:74)
    at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
    at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

这是我的spark-submit命令spark调优参数 --conf spark.default.parallelism=400 --conf spark.dynamicAllocation.enabled=true --conf maximizeResourceAllocation=true --conf spark.shuffle.service.enabled=true 如何在spark节点和aws elasticsearch节点之间找到正确的平衡点,看起来在elasticserch中添加更多的节点可以提供更多的核心,而更多的核心可以提供更多的写容量,但是看起来amazon elasticsearch集群的扩展是不起作用的。
我做错什么了吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题