无法在pyspark中导入Parquet数据

de90aj5v  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(812)

我保存了Parquet格式的PandasDataframe。因为它是巨大的,我需要执行梯度推进分类,我想使用pyspark来加速这个过程。我的Pandas是这样的

Y     X

a     3.0
b     3.5
c     4.9
d     6.8

我所有的x的类型是int64或float64,y是object。所以我把数据保存在Parquet地板里( df.to_parquet('DF.parquet') ),然后遵循此文档https://spark.apache.org/docs/2.3.0/ml-classification-regression.html#gradient-我知道

In: data = spark.read.load("DF.parquet")
Out: DataFrame[X: double, Y: string]
In: labelIndexer = StringIndexer(inputCol="Y", outputCol="indexedLabel").fit(data)
Out: Py4JJavaError: An error occurred while calling o492.fit.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
ObjectHashAggregate(keys=[], functions=[stringindexeraggregator(org.apache.spark.ml.feature.StringIndexerAggregator@378fc5, Some(createexternalrow(Y#1137.toString, StructField(Y,StringType,true))), Some(interface org.apache.spark.sql.Row), Some(StructType(StructField(Y,StringType,true))), encodeusingserializer(input[0, java.lang.Object, true], true), decodeusingserializer(input[0, binary, true], Array[org.apache.spark.util.collection.OpenHashMap], true), encodeusingserializer(input[0, java.lang.Object, true], true), BinaryType, true, 0, 0)], output=[StringIndexerAggregator(org.apache.spark.sql.Row)#1256])
+- Exchange SinglePartition, true, [id=#279]
   +- ObjectHashAggregate(keys=[], functions=[partial_stringindexeraggregator(org.apache.spark.ml.feature.StringIndexerAggregator@378fc5, Some(createexternalrow(Y#1137.toString, StructField(Y,StringType,true))), Some(interface org.apache.spark.sql.Row), Some(StructType(StructField(Y,StringType,true))), encodeusingserializer(input[0, java.lang.Object, true], true), decodeusingserializer(input[0, binary, true], Array[org.apache.spark.util.collection.OpenHashMap], true), encodeusingserializer(input[0, java.lang.Object, true], true), BinaryType, true, 0, 0)], output=[buf#1261])
      +- *(1) ColumnarToRow
         +- FileScan parquet [Y#1137] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/k93947/DF training], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Y:string>

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.doExecute(ObjectHashAggregateExec.scala:102)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:316)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:382)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3625)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2938)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2938)
    at org.apache.spark.ml.feature.StringIndexer.countByValue(StringIndexer.scala:204)
    at org.apache.spark.ml.feature.StringIndexer.sortByFreq(StringIndexer.scala:212)
    at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:241)
    at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:144)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition, true, [id=#279]
+- ObjectHashAggregate(keys=[], functions=[partial_stringindexeraggregator(org.apache.spark.ml.feature.StringIndexerAggregator@378fc5, Some(createexternalrow(Y#1137.toString, StructField(Y,StringType,true))), Some(interface org.apache.spark.sql.Row), Some(StructType(StructField(Y,StringType,true))), encodeusingserializer(input[0, java.lang.Object, true], true), decodeusingserializer(input[0, binary, true], Array[org.apache.spark.util.collection.OpenHashMap], true), encodeusingserializer(input[0, java.lang.Object, true], true), BinaryType, true, 0, 0)], output=[buf#1261])
   +- *(1) ColumnarToRow
      +- FileScan parquet [Y#1137] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/k93947/DF training], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Y:string>

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:95)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:107)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 33 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
ObjectHashAggregate(keys=[], functions=[partial_stringindexeraggregator(org.apache.spark.ml.feature.StringIndexerAggregator@378fc5, Some(createexternalrow(Y#1137.toString, StructField(Y,StringType,true))), Some(interface org.apache.spark.sql.Row), Some(StructType(StructField(Y,StringType,true))), encodeusingserializer(input[0, java.lang.Object, true], true), decodeusingserializer(input[0, binary, true], Array[org.apache.spark.util.collection.OpenHashMap], true), encodeusingserializer(input[0, java.lang.Object, true], true), BinaryType, true, 0, 0)], output=[buf#1261])
+- *(1) ColumnarToRow
   +- FileScan parquet [Y#1137] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/k93947/DF training], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Y:string>

    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.doExecute(ObjectHashAggregateExec.scala:102)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:64)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:64)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:83)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:81)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:98)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 41 more
Caused by: org.apache.spark.sql.AnalysisException: Attribute name "Y" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;
    at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkConversionRequirement(ParquetSchemaConverter.scala:583)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldName(ParquetSchemaConverter.scala:574)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$.$anonfun$setSchema$2(ParquetWriteSupport.scala:472)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$.$anonfun$setSchema$2$adapted(ParquetWriteSupport.scala:472)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$.setSchema(ParquetWriteSupport.scala:472)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.buildReaderWithPartitionValues(ParquetFileFormat.scala:221)
    at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:398)
    at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:389)
    at org.apache.spark.sql.execution.FileSourceScanExec.doExecuteColumnar(DataSourceScanExec.scala:484)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:202)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:198)
    at org.apache.spark.sql.execution.InputAdapter.doExecuteColumnar(WholeStageCodegenExec.scala:519)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:202)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:198)
    at org.apache.spark.sql.execution.ColumnarToRowExec.inputRDDs(Columnar.scala:196)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:107)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 53 more
alen0pnh

alen0pnh1#

您的列名y中似乎有空格或\t。
请检查并移除。

m0rkklqb

m0rkklqb2#

这应该起作用:

data = spark.read.parquet("DF.parquet")

不确定接受的答案是否有用。

相关问题