我的工作流程如下:
我正在处理大量的数据。我有一个 MapFile
需要缓存。这个文件的大小现在是1 gb,但我希望它最终会增长。
Map文件的内容如下所示:
12345,45464 192.34.23.1
33214,45321 123.45.32.1
在 map-phase
,我处理中的输入文件中的每条记录 TextInputFormat
. 我解析这行(按记号分割)并检索前两个记号,记号1和记号2。
如果这对(token1,token2)不在缓存文件中,那么我执行一个api调用,获取信息,在缓存中保持(如果可能的话)并继续处理。
private Parser parser = new customParser();
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
parser.parse(value);
Pair pair = new Pair();
pair.setFirst(parser.getFirst());
pair.setSecond(parser.getSecond());
IP ip = null;
//here is the catch
//check if pair exists in cache
if cache.contains(pair){
ip=cache.get(pair);
}
else {
ip=getFromAPI(pair);//This does API call outside network.
cache.put(pair,ip);
}
context.write(pair,ip);
}
}
我在这里看到的主要问题是
如何跨所有节点获取缓存中的大文件。distributedcache通过将文件复制到本地节点来工作。但由于这个文件更大,这里涉及到网络流量,对于我的日常工作,我不想继续分发它。
如何高效地查找Map文件(缓存),整个Map文件将不在内存中。
如何写入我的缓存Map文件。
谢谢
1条答案
按热度按时间btxsgosb1#
在我看来,有三种方法可以处理这个问题,最好的方法取决于缓存文件的增长方式。
如果您不希望缓存文件增长太大,并且它总是能够在不妨碍其他应用程序或mapreduce作业的情况下放入内存,那么可以将其放入hdfs缓存中。从hadoop 2.3.0开始就支持此功能:
hdfs缓存允许用户在hdfs中显式缓存某些文件或目录。然后,datanodes将通过使用mmap和mlock在堆外内存中缓存相应的块。一旦缓存,hadoop应用程序可以查询缓存块的位置,并将其任务放置在内存位置。最后,当内存为本地时,应用程序可以使用新的零拷贝读取api读取缓存数据,而不需要额外的开销。
如果在缓存文件增长时无法安全地将其保留在内存中,则最后两个选项更合适:
thomasjungblut的回答建议将缓存文件放入hdfs中,增加复制计数并使用文件系统api读取它。这仍然会导致非本地副本的网络通信,但希望少于对分布式缓存中所有节点的传输。文件系统api还允许您附加到现有文件,从而更新该文件。
如果缓存文件增长过快,可能会在存储额外的复制时遇到问题,则可以考虑将其作为第一个Map步骤的一部分进行检索。
例如,可以将缓存文件和要处理的文件都作为Map器的输入,并为这两个输入Map令牌对。在reduce步骤中,如果a令牌对具有来自缓存文件和处理文件的行,则不输出任何内容,并在其他两种可能的情况下输出相应的缓存行,从而构建新的缓存文件。