如何使mapreduce中的读写执行更快?

5m1hhzi4  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(371)

我有一个程序,它只有驱动程序和Map程序类。我没有使用减速机类。
在driver类中,我正在s3bucket和mapper类中读取一个文件。我在s3bucket中通过普通java代码(如aws java sdk)编写文件,而不是通过context.write。
我有1000个json文件。当我运行这个程序时,driver类获取文件,mapper类在s3 bucket中编写每个文件。对我来说,写一个文件最多需要2秒钟,但我想在2秒钟内写至少100个文件。
我怎样才能做到这一点?请给我建议一些解决办法。

goucqfw6

goucqfw61#

Service call class:

import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpMethodBase;
import org.apache.commons.httpclient.methods.GetMethod;
import org.elasticsearch.client.Client;

public class ClcConstants {

public static Client client = null;

// Variable Declarations for mapping the field in input file
public static String ContactId = "contactid";
public static String PostalCode = "PostalCode";
public static String Email = "Email";
public static String DateFormat = "dd/MM/yyyy HH:mm";
public static String Date = "dd/MM/yyyy";

// WebService MethodExecution
public static String executePost(String url) {

    System.out.println("Started to connect webservices...");

    HttpClient httpClient = new HttpClient();
    /*
     * Credentials credentials = new NTCredentials("myusername",
     * "mypassword", "myhost", "mydomain");
     * httpClient.getState().setCredentials(AuthScope.ANY, credentials);
     */

    System.out.println("Accessing webservices...");
    HttpMethodBase method = new GetMethod(url);

    try {
        int returnCode = httpClient.executeMethod(method);
        System.out.println("returnCode: " + returnCode);
        String response = method.getResponseBodyAsString();
        // response
        // ="{\"GetAllSourceWeightageResult\":[{\"ClientKey\":\"L4CTRsto\",\"Weightages\":[{\"Source\":\"DI\",\"Weight\":1},{\"Source\":\"ER\",\"Weight\":2},{\"Source\":\"IM\",\"Weight\":3},{\"Source\":\"CU\",\"Weight\":4}]}]}";
        System.out.println("Response: " + response);
        return response;
    } catch (Exception e) {
        e.printStackTrace();
    }
    return "";
}

}

Driver Class:

import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.json.JSONArray;
import org.json.JSONObject;

public class ClcDriver {

// Globally initialized in order to access outside the method
public static String client_key;
public static String folder_name;

public static void main(String[] args) throws Exception {

    // Arguments to be passed dynamically

    String dbType = args[0];
    String serviceUrl = args[1];
    String input_path = args[2];
    client_key = args[3];
    folder_name = args[4];

    Date date = new Date();

    String jobstarttime = String.format("Current Date/Time : %tc", date);

    System.out.println(jobstarttime);

    String url = serviceUrl + "/GetCLCConfiguration/?clientKey="
            + client_key;
    System.out.println("GetCLCConfiguration from Service");
    String responseText = ClcConstants.executePost(url);
    System.out.println("End GetCLCConfiguration from Service");

    // Convert the accessed string to JsonObject.
    JSONObject json_data = new JSONObject(responseText);

    // Converting into JsonArray in order to process in the loop
    JSONArray array = (JSONArray) json_data
            .get("GetCLCConfigurationResult");

    // If the argument passed as "amazonS3", the below method gets executed
    if (dbType.equals("amazonS3")) {
        amazonS3(array, input_path);
    }
}

// Passing the GetCLCConfigurationResults and input path of S3 Bucket
public static void amazonS3(JSONArray array, String input_path)
        throws Exception {

    for (int i = 0; i < array.length(); i++) {
        System.out.println("***********");

        JSONObject innerObj = (JSONObject) array.get(i);

        String clientName = innerObj.get("ClientKey").toString();
        String data = innerObj.get("Configurations").toString();

        System.out.println("Setting Configuration...");
        Configuration conf = new Configuration();
        System.out.println("Configuration done");

        System.out.println("Accessing s3bucket...");
        conf.set("Configurations", data);
        conf.set("ClientKey", clientName);

        conf.set("fs.s3n.awsAccessKeyId", "myaccesskey");
        conf.set("fs.s3n.awsSecretAccessKey",
                "mysecret access key");
        System.out.println("Accessed.");

        System.out.println("Setting Job...");
        Job job = Job.getInstance(conf, "JobName");
        System.out.println("Job assigned");

        job.setJarByClass(ClcDriver.class);
        job.setMapperClass(ClcMapper.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(input_path + client_key
                + "/" + folder_name + "/"));

        FileOutputFormat.setOutputPath(job, new Path(input_path + client_key
                + "/" + folder_name + "/" + "output" + "/"));

        if (!job.waitForCompletion(true))
            return;

        // Calculating Job Completed Time
        Date date = new Date();

        String jobcompletedtime = String
                .format("Current Date/Time : %tc", date);

        System.out.println(jobcompletedtime);
    }
}
}

Mapper Class:

public class ClcMapper extends Mapper<Object, Text, Text, Text> {

private static String bucketName = "BucketName";

private static String keyName = ClcDriver.client_key + "/"
        + ClcDriver.folder_name + "/";

public static AmazonS3 s3client;

public String jsonobjectconvertstring;

public InputStream writecontentins3;

int val;

// Called once at the beginning of the task
protected void setup(Context context) throws IOException,
        InterruptedException {
    System.out.println("Mapper Started");

    System.out.println("Accessing s3bucket once again...");
    s3client = new AmazonS3Client(new BasicAWSCredentials(
            "Myaccesskey",
            "mysecretaccesskey"));
    System.out.println("Accessed.");

    System.out.println("Setting Region...");
    Region region = Region.getRegion(Regions.US_WEST_2);
    s3client.setRegion(region);
    s3client.setEndpoint("s3-us-west-2.amazonaws.com");
    System.out.println("Region was successfully set.");

    // GetCLCConfiguration results from Driver class
    //Configuration conf = context.getConfiguration();
    //String data = conf.get("Configurations");

}

// Processing Mapper for each contact...
public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {

    boolean MemberOfAnyContactGroup = true;

    String line = value.toString();
    try {

        JSONObject contacts = new JSONObject(line);         

        // Updating the CLC field
        System.out.println(contacts.put("CLC", val).toString());

        context.write(new Text(contacts.toString()),new Text());

    } catch (Exception e) {
        System.out.println(e);
    }
}

相关问题