我有我的输入文件大小约3亿记录存储到亚马逊ElasticSearch,我使用亚马逊电子病历(Spark)加载到亚马逊ElasticSearch。
我的emr配置是master-r5.4xlagle(16 vcore,128 gib内存)和slaves-r5.8xlagle(10个节点,每个节点32 vcore,每个节点256 gib内存)。
我的es配置是示例类型(数据)i3.large.elasticsearch节点数5,主示例数-3
我正在读的输入来自s3,这是我的代码
SparkConf sparkConf = (new SparkConf())
.setAppName("SparkLoadToES");
sparkConf.set("spark.es.nodes", );
sparkConf.set("spark.es.port", );
sparkConf.set("spark.es.nodes.wan.only","true");
sparkConf.set("spark.es.batch.write.refresh","false");
sparkConf.set("spark.es.index.auto.create","true");
sparkConf.set("spark.es.resource","test-index");
sparkConf.set("spark.es.batch.size.entries","25000");
sparkConf.set("spark.es.batch.write.retry.wait","30s");
sparkConf.set("spark.es.batch.write.retry.count","-1");
sparkConf.set("spark.es.batch.size.bytes","15mb");
SparkSession sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();
Dataset<Row> datasetToLoad = sparkSession.read().parquet("s3a://databucket/temp/").repartition(320);
datasetToLoad.cache();
datasetToLoad.write().format("org.elasticsearch.spark.sql").mode(SaveMode.Append).save("test-index");
写入ElasticSearch时数据加载非常慢,加载2亿需要2小时,我想把这个时间降到30到40分钟左右,加载3亿,
我在amazon elasticsearch索引上的碎片数是9和0。
我想在30分钟内完成写给elasticsearch的工作,但是速度非常慢,看起来elasticsearch方面显然有一些瓶颈。
进一步增加碎片使elasticsearch群集变为红色状态..并且向elasticsearch群集添加太多节点也没有用,我是否可以使用理想参数调整emr和elasticsearch,以便在不到一小时内完成所有3亿条记录的任务?。
在elasticsearch的索引中添加太多碎片会导致此错误
executor 62: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest ([HEAD] on [test-index] failed; server[endpoint.us-east-1.es.amazonaws.com:443] returned [429|Too Many Requests:]) [duplicate 2]
在spark中添加太多节点会产生以下错误
org.elasticsearch.hadoop.rest.EsHadoopRemoteException: es_rejected_execution_exception: rejected execution of processing of [76498][indices:data/write/bulk[s][p]]: request: BulkShardRequest [[test-index][7]] containing [3] requests, target allocation id: a02WQn91TdqUU7n3NSWufw, primary term: 1 on EsThreadPoolExecutor[name = 9e97ab8ff87aeecaf789b7203d64a894/write, queue capacity = 200, org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor@7ca1f1c1[Running, pool size = 2, active threads = 2, queued tasks = 200, completed tasks = 22084]]
{"index":{}}
Bailing out...
at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.flush(BulkProcessor.java:519)
at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.add(BulkProcessor.java:113)
at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:192)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:172)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:74)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
这是我的spark-submit命令spark调优参数 --conf spark.default.parallelism=400 --conf spark.dynamicAllocation.enabled=true --conf maximizeResourceAllocation=true --conf spark.shuffle.service.enabled=true
如何在spark节点和aws elasticsearch节点之间找到正确的平衡点,看起来在elasticserch中添加更多的节点可以提供更多的核心,而更多的核心可以提供更多的写容量,但是看起来amazon elasticsearch集群的扩展是不起作用的。
我做错什么了吗?
暂无答案!
目前还没有任何答案,快来回答吧!