我如何进行这种转换?

ctrmrzij  于 2021-05-29  发布在  Spark
关注(0)|答案(3)|浏览(358)

环境:spark2.4.5

source.json文件:

{
    "a_key": "1",
    "a_pro": "2",
    "a_con": "3",
    "b_key": "4",
    "b_pro": "5",
    "b_con": "6",
    "c_key": "7",
    "c_pro": "8",
    "c_con": "9",
    ...
}

traget.json格式:

{
    "factors": [
        {
            "name": "a",
            "key": "1",
            "pros": "2",
            "cons": "3"
        },
        {
            "name": "b",
            "key": "4",
            "pros": "5",
            "cons": "6"
        },
        {
            "name": "c",
            "key": "7",
            "pros": "8",
            "cons": "9"
        },
        ...
    ]
}

正如您所看到的,目标'name'是源代码键的一部分。例如,“a”是“a\u key”、“a\u pro”、“a\u con”的“name”。我真的不知道如何从键中提取值,并进行“分组”转换。谁能给我一些建议吗?

lvmkulzt

lvmkulzt1#

无需涉及Dataframe,一些简单的字符串和字典操作即可:

import json

source = {
    "a_key": "1",
    "a_pro": "2",
    "a_con": "3",
    "b_key": "4",
    "b_pro": "5",
    "b_con": "6",
    "c_key": "7",
    "c_pro": "8",
    "c_con": "9",
}

factors = {}

# Prepare each factor dictionary

for k, v in source.items():
    factor, item = k.split('_')
    d = factors.get(factor, {})
    d[item] = v
    factors[factor] = d

# Prepare result dictionary

target = {
    'factors': []
}

# Move name attribute into dictionary & append

for k, v in factors.items():
    d = v
    d['name'] = k
    target['factors'].append(d)

result = json.dumps(target)
yrefmtwq

yrefmtwq2#

您的数据很奇怪,但以下代码可以帮助您解决此问题:
source.json文件:

{
      "a_key": "1",
      "a_pro": "2",
      "a_con": "3",
      "b_key": "4",
      "b_pro": "5",
      "b_con": "6",
      "c_key": "7",
      "c_pro": "8",
      "c_con": "9"
}

代码:

val sparkSession = SparkSession.builder()
  .appName("readAndWriteJsonTest")
  .master("local[*]").getOrCreate()

val dataFrame = sparkSession.read.format("json").load("R:\\data\\source.json")

// println(dataFrame.rdd.count())

val mapRdd: RDD[(String, (String, String))] = dataFrame.rdd.map(_.getString(0))
 .filter(_.split("\\:").length == 2)
 .map(line => {
  val Array(key1, value1) = line.split("\\:")
  val Array(name, key2) = key1.replace("\"", "").trim.split("\\_")
  val value2 = value1.replace("\"", "").replace(",", "").trim
  (name, (key2, value2))
})

// mapRdd.collect().foreach(println)

val initVale = new ArrayBuffer[(String, String)]

val function1 = (buffer1: ArrayBuffer[(String, String)], t1: (String, String)) => buffer1.+=(t1)
val function2 = (buffer1: ArrayBuffer[(String, String)], buffer2: ArrayBuffer[(String, String)]) => buffer1.++(buffer2)

val aggRdd: RDD[(String, ArrayBuffer[(String, String)])] = mapRdd.aggregateByKey(initVale)(function1, function2)

// aggRdd.collect().foreach(println)

import scala.collection.JavaConverters._
val persons: util.List[Person] = aggRdd.map(line => {
  val name = line._1
  val keyValue = line._2(0)._2
  val prosValue = line._2(1)._2
  val consvalue = line._2(2)._2

  Person(name, keyValue, prosValue, consvalue)
}).collect().toList.asJava

import com.google.gson.GsonBuilder
val gson = new GsonBuilder().create

val factors = Factors(persons)

val targetJsonStr = gson.toJson(factors)

println(targetJsonStr)

traget.json格式:

{
  "factors": [
  {
    "name": "a",
    "key": "1",
    "pros": "2",
    "cons": "3"
  },
  {
    "name": "b",
    "key": "4",
    "pros": "5",
    "cons": "6"
  },
  {
    "name": "c",
    "key": "7",
    "pros": "8",
    "cons": "9"
  }
  ]
}

您可以将上述代码放入测试方法并运行它以查看所需的结果。

@Test
  def readAndSaveJsonTest: Unit = {}

希望能对你有所帮助。

jc3wubiy

jc3wubiy3#

iiuc首先从输入json创建Dataframe

json_data = {
    "a_key": "1",
    "a_pro": "2",
    "a_con": "3",
    "b_key": "4",
    "b_pro": "5",
    "b_con": "6",
    "c_key": "7",
    "c_pro": "8",
    "c_con": "9"
}
df=spark.createDataFrame(list(map(list,json_data.items())),['key','value'])
df.show()

+-----+-----+
|  key|value|
+-----+-----+
|a_key|    1|
|a_pro|    2|
|a_con|    3|
|b_key|    4|
|b_pro|    5|
|b_con|    6|
|c_key|    7|
|c_pro|    8|
|c_con|    9|
+-----+-----+

现在从现有列创建所需的列

import pyspark.sql.functions as  f
df2 = df.withColumn('Name', f.substring('key',1,1)).\
         withColumn('Attributes', f.concat(f.split('key','_')[1],f.lit('s')))
df2.show()
+-----+-----+----+----------+
|  key|value|Name|Attributes|
+-----+-----+----+----------+
|a_key|    1|   a|      keys|
|a_pro|    2|   a|      pros|
|a_con|    3|   a|      cons|
|b_key|    4|   b|      keys|
|b_pro|    5|   b|      pros|
|b_con|    6|   b|      cons|
|c_key|    7|   c|      keys|
|c_pro|    8|   c|      pros|
|c_con|    9|   c|      cons|
+-----+-----+----+----------+

现在透视dataframe并将结果收集为json对象

output_json = df2.groupBy('Name').\
                  pivot('Attributes').\
                  agg(f.min('value')).\         
                  select(f.collect_list(f.struct('Name','keys','cons','pros')).alias('factors')).\
                  toJSON().collect()

import json
print(json.dumps(json.loads(output_json[0]),indent=4))

{
    "factors": [
        {
            "Name": "c",
            "keys": "7",
            "cons": "9",
            "pros": "8"
        },
        {
            "Name": "b",
            "keys": "4",
            "cons": "6",
            "pros": "5"
        },
        {
            "Name": "a",
            "keys": "1",
            "cons": "3",
            "pros": "2"
        }
    ]
}

相关问题