提交hadoop map reduce作业和辅助数据文件?

vuktfyat  于 2021-06-01  发布在  Hadoop
关注(0)|答案(0)|浏览(218)

我想做一个 Map-Reduce 包含我的数据的大文本文件上的作业。为了处理这些数据,我实际上需要另一个 array 对我正在处理的数据进行查找。
所以理想情况下 reducers 将加载此文件并填充数组,然后开始 reducing .
我试着用 Google Cloud platform 因为我得到£250免费计算,不涉及大量的项目细节,我是新来的 map-reduce ,只完成了一个关于极限计算的小的大学课程,需要一个最好的方法来实现这一点的方向。有一些事情我还没有解决,如输出类型等,但我所挣扎的是 setup() 函数,在 reducer 开始 reducing ,任何帮助都将不胜感激!

public static class MapForEquityCalculator extends Mapper<LongWritable, Text, Text, IntWritable> {
    public int GetIndex(int R1, int R2, boolean IsSuited) {
        if (IsSuited) {
            return (int)(((13 - Math.ceil((double)Math.max(R1, R2) / 4)) * 13) + (13 - Math.ceil((double)Math.min(R1, R2) / 4)));
        } else {
            return (int)(((13 - Math.ceil((double)Math.min(R1, R2) / 4)) * 13) + (13 - Math.ceil((double)Math.max(R1, R2) / 4)));
        }
    }

    public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException {
            String line = value.toString();
            String[] cards=line.split(",");
            int ind_1 = GetIndex(Integer.parseInt(cards[0]),Integer.parseInt(cards[1]),Integer.parseInt(cards[0])%4 == Integer.parseInt(cards[1])%4);
            int ind_2 = GetIndex(Integer.parseInt(cards[2]),Integer.parseInt(cards[3]),Integer.parseInt(cards[2])%4 == Integer.parseInt(cards[3])%4);
            int ind_3 = GetIndex(Integer.parseInt(cards[4]),Integer.parseInt(cards[5]),Integer.parseInt(cards[4])%4 == Integer.parseInt(cards[4])%4);

            Text outputKey = new Text(Integer.toString(ind_1) +"," + Integer.toString(ind_2) +"," + Integer.toString(ind_3));
            Text outputValue = new Text(line);
            con.write(outputKey, outputValue);
    }

}

public static class ReduceForEquityCalculator extends Reducer<Text, IntWritable, Text, IntWritable> {

    public static final int HandRankSize = 32487834;
    public static int HR[] = new int[HandRankSize];

    protected static final int littleEndianByteArrayToInt(byte[] b, int offset) {
    return (b[offset + 3] << 24) + ((b[offset + 2] & 0xFF) << 16)
            + ((b[offset + 1] & 0xFF) << 8) + (b[offset] & 0xFF);
    }

    public void setup(Context context) throws IOException{
        Path pt=new Path("gs://mybucket/HandRanks.dat");
        Configuration conf = new Configuration();
        FileSystem fs = pt.getFileSystem(conf);
        int tableSize = HandRankSize*4;
        byte[] b = new byte[tableSize];
        try {
            FileInputStream fr = new FileInputStream(fs.open(pt));
            int bytesRead = fr.read(b, 0, tableSize);
            if (bytesRead != tableSize) {
            }
            fr.close();
        } catch (FileNotFoundException e) {
        } catch (IOException e) {
        }
        for (int i = 0; i < HandRankSize; i++) {
            HR[i] = littleEndianByteArrayToInt(b, i * 4);
        }
    }

    public void reduce(Text word, Text value, Context con) throws IOException, InterruptedException {
        String line = value.toString();
        String[] cards_string=line.split(",");
        Integer[] cards = new Integer[cards_string.length];
        int i=0;
        for(String str:cards_string){
            cards[i]=Integer.parseInt(str);//Exception in this line
            i++;
        }

        int c0, c1, c2, c3, c4, c5, c6;

        int u0, u1, u2, u3, u4, u5;
        int su0, su1, su2, su3, su4, su5;
        int tu0, tu1, tu2, tu3, tu4, tu5;

        u0 = HR[53+cards[0]];
        u1 = HR[u0+cards[1]];
        su0 = HR[53+cards[2]];
        su1 = HR[su0+cards[3]];
        tu0 = HR[53+cards[4]];
        tu1 = HR[su0+cards[5]];

        int A_B_C = 0;
        int A_C_B = 0;
        int A_BC = 0;
        int AB_C = 0;
        int AC_B = 0;
        int ABC = 0;
        int B_A_C = 0;
        int B_C_A = 0;
        int B_AC = 0;
        int BC_A = 0;
        int C_A_B = 0;
        int C_B_A = 0;
        int C_AB = 0;

        for (int com1 = 1; com1 < 49; com1++) {
            if (com1 != cards[0] && com1 != cards[1] && com1 != cards[2] && com1 != cards[3] && com1 != cards[4] && com1 != cards[5]) {
                u2 = HR[u1+com1];
                su2 = HR[su1+com1];
                tu2 = HR[tu1+com1];
                for (int com2 = com1 + 1; com2 < 50; com2++) {
                    if (com2 != cards[0] && com2 != cards[1] && com2 != cards[2] && com2 != cards[3] && com2 != cards[4] && com2 != cards[5]) {
                        u3 = HR[u2+com2];
                        su3 = HR[su2+com2];
                        tu3 = HR[tu2+com2];
                        for (int com3 = com2 + 1; com3 < 51; com3++) {
                            if (com3 != cards[0] && com3 != cards[1] && com3 != cards[2] && com3 != cards[3] && com3 != cards[4] && com3 != cards[5]) {
                                u4 = HR[u3+com3];
                                su4 = HR[su3+com3];
                                tu4 = HR[tu3+com3];
                                for (int com4 = com3 + 1; com4 < 52; com4++) {
                                    if (com4 != cards[0] && com4 != cards[1] && com4 != cards[2] && com4 != cards[3] && com4 != cards[4] && com4 != cards[5]) {
                                        u5 = HR[u4+com4];
                                        su5 = HR[su4+com4];
                                        tu5 = HR[tu4+com4];
                                        for (int com5 = com4 + 1; com5 < 53; com5++) {
                                            if (com5 != cards[0] && com5 != cards[1] && com5 != cards[2] && com5 != cards[3] && com5 != cards[4] && com5 != cards[5]) {
                                                int rank_a = HR[u5 + com5];
                                                int rank_b = HR[su5 + com5];
                                                int rank_c = HR[tu5 + com5];
                                                if (rank_a > rank_b && rank_b > rank_c) {
                                                    A_B_C++;
                                                } else if (rank_a > rank_c && rank_c > rank_b) {
                                                    A_C_B++;
                                                } else if (rank_a > rank_b && rank_b == rank_c) {
                                                    A_BC++;
                                                } else if (rank_a == rank_b && rank_b > rank_c) {
                                                    AB_C++;
                                                } else if (rank_a == rank_c && rank_c > rank_b) {
                                                    AC_B++;
                                                } else if (rank_a == rank_c && rank_a == rank_b) {
                                                    ABC++;  
                                                } else if (rank_b > rank_a && rank_a > rank_c) {
                                                    B_A_C++;
                                                } else if (rank_b > rank_c && rank_c > rank_a) {
                                                    B_C_A++;
                                                } else if (rank_b > rank_a && rank_a == rank_c) {
                                                    B_AC++;
                                                } else if (rank_b == rank_c && rank_c > rank_a) {
                                                    BC_A++;
                                                } else if (rank_c > rank_a && rank_a > rank_b) {
                                                    C_A_B++;
                                                } else if (rank_c > rank_b && rank_b > rank_a) {
                                                    C_B_A++;
                                                } else {
                                                    C_AB++;
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }

        Text outputValue = new Text(Integer.toString(A_B_C) +"," + Integer.toString(A_C_B) + "," + Integer.toString(A_BC)+"," + Integer.toString(A_BC) +","+ Integer.toString(AB_C)+","+ Integer.toString(AC_B) +","+ Integer.toString(ABC)+"," +Integer.toString(B_A_C) +","+Integer.toString(B_C_A)+","+Integer.toString(B_AC) +","+Integer.toString(BC_A)+","+Integer.toString(C_A_B) +","+Integer.toString(C_B_A)+","+Integer.toString(C_AB) + "\n");
        con.write(word, outputValue);
    }
}

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题