在Apache beam中使用MongoDB聚合查询

mspsb9vt  于 2023-05-06  发布在  Go
关注(0)|答案(1)|浏览(116)

我尝试使用聚合查询作为Apache Beam MongoDBIO QueryFn的一部分。但我没有得到任何结果。

List<BsonDocument> documents = new ArrayList<>();
documents.add(
       new BsonDocument(
              "$group",
               new BsonDocument("_id", new BsonString("$field_name1"))
                   .append("count", new BsonDocument("$sum", new BsonString("field_name2")))
         ));

 pipeline.apply(MongoDbIO.read()
          .withUri("mongodb://localhost:27017")
          .withDatabase("databaseName")
          .withCollection("collectionName")
          .withQueryFn(AggregationQuery.create().withMongoDbPipeline(documents)));

查询结构是否正确?

jdg4fx2g

jdg4fx2g1#

不应使用BsonString,而应为管道中的每个字段使用适当的BsonValue子类。例如,要指定字符串值,应使用BsonString。若要指定数值,应根据字段的数据类型使用BsonInt32、BsonInt64或其他数值类型。

List<BsonDocument> documents = new ArrayList<>();
documents.add(
    new BsonDocument(
        "$group",
        new BsonDocument("_id", new BsonString("$field_name1"))
            .append("count", new BsonDocument("$sum", new BsonInt32(1))) // use BsonInt32 instead of BsonString
    )
);

pipeline.apply(MongoDbIO.read()
    .withUri("mongodb://localhost:27017")
    .withDatabase("databaseName")
    .withCollection("collectionName")
    .withQueryFn(AggregationQuery.create().withMongoDbPipeline(documents))
);

相关问题