spark jobend listner从hdfs路径移动源文件会导致“找不到文件”异常

qybjjes1  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(369)

spark版本:2.3
spark流应用程序从hdfs路径流式传输

  1. Dataset<Row> lines = spark
  2. .readStream()
  3. .format("text")
  4. .load("path");

经过一些数据转换后,对于一个文件,作业应该处于succeeded状态。
job listner是为job end添加的,它在触发文件时移动文件。

  1. @Override
  2. public void onJobENd(SparkListenerJobEnd jobEnd) {
  3. // Move source file to some other location which is finished processing.
  4. }

文件成功移动到另一个位置。但同时(确切的时间戳)spark抛出以下文件not found exception。这是随机发生的,无法复制。但是经常发生
即使特定作业已结束,spark仍以某种方式引用该文件。
如何确保作业结束后spark未引用文件并避免此文件找不到问题
我可以在这里找到
sparklistenerjobend公司

  1. DAGScheduler does cleanUpAfterSchedulerStop, handleTaskCompletion, failJobAndIndependentStages, and markMapStageJobAsFinished.

同样的问题,不同的方法
例外情况:

  1. java.io.FileNotFoundException: File does not exist: <filename>
  2. at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
  3. at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
  4. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1932)
  5. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1873)
  6. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1853)
  7. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1825)
  8. at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:559)
  9. at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:87)
  10. at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:363)
  11. at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
  12. at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
  13. at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
  14. at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
  15. at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
  16. at java.security.AccessController.doPrivileged(Native Method)
  17. at javax.security.auth.Subject.doAs(Subject.java:415)
  18. at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
  19. at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
  20. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  21. at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
  22. at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  23. at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
  24. at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
  25. at org.apache.hadoop.ipc.RemoteExc
vxf3dgd4

vxf3dgd41#

这是Spark中的虫子https://issues.apache.org/jira/browse/spark-24364 邮件线程;http://mail-archives.apache.org/mod_mbox/spark-issues/201805.mbox/%3cjira.13161366.1527070555000.13725.1527070560256@atlassian.jira%3e 修正:https://github.com/apache/spark/pull/21408/commits/c52d972e4ca09e0ede1bb9e60d3c07f80f605f88
固定版本:2.3.1/2.4.0

相关问题