我可以查询Elasticsearch,并聚合结果,使用:
spark.read.format("org.elasticsearch.spark.sql").load('my_index').selectExpr('max(my_field)').show()
字符串
然而,这是低效的,因为PySpark检索所有数据,然后执行聚合。我更喜欢弹性来做聚合,只手动触发结果。如何才能做到这一点?
我知道我可以让弹性为我做过滤,只交出过滤后的结果,使用:
myquery = '''{
"range": {
"my_field": {
"gt": 1,
"boost": 1
}
}
}'''
spark.read.format("org.elasticsearch.spark.sql").option("es.query", myquery).load('my_index').show()
型
有趣的是,当我让Spark过滤器,像:
spark.read.format("org.elasticsearch.spark.sql").load('my_index').filter('my_field>1').show()
型
它似乎让弹性做过滤。无论如何,我不知道如何在弹性中做聚合。
1条答案
按热度按时间q35jwt9p1#
与其依赖于Spark SQL接口到Elasticsearch,不如直接查询Elasticsearch API。这确保了繁重的工作由Elasticsearch完成。一旦你得到了聚合结果,你可以在Spark中使用它来进行进一步的分析或处理。假设你想获取my_field的最大值。使用Elasticsearch的最大聚合:
字符串
要执行此查询,请使用Python中的工具或库(如请求)向Elasticsearch端点(
http://YOUR_ELASTICSEARCH_ENDPOINT/my_index/_search
)发送POST请求。从Elasticsearch获得聚合结果后,您可以解析相关字段并在Spark中创建DataFrame。
型