我正在hadoop多节点集群(2.4.1)上运行mapreduce代码。当我尝试使用大小为200mb和200mb的两个输入文件运行时,我得到了超出gc开销限制的错误。当我使用非常小的文件时,它运行得非常好,并且得到了正确的输出。
我的目的是比较第一个文件中的每个流量记录和第二个文件中的每个流量记录,并计算距离,然后取10个最大值,并根据这10个最大值输出到reducer。
两个文件中的流记录示例-194.144.0.27 | 192.168.1.5 | 0.0.0 | 0 | 0 | 2 | 104 | 1410985350 | 1410985350 | 51915 | 51413 | 6 | 6
很少快照:http://goo.gl/5tuhjj 以及http://goo.gl/lh1qvm
下面是mapper类:
Map器类:
public class mapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
private final static IntWritable five = new IntWritable(5);
private Text counter1;
ArrayList<String> lines = new ArrayList<String>();
String str;
BufferedReader br,in;
int ddos_line = 0;
int normal_line = 0,total_testing_records=4000;
int K = 10;
@Override
protected void setup(Context context) throws IOException, InterruptedException
{
//BufferedReader in = new BufferedReader(new FileReader("normal"));
Configuration conf = context.getConfiguration();
URI[] cachefiles = context.getCacheFiles();
FileSystem fs = FileSystem.get(new Configuration());
FileStatus[] status = fs.listStatus(new Path(cachefiles[0].toString()));
BufferedReader in=new BufferedReader(new InputStreamReader(fs.open(status[0].getPath())));
while((str = in.readLine()) != null)
{
lines.add(str);
}
in.close();
//System.out.println("na netti");
}
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line1 = value.toString();
ddos_line++;
normal_line = 0;
double[] count = {-1, -1, -1, -1, -1, -1, -1, -1, -1, -1};
int[] lineIndex = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
String[] parts = line1.split("\\|");
String[] linesArray = lines.toArray(new String[lines.size()]);
boolean bool = true;
int t1=0;
double sum=0;
while (bool)
{
for(int i=0; i<K;i++)
{
if(bool==false) break;
sum = 0;
String[] parts2 = linesArray[normal_line].split("\\|");
for(int x=0;x<13;x++)
{
if(parts[x].equals(parts2[x]))
{
t1 = 1;
}
else t1 = 0;
sum += t1;
}
sum = Math.sqrt(sum);
if(count[K-1] <= sum)
{
count[K-1] = sum;
lineIndex[K-1]=normal_line;
}
for(int k=0;k<K;k++)
{
for(int j=0;j<K-1;j++)
{
if(count[j] < count[j+1])
{
double temp2 = count[j+1];
count[j+1] = count[j];
count[j] = temp2;
int temp3 = lineIndex[j+1];
lineIndex[j+1] = lineIndex[j];
lineIndex[j] = temp3;
}
}
}
//System.out.println(ddos_line + " " + normal_line);
if (normal_line + 1 < linesArray.length)
{
normal_line++;
continue;
}
else bool = false;
}
} // while end
char[] t = {'d','d','d','d','d','d','d','d','d','d'};
for(int i=0;i<K;i++)
{
if(lineIndex[i] <= total_testing_records/2 ) t[i] = 'n';
}
int counter_normal=0, counter_ddos=0;
for(int i=0;i<K;i++)
{
if(t[i]=='n')
counter_normal++;
else
counter_ddos++;
//System.out.println("t[i]: "+t[i]+", counter: "+counter_ddos);
}
if(counter_normal<=K/2)
{
counter1 = new Text(ddos_line + " : d : "+ counter_ddos);
}
else
{
counter1 = new Text(ddos_line + " : n : "+ (counter_normal));
}
context.write(counter1, five);
//System.out.println("mapper finished");
}
public void run(Context context) throws IOException, InterruptedException
{
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
}
1条答案
按热度按时间bxgwgixi1#
只需增加任务的记忆,然后:
设置
在作业配置中
或者更多,任何你需要阅读和处理的文件。