scala spark databricks joblog试图找出如何优雅地提取非标准格式的数据

kd3sttzy  于 2023-03-30  发布在  Scala
关注(0)|答案(1)|浏览(107)

我正在努力从Azure数据块发出的作业日志json中提取数据。json中的一个条目包含类似于字典的东西,但不完全是,我无法找到一种优雅的-内联-提取此数据的方法。我正在使用scala并在scala中创建一个结构,以反映作业日志的层次结构。有三个数据元素包含这种格式的字符串,一个例子是:

"response": "{\\"statusCode\\":200}"

如果不是因为反斜杠转义了引号和字符串开头和结尾的双引号,它将成为一个很好的JSON结构。
我做了很多互联网搜索,但没有找到描述这种字符串格式的单词,所以搜索产生了大量不相关的结果。
我尝试了一些正则表达式来预处理它,使其看起来像json结构,但它不起作用。这是我尝试的:

case class cls_logJSON(value:String)
var df2 = df.withColumn("value",regexp_replace(col("value"), "\\\\", "")).as[cls_logJSON]
    df2 = df.withColumn("value",regexp_replace(col("value"), " \"{", " {")).as[cls_logJSON]
    df2 = df.withColumn("value",regexp_replace(col("value"), "}\",", "},")).as[cls_logJSON]

这不是我喜欢看到的代码,我希望这个字符串格式是某种标准,可以用适当的数据类型或类似的东西访问。我从这些JSON行创建的dataframe中提取数据,并可以使用点表示法(如properties.sourceIPAddress)访问其他数据,当然,我可以将此数据作为字符串获取,但是我试图避免在 Dataframe 的每一行上出现某种rbar循环。任何帮助都非常感谢。

oxcyiej7

oxcyiej71#

我不是100%我想去的地方,但我确实找到了一种方法来提取这些数据到dataframe中。在我看来,奇怪的是,数据被格式化为外部json对象中的json对象,并且可以如下访问。我想把一些处理串或链在一起,但我不知道如何做到这一点,所以我有三个步骤来获取我需要的数据。下面是代码。'from_json'函数能够提取这个奇怪格式的列数据:

val reqParamStruct = 
StructType(
StructField("runId",StringType,true) ::
StructField("idInJob",StringType,true) ::
StructField("jobId",StringType,true) ::
StructField("jobTerminalState",StringType,true) ::
StructField("notebookPath",StringType,true) ::
StructField("jobTriggerType",StringType,true) ::
StructField("jobTaskType",StringType,true) ::
StructField("jobClusterType",StringType,true) :: 
StructField("multitaskParentRunId",StringType,true) :: 
StructField("taskDependencyType",StringType,true) :: 
StructField("run_name",StringType,true) :: 
StructField("taskKey",StringType,true) :: 
Nil)

val respParamStruct = 
StructType(StructField("statusCode",StringType,true) :: 
StructField("result",StringType,true) :: Nil)

val identityStruct = 
StructType(StructField("email",StringType,true) :: 
StructField("subjectName",StringType,true) :: Nil)

val resultStruct = 
StructType(StructField("email",StringType,true) :: 
StructField("run_id",StringType,true) :: Nil)

val jobDf_2 = jobDf.select(
$"resourceId",
$"identity",
$"operationName",
$"time",
$"properties.response",
$"properties.logId",
from_json($"identity",identityStruct).as("reqId"),
from_json($"properties.requestParams",reqParamStruct).as("reqParm"),
from_json($"properties.response",respParamStruct).as("respParm")
)

val jobDf_3 = jobDf_2.select(
$"resourceId",
$"reqId.email",
$"operationName",
$"time",
$"logId",
expr("reqParm.runId as reqRunId"),
$"reqParm.runId",
$"reqParm.idInJob",
$"reqParm.jobId",
$"reqParm.notebookPath",
$"reqParm.jobTriggerType",
$"reqParm.jobTaskType",
$"reqParm.jobClusterType",
$"reqParm.multitaskParentRunId",
$"reqParm.taskDependencyType",
$"reqParm.run_name",
$"reqParm.taskKey",

from_json($"respParm.result",resultStruct).as("result")
)

val jobDf_4 = jobDf_3.select(
$"resourceId",
$"email",
$"operationName",
$"time",
$"logId",
$"reqRunId",
$"idInJob",
$"jobId",
$"notebookPath",
$"jobTriggerType",
$"jobTaskType",
$"jobClusterType",
$"multitaskParentRunId",
$"taskDependencyType",
$"taskKey",
$"run_name",
expr("result.run_id as respRunId")
)
jobDf_4.createOrReplaceTempView("v_logEntries")

相关问题