mapreduce作业由于全局状态而不产生输出

np8igboo  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(319)

我不知道为什么我的Map器和减速机没有输出。代码背后的逻辑是,给定一个uuid文件(新行分隔),我想使用 globStatus 显示uuid可能所在的所有潜在文件的所有路径。打开并读取文件。每个文件包含1-n行json。uuid在 event_header.event_id 在json中。
现在mapreduce作业运行时没有错误。然而,有些事情是错误的,因为我没有任何输出。我也不知道如何调试mapreduce作业。如果有人能给我提供一个来源,这将是可怕的!此程序的预期输出应为

fee90c3f-e832-4267-aa9b-250f53kc06d3 1
914938ae-eed6-4dfa-81bf-71e67m42d93a 1
bbge6012-9c51-4ae1-9242-a4aaf08bfb36 1
e5a12493-gtrf-4ar4-9235-02fd3h580970 1
3b054300-09ba-4d59-a6ac-a0975ca74ed5 1
6fbb1c5g-15ce-4e6f-9236-55a9d9d6e2c6 1
ab4677a3-0f58-428c-8h58-5fe3dfe528dc 1
caaa011d-ahba-4ne7-9h05-3872f3k1854c 1

json示例:

{"event_header":{"version":"1.0","event_id":"fdk32k23-f7f6-412d-879d-f79b4c3b0d55","server_timestamp":1427734304673,"client_ip_address":"10.144.28.48","server_ip_address":"10.129.67.0"},"data_version":"1.0","application":{"properties":{}},"session":{"test":false,"user_id":"1121057496"},"event":{"timestamp":"1427734304577","event_category":"User","traffic":{"priority_code":"1728300000"},"event_id":"9ad26251-b940-408a-b6a9-0a825be1fd38","event_name":"Create"}}

在我的逻辑中,输出文件应该是UUID,旁边有一个1,因为一旦找到,1就会被写入,如果没有找到,0就会被写入。它们应该都是1,因为我从源代码中提取了uuid。
我加了一行 context.write(new Text("None"), new Text("blank")) 在for循环中,我发现没有向输出写入任何内容。所以我想我可以有把握地得出结论,我正在使用 globStatus() 链接不正确。
我的减速机目前没有做任何事情,除了我只是想看看我是否可以得到一些简单的逻辑工作。我的代码中很可能有bug,因为我不知道有什么简单的方法来调试mapreduce作业。
司机:

public class SearchUUID {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "UUID Search");
        job.getConfiguration().set("mapred.job.queue.name", "exp_dsa");
        job.setJarByClass(SearchUUID.class);
        job.setMapperClass(UUIDMapper.class);
        job.setReducerClass(UUIDReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

UUIDMap程序:

public class UUIDMapper extends Mapper<Object, Text, Text, Text> {
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

        try {
            Text one = new Text("1");
            Text zero = new Text("0");

            FileSystem fs = FileSystem.get(new Configuration());
            FileStatus[] paths = fs.globStatus(new Path("/data/path/to/file/d_20150330-1650"));
            for (FileStatus path : paths) {
                BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path.getPath())));
                String json_string = br.readLine();
                while (json_string != null) {
                    JsonElement jelement = new JsonParser().parse(json_string);
                    JsonObject jsonObject = jelement.getAsJsonObject();
                    jsonObject = jsonObject.getAsJsonObject("event_header");
                    jsonObject = jsonObject.getAsJsonObject("event_id");

                    if (value.toString().equals(jsonObject.getAsString())) {
                        System.out.println(value.toString() + "slkdjfksajflkjsfdkljsadfk;ljasklfjklasjfklsadl;sjdf");
                        context.write(value, one);
                    } else {
                        context.write(value, zero);
                    }

                    json_string = br.readLine();
                }
            }
        } catch (IOException failed) {
        }
    }
}

减速器:

public class UUIDReducer extends Reducer<Text, Text, Text, Text>{

    public void reduce(Text key, Text value, Context context) throws IOException, InterruptedException{
        context.write(key, value);
    }
}
xmjla07d

xmjla07d1#

你检查过日志文件夹里的用户日志了吗?下面的代码可以正常工作
jsonobject=jsonobject.getasjsonobject(“事件头”);jsonobject=jsonobject.getasjsonobject(“事件标识”);此行不正确,请使用jsonobject.get(“event_header”).getasjsonobject();jsonobject.get(“event_id”).getasjsonobject();问题在获取事件标题、事件id jsonobject中。

public class UUIDMapper extends Mapper < Object, Text, Text, Text > {
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        try {
            Text one = new Text("1");
            Text zero = new Text("0");
            String json_string[] = {
                "your data", "your data", "your data", "your data "
            };
            int i = 0;
            while (i < json_string.length) {
                if (value.toString().equals(json_string[i])) {
                    context.write(value, one);
                } else {
                    context.write(value, zero);
                }
            }
        } catch (Exception t) {
            t.printStackTrace();
        }
    }
}

相关问题