使用MongoDB Spark连接器基于时间戳进行过滤

s5a0g9ez  于 2022-12-03  发布在  Go
关注(0)|答案(3)|浏览(173)

我正在使用Spark MongoDB connector从mongodb中获取数据。但是我无法了解如何使用Spark和聚合管道(rdd.withPipeline)查询Mongo。以下是我的代码,我希望根据时间戳获取记录并存储在 Dataframe 中:

val appData=MongoSpark.load(spark.sparkContext,readConfig)
val df=appData.withPipeline(Seq(Document.parse("{ $match: { createdAt : { $gt : 2017-01-01 00:00:00 } } }"))).toDF()

使用spark在mongodb上查询时间戳值是否正确?

zpgglvta

zpgglvta1#

正如注解中提到的,您可以使用Extended JSON的日期过滤器。

val appDataRDD  = MongoSpark.load(sc)
val filteredRDD = appDataRDD.withPipeline(Seq(Document.parse("{$match:{timestamp:{$gt:{$date:'2017-01-01T00:00:00.000'}}}}")))
filteredRDD.foreach(println)

另请参阅MongoDB Spark Connector: Filters and Aggregation以查看替代筛选器。

lp0sw83n

lp0sw83n2#

试试这个:

val pipeline = "{'$match': {'CreationDate':{$gt: {$date:'2020-08-26T00:00:00.000Z'}}}}"

val sourceDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://administrator:password@10.XXXXX:27017/?authSource=admin").option("database","_poc").option("collection", "activity").option("pipeline", pipeline).load()
nvbavucw

nvbavucw3#

试试这个(但是它有一些限制,比如mongo日期和ISODate只能采用TZ格式时间戳。

option("pipeline", s"""[{ $$match: { "updatedAt" : { $$gte : new ISODate("2022-11-29T15:26:21.556Z") } } }]""").mongo[DeltaComments]

相关问题