我尝试在Synapse Spark笔记本中使用Mongo DB aggregation pipeline。用例是将ObjectID
类型字段(如_id
字段)转换为$addFields字符串。
然而,我的尝试
IllegalArgumentException: Unrecognized configuration specified: (pipeline,[{ $addFields: { _id: { '$toString': '$_id' } } } ])
字符串
我一直在尝试蛮力引用标记组合,从Copilot和文档的例子看.这里是一个例子:
pipeline = "[{ '$addFields': { '_id': { '$toString': $_id } } } ]"
df = spark.read\
.format("cosmos.olap")\
.option("spark.synapse.linkedService", "CosmosDbMongoDb1")\
.option("spark.cosmos.container", "<COLLECTION NAME>")\
.option("pipeline", pipeline)\
.load()
display(df.limit(10))
型
是我在文字格式上犯了一些低级错误,还是这是Spark连接器中缺少支持的情况?
编辑:connector docs中的这一段可以很好地为MongoDB经验丰富的人提供答案。
自定义聚合管道必须与分区器>策略兼容。例如,聚合阶段(如$group)不适用于创建多个分区的任何分区器。
2条答案
按热度按时间yrdbyhpb1#
要将
ObjectID
类型字段(如_id
字段)转换为字符串,请遵循以下方法:1.为将从Cosmos OLAP读取的数据定义Schema。
1.在架构中为
ObjectID
字段使用二进制类型。字符串
convert_object_id()
将字节数组作为输入,并将其转换为十六进制字符串。型
1.下面的行将UDF注册到Spark,Spark DataFrames中使用。
型
1.使用模式从CosmosOLAP读取数据,然后将数据读入DataFrame。
型
dfConverted
通过添加objectId
和convertedObjectId
创建新的DataFrame。型
我尝试的代码:
型
输出:
的数据
ljsrvy3e2#
我在Azure文档中找到了Scala代码示例,它可以将
ObjectId
字段转换为字符串:字符串
此解决方案还回答了问题here