pyspark 如何在Spark.read?中使用Mongo DB addFields聚合管道

5cnsuln7  于 2024-01-06  发布在  Spark
关注(0)|答案(2)|浏览(172)

我尝试在Synapse Spark笔记本中使用Mongo DB aggregation pipeline。用例是将ObjectID类型字段(如_id字段)转换为$addFields字符串。
然而,我的尝试

  1. IllegalArgumentException: Unrecognized configuration specified: (pipeline,[{ $addFields: { _id: { '$toString': '$_id' } } } ])

字符串
我一直在尝试蛮力引用标记组合,从Copilot和文档的例子看.这里是一个例子:

  1. pipeline = "[{ '$addFields': { '_id': { '$toString': $_id } } } ]"
  2. df = spark.read\
  3. .format("cosmos.olap")\
  4. .option("spark.synapse.linkedService", "CosmosDbMongoDb1")\
  5. .option("spark.cosmos.container", "<COLLECTION NAME>")\
  6. .option("pipeline", pipeline)\
  7. .load()
  8. display(df.limit(10))


是我在文字格式上犯了一些低级错误,还是这是Spark连接器中缺少支持的情况?
编辑:connector docs中的这一段可以很好地为MongoDB经验丰富的人提供答案。
自定义聚合管道必须与分区器>策略兼容。例如,聚合阶段(如$group)不适用于创建多个分区的任何分区器。

yrdbyhpb

yrdbyhpb1#

要将ObjectID类型字段(如_id字段)转换为字符串,请遵循以下方法:
1.为将从Cosmos OLAP读取的数据定义Schema
1.在架构中为ObjectID字段使用二进制类型。

  1. schema = StructType([
  2. StructField("_rid", StringType(), True),
  3. StructField("_ts", LongType(), True),
  4. StructField("id", StringType(), True),
  5. StructField("_etag", StringType(), True),
  6. StructField("_id", StructType([
  7. StructField("objectId", BinaryType(), True) # Use BinaryType for objectId
  8. ]), True),
  9. StructField("name", StructType([
  10. StructField("string", StringType(), True)
  11. ]), True),
  12. StructField("age", StructType([
  13. StructField("int32", IntegerType(), True)
  14. ]), True),
  15. StructField("marks", StructType([
  16. StructField("array", ArrayType(StructType([
  17. StructField("int32", IntegerType(), True)
  18. ])), True)
  19. ]), True),
  20. StructField("newAge", StructType([
  21. StructField("string", StringType(), True)
  22. ]), True),
  23. StructField("_partitionKey", StructType([
  24. StructField("string", StringType(), True)
  25. ]), True)
  26. ])

字符串

  1. convert_object_id()将字节数组作为输入,并将其转换为十六进制字符串。
  1. def convert_object_id(bytes_array):
  2. builder = []
  3. for b in bytes_array:
  4. builder.append(format(b, '02x'))
  5. return ''.join(builder)


1.下面的行将UDF注册到Spark,Spark DataFrames中使用。

  1. convert_object_id_udf = udf(convert_object_id)


1.使用模式从CosmosOLAP读取数据,然后将数据读入DataFrame。

  1. df = spark.read\
  2. .format("cosmos.olap")\
  3. .option("spark.synapse.linkedService", "CosmosDbMongoDb1")\
  4. .option("spark.cosmos.container", "secondColl")\
  5. .schema(schema)\
  6. .load()

  1. dfConverted通过添加objectIdconvertedObjectId创建新的DataFrame。
  1. dfConverted = (
  2. df.withColumn("objectId", col("_id.objectId"))
  3. .withColumn("convertedObjectId", convert_object_id_udf(col("_id.objectId")))
  4. .select("id", "objectId", "convertedObjectId")
  5. )

我尝试的代码:

  1. from pyspark.sql.functions import udf, col
  2. from pyspark.sql import SparkSession
  3. from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType, ArrayType, BinaryType
  4. schema = StructType([
  5. StructField("_rid", StringType(), True),
  6. StructField("_ts", LongType(), True),
  7. StructField("id", StringType(), True),
  8. StructField("_etag", StringType(), True),
  9. StructField("_id", StructType([
  10. StructField("objectId", BinaryType(), True) # Use BinaryType for objectId
  11. ]), True),
  12. StructField("name", StructType([
  13. StructField("string", StringType(), True)
  14. ]), True),
  15. StructField("age", StructType([
  16. StructField("int32", IntegerType(), True)
  17. ]), True),
  18. StructField("marks", StructType([
  19. StructField("array", ArrayType(StructType([
  20. StructField("int32", IntegerType(), True)
  21. ])), True)
  22. ]), True),
  23. StructField("newAge", StructType([
  24. StructField("string", StringType(), True)
  25. ]), True),
  26. StructField("_partitionKey", StructType([
  27. StructField("string", StringType(), True)
  28. ]), True)
  29. ])
  30. def convert_object_id(bytes_array):
  31. builder = []
  32. for b in bytes_array:
  33. builder.append(format(b, '02x'))
  34. return ''.join(builder)
  35. convert_object_id_udf = udf(convert_object_id)
  36. df = spark.read\
  37. .format("cosmos.olap")\
  38. .option("spark.synapse.linkedService", "CosmosDbMongoDb1")\
  39. .option("spark.cosmos.container", "secondColl")\
  40. .schema(schema)\
  41. .load()
  42. dfConverted = (
  43. df.withColumn("objectId", col("_id.objectId"))
  44. .withColumn("convertedObjectId", convert_object_id_udf(col("_id.objectId")))
  45. .select("id", "objectId", "convertedObjectId")
  46. )
  47. display(dfConverted)

输出:


的数据

展开查看全部
ljsrvy3e

ljsrvy3e2#

我在Azure文档中找到了Scala代码示例,它可以将ObjectId字段转换为字符串:

  1. val df = spark.read.format("cosmos.olap")
  2. .option("spark.synapse.linkedService", "xxxx")
  3. .option("spark.cosmos.container", "xxxx")
  4. .load()
  5. val convertObjectId = udf((bytes: Array[Byte]) => {
  6. val builder = new StringBuilder
  7. for (b <- bytes) {
  8. builder.append(String.format("%02x", Byte.box(b)))
  9. }
  10. builder.toString
  11. }
  12. )
  13. val dfConverted = df.withColumn("objectId", col("_id.objectId")).withColumn("convertedObjectId", convertObjectId(col("_id.objectId"))).select("id", "objectId", "convertedObjectId")
  14. display(dfConverted)

字符串
此解决方案还回答了问题here

展开查看全部

相关问题