I have a custom Flink Sink, which is writing to HDFS, usin the following code to instantiate the FileSystem object.
val path = new Path("/path/to/one/hdfs/dir")
val hadoopJob = Job.getInstance
val hadoopConf = hadoopJob.getConfiguration
val fs = FileSystem.get(hadoopConf)
val os = fs.create(path)
I have set the property fs.hdfs.hadoopconf in the flink configuration file pointing to the directory where I have the hadoop configuration files.
In the core-site.xml I have defined the property fs.defaultFS as shown below.
<property>
<name>fs.defaultFS</name>
<value>hdfs://hostname:port</value>
</property>
And it is failing because it is instantiating an object type LocalFileSystem, instead of DistributedFileSystem. Following is the exception I get.
java.lang.IllegalArgumentException: Wrong FS: hdfs://compute-0-0:9000/esteban.collado/kmers, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:665) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:86) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:542) at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:528)
Could anyone give me some clue about the possible issue ?
Thanks,
1条答案
按热度按时间kgsdhlau1#
也许你可以看看FileSystem.get(path)方法,通过路径来识别最终的文件系统[1]:https://i.stack.imgur.com/puPzC.png