尝试编写一个Python脚本,该脚本从Google Drive文件中获取一个JSON文件和一些CSV文件,并使用ONLY Spark Core分析和操作其数据。
此代码的功能是使用JSON和CSV文件中的数据创建元组。这两个文件共享一个共同的信息项,这是每个文件(json和csv)中对应于字符串的第一项。在上一个示例中,此项对应于对象“1111”。此项是一个“代码”(例如:1111,123,12345...),存在于csv文件的每一行中,与json文件中的“item 1”相关联。csv文件中可能有许多行在相同的“code”下,所以脚本处理这些信息,如您前面所见。
json文件中的数据采用以下格式:
[{"item1":"1111","item2":"aaaaaa","item3":"bbbbbbb"},{"item1":"123",...},...
和csv:
1111;SDS111;99.999;2.222
123;SDS222;333.111;4.111
...
打印的元组将是json和csv文件中的数据的组合,由这个“code”统一。目标是编写一些东西,对于json文件中的每个元素,标识“code”,如果“code”等于csv文件中的第一项,则打印出一个元组,其中包含两个文件中的数据,如下所示:
('aaaaaa', 'bbbbbbb', '1111', 'SDS111', (99.999, 2.222))
('ccccccccc', 'ddddd', '123', 'SDS222', (333.111, 4.111))
...
目前,我所拥有的代码能够读取json文件并从中创建元组,然后处理存储在csv:
import json
import pyspark
sc = pyspark.SparkContext('local[*]')
try:
with open("/content/drive/../file.json") as f:
data = json.load(f)
rdd = sc.parallelize(data)
json_data = rdd.map(lambda item: (item["item1"], item["item2"], item["item3"]))
lines = sc.textFile('/content/drive/../file*.csv', 5) \
.map(lambda line: line.strip()) \
.filter(lambda line: len(line.split(';')) == 4)
csv_data = lines.map(lambda line: line.split(';')) \
.map(lambda values: (values[0], float(values[2]), float(values[3]))) \
.map(lambda kv: (kv[0], (kv[1], kv[2]))) \
.reduceByKey(lambda a, b: (max(a[0] + b[0]), max(a[1] + b[1]))) \
.map(lambda kv: (kv[0], (kv[1][1], kv[1][2]))) \
.sortByKey()
#combination part???
sc.stop()
except Exception as e:
print(e)
sc.stop()
我真的很难与脚本的这一部分,并会非常感谢一些帮助!随时问我更多的信息,如果需要的话
1条答案
按热度按时间s5a0g9ez1#
为什么要使用低级命令。除非你不得不使用rdd调用。我建议用PySpark代替。解决方案非常简单。
此代码位于Azure数据块中。请编写示例json文件。
创建示例csv文件。
读取一个或多个csv文件并创建名为“tmp_csv_data”的临时配置单元视图。
读取一个或多个json文件并创建一个名为“tmp_json_data”的临时配置单元视图。
让我们快速查看一下csv视图。
让我们快速查看一下json视图。请注意,格式错误的JSON文件创建了两个空白记录。
下图显示了组合 Dataframe 中的两个数据集,只需使用spark.write()创建最终文件即可。
这段代码是直接的,并利用了大多数人已经掌握的ANSI SQL技能。