我正在尝试使用Spark SQL将新记录追加到现有csv文件中。
获取Exception in thread "main" org.apache.hadoop.fs.ParentNotDirectoryException: /tmp/spark/input/csv/orders.csv (is not a directory)
作为输入是一个文件而不是目录。
Spark(Java)代码和异常堆栈跟踪将在下面提到。
如何解决这一问题?
环境信息- Mac OS
MyiCloud@MyMC someDir % uname -a
Darwin MyMC.local 21.6.0 Darwin Kernel Version 21.6.0: Fri Sep 15 16:17:23 PDT 2023; root:xnu-8020.240.18.703.5~1/RELEASE_X86_64 x86_64
字符串
Hadoop环境信息
MyiCloud@MyMC someDir % hadoop version
Hadoop 3.3.6
Source code repository https://github.com/apache/hadoop.git -r 1be78238728da9266a4f88195058f08fd012bf9c
Compiled by ubuntu on 2023-06-18T08:22Z
Compiled on platform linux-x86_64
Compiled with protoc 3.7.1
From source with checksum 5652179ad55f76cb287d9c633bb53bbd
This command was run using /usr/local/Cellar/hadoop/3.3.6/libexec/share/hadoop/common/hadoop-common-3.3.6.jar
型
文件权限- HDFS
MyiCloud@MyMC sbin % hdfs dfs -ls /tmp/spark/input/csv/
2023-11-20 01:35:51,752 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 1 items
-rwxr-xr-x 1 MyiCloud supergroup 268 2023-11-20 00:58 /tmp/spark/input/csv/orders.csv
型
输入文件- orders.csv
firstName,lastName,state,quantity,revenue,timestamp
Jean Georges,Perrin,NC,1,300,1551903533
Jean Georges,Perrin,NC,2,120,1551903567
Jean Georges,Perrin,CA,4,75,1551903599
Holden,Karau,CA,6,37,1551904299
Ginni,Rometty,NY,7,91,1551916792
Holden,Karau,CA,4,153,1552876129
型
带有Java代码的Apache Spark
package org.example.ch11;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import java.util.Collections;
public class InsertApp {
public static void main(String[] args) {
InsertApp inApp = new InsertApp();
inApp.append();
}
private void append() {
SparkSession spark = SparkSession.builder()
.appName("InsertProcessor")
.master("local[*]").getOrCreate();
Dataset<Row> df = spark.read().format("csv")
.option("inferSchema", true)
.option("header", true)
.load("hdfs://localhost:8020/tmp/spark/input/csv/orders.csv");
System.out.println("Printing csv content..");
df.show();
df.createOrReplaceTempView("orders_view");
try(JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext())) {
Dataset<Row> newDf = spark.createDataFrame(
jsc.parallelize(
Collections.singletonList(RowFactory.create("V", "S", "TN", 10, 20, 1551904299))
),
df.schema()
);
newDf.createOrReplaceTempView("new_order_view");
System.out.println("Inserting data into orders_view view from new_order_view view...");
Dataset<Row> mergedDf = spark.sql("INSERT INTO orders_view SELECT * FROM new_order_view");
mergedDf.show();
System.out.println("Completed...");
}
}
}
型
异常-堆栈跟踪
Printing csv content..
23/11/20 01:38:10 INFO FileSourceStrategy: Pushed Filters:
23/11/20 01:38:10 INFO FileSourceStrategy: Post-Scan Filters:
23/11/20 01:38:10 INFO FileSourceStrategy: Output Data Schema: struct<firstName: string, lastName: string, state: string, quantity: int, revenue: int ... 1 more field>
23/11/20 01:38:10 INFO CodeGenerator: Code generated in 22.283375 ms
23/11/20 01:38:10 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 302.5 KiB, free 2003.6 MiB)
23/11/20 01:38:10 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 27.3 KiB, free 2003.6 MiB)
23/11/20 01:38:10 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.0.6:49627 (size: 27.3 KiB, free: 2004.5 MiB)
23/11/20 01:38:10 INFO SparkContext: Created broadcast 4 from show at InsertApp.java:27
23/11/20 01:38:10 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
23/11/20 01:38:10 INFO SparkContext: Starting job: show at InsertApp.java:27
23/11/20 01:38:10 INFO DAGScheduler: Got job 2 (show at InsertApp.java:27) with 1 output partitions
23/11/20 01:38:10 INFO DAGScheduler: Final stage: ResultStage 2 (show at InsertApp.java:27)
23/11/20 01:38:10 INFO DAGScheduler: Parents of final stage: List()
23/11/20 01:38:10 INFO DAGScheduler: Missing parents: List()
23/11/20 01:38:10 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[13] at show at InsertApp.java:27), which has no missing parents
23/11/20 01:38:10 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 14.4 KiB, free 2003.6 MiB)
23/11/20 01:38:10 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 6.8 KiB, free 2003.6 MiB)
23/11/20 01:38:10 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 192.168.0.6:49627 (size: 6.8 KiB, free: 2004.5 MiB)
23/11/20 01:38:10 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1388
23/11/20 01:38:10 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[13] at show at InsertApp.java:27) (first 15 tasks are for partitions Vector(0))
23/11/20 01:38:10 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks resource profile 0
23/11/20 01:38:10 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2) (192.168.0.6, executor driver, partition 0, NODE_LOCAL, 4877 bytes) taskResourceAssignments Map()
23/11/20 01:38:10 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
23/11/20 01:38:10 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 192.168.0.6:49627 in memory (size: 7.7 KiB, free: 2004.5 MiB)
23/11/20 01:38:10 INFO FileScanRDD: Reading File path: hdfs://localhost:8020/tmp/spark/input/csv/orders.csv, range: 0-268, partition values: [empty row]
23/11/20 01:38:10 INFO CodeGenerator: Code generated in 19.008744 ms
23/11/20 01:38:10 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1789 bytes result sent to driver
23/11/20 01:38:10 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 115 ms on 192.168.0.6 (executor driver) (1/1)
23/11/20 01:38:10 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
23/11/20 01:38:10 INFO DAGScheduler: ResultStage 2 (show at InsertApp.java:27) finished in 0.141 s
23/11/20 01:38:10 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
23/11/20 01:38:10 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
23/11/20 01:38:10 INFO DAGScheduler: Job 2 finished: show at InsertApp.java:27, took 0.150319 s
23/11/20 01:38:10 INFO CodeGenerator: Code generated in 40.368014 ms
+------------+--------+-----+--------+-------+----------+
| firstName|lastName|state|quantity|revenue| timestamp|
+------------+--------+-----+--------+-------+----------+
|Jean Georges| Perrin| NC| 1| 300|1551903533|
|Jean Georges| Perrin| NC| 2| 120|1551903567|
|Jean Georges| Perrin| CA| 4| 75|1551903599|
| Holden| Karau| CA| 6| 37|1551904299|
| Ginni| Rometty| NY| 7| 91|1551916792|
| Holden| Karau| CA| 4| 153|1552876129|
+------------+--------+-----+--------+-------+----------+
Inserting data into orders_view view from new_order_view view...
23/11/20 01:38:10 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/11/20 01:38:10 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/11/20 01:38:10 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
23/11/20 01:38:10 INFO SparkUI: Stopped Spark web UI at http://192.168.0.6:4040
23/11/20 01:38:10 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/11/20 01:38:10 INFO MemoryStore: MemoryStore cleared
23/11/20 01:38:10 INFO BlockManager: BlockManager stopped
23/11/20 01:38:10 INFO BlockManagerMaster: BlockManagerMaster stopped
23/11/20 01:38:10 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/11/20 01:38:10 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.hadoop.fs.ParentNotDirectoryException: /tmp/spark/input/csv/orders.csv (is not a directory)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkIsDirectory(FSPermissionChecker.java:745)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkSimpleTraverse(FSPermissionChecker.java:736)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:711)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1892)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1910)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.resolvePath(FSDirectory.java:727)
at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:51)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3441)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1167)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:742)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:621)
at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:589)
at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1227)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1094)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1017)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3048)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2426)
at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2400)
at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1324)
at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1321)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:1338)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:1313)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2275)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:354)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:178)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
at org.example.ch11.InsertApp.append(InsertApp.java:40)
at org.example.ch11.InsertApp.main(InsertApp.java:15)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.ParentNotDirectoryException): /tmp/spark/input/csv/orders.csv (is not a directory)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkIsDirectory(FSPermissionChecker.java:745)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkSimpleTraverse(FSPermissionChecker.java:736)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:711)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1892)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1910)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.resolvePath(FSDirectory.java:727)
at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:51)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3441)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1167)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:742)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:621)
at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:589)
at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1227)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1094)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1017)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3048)
at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1511)
at org.apache.hadoop.ipc.Client.call(Client.java:1457)
at org.apache.hadoop.ipc.Client.call(Client.java:1367)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
at com.sun.proxy.$Proxy18.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:656)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
at com.sun.proxy.$Proxy19.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2424)
... 31 more
23/11/20 01:38:11 INFO ShutdownHookManager: Shutdown hook called
23/11/20 01:38:11 INFO ShutdownHookManager: Deleting directory /private/var/folders/6w/v2bcqfkd6kl5ylt9k5_q0lb80000gn/T/spark-cf226489-3921-439a-9653-b33159d3a230
Process finished with exit code 1
型
1条答案
按热度按时间oymdgrw71#
更新你的逻辑:
字符串
输出量:
型
有几点要提:
csv
方法加载.csv
文件,而不是load
方法。union
方法将来自两个不同表视图的两行合并合并,并使用distinct
方法将其区分开来,然后用新创建的合并数据替换order_view的现有数据,以完成工作。Hadoop UI输出(包含新的合并数据):
的数据
的
我想这是个更容易接近的方法。看看有没有用。