如何从json文件创建sparksqlDataframe,其中列出了数据和模式

cgh8pdjw  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(240)
conf = SparkConf().setAppName("PySpark").setMaster("local")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

file = sqlContext.read.json(json_file_path)
file.show()

输出:

+--------------------+--------------------+
|                data|              schema|
+--------------------+--------------------+
|[[The battery is ...|[[[index, integer...|
+--------------------+--------------------+

如何使用自己创建的模式提取数据。我的架构代码是:

from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType
schema = StructType([
    StructField('index', IntegerType(), True),
    StructField('content', StringType(), True),
    StructField('label', IntegerType(), True),
    StructField('label_1', StringType(), True ),
    StructField('label_2', StringType(), True ),
    StructField('label_3', IntegerType(), True ),
    StructField('label_4', IntegerType(), True )])

我试过:

file.withColumn("data", from_json("data", schema))\
    .show()

但我收到以下错误:

cannot resolve 'from_json(`data`)' due to data type mismatch: argument 1 requires string type, however, '`data`' is of array<struct<content:string,index:bigint,label:bigint,label_1:string,label_2:string,label_3:double,label_4:timestamp>> type.;;
rfbsl7qr

rfbsl7qr1#

这个 read 方法已识别后面的架构。
试着跑步 file.printSchema() 而且它应该显示更多更少的模式,你想要的。
打开 Package 的方法 data 要运行:

file = file.select(explode("data").as("exploded_data"))

如果需要,您可以通过以下方式将其提升到下一个级别:

file.select(file.col("exploded_data.*"))

这将使模式变平。
免责声明:这是scala代码,python可能需要微小的调整

相关问题