让springxd和hdfs接收器为maprfs工作

ckocjqey  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(442)

这是一个关于springxdrelease1.0.1与maprfs协同工作的问题,目前官方还不支持maprfs。但我还是想让它发挥作用。
我们就是这么做的:
1) 调整了xd shell、xd worker和xd singlenode shell脚本以接受参数 --hadoopDistro mapr 2) 向新目录$xd\u home/lib/mapr添加了库

  1. avro-1.7.4.jar jersey-core-1.9.jar
  2. hadoop-annotations-2.2.0.jar jersey-server-1.9.jar
  3. hadoop-core-1.0.3-mapr-3.0.2.jar jetty-util-6.1.26.jar
  4. hadoop-distcp-2.2.0.jar maprfs-1.0.3-mapr-3.0.2.jar
  5. hadoop-hdfs-2.2.0.jar protobuf-java-2.5.0.jar
  6. hadoop-mapreduce-client-core-2.2.0.jar spring-data-hadoop-2.0.2.RELEASE-hadoop24.jar
  7. hadoop-streaming-2.2.0.jar spring-data-hadoop-batch-2.0.2.RELEASE-hadoop24.jar
  8. hadoop-yarn-api-2.2.0.jar spring-data-hadoop-core-2.0.2.RELEASE-hadoop24.jar
  9. hadoop-yarn-common-2.2.0.jar spring-data-hadoop-store-2.0.2.RELEASE-hadoop24.jar

3) 跑 bin/xd-singlenode --hadoopDistro mapr 以及 shell/bin/xd-shell --hadoopDistro mapr .
通过创建和部署流时 stream create foo --definition "time | hdfs" --deploy ,数据将写入maprfs上的tmp/xd/foo/foo-1.txt.tmp文件。但是,在取消部署流时,会出现以下异常:

  1. org.springframework.data.hadoop.store.StoreException: Failed renaming from /xd/foo/foo-1.txt.tmp to /xd/foo/foo-1.txt; nested exception is java.io.FileNotFoundException: Requested file /xd/foo/foo-1.txt does not exist.
  2. at org.springframework.data.hadoop.store.support.OutputStoreObjectSupport.renameFile(OutputStoreObjectSupport.java:261)
  3. at org.springframework.data.hadoop.store.output.TextFileWriter.close(TextFileWriter.java:92)
  4. at org.springframework.xd.integration.hadoop.outbound.HdfsDataStoreMessageHandler.doStop(HdfsDataStoreMessageHandler.java:58)
  5. at org.springframework.xd.integration.hadoop.outbound.HdfsStoreMessageHandler.stop(HdfsStoreMessageHandler.java:94)
  6. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  7. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  8. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  9. at java.lang.reflect.Method.invoke(Method.java:497)
  10. at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
  11. at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:201)
  12. at com.sun.proxy.$Proxy120.stop(Unknown Source)
  13. at org.springframework.integration.endpoint.EventDrivenConsumer.doStop(EventDrivenConsumer.java:64)
  14. at org.springframework.integration.endpoint.AbstractEndpoint.stop(AbstractEndpoint.java:100)
  15. at org.springframework.integration.endpoint.AbstractEndpoint.stop(AbstractEndpoint.java:115)
  16. at org.springframework.integration.config.ConsumerEndpointFactoryBean.stop(ConsumerEndpointFactoryBean.java:303)
  17. at org.springframework.context.support.DefaultLifecycleProcessor.doStop(DefaultLifecycleProcessor.java:229)
  18. at org.springframework.context.support.DefaultLifecycleProcessor.access$300(DefaultLifecycleProcessor.java:51)
  19. at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.stop(DefaultLifecycleProcessor.java:363)
  20. at org.springframework.context.support.DefaultLifecycleProcessor.stopBeans(DefaultLifecycleProcessor.java:202)
  21. at org.springframework.context.support.DefaultLifecycleProcessor.stop(DefaultLifecycleProcessor.java:106)
  22. at org.springframework.context.support.AbstractApplicationContext.stop(AbstractApplicationContext.java:1186)
  23. at org.springframework.xd.module.core.SimpleModule.stop(SimpleModule.java:234)
  24. at org.springframework.xd.dirt.module.ModuleDeployer.destroyModule(ModuleDeployer.java:132)
  25. at org.springframework.xd.dirt.module.ModuleDeployer.handleUndeploy(ModuleDeployer.java:111)
  26. at org.springframework.xd.dirt.module.ModuleDeployer.undeploy(ModuleDeployer.java:83)
  27. at org.springframework.xd.dirt.server.ContainerRegistrar.undeployModule(ContainerRegistrar.java:261)
  28. at org.springframework.xd.dirt.server.ContainerRegistrar$StreamModuleWatcher.process(ContainerRegistrar.java:884)
  29. at org.apache.curator.framework.imps.NamespaceWatcher.process(NamespaceWatcher.java:67)
  30. at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:522)
  31. at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:498)
  32. Caused by: java.io.FileNotFoundException: Requested file /xd/foo/foo-1.txt does not exist.
  33. at com.mapr.fs.MapRFileSystem.getMapRFileStatus(MapRFileSystem.java:805)
  34. at com.mapr.fs.MapRFileSystem.delete(MapRFileSystem.java:629)
  35. at org.springframework.data.hadoop.store.support.OutputStoreObjectSupport.renameFile(OutputStoreObjectSupport.java:258)
  36. ... 29 more

我看了一眼 OutputStoreObjectSupport.renameFile() 功能。当hdfs上的文件完成时,此方法尝试将文件/xd/foo/foo-1.txt.tmp重命名为xd/foo/foo1.txt。这是相关代码:

  1. try {
  2. FileSystem fs = path.getFileSystem(getConfiguration());
  3. boolean succeed;
  4. try {
  5. fs.delete(toPath, false);
  6. log.info("Renaming path=[" + path + "] toPath=[" + toPath + "]");
  7. succeed = fs.rename(path, toPath);
  8. } catch (Exception e) {
  9. throw new StoreException("Failed renaming from " + path + " to " + toPath, e);
  10. }
  11. if (!succeed) {
  12. throw new StoreException("Failed renaming from " + path + " to " + toPath + " because hdfs returned false");
  13. }
  14. }

当目标文件在hdfs上不存在时,maprfs似乎在 fs.delete(toPath, false) 被称为。但是在这种情况下抛出一个异常是没有意义的。我假设其他文件系统实现的行为不同,但这一点我仍然需要验证。不幸的是,我找不到maprfilesystem.java的源代码。这是封闭源吗?这将有助于我更好地理解这个问题。有人有从springxd到maprfs的写作经验吗?或者用spring-data-hadoop重命名maprfs上的文件?

编辑

我设法用一个简单的测试用例(见下文)在springxd之外重现了这个问题。请注意,只有在设置了inwritingsuffix或inwritingprefix时才会引发此异常。否则springhadoop将不会尝试重命名该文件。所以这对我来说仍然是一个令人不满意的解决方法:不要使用inwritingprefixes和inwritinguffixes。

  1. @ContextConfiguration("context.xml")
  2. @RunWith(SpringJUnit4ClassRunner.class)
  3. public class MaprfsSinkTest {
  4. @Autowired
  5. Configuration configuration;
  6. @Autowired
  7. FileSystem filesystem;
  8. @Autowired
  9. DataStoreWriter<String >storeWriter;
  10. @Test
  11. public void testRenameOnMaprfs() throws IOException, InterruptedException {
  12. Path testPath = new Path("/tmp/foo.txt");
  13. filesystem.delete(testPath, true);
  14. TextFileWriter writer = new TextFileWriter(configuration, testPath, null);
  15. writer.setInWritingSuffix("tmp");
  16. writer.write("some entity");
  17. writer.close();
  18. }
  19. @Test
  20. public void testStoreWriter() throws IOException {
  21. this.storeWriter.write("something");
  22. }
  23. }
kqlmhetl

kqlmhetl1#

我为支持maprfs的spring hadoop创建了一个新分支:
https://github.com/blinse/spring-hadoop/tree/origin/2.0.2.release-mapr
构建此版本并使用生成的jar可以很好地与hdfs接收器配合使用。

相关问题