我们正在运行级联,Sink Tap配置为存储在Amazon S3中,遇到了一些FileAlreadyExistsException(见[1])。这种情况只是偶尔发生(大约100次),不可重现。
深入研究级联编码,我们发现Hfs.deleteResource()是由BaseFlow.deleteSinksIfNotUpdate()调用的(除其他外)。顺便说一句,我们对静默的NPE很感兴趣(注解为“破解当fs到达根目录时抛出的NPE”)。
在此基础上,我们使用自己的Tap扩展了Hfs tap,以便在deleteResource()方法(参见[2])中添加更多操作,该方法具有直接调用getFileSystem(conf). delete的重试机制。
重试机制似乎带来了改进,但我们有时仍然面临失败(参见[3]中的示例):这听起来像是HDFS返回isDeleted=true,但在直接询问文件夹是否存在之后,我们收到exists=true,这不应该发生。日志还在流成功时随机显示isDeleted true或false,这听起来像是返回值不相关或不可信。
任何人都可以带来他自己的S3经验与这样的行为:“文件夹应该被删除,但它没有”?我们怀疑是S3问题,但它也可能是在级联或HDFS中?
我们在Hadoop Cloudera-cdh 3u 5和级联2.0.1-wip-dev上运行。
[一]
org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3n://... already exists
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132)
at com.twitter.elephantbird.mapred.output.DeprecatedOutputFormatWrapper.checkOutputSpecs(DeprecatedOutputFormatWrapper.java:75)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:923)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:882)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1278)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:882)
at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:856)
at cascading.flow.hadoop.planner.HadoopFlowStepJob.internalNonBlockingStart(HadoopFlowStepJob.java:104)
at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:174)
at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:137)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:122)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:42)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.j
[二]
@Override
public boolean deleteResource(JobConf conf) throws IOException {
LOGGER.info("Deleting resource {}", getIdentifier());
boolean isDeleted = super.deleteResource(conf);
LOGGER.info("Hfs Sink Tap isDeleted is {} for {}", isDeleted,
getIdentifier());
Path path = new Path(getIdentifier());
int retryCount = 0;
int cumulativeSleepTime = 0;
int sleepTime = 1000;
while (getFileSystem(conf).exists(path)) {
LOGGER
.info(
"Resource {} still exists, it should not... - I will continue to wait patiently...",
getIdentifier());
try {
LOGGER.info("Now I will sleep " + sleepTime / 1000
+ " seconds while trying to delete {} - attempt: {}",
getIdentifier(), retryCount + 1);
Thread.sleep(sleepTime);
cumulativeSleepTime += sleepTime;
sleepTime *= 2;
} catch (InterruptedException e) {
e.printStackTrace();
LOGGER
.error(
"Interrupted while sleeping trying to delete {} with message {}...",
getIdentifier(), e.getMessage());
throw new RuntimeException(e);
}
if (retryCount == 0) {
getFileSystem(conf).delete(getPath(), true);
}
retryCount++;
if (cumulativeSleepTime > MAXIMUM_TIME_TO_WAIT_TO_DELETE_MS) {
break;
}
}
if (getFileSystem(conf).exists(path)) {
LOGGER
.error(
"We didn't succeed to delete the resource {}. Throwing now a runtime exception.",
getIdentifier());
throw new RuntimeException(
"Although we waited to delete the resource for "
+ getIdentifier()
+ ' '
+ retryCount
+ " iterations, it still exists - This must be an issue in the underlying storage system.");
}
return isDeleted;
}
[三]
INFO [pool-2-thread-15] (BaseFlow.java:1287) - [...] at least one sink is marked for delete
INFO [pool-2-thread-15] (BaseFlow.java:1287) - [...] sink oldest modified date: Wed Dec 31 23:59:59 UTC 1969
INFO [pool-2-thread-15] (HiveSinkTap.java:148) - Now I will sleep 1 seconds while trying to delete s3n://... - attempt: 1
INFO [pool-2-thread-15] (HiveSinkTap.java:130) - Deleting resource s3n://...
INFO [pool-2-thread-15] (HiveSinkTap.java:133) - Hfs Sink Tap isDeleted is true for s3n://...
ERROR [pool-2-thread-15] (HiveSinkTap.java:175) - We didn't succeed to delete the resource s3n://... Throwing now a runtime exception.
WARN [pool-2-thread-15] (Cascade.java:706) - [...] flow failed: ...
java.lang.RuntimeException: Although we waited to delete the resource for s3n://... 0 iterations, it still exists - This must be an issue in the underlying storage system.
at com.qubit.hive.tap.HiveSinkTap.deleteResource(HiveSinkTap.java:179)
at com.qubit.hive.tap.HiveSinkTap.deleteResource(HiveSinkTap.java:40)
at cascading.flow.BaseFlow.deleteSinksIfNotUpdate(BaseFlow.java:971)
at cascading.flow.BaseFlow.prepare(BaseFlow.java:733)
at cascading.cascade.Cascade$CascadeJob.call(Cascade.java:761)
at cascading.cascade.Cascade$CascadeJob.call(Cascade.java:710)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
2条答案
按热度按时间ylamdve61#
首先,仔细检查支持的发行版的级联兼容性页面。
http://www.cascading.org/support/compatibility/
注Amazon EMR被列出,因为他们定期运行兼容性测试并报告结果。
第二,S3是一个最终一致的文件系统,HDFS不是。因此,对HDFS行为的假设并不适用于在S3上存储数据。例如,重命名实际上是一个复制和删除。复制可能需要几个小时。亚马逊已经修补了他们的内部发行版,以适应许多差异。
第三,S3中没有目录。这是一个黑客攻击,不同的S3接口支持不同(jets3t vs s3cmd vs ...)。考虑到前一点,这肯定会有问题。
第四,网络延迟和可靠性至关重要,尤其是在与S3通信时。我发现,在使用EMR与标准EC2示例操作S3上的大量数据集时,Amazon网络的表现更好。我还相信,他们是EMR中的一个补丁,也能改善这方面的问题。
因此,我建议尝试运行EMR Apache Hadoop发行版,看看您的问题是否得到解决。
muk1a3rh2#
在Hadoop上运行任何使用S3中文件的作业时,必须记住最终一致性的细微差别。
我帮助解决了许多应用程序的问题,这些应用程序的根本问题是具有类似的删除竞争条件--无论它们是在级联或Hadoop流中,还是直接用Java编写的。
曾经讨论过在一个给定的键/值对被完全删除后从S3得到通知。我没有跟上这个特性的发展。否则,设计系统可能是最好的--同样,无论是在Cascading中还是在使用S3的任何其他应用程序中--这样,批处理工作流所使用或生成的数据将在HDFS或HBase或键/值框架中进行管理(例如,已经为此使用了Redis)。然后S3被用于持久存储,但不用于中间数据。