gcp dataproc cluster hadoop将数据从gs bucket移动到s3 amazon bucket的作业失败[控制台]

a8jjtwal  于 2021-07-13  发布在  Hadoop
关注(0)|答案(1)|浏览(489)

第一个问题是堆栈溢出,所以请原谅我的任何新手错误。
我目前正致力于移动一个非常大的数据量(700+gib),其中包括许多小文件,每个文件大约1-10mb,从gcs bucket中的一个文件夹移动到s3中的一个文件夹。
我做了几次尝试: gsutil -m rsync -r gs://<path> s3://<path> 由于大量数据导致超时 gsutil -m cp -r gs://<path> s3://<path> 时间太长了。即使有许多并行进程和/或线程,它的平均传输速度仍约为3.4mib/s。我已确保在这次尝试中升级vm示例。
使用 rclone 与cp相同的性能问题
最近我发现了另一种可能的方法。但是我不熟悉gcp,所以请容忍我,对不起。这是我找到的参考资料https://medium.com/swlh/transfer-data-from-gcs-to-s3-using-google-dataproc-with-airflow-aa49dc896dad 该方法涉及使用以下配置通过gcp控制台生成dataproc群集:

Name:
    <dataproc-cluster-name>
Region:
    asia-southeast1
Nodes configuration:
    1 main 2 worker @2vCPU & @3.75GBMemory & @30GBPersistentDisk
properties:
    core    fs.s3.awsAccessKeyId        <key>
    core    fs.s3.awsSecretAccessKey    <secret>
    core    fs.s3.impl                  org.apache.hadoop.fs.s3.S3FileSystem

然后我通过gcp网站的控制台菜单提交作业:
在这一刻,我开始注意到问题,我找不到 hadoop-mapreduce/hadoop-distcp.jar 任何地方。我只能找到 /usr/lib/hadoop/hadoop-distcp.jar 通过我的主dataproc集群vm示例浏览根文件
我提交的工作:

Start time:
31 Mar 2021, 16:00:25
Elapsed time:
3 sec
Status:
Failed
Region
asia-southeast1
Cluster
<cluster-name>
Job type
Hadoop
Main class or JAR
file://usr/lib/hadoop/hadoop-distcp.jar
Arguments
-update
gs://*
s3://*

返回一个错误

/usr/lib/hadoop/libexec//hadoop-functions.sh: line 2400: HADOOP_COM.GOOGLE.CLOUD.HADOOP.SERVICES.AGENT.JOB.SHIM.HADOOPRUNJARSHIM_USER: invalid variable name
/usr/lib/hadoop/libexec//hadoop-functions.sh: line 2365: HADOOP_COM.GOOGLE.CLOUD.HADOOP.SERVICES.AGENT.JOB.SHIM.HADOOPRUNJARSHIM_USER: invalid variable name
/usr/lib/hadoop/libexec//hadoop-functions.sh: line 2460: HADOOP_COM.GOOGLE.CLOUD.HADOOP.SERVICES.AGENT.JOB.SHIM.HADOOPRUNJARSHIM_OPTS: invalid variable name
2021-03-31 09:00:28,549 ERROR tools.DistCp: Invalid arguments: 
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3.S3FileSystem not found
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2638)
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3342)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3374)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:126)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3425)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3393)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:486)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
    at org.apache.hadoop.tools.DistCp.setTargetPathExists(DistCp.java:240)
    at org.apache.hadoop.tools.DistCp.run(DistCp.java:143)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
    at org.apache.hadoop.tools.DistCp.main(DistCp.java:441)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:236)
    at com.google.cloud.hadoop.services.agent.job.shim.HadoopRunJarShim.main(HadoopRunJarShim.java:12)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3.S3FileSystem not found
    at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2542)
    at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2636)
    ... 18 more
Invalid arguments: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3.S3FileSystem not found
usage: distcp OPTIONS [source_path...] <target_path>
              OPTIONS
 -append                       Reuse existing data in target files and
                               append new data to them if possible
 -async                        Should distcp execution be blocking
 -atomic                       Commit all changes or none
 -bandwidth <arg>              Specify bandwidth per map in MB, accepts
                               bandwidth as a fraction.
 -blocksperchunk <arg>         If set to a positive value, fileswith more
                               blocks than this value will be split into
                               chunks of <blocksperchunk> blocks to be
                               transferred in parallel, and reassembled on
                               the destination. By default,
                               <blocksperchunk> is 0 and the files will be
                               transmitted in their entirety without
                               splitting. This switch is only applicable
                               when the source file system implements
                               getBlockLocations method and the target
                               file system implements concat method
 -copybuffersize <arg>         Size of the copy buffer to use. By default
                               <copybuffersize> is 8192B.
 -delete                       Delete from target, files missing in
                               source. Delete is applicable only with
                               update or overwrite options
 -diff <arg>                   Use snapshot diff report to identify the
                               difference between source and target
 -direct                       Write files directly to the target
                               location, avoiding temporary file rename.
 -f <arg>                      List of files that need to be copied
 -filelimit <arg>              (Deprecated!) Limit number of files copied
                               to <= n
 -filters <arg>                The path to a file containing a list of
                               strings for paths to be excluded from the
                               copy.
 -i                            Ignore failures during copy
 -log <arg>                    Folder on DFS where distcp execution logs
                               are saved
 -m <arg>                      Max number of concurrent maps to use for
                               copy
 -numListstatusThreads <arg>   Number of threads to use for building file
                               listing (max 40).
 -overwrite                    Choose to overwrite target files
                               unconditionally, even if they exist.
 -p <arg>                      preserve status (rbugpcaxt)(replication,
                               block-size, user, group, permission,
                               checksum-type, ACL, XATTR, timestamps). If
                               -p is specified with no <arg>, then
                               preserves replication, block size, user,
                               group, permission, checksum type and
                               timestamps. raw.* xattrs are preserved when
                               both the source and destination paths are
                               in the /.reserved/raw hierarchy (HDFS
                               only). raw.* xattrpreservation is
                               independent of the -p flag. Refer to the
                               DistCp documentation for more details.
 -rdiff <arg>                  Use target snapshot diff report to identify
                               changes made on target
 -sizelimit <arg>              (Deprecated!) Limit number of files copied
                               to <= n bytes
 -skipcrccheck                 Whether to skip CRC checks between source
                               and target paths.
 -strategy <arg>               Copy strategy to use. Default is dividing
                               work based on file sizes
 -tmp <arg>                    Intermediate work path to be used for
                               atomic commit
 -update                       Update target, copying only missing files
                               or directories
 -v                            Log additional info (path, size) in the
                               SKIP/COPY log
 -xtrack <arg>                 Save information about missing source files
                               to the specified directory

我怎样才能解决这个问题?我在网上找到的几个补丁都没有什么帮助。他们要么使用hadoop cli,要么像我一样有不同的jar文件。例如,这里有一个:使用dataproc hadoop集群和airflow将数据从google云存储移动到s3https://github.com/coorpacademy/docker-pyspark/issues/13
免责声明:我不使用hadoopcli或aiffort。我使用控制台来完成这个任务,通过dataproc集群主vm示例shell提交作业也会返回相同的错误。如有需要,请详细参考,谢谢!
更新:
修复了gsutil部件上的错误路径替换
问题是由于hadoop不再支持s3filesystem。所以我不得不用hadoop2.10降级到一个图像。不过,速度也不尽如人意

xiozqbni

xiozqbni1#

我认为dataproc的解决方案在你的情况下是过分的。如果您需要每天或每小时从gcs向s3拷贝一tb的数据,那么dataproc是有意义的。但听起来你的只是一个一次性拷贝,你可以让它运行数小时或数天。我建议在googlecloud(gcp)示例上运行gsutil。我已经为此尝试了一个awsec2示例,对于这个特定的操作,它总是非常慢。
在同一区域中创建源和目标存储桶。例如,us-east4(n。弗吉尼亚州)和us-east-1(n。(弗吉尼亚州)s3。然后将示例部署到同一gcp区域。

gsutil -m cp -r gs://* s3://*

. . . 可能行不通。它在dataproc中肯定不起作用,如果没有明确的文件位置或以/
相反,首先尝试显式地成功复制一个文件。然后尝试整个文件夹或桶。
你想复制多少文件?

相关问题