我有一个mapper和reducer类,其输入和输出值如下所示。
//Reducer
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(MapperOutput.class);
//Mapper
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(MapperOutput.class);
在这里 MapperOutput
是我定义的自定义类,它实现了 Writable
接口。
Map器函数的一部分如下所示。
public void map(LongWritable arg0, Text arg1,
Context context)
throws IOException
{
try
{
String tran = null;
String ip = arg1.toString();
System.out.println(ip);
BufferedReader br = new BufferedReader(new StringReader(ip));
Hsynopsis bdelta = null;
Hsynopsis b = null, bnew = null;
hashEntries = (int) Math.floor(calculateHashEntries()); //Hash table size
System.out.println("Hash entries: "+hashEntries);
//Initialize the main hash table and delta hashtable
hashTable = new ArrayList<>(hashEntries);
for(int i = 0; i < hashEntries; i++)
{
hashTable.add(i, null);
}
deltahashTable = new ArrayList<>(hashEntries);
for(int i = 0; i < hashEntries; i++)
{
deltahashTable.add(i, null);
}
while((tran = br.readLine())!=null)
{
createBinaryRep(tran);
for(int i = 0; i < deltahashTable.size(); i++)
{
bdelta = deltahashTable.get(i);
if(bdelta != null)
{
if(bdelta.NLast_Access >= (alpha * transactionCount))
{
//Transmit bdelta to the coordinator
MapperOutput mp = new MapperOutput(transactionCount, bdelta);
context.write(new LongWritable(i), mp);
//Merge bdelta into b
b = hashTable.get(i);
bnew = merge(b,bdelta);
hashTable.set(i, bnew);
//Release bdelta
deltahashTable.set(i, null);
}
}
}
}
}
catch(Exception e)
{
e.printStackTrace();
}
}
我的任务如下。
public void reduce(LongWritable index, Iterator<MapperOutput> mpValues, Context context)
{
while(mpValues.hasNext())
{
/*Some code here */
}
context.write(index, mp);
}
根据mapper的代码,按照算法的要求,我试图在 condition is satisfied
(内部) for
循环),并在写入 context
,继续执行循环。
当我尝试在单节点hadoop集群上运行此代码时,会得到以下日志。
15/04/29 03:19:23 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/04/29 03:19:23 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
15/04/29 03:19:23 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
15/04/29 03:19:23 INFO input.FileInputFormat: Total input paths to process : 2
15/04/29 03:19:23 WARN snappy.LoadSnappy: Snappy native library not loaded
15/04/29 03:19:24 INFO mapred.JobClient: Running job: job_local599819429_0001
15/04/29 03:19:24 INFO mapred.LocalJobRunner: Waiting for map tasks
15/04/29 03:19:24 INFO mapred.LocalJobRunner: Starting task: attempt_local599819429_0001_m_000000_0
15/04/29 03:19:24 INFO util.ProcessTree: setsid exited with exit code 0
15/04/29 03:19:24 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@74ff364a
15/04/29 03:19:24 INFO mapred.MapTask: Processing split: file:/home/pooja/ADM/FrequentPatternMining/input/file.dat~:0+24
15/04/29 03:19:24 INFO mapred.MapTask: io.sort.mb = 100
15/04/29 03:19:24 INFO mapred.MapTask: data buffer = 79691776/99614720
15/04/29 03:19:24 INFO mapred.MapTask: record buffer = 262144/327680
15/04/29 03:19:24 INFO mapred.MapTask: Starting flush of map output
15/04/29 03:19:24 INFO mapred.MapTask: Starting flush of map output
15/04/29 03:19:25 INFO mapred.JobClient: map 0% reduce 0%
15/04/29 03:19:30 INFO mapred.LocalJobRunner:
15/04/29 03:19:31 INFO mapred.JobClient: map 50% reduce 0%
Map任务已停留在50%,无法继续。
当我单独运行map函数时(不是在hadoop中),我没有任何无限循环的问题。
有人能用这个取悦我吗?
编辑1:我的输入文件是在kb的顺序。这会给Map绘制者的数据分发带来问题吗?
编辑2:如答案中所述,我将iterator更改为iterable。但Map仍然停留在100%,一段时间后重新启动。
我可以在jobtracker日志中看到以下内容:
2015-04-29 13:26:28,026 INFO org.apache.hadoop.mapred.TaskInProgress: Error from attempt_201504291300_0003_m_000000_0: Task attempt_201504291300_0003_m_000000_0 failed to report status for 600 seconds. Killing!
2015-04-29 13:26:28,026 INFO org.apache.hadoop.mapred.JobTracker: Removing task 'attempt_201504291300_0003_m_000000_0'
2条答案
按热度按时间smtd7mpg1#
您在reduce函数中错误地使用了迭代器而不是iterable。
在使用新的MapReduceAPI时需要使用iterable,因为reduce(object,iterable,org.apache.hadoop.mapreduce.reducer.context)
方法为排序的输入中的每个输入调用。
6xfqseft2#
如果您的代码陷入无限循环中,有时会发生这种情况。试着检查一下。