我正在运行结构化的流媒体管道,使用hdfs作为源,hive作为接收器。我使用的spark版本是spark2.4.0-cdh6.2.1。在运行应用程序时,我通过定期将新文件放入input hdfs目录来测试管道。在正在运行的流媒体应用程序中,经过一段时间,在放置新文件时,我得到以下错误-
2020-08-09 07:48:25,094 WARN org.apache.spark.sql.execution.streaming.CheckpointFileManager Failed to rename temp file hdfs://name
service1/user/zsamol1u/sachin_phase2/chkpoint-hive_avro1_amo_1h_aprimo_hdfs_hive_streaming_avro1_checkpoint/sources/0/.6.0398918f-
8f4c-4d79-ab5c-e8c87781ac86.tmp to hdfs://nameservice1/user/zsamol1u/sachin_phase2/chkpoint-hive_avro1_amo_1h_aprimo_hdfs_hive_str
eaming_avro1_checkpoint/sources/0/6 because file exists
org.apache.hadoop.fs.FileAlreadyExistsException: rename destination /user/zsamol1u/sachin_phase2/chkpoint-hive_avro1_amo_1h_aprimo
_hdfs_hive_streaming_avro1_checkpoint/sources/0/6 already exists
at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.validateOverwrite(FSDirRenameOp.java:542)
at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.unprotectedRenameTo(FSDirRenameOp.java:383)
at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameTo(FSDirRenameOp.java:296)
at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameToInt(FSDirRenameOp.java:246)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renameTo(FSNamesystem.java:2926)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.rename2(NameNodeRpcServer.java:1052)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.rename2(ClientNamenodeProtocolServerSide
TranslatorPB.java:657)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNa
menodeProtocolProtos.java)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsException): rename destination /user/zsamol1u/sachin_phase2/chkpoint-hive_avro1_amo_1h_aprimo_hdfs_hive_streaming_avro1_checkpoint/sources/0/6 already exists
at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.validateOverwrite(FSDirRenameOp.java:542)
at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.unprotectedRenameTo(FSDirRenameOp.java:383)
at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameTo(FSDirRenameOp.java:296)
at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameToInt(FSDirRenameOp.java:246)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renameTo(FSNamesystem.java:2926)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.rename2(NameNodeRpcServer.java:1052)
我们在piepline使用hdfs检查点位置。在上面的例子中,流应用程序为前6个文件运行,每次插入新文件时,都会在检查点位置下的sources文件夹中创建名为0到6的文件。但是,一旦插入第7个文件,就会发生此错误并停止应用程序。
从源代码(hdfs)读入Dataframe的代码-
Dataset<Row> inputDataFrame = spark.readStream().schema(inputSourceStructType).format("avro").option("maxFilesPerTrigger","50")).load(this.path);
将Dataframe写入接收器(配置单元)的代码-
inputDataFrame.writeStream().format("parquet").option("checkpointLocation", this.checkPoint).option("path",this.externalTablePath).partitionBy("file_recived_date").trigger(Trigger.ProcessingTime("5 seconds")).queryName("queryName").start();
未诊断出发生此错误的根本原因。我了解到,在sources目录下的checkpoint编写新文件时会出现重命名然后替换的策略。此问题间歇性出现,在运行应用程序的任何时候,此问题都会出现。另外,在停止运行应用程序并重新启动它时,同样的问题也会发生。
为什么将文件从.6.0398918f-8f4c-4d79-ab5c-e8c87781ac86.tmp重命名为名为6的文件的操作没有成功?是什么导致了这个问题?我们怎样才能阻止这个问题的发生?
暂无答案!
目前还没有任何答案,快来回答吧!