hadoop分布式副本覆盖不工作

mqxuamgl  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(476)

我正在尝试使用org.apache.hadoop.tools.distcp类将一些文件复制到s3存储桶中。但是,尽管将overwrite标志显式设置为true,覆盖功能仍不起作用
复制工作正常,但如果存在现有文件,它不会覆盖。复制Map器跳过这些文件。我已经明确地将“overwrite”选项设置为true。

  1. import com.typesafe.scalalogging.LazyLogging
  2. import org.apache.hadoop.conf.Configuration
  3. import org.apache.hadoop.fs.Path
  4. import org.apache.hadoop.tools.{DistCp, DistCpOptions}
  5. import org.apache.hadoop.util.ToolRunner
  6. import scala.collection.JavaConverters._
  7. object distcptest extends App with LazyLogging {
  8. def copytoS3( hdfsSrcFilePathStr: String, s3DestPathStr: String) = {
  9. val hdfsSrcPathList = List(new Path(hdfsSrcFilePathStr))
  10. val s3DestPath = new Path(s3DestPathStr)
  11. val distcpOpt = new DistCpOptions(hdfsSrcPathList.asJava, s3DestPath)
  12. // Overwriting is not working inspite of explicitly setting it to true.
  13. distcpOpt.setOverwrite(true)
  14. val conf: Configuration = new Configuration()
  15. conf.set("fs.s3n.awsSecretAccessKey", "secret key")
  16. conf.set("fs.s3n.awsAccessKeyId", "access key")
  17. conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
  18. val distCp: DistCp = new DistCp(conf, distcpOpt)
  19. val filepaths: Array[String] = Array(hdfsSrcFilePathStr, s3DestPathStr)
  20. try {
  21. val distCp_result = ToolRunner.run(distCp, filepaths)
  22. if (distCp_result != 0) {
  23. logger.error(s"DistCP has failed with - error code = $distCp_result")
  24. }
  25. }
  26. catch {
  27. case e: Exception => {
  28. e.printStackTrace()
  29. }
  30. }
  31. }
  32. copytoS3("hdfs://abc/pqr", "s3n://xyz/wst")
  33. }
ykejflvf

ykejflvf1#

我认为问题是您调用了toolrunner.run(distcp,filepath)。
如果检查distcp的源代码,in run方法将覆盖inputoptions,因此传递给构造函数的distcpoptions将不起作用。

  1. @Override
  2. public int run(String[] argv) {
  3. ...
  4. try {
  5. inputOptions = (OptionsParser.parse(argv));
  6. ...
  7. } catch (Throwable e) {
  8. ...
  9. }
  10. ...
  11. }

相关问题