文本数据未使用hadoop连接器正确写入marklogic数据库

lh80um4z  于 2021-05-29  发布在  Hadoop
关注(0)|答案(0)|浏览(240)

我正在读取一个示例csv数据,然后使用hadoop连接器api将其作为文本写入marklogic数据库。问题是,只有部分数据被随机写入数据库。
例如,假设我存储了10条记录,那么marklogic数据库应该有10次插入。我得到的是,只有少数记录被写,多次,随机。有人能解释为什么会这样吗?
以下是Map程序代码:

public static class CSVMapper extends Mapper<LongWritable, Text, DocumentURI, Text> {
    static int i = 1;
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        ObjectMapper mapper = new ObjectMapper();
         String line = value.toString();      //line contains one line of your csv file.
         System.out.println("line value is - "+line);

           String[] singleData = line.split("\n");
            for(String lineData : singleData)
            {
                String[] fields = lineData.split(",");
                Sample sd = new Sample(fields[0], fields[1], fields[2].trim(), fields[3]);

                String jsonInString = mapper.writeValueAsString(sd);
                Text txt = new Text();
                 txt.set(jsonInString);
                //do your processing here
                System.out.println("line Data is    - "+line);
                System.out.println("jsonInString is -  "+jsonInString);
                final DocumentURI outputURI1 = new DocumentURI("HadoopMarklogicNPPES-"+i+".json");
                i++;

                context.write(outputURI1,txt);                      
            }   
    }
}

主要方法如下:

Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    Job job = Job.getInstance(conf, "Hadoop Marklogic MarklogicHadoopCSVDataDump");
    job.setJarByClass(MarklogicHadoopCSVDataDump.class);

    // Map related configuration
    job.setMapperClass(CSVMapper.class);

    job.setMapOutputKeyClass(DocumentURI.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputFormatClass(ContentOutputFormat.class); 
    ContentInputFormatTest.setInputPaths(job, new Path("/marklogic/sampleData.csv"));
    conf = job.getConfiguration();
    conf.addResource("hadoopMarklogic.xml");        

    try {
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    } catch (ClassNotFoundException | InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

以下是示例csv数据-

"Complaint ID "," Product "," Sub-product "," Issue 
"1350210 "," Bank account or service "," Other bank product/service "," Account opening  closing  or management "
"1348006 "," Debt collection "," Other (phone  health club  etc.) "," Improper contact or sharing of info "
"1351347 "," Bank account or service "," Checking account "," Problems caused by my funds being low"
"1347916 "," Debt collection "," Payday loan "," Communication tactics"
"1348296 "," Credit card ","  "," Identity theft / Fraud / Embezzlement"
"1348136 "," Money transfers "," International money transfer "," Money was not available when promised"

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题