分布式缓存不工作

kiayqfof  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(435)

我在分布式缓存中存储了少量数据(几MB),并用它来执行两个大文件的反连接。对于缓存中的几行数据,该功能可以正常工作,但是当缓存在生产中有更多的数据时,它无法完成该工作,但也不会引发任何错误。只是只有很少的记录(大约20%)被加入,而其他记录只是被忽略了。那么,分布式缓存中可以存储的记录数是否有上限呢?为什么它只为一些记录工作而忽略其余的记录?任何建议都会非常有用。下面是我的密码

public class MyMapper extends Mapper<LongWritable, Text, Text, TextPair> {

            Text albumKey = new Text();
            Text photoKey = new Text();
            private HashSet<String> photoDeleted = new HashSet<String>();

            private HashSet<String> albDeleted = new HashSet<String>();
            Text interKey = new Text();
            private TextPair interValue = new TextPair();
            private static final Logger LOGGER = Logger.getLogger(SharedStreamsSlMapper.class);

            protected void setup(Context context) throws IOException, InterruptedException {
                int count=0;
                Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
                System.out.println(cacheFiles.length);
                LOGGER.info(cacheFiles+"****");
                try {
                    if (cacheFiles != null && cacheFiles.length > 0) {
                        for (Path path : cacheFiles) {
                            String line;
                            String[] tokens;

                            BufferedReader joinReader = new BufferedReader(new FileReader(path.toString()));
                            System.out.println(path.toString());
        //                  BufferedReader joinReader = new BufferedReader(new FileReader("/Users/Kunal_Basak/Desktop/ss_test/dsitCache/part-m-00000"));
                            try {
                                while ((line = joinReader.readLine()) != null) {
                                    count++;
                                    tokens = line.split(SSConstants.TAB, 2);
                                    if(tokens.length<2){
                                        System.out.println("WL");
                                        continue;
                                    }
                                    if (tokens[0].equals("P")) {
                                        photoDeleted.add(tokens[1]);
                                    }
                                    else if (tokens[0].equals("A")) {
                                        albDeleted.add(tokens[1]);
                                    }
                                }
                            }
                            finally {
                                joinReader.close();
                            }
                        }
                    }
                }
                catch (IOException e) {
                    System.out.println("Exception reading DistributedCache: " + e);
                }
                System.out.println(count);
                System.out.println("albdeleted*****"+albDeleted.size());
                System.out.println("photo deleted*****"+photoDeleted.size());
                LOGGER.info("albdeleted*****"+albDeleted.size());
                LOGGER.info("albdeleted*****"+albDeleted.size());
            }

            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        try{
    //my mapper code
    }
    }
    }
sr4lhrrt

sr4lhrrt1#

根据这篇博客文章:
这个 local.cache.size 参数控制distributedcache的大小。
默认设置为10GB。
因此,如果缓存中的容量超过10gb,这可能是您的问题。

相关问题