pyspark 对框架列的动态架构计算运行时间过长

qrjkbowd  于 2023-11-16  发布在  Spark
关注(0)|答案(2)|浏览(152)

我正在使用spark jdbc从databricks向postgresql发送一个join查询。结果行计数是170k记录。我在结果框架中有一个名为"data"的列。它包含一个JSON字符串,其结构可能对所有行都相同,也可能不相同。因此,由于"column"的模式不是静态的,我没有预先定义任何结构。我使用以下方法动态派生模式。

json_schema = spark.read.json(df_code.select("data").rdd.map(lambda row: row.data)).schema
 df_json_data = df_code.withColumn('json_data', F.from_json(F.col('data'), json_schema)).drop('data')

字符串
这就是我将json的字符串表示转换为json列的方法。但是这需要花费很多时间。如果是少量的记录,比如几千条,没有问题。但是在这个例子中,我有170k条记录,工作似乎没有进展。
我使用的集群有4个工作站和1个驱动程序。每个工作站有56 GB内存和8个核心。虽然它是一个共享集群,但在我运行作业时,没有笔记本电脑在此集群上运行。我无法找出如何解决这个问题。任何解决此问题的意见都非常感谢。请求社区的帮助。

    • 样本数据:**

请注意,在示例数据中,我在第一条记录中提到了867506随机字符和116随机字符。我认为我们的源网页捕获的是jpg图像,其相关文本在我们的postgresql中捕获。这些随机字符也有字母数字和特殊字符。参考图像如下。jpg文件大小越大,我猜在这个json中,字符长度会更大。正如我提到的,这个"data"列没有正确的结构。这就是为什么你会看到每条记录的不同模式。

    • Record1:**{"recomend_portal ":2," upload ":[{" storage":"base64","name":"dbaver.exe","url":"data:application/x-msdownload; base64,867506随机字符","size":532488,"type":"application/x-msdownload","originalName":"dbeaver.exe","hash":"0fbee5a6f48b20225eb23bb59d870147 "," fileType ":" JPG "}],"secondbaseadPdf":[{"storage ":" base64","name":"我的HR工具-94a42a23 - 6239 - 42ca-b5d6-ea13c5969b24.url","url":"data:application/octet-stream; base64,116随机字符"," size":86," type":null," originalName":" My HR Tool. url"," hash":" 0fbee5a6f48b20225eb23bb59d870147"," fileType":" PDF "}]," aFieldWithInlineValidation":"df","thePortalsCapabilitiesMetMyNeeds":4}
    • Record2:**{"recomend_portal ":0," thePortalsCapabilitiesMetMyNeeds ":8," upload ":[]," seconderadPdf ":[]}
    • 记录3:**{"start_time ":" 2023 - 06 - 28 16:00:00 "," end_time ":" 2023 - 06 - 28 17:00:00 "}


的数据

dtcbnfnu

dtcbnfnu1#

与源代码团队进行了交谈,以了解在json列中包含jpg图像的必要性。他们提到他们代码的初始版本是将图像写入数据库,最新的版本只是将图像在blob上的存储位置保存到json列中。代码运行良好,没有直接将图像存储在json中的数据。但我仍然不明白为什么这项工作当图像存储在表中时,不会运行。但是现在,我的问题已经解决了,因为我们要使用一个新版本。

byqmnocz

byqmnocz2#

从您的框架中获取不同的模式,并执行spark.read.json
代码:

from pyspark.sql.functions import schema_of_json,col

tmp = df.withColumn("schema_t",schema_of_json(col('data'))).drop_duplicates(['schema_t'])

json_schema  = spark.read.json(tmp.select("data").rdd.map(lambda row: row.data)).schema
df_json_data = df.withColumn('json_data', from_json(col('data'), json_schema)).drop('data')
display(df_json_data)

字符串
输出量:
我已经尝试了超过20万条记录。之所以花很长时间是为了解释如此大量的模式并将其标准化为一个单一的模式。
因此,只从170k中获取不同的模式记录,并再次使用spark.read.json获取模式。


的数据

相关问题