pyspark 如何使用spark mongo连接器在mongo查询管道中使用aggregate

68de4m5k  于 2023-04-29  发布在  Spark
关注(0)|答案(2)|浏览(115)

我使用下面的代码从mongo获取数据。

pipeline = [{'$match': {'createdDateTime': {'$gte': {'$date': f'{yesterday}T00:00:00Z', '$lte': f'{today}T00:00:00Z'}}},
             {'$project': { '_class' :  {'$ifNull' : ['$_class','']}}}
             }
    ]

df= spark.read.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.input.uri",uri).option("pipeline", pipeline).load()

我不明白这是怎么回事,我得到了下面的例外。

IllegalArgumentException: requirement failed: Invalid Aggregation map Map(uri -> mongodb://xxxx:yyyy@mongo.com:27017/DBReport.Application, pipeline -> [{'$match': {'createdDateTime': {'$gte': {'$date': '2021-08-24T00:00:00Z', '$lte': '2021-08-25T00:00:00Z'}}}}, {'$project': {'_class': {'$ifNull': ['$_class', '']}}
    ]

请解释

u7up0aaq

u7up0aaq1#

也许你只是漏掉了一些括号。
试试下面的代码

pipeline = [
             {
                 '$match': {
                     'createdDateTime': {
                         '$gte': {'$date': f'{yesterday}T00:00:00Z'}, 
                         '$lte': {'$date': f'{today}T00:00:00Z'}
                     }
                 }
             },
             {
                 '$project': { 
                      '_class' :  {
                          '$ifNull' : ['$_class','']
                      }
                 }
             }
]
nbysray5

nbysray52#

您可以通过以下方式对MongoDB Spark Connector V10.进行排列

val df = spark.read 
          .format("mongodb") 
          .option("spark.mongodb.connection.uri", "hostname") 
          .option("spark.mongodb.database", "databaseName") 
          .option("spark.mongodb.collection", "collectionName") 
          .option("partitioner","com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner")
          .option("aggregation.pipeline", """[
                          {
                            $match: {
                             startDt: {
                                $gte: ISODate("2022-04-02"),
                              },
                              endDt: {
                                $lte: ISODate("2022-04-03"),
                              },
                            },
                          },
                        ]""")
          .load()

相关问题