使用CosmosDBMongoDB链接服务,从Azure Cosmos DB for MongoDB到Synapse Analytics工作区。
编辑:我可以确认Cosmos/ Mongo DB中的objectID
值是有效的,因为它们在应用程序中使用。
文档片段,如CosmosDB数据资源管理器中所示
{
"_id" : ObjectId("623a3902764504df1bc51620"),
"name": "Dummy data"
}
字符串
在CosmosDB集合中,“Analytical Storage Time to Live”(分析存储生存时间)选项为“On”(打开),这意味着它们将显示在Analytics studio的“Linked”(链接)部分中。
在Synapse笔记本电脑上运行
spark.sql("select unhex('623a3902764504df1bc51620') as bytes").show(truncate=False)
型
返回
+-------------------------------------+
|bytes |
+-------------------------------------+
|[62 3A 39 02 76 45 04 DF 1B C5 16 20]|
+-------------------------------------+
型
这表明PySpark环境正在工作。但是
df = spark.read\
.format("cosmos.olap")\
.option("spark.synapse.linkedService", "CosmosDbMongoDb1")\
.option("spark.cosmos.container", "TABLENAME")\
.load()
display(df.limit(10))
型
返回
"{"objectId":"b:9\u0002vE\u0004�\u001b�\u0016 "}"
objectId: ""b:9\u0002vE\u0004�\u001b�\u0016 ""
型
为_id
列中的所有值。
编辑:df.printSchema()
返回值
root
|-- _rid: string (nullable = true)
|-- _ts: long (nullable = true)
|-- id: string (nullable = true)
|-- _etag: string (nullable = true)
|-- _id: struct (nullable = true)
| |-- objectId: string (nullable = true)
|-- name: struct (nullable = true)
| |-- string: string (nullable = true)
..... snip ...
型
运行df3 = df.select(unhex("_id.objectId"))
返回
+-------------------+
|unhex(_id.objectId)|
+-------------------+
| null|
| null|
+-------------------+
型
运行df.select(unhex('_id.objectId'))
返回
DataFrame[unhex(_id.objectId): binary]
型
运行
SELECT TOP (100) JSON_VALUE([_id], '$.objectId') AS _id
FROM [DB].[dbo].[TABLENAME]
型
内建SQL集区中的,会传回相同的乱码b:9vE��
值。
试图
%%sql
create table TABLENAME using cosmos.olap options (
spark.synapse.linkedService 'CosmosDbMongoDb1',
spark.cosmos.container 'TABLENAME'
)
SELECT name, _id
FROM TABLENAME;
型
返回_id
列的值
"{"schema":[{"name":"objectId","dataType":{},"nulla..."
schema: "[{"name":"objectId","dataType":{},"nullable":true,..."
0: "{"name":"objectId","dataType":{},"nullable":true,"..."
name: ""objectId""
dataType: "{}"
nullable: "true"
metadata: "{"map":{}}"
map: "{}"
values: "["b:9\u0002vE\u0004�\u001b�\u0016 "]"
0: ""b:9\u0002vE\u0004�\u001
型
对不起,复制粘贴额外的乱码,列值中有可扩展的字段。
2条答案
按热度按时间hgtggwj01#
根据这个documentation,
Cosmos DB自动将BSON数据(二进制JSON)转换为列格式。
但是,由于ObjectId类型,您仍然会收到错误。
在Spark中,没有ObjectId类型,所以它无法识别类型并给出这样的错误。These是Spark支持的数据类型。
为了避免这种情况,您需要使用提供的
_id
创建或摄取数据,因为如果您不提供_id
,它将自动添加,如下所示:ObjectId("623a3902764504df1bc51620")
。下面是创建的两个记录:一个是自动生成的
_id
,另一个是手动创建的_id
。自动生成
的数据
手动添加
的
输出:
下面是用于为容器HTAP2向Cosmos MongoDB添加数据的代码。
字符串
输出量:
的
因此,请确保在向Cosmos DB添加数据时添加
_id
。请参阅此documentation了解更多信息。
kq4fsx7k2#
我在Azure文档中找到了Scala代码示例,它可以将
ObjectId
字段转换为字符串:字符串