我们有一个elasticsearch集群已经有近2年的历史了,我们希望对其中包含的日志数据以及其他不同的数据源进行更高级的分析。
我们的重点是elasticsearch中的系统日志。每天生成约100gb的系统日志数据—每天都是自己的索引。我们也有一些应用程序日志,但是如果我能为syslog解决这个问题,我就可以轻松地为其他数据移动问题解决它。
这就引出了我的问题。在我的分析中,我们将spark2.1.1与pythonapi结合使用。我想要所有的系统日志数据,比如说,2周的hdfs,这样我们可以做两件事:
通过spark/hadoop集群之间的通信避免延迟
加快机器学习工作的速度
接下来我想开始使用parquet作为我的数据,所以如果我有从es中提取的数据,我以后可以用它做任何我想做的事情。
现在,我的问题是-从es中提取如此大量的数据并将其放入hdfs的最佳方法是什么?我在pyspark中有一个做一些基本查询的例子,但是当我尝试将整个索引(每天生成100gb的索引)拉入rdd时,就会出现内存不足的错误。我已经联系了elasticsearch的支持,但是他们告诉我这是一个我需要在hadoop/spark方面解决的问题,他们不支持这个。
我们已经设置了“es hadoop connector”,它确实为我提供了一些框架,尽管理解文档确实是一个挑战。hadoop生态系统的几个组件(hive、spark、hadoop等)都有连接器。我不确定是否有解决办法,或者是否有更好的办法。我是新来的,所以请原谅任何有明显答案的问题。我正在寻找一些指导和一些具体的建议(如果可能的话,指向带有设置和代码的特定示例的指针将是惊人的)。我的目标是:
在hdfs中获得大约2周的系统日志(我希望这是一个滚动的2周)
在elasticsearch系统上创建最小负载
不管有什么方法,最好是自动执行,这样每天都会接收一个新索引并删除最旧的索引。这不是一个严格的要求,但只是一个很好的拥有。
谢谢你给我的任何帮助、建议或例子。
编辑/附加信息:
我想在这里添加一些代码来解释我要做的事情。这个过程需要非常长的时间才能完成,即使在几个小时之后,也没有显示出进展,所以我想知道我是否做错了什么。
以下是我如何启动py spark:
pyspark --jars=/sysadmin/hadoop/elasticsearch-hadoop-5.6.3/dist/elasticsearch-hadoop-5.6.3.jar --master yarn --deploy-mode client --num-executors 10 --executor-cores 4 --executor-memory 8G --driver-memory 50G
然后,我做了几件事,设置esconf,创建rdd,然后尝试将其作为文本保存到hdfs:
>>> esconf = {"es.net.http.auth.user":"XXXXX","es.net.http.auth.pass":"XXXXX","es.resource":"logstash-syslog-2017.10.11", "es.query":"?q=*","es.read.field.include":"message","es.nodes":"server0005","es.net.ssl":"true"}
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=esconf)
>>> rdd.saveAsTextFile("/user/spark/logstash-syslog-2017.10.11.txt")
现在,rdd回来了,如果我 take(1)
从rdd,这需要一段时间,但我可以得到前10名的结果。在那10张唱片上,我可以保存下来,效果很好。不过,在完整的rdd上,这只是花了很长时间。我真的不知道我应该期待什么,但我不能想象在一个10节点的集群上有64gb的ram和每个盒子8个内核,这应该需要几个小时。
1条答案
按热度按时间os8fio9y1#
我在pyspark中有一个做一些基本查询的例子,但是当我尝试将整个索引(每天生成100gb的索引)拉入rdd时,就会出现内存不足的错误
默认情况下,spark不会为作业分配太多内存,所以是的,当处理那么多数据时,会出现错误。
以下是您应该关注的关键属性及其默认值。
spark.dynamicAllocation.enabled
-false
spark.executor.instances
-2
spark.executor.memory
-1g
spark.driver.cores
-1
如果您的spark作业是在Yarn簇管理下运行的,那么您还需要考虑Yarn容器的大小。在集群模式下运行时,应用程序主机将是spark驱动程序容器。以我的经验,除非你的星火密码collect()
要通过驱动程序发回数据,它本身不需要那么多内存。我会先增加执行器内存,然后增加执行器的数量。如果启用动态分配,则可以考虑不指定执行器数量,但它确实设置了一个较低的边界。
es-hadoop提供了许多连接器来提取数据,但都归结为首选项。如果你懂sql,就用hive。清管器比Spark更容易操作。spark内存非常大,在某些集群中可能无法正常工作。
您在评论中提到了nifi,但这仍然是一个java进程,并且容易出错。除非您有一个nifi集群,否则在写入hdfs之前,您将有一个进程在某个地方通过磁盘上的流文件拉100gb。
如果您需要一个完整索引的快照,elasticsearch会为这种特性提供hdfs支持。我不确定那是什么数据格式,或者hadoop进程是否可以读取它。