提前感谢你的帮助。这是第二天,我一直在尝试所有的排列和组合,找不到任何解决办法。
我在azurehdinsight上安装了4个节点的hadoop/spark
我有一个简单的scala代码,它从hdfs读取数据,并做一些简单的操作,比如将列转换成小写,等等。小文件434kb,有2300条记录,有14列。
代码可以在我的本地笔记本电脑(win10)上运行,也可以在azureubuntuvm独立集群上运行。
问题-在写入csv时,它抛出java.util.concurrent.timeoutexception:futures在[100000毫秒]注意到输出位置上几乎没有部分文件后超时,即。 /data/8/1/data.csv
与 /data/8/1/data.csv/_temporary
重要代码段: def get_spark(): SparkSession = { val spark = org.apache.spark.sql.SparkSession.builder .appName("My App").enableHiveSupport().getOrCreate; return spark }
```
val spark = get_spark()
val o_file = "/data/8/1/data.csv"
val loader = new Loader(spark)
var df = loader.load(i_file)
logger.info("Writing output to: {}", o_file)
df.write.mode("overwrite").option("header", "true").csv(o_file)<--throwing error here
例外情况是: `18/11/29 19:06:05 INFO LoaderProcessor: Writing output to: /data/8/1/data.csv 18/11/29 19:07:37 ERROR ApplicationMaster: Uncaught exception: java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201) at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:498) at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$runImpl(ApplicationMaster.scala:345) at org.apache.spark.deploy.yarn.ApplicationMaster$anonfun$run$2.apply$mcV$sp(ApplicationMaster.scala:260) at org.apache.spark.deploy.yarn.ApplicationMaster$anonfun$run$2.apply(ApplicationMaster.scala:260) at org.apache.spark.deploy.yarn.ApplicationMaster$anonfun$run$2.apply(ApplicationMaster.scala:260) at org.apache.spark.deploy.yarn.ApplicationMaster$anon$5.run(ApplicationMaster.scala:815) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869) at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:814) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:259) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:839) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) 18/11/29 19:07:37 ERROR FileFormatWriter: Aborting job null. java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:222) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:633) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$anonfun$runCommand$1.apply(DataFrameWriter.scala:654) at org.apache.spark.sql.DataFrameWriter$anonfun$runCommand$1.apply(DataFrameWriter.scala:654) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225) at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:642) at zettasense.LoaderProcessor.execute(LoaderProcessor.scala:81) at zettasense.Processor$.main(Processor.scala:47) at zettasense.Processor.main(Processor.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$anon$4.run(ApplicationMaster.scala:721` 更多信息
还注意到“write”操作继续在输出文件夹上写入,如下所示:
part-00000-3e9566ae-c13c-468a-8732-e7b8a8df5335-c000.csv
part-00000-3e9566ae-c13c-468a-8732-e7b8a8df5335-c001.csv
几秒钟后:
part-00000-4f4979a0-d9f9-481b-aac4-115e63b9f59c-c000.csv
part-00000-4f4979a0-d9f9-481b-aac4-115e63b9f59c-c001.csv
为了验证它,我做了如下重新分区: `df = df.repartition(1)` 注意到了同样的事情。它一次又一次地写文件。
我现在也有个例外: `java.lang.IllegalStateException: User did not initialize spark context! at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:510) at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$runImpl(ApplicationMaster.scala:345) at org.apache.spark.deploy.yarn.ApplicationMaster$anonfun$run$2.apply$mcV$sp(ApplicationMaster.scala:260) at org.apache.spark.deploy.yarn.ApplicationMaster$anonfun$run$2.apply(ApplicationMaster.scala:260) at org.apache.spark.deploy.yarn.ApplicationMaster$anonfun$run$2.apply(ApplicationMaster.scala:260) at org.apache.spark.deploy.yarn.ApplicationMaster$anon$5.run(ApplicationMaster.scala:815) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869) at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:814) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:259) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:839) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)`
暂无答案!
目前还没有任何答案,快来回答吧!