pyspark 为什么Synapse spark.read返回乱码的MongoDB _id值?

webghufk  于 2024-01-06  发布在  Spark
关注(0)|答案(2)|浏览(162)

使用CosmosDBMongoDB链接服务,从Azure Cosmos DB for MongoDB到Synapse Analytics工作区。
编辑:我可以确认Cosmos/ Mongo DB中的objectID值是有效的,因为它们在应用程序中使用。
文档片段,如CosmosDB数据资源管理器中所示

  1. {
  2. "_id" : ObjectId("623a3902764504df1bc51620"),
  3. "name": "Dummy data"
  4. }

字符串
在CosmosDB集合中,“Analytical Storage Time to Live”(分析存储生存时间)选项为“On”(打开),这意味着它们将显示在Analytics studio的“Linked”(链接)部分中。
在Synapse笔记本电脑上运行

  1. spark.sql("select unhex('623a3902764504df1bc51620') as bytes").show(truncate=False)


返回

  1. +-------------------------------------+
  2. |bytes |
  3. +-------------------------------------+
  4. |[62 3A 39 02 76 45 04 DF 1B C5 16 20]|
  5. +-------------------------------------+


这表明PySpark环境正在工作。但是

  1. df = spark.read\
  2. .format("cosmos.olap")\
  3. .option("spark.synapse.linkedService", "CosmosDbMongoDb1")\
  4. .option("spark.cosmos.container", "TABLENAME")\
  5. .load()
  6. display(df.limit(10))


返回

  1. "{"objectId":"b:9\u0002vE\u0004\u001b\u0016 "}"
  2. objectId: ""b:9\u0002vE\u0004\u001b\u0016 ""


_id列中的所有值。
编辑:df.printSchema()返回值

  1. root
  2. |-- _rid: string (nullable = true)
  3. |-- _ts: long (nullable = true)
  4. |-- id: string (nullable = true)
  5. |-- _etag: string (nullable = true)
  6. |-- _id: struct (nullable = true)
  7. | |-- objectId: string (nullable = true)
  8. |-- name: struct (nullable = true)
  9. | |-- string: string (nullable = true)
  10. ..... snip ...


运行df3 = df.select(unhex("_id.objectId"))返回

  1. +-------------------+
  2. |unhex(_id.objectId)|
  3. +-------------------+
  4. | null|
  5. | null|
  6. +-------------------+


运行df.select(unhex('_id.objectId'))返回

  1. DataFrame[unhex(_id.objectId): binary]


运行

  1. SELECT TOP (100) JSON_VALUE([_id], '$.objectId') AS _id
  2. FROM [DB].[dbo].[TABLENAME]


内建SQL集区中的,会传回相同的乱码b:9vE��值。
试图

  1. %%sql
  2. create table TABLENAME using cosmos.olap options (
  3. spark.synapse.linkedService 'CosmosDbMongoDb1',
  4. spark.cosmos.container 'TABLENAME'
  5. )
  6. SELECT name, _id
  7. FROM TABLENAME;


返回_id列的值

  1. "{"schema":[{"name":"objectId","dataType":{},"nulla..."
  2. schema: "[{"name":"objectId","dataType":{},"nullable":true,..."
  3. 0: "{"name":"objectId","dataType":{},"nullable":true,"..."
  4. name: ""objectId""
  5. dataType: "{}"
  6. nullable: "true"
  7. metadata: "{"map":{}}"
  8. map: "{}"
  9. values: "["b:9\u0002vE\u0004\u001b\u0016 "]"
  10. 0: ""b:9\u0002vE\u0004\u001


对不起,复制粘贴额外的乱码,列值中有可扩展的字段。

hgtggwj0

hgtggwj01#

根据这个documentation
Cosmos DB自动将BSON数据(二进制JSON)转换为列格式。
但是,由于ObjectId类型,您仍然会收到错误。
在Spark中,没有ObjectId类型,所以它无法识别类型并给出这样的错误。These是Spark支持的数据类型。
为了避免这种情况,您需要使用提供的_id创建或摄取数据,因为如果您不提供_id,它将自动添加,如下所示:ObjectId("623a3902764504df1bc51620")
下面是创建的两个记录:一个是自动生成的_id,另一个是手动创建的_id

自动生成


的数据

手动添加



输出:

下面是用于为容器HTAP2向Cosmos MongoDB添加数据的代码。

  1. from random import randint
  2. import time
  3. import uuid
  4. client = MongoClient(connection_string)
  5. db = client.samp # get database
  6. orders = db["HTAP2"] #get container
  7. items = ['Pizza','Sandwich','Soup', 'Salad', 'Tacos']
  8. prices = [2.99, 3.49, 5.49, 12.99, 54.49]
  9. for x in range(1, 501):
  10. random_uuid = uuid.uuid4()
  11. order = {
  12. '_id' : str(random_uuid),
  13. 'item' : items[randint(0, (len(items)-1))],
  14. 'price' : prices[randint(0, (len(prices)-1))],
  15. 'rating' : randint(1, 5),
  16. 'timestamp' : time.time()
  17. }
  18. result=orders.insert_one(order)
  19. print('finished creating 500 orders')

字符串
输出量:



因此,请确保在向Cosmos DB添加数据时添加_id
请参阅此documentation了解更多信息。

展开查看全部
kq4fsx7k

kq4fsx7k2#

我在Azure文档中找到了Scala代码示例,它可以将ObjectId字段转换为字符串:

  1. val df = spark.read.format("cosmos.olap")
  2. .option("spark.synapse.linkedService", "xxxx")
  3. .option("spark.cosmos.container", "xxxx")
  4. .load()
  5. val convertObjectId = udf((bytes: Array[Byte]) => {
  6. val builder = new StringBuilder
  7. for (b <- bytes) {
  8. builder.append(String.format("%02x", Byte.box(b)))
  9. }
  10. builder.toString
  11. }
  12. )
  13. val dfConverted = df.withColumn("objectId", col("_id.objectId")).withColumn("convertedObjectId", convertObjectId(col("_id.objectId"))).select("id", "objectId", "convertedObjectId")
  14. display(dfConverted)

字符串

展开查看全部

相关问题