使用Python中的Spark Core合并JSON和CSV文件中的数据

xa9qqrwz  于 2022-12-13  发布在  Apache
关注(0)|答案(1)|浏览(128)

尝试编写一个Python脚本,该脚本从Google Drive文件中获取一个JSON文件和一些CSV文件,并使用ONLY Spark Core分析和操作其数据。
此代码的功能是使用JSON和CSV文件中的数据创建元组。这两个文件共享一个共同的信息项,这是每个文件(json和csv)中对应于字符串的第一项。在上一个示例中,此项对应于对象“1111”。此项是一个“代码”(例如:1111,123,12345...),存在于csv文件的每一行中,与json文件中的“item 1”相关联。csv文件中可能有许多行在相同的“code”下,所以脚本处理这些信息,如您前面所见。
json文件中的数据采用以下格式:

[{"item1":"1111","item2":"aaaaaa","item3":"bbbbbbb"},{"item1":"123",...},...

和csv:

1111;SDS111;99.999;2.222
123;SDS222;333.111;4.111
...

打印的元组将是json和csv文件中的数据的组合,由这个“code”统一。目标是编写一些东西,对于json文件中的每个元素,标识“code”,如果“code”等于csv文件中的第一项,则打印出一个元组,其中包含两个文件中的数据,如下所示:

('aaaaaa', 'bbbbbbb', '1111', 'SDS111', (99.999, 2.222))
('ccccccccc', 'ddddd', '123', 'SDS222', (333.111, 4.111))
...

目前,我所拥有的代码能够读取json文件并从中创建元组,然后处理存储在csv:

import json
import pyspark

sc = pyspark.SparkContext('local[*]')
try:
  with open("/content/drive/../file.json") as f:
    data = json.load(f)
    rdd = sc.parallelize(data)
    json_data = rdd.map(lambda item: (item["item1"], item["item2"], item["item3"]))

  lines = sc.textFile('/content/drive/../file*.csv', 5) \
          .map(lambda line: line.strip()) \
          .filter(lambda line: len(line.split(';')) == 4)

  csv_data = lines.map(lambda line: line.split(';')) \
          .map(lambda values: (values[0], float(values[2]), float(values[3]))) \
          .map(lambda kv: (kv[0], (kv[1], kv[2]))) \
          .reduceByKey(lambda a, b: (max(a[0] + b[0]), max(a[1] + b[1]))) \
          .map(lambda kv: (kv[0], (kv[1][1], kv[1][2]))) \
          .sortByKey()

  #combination part???

  sc.stop()
except Exception as e: 
  print(e)
  sc.stop()

我真的很难与脚本的这一部分,并会非常感谢一些帮助!随时问我更多的信息,如果需要的话

s5a0g9ez

s5a0g9ez1#

为什么要使用低级命令。除非你不得不使用rdd调用。我建议用PySpark代替。解决方案非常简单。
此代码位于Azure数据块中。请编写示例json文件。

#
#  1 - Create sample json file
#

# some text
data = """
[
{"item1":"111","item2":"aaaaaa","item3":"bbbbbbb"},
{"item1":"123","item2":"cccccc","item3":"ddddddd"}
]
"""

# write file
path = "/tmp/stackoverflow.json"
dbutils.fs.put(path, data, True)

创建示例csv文件。

#
#  2 - Create sample csv file
#

# some text
data = """
111;SDS111;99.999;2.222
123;SDS222;333.111;4.111
"""

# write file
path = "/tmp/stackoverflow.csv"
dbutils.fs.put(path, data, True)

读取一个或多个csv文件并创建名为“tmp_csv_data”的临时配置单元视图。

#
#  3 - Read csv data
#

# file location
path = "/tmp/*.csv"

# make dataframe
df1 = spark.read.format("csv") \
  .option("header", "false") \
  .option("sep", ";") \
  .schema("item1 string, label1 string, value1 float, value2 float") \
  .load(path)

# make temp hive view
df1.createOrReplaceTempView("tmp_csv_data")

读取一个或多个json文件并创建一个名为“tmp_json_data”的临时配置单元视图。

#
#  4 - Read json data
#

# file location
path = "/tmp/*.json"

# make dataframe
df1 = spark.read.format("json") \
  .schema("item1 string, item2 string, item3 string") \
  .load(path)

# make temp hive view
df1.createOrReplaceTempView("tmp_json_data")

让我们快速查看一下csv视图。

让我们快速查看一下json视图。请注意,格式错误的JSON文件创建了两个空白记录。

下图显示了组合 Dataframe 中的两个数据集,只需使用spark.write()创建最终文件即可。

这段代码是直接的,并利用了大多数人已经掌握的ANSI SQL技能。

相关问题