PySpark / Elasticsearch中的聚合

wn9m85ua  于 2023-11-16  发布在  Spark
关注(0)|答案(1)|浏览(101)

我可以查询Elasticsearch,并聚合结果,使用:

spark.read.format("org.elasticsearch.spark.sql").load('my_index').selectExpr('max(my_field)').show()

字符串
然而,这是低效的,因为PySpark检索所有数据,然后执行聚合。我更喜欢弹性来做聚合,只手动触发结果。如何才能做到这一点?
我知道我可以让弹性为我做过滤,只交出过滤后的结果,使用:

myquery = '''{ 
   "range": {
      "my_field": {
        "gt": 1,
        "boost": 1
      }
    }
  }'''
spark.read.format("org.elasticsearch.spark.sql").option("es.query", myquery).load('my_index').show()


有趣的是,当我让Spark过滤器,像:

spark.read.format("org.elasticsearch.spark.sql").load('my_index').filter('my_field>1').show()


它似乎让弹性做过滤。无论如何,我不知道如何在弹性中做聚合。

q35jwt9p

q35jwt9p1#

与其依赖于Spark SQL接口到Elasticsearch,不如直接查询Elasticsearch API。这确保了繁重的工作由Elasticsearch完成。一旦你得到了聚合结果,你可以在Spark中使用它来进行进一步的分析或处理。假设你想获取my_field的最大值。使用Elasticsearch的最大聚合:

{
  "size": 0, 
  "aggs": {
    "max_value": {
      "max": {
        "field": "my_field"
      }
    }
  }
}

字符串
要执行此查询,请使用Python中的工具或库(如请求)向Elasticsearch端点(http://YOUR_ELASTICSEARCH_ENDPOINT/my_index/_search)发送POST请求。
从Elasticsearch获得聚合结果后,您可以解析相关字段并在Spark中创建DataFrame。

import requests
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("ElasticAgg").getOrCreate()

# Elasticsearch endpoint and query
es_endpoint = "http://YOUR_ELASTICSEARCH_ENDPOINT/my_index/_search"
query = {
    "size": 0, 
    "aggs": {
        "max_value": {
            "max": {
                "field": "my_field"
            }
        }
    }
}

response = requests.post(es_endpoint, json=query).json()
max_val = response['aggregations']['max_value']['value']

df = spark.createDataFrame([(max_val,)], ["max_my_field"])
df.show()

相关问题