我保存了Parquet格式的PandasDataframe。因为它是巨大的,我需要执行梯度推进分类,我想使用pyspark来加速这个过程。我的Pandas是这样的
Y X
a 3.0
b 3.5
c 4.9
d 6.8
我所有的x的类型是int64或float64,y是object。所以我把数据保存在Parquet地板里( df.to_parquet('DF.parquet')
),然后遵循此文档https://spark.apache.org/docs/2.3.0/ml-classification-regression.html#gradient-我知道
In: data = spark.read.load("DF.parquet")
Out: DataFrame[X: double, Y: string]
In: labelIndexer = StringIndexer(inputCol="Y", outputCol="indexedLabel").fit(data)
Out: Py4JJavaError: An error occurred while calling o492.fit.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
ObjectHashAggregate(keys=[], functions=[stringindexeraggregator(org.apache.spark.ml.feature.StringIndexerAggregator@378fc5, Some(createexternalrow(Y#1137.toString, StructField(Y,StringType,true))), Some(interface org.apache.spark.sql.Row), Some(StructType(StructField(Y,StringType,true))), encodeusingserializer(input[0, java.lang.Object, true], true), decodeusingserializer(input[0, binary, true], Array[org.apache.spark.util.collection.OpenHashMap], true), encodeusingserializer(input[0, java.lang.Object, true], true), BinaryType, true, 0, 0)], output=[StringIndexerAggregator(org.apache.spark.sql.Row)#1256])
+- Exchange SinglePartition, true, [id=#279]
+- ObjectHashAggregate(keys=[], functions=[partial_stringindexeraggregator(org.apache.spark.ml.feature.StringIndexerAggregator@378fc5, Some(createexternalrow(Y#1137.toString, StructField(Y,StringType,true))), Some(interface org.apache.spark.sql.Row), Some(StructType(StructField(Y,StringType,true))), encodeusingserializer(input[0, java.lang.Object, true], true), decodeusingserializer(input[0, binary, true], Array[org.apache.spark.util.collection.OpenHashMap], true), encodeusingserializer(input[0, java.lang.Object, true], true), BinaryType, true, 0, 0)], output=[buf#1261])
+- *(1) ColumnarToRow
+- FileScan parquet [Y#1137] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/k93947/DF training], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Y:string>
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.doExecute(ObjectHashAggregateExec.scala:102)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:316)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:382)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3625)
at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2938)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2938)
at org.apache.spark.ml.feature.StringIndexer.countByValue(StringIndexer.scala:204)
at org.apache.spark.ml.feature.StringIndexer.sortByFreq(StringIndexer.scala:212)
at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:241)
at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:144)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition, true, [id=#279]
+- ObjectHashAggregate(keys=[], functions=[partial_stringindexeraggregator(org.apache.spark.ml.feature.StringIndexerAggregator@378fc5, Some(createexternalrow(Y#1137.toString, StructField(Y,StringType,true))), Some(interface org.apache.spark.sql.Row), Some(StructType(StructField(Y,StringType,true))), encodeusingserializer(input[0, java.lang.Object, true], true), decodeusingserializer(input[0, binary, true], Array[org.apache.spark.util.collection.OpenHashMap], true), encodeusingserializer(input[0, java.lang.Object, true], true), BinaryType, true, 0, 0)], output=[buf#1261])
+- *(1) ColumnarToRow
+- FileScan parquet [Y#1137] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/k93947/DF training], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Y:string>
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:95)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:107)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 33 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
ObjectHashAggregate(keys=[], functions=[partial_stringindexeraggregator(org.apache.spark.ml.feature.StringIndexerAggregator@378fc5, Some(createexternalrow(Y#1137.toString, StructField(Y,StringType,true))), Some(interface org.apache.spark.sql.Row), Some(StructType(StructField(Y,StringType,true))), encodeusingserializer(input[0, java.lang.Object, true], true), decodeusingserializer(input[0, binary, true], Array[org.apache.spark.util.collection.OpenHashMap], true), encodeusingserializer(input[0, java.lang.Object, true], true), BinaryType, true, 0, 0)], output=[buf#1261])
+- *(1) ColumnarToRow
+- FileScan parquet [Y#1137] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/k93947/DF training], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Y:string>
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.doExecute(ObjectHashAggregateExec.scala:102)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:64)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:64)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:83)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:81)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:98)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 41 more
Caused by: org.apache.spark.sql.AnalysisException: Attribute name "Y" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;
at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkConversionRequirement(ParquetSchemaConverter.scala:583)
at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldName(ParquetSchemaConverter.scala:574)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$.$anonfun$setSchema$2(ParquetWriteSupport.scala:472)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$.$anonfun$setSchema$2$adapted(ParquetWriteSupport.scala:472)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$.setSchema(ParquetWriteSupport.scala:472)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.buildReaderWithPartitionValues(ParquetFileFormat.scala:221)
at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:398)
at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:389)
at org.apache.spark.sql.execution.FileSourceScanExec.doExecuteColumnar(DataSourceScanExec.scala:484)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:202)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:198)
at org.apache.spark.sql.execution.InputAdapter.doExecuteColumnar(WholeStageCodegenExec.scala:519)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:202)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:198)
at org.apache.spark.sql.execution.ColumnarToRowExec.inputRDDs(Columnar.scala:196)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:107)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 53 more
2条答案
按热度按时间alen0pnh1#
您的列名y中似乎有空格或\t。
请检查并移除。
m0rkklqb2#
这应该起作用:
不确定接受的答案是否有用。