所以我在hdfs上有这个文件,但显然hdfs找不到它,我也不知道为什么。
我的代码是:
public static Schema getSchema() throws IOException {
InputStream is = new FileInputStream("hdfs:///schema.movies");
String ps = new String(is.readAllBytes());
MessageType mt = MessageTypeParser.parseMessageType(ps);
return new AvroSchemaConverter().convert(mt);
}
schema.movies文件可以在这里看到:
我得到的错误是:
Connected.
Configuring core
- Setting hadoop.proxyuser.hue.hosts=*
- Setting fs.defaultFS=hdfs://namenode:9000
- Setting hadoop.http.staticuser.user=root
- Setting io.compression.codecs=org.apache.hadoop.io.compress.SnappyCodec
- Setting hadoop.proxyuser.hue.groups=*
Configuring hdfs
- Setting dfs.namenode.datanode.registration.ip-hostname-check=false
- Setting dfs.webhdfs.enabled=true
- Setting dfs.permissions.enabled=false
Configuring yarn
- Setting yarn.timeline-service.enabled=true
- Setting yarn.scheduler.capacity.root.default.maximum-allocation-vcores=4
- Setting yarn.resourcemanager.system-metrics-publisher.enabled=true
- Setting yarn.resourcemanager.store.class=org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore
- Setting yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage=98.5
- Setting yarn.log.server.url=http://historyserver:8188/applicationhistory/logs/
- Setting yarn.resourcemanager.fs.state-store.uri=/rmstate
- Setting yarn.timeline-service.generic-application-history.enabled=true
- Setting yarn.log-aggregation-enable=true
- Setting yarn.resourcemanager.hostname=resourcemanager
- Setting yarn.scheduler.capacity.root.default.maximum-allocation-mb=8192
- Setting yarn.nodemanager.aux-services=mapreduce_shuffle
- Setting yarn.resourcemanager.resource_tracker.address=resourcemanager:8031
- Setting yarn.timeline-service.hostname=historyserver
- Setting yarn.resourcemanager.scheduler.address=resourcemanager:8030
- Setting yarn.resourcemanager.address=resourcemanager:8032
- Setting mapred.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
- Setting yarn.nodemanager.remote-app-log-dir=/app-logs
- Setting yarn.resourcemanager.scheduler.class=org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler
- Setting mapreduce.map.output.compress=true
- Setting yarn.nodemanager.resource.memory-mb=16384
- Setting yarn.resourcemanager.recovery.enabled=true
- Setting yarn.nodemanager.resource.cpu-vcores=8
Configuring httpfs
Configuring kms
Configuring mapred
- Setting mapreduce.map.java.opts=-Xmx3072m
- Setting mapreduce.reduce.java.opts=-Xmx6144m
- Setting mapreduce.reduce.memory.mb=8192
- Setting yarn.app.mapreduce.am.env=HADOOP_MAPRED_HOME=/opt/hadoop-3.2.1/
- Setting mapreduce.map.memory.mb=4096
- Setting mapred.child.java.opts=-Xmx4096m
- Setting mapreduce.reduce.env=HADOOP_MAPRED_HOME=/opt/hadoop-3.2.1/
- Setting mapreduce.framework.name=yarn
- Setting mapreduce.map.env=HADOOP_MAPRED_HOME=/opt/hadoop-3.2.1/
Configuring for multihomed network
Exception in thread "main" java.io.FileNotFoundException: hdfs:/schema.movies (No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at java.io.FileInputStream.<init>(FileInputStream.java:93)
at GGCD_Alinea1.ToParquet.getSchema(ToParquet.java:33)
at GGCD_Alinea1.ToParquet.main(ToParquet.java:214)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
at org.apache.hadoop.util.RunJar.main(RunJar.java:236)
Disconnected from container.
正如您看到的,它说它找不到schema.movies文件,但您可以看到它已经存储在hdfs中。这是因为它没有使用org.apache.hadoop.fs.path中的path类吗?因为当我使用下面的代码(使用path)运行程序时,它会找到我的数据文件,但是在getschema()中,我需要使用readallbytes()方法,这就是为什么我不在那里使用path的原因。
public static void main(String[] args) throws Exception{
long startTime = System.nanoTime();
Job job1 = Job.getInstance(new Configuration(), "ToParquetAlinea1");
job1.setJarByClass(ToParquet.class);
//input
job1.setInputFormatClass(TextInputFormat.class);
MultipleInputs.addInputPath(job1,new Path("hdfs:///title.basics.tsv.gz"),
TextInputFormat.class, ToParquetMapperLeft.class);
MultipleInputs.addInputPath(job1,new Path("hdfs:///title.ratings.tsv.gz"),
TextInputFormat.class, ToParquetMapperRight.class);
job1.setReducerClass(JoinReducer.class);
//output
job1.setOutputKeyClass(Void.class);
job1.setOutputValueClass(GenericRecord.class);
job1.setOutputFormatClass(AvroParquetOutputFormat.class);
AvroParquetOutputFormat.setSchema(job1, getSchema());
FileOutputFormat.setOutputPath(job1,new Path("hdfs:///resultado_parquet"));
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(Text.class);
job1.waitForCompletion(true);
long endTime = System.nanoTime();
long duration = (endTime - startTime)/1000000; //miliseconds
System.out.println("\n\nTIME: " + duration +"\n");
}
为什么?
1条答案
按热度按时间yrwegjxp1#
有效的getschema()方法是: