java—将1gb数据加载到hbase需要1小时

vzgqcmou  于 2021-06-03  发布在  Hadoop
关注(0)|答案(3)|浏览(316)

我想把1gb(1000万条记录)的csv文件加载到hbase中。我为它写了Map缩小程序。我的代码工作正常,但需要1小时才能完成。最后的减速机需要半个多小时的时间。谁能帮我一下吗?
我的代码如下:
驱动程序.java

package com.cloudera.examples.hbase.bulkimport;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.KeyValue;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    /**
     * HBase bulk import example 
     * Data preparation MapReduce job driver
     * 
     * args[0]: HDFS input path * args[1]: HDFS output path * args[2]: HBase table name * 
     */
    public class Driver {
      public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        /*
         * NBA Final 2010 game 1 tip-off time (seconds from epoch) 
         * Thu, 03 Jun 2010 18:00:00 PDT
         */
      //  conf.setInt("epoch.seconds.tipoff", 1275613200);
        conf.set("hbase.table.name", args[2]);

        // Load hbase-site.xml 
        HBaseConfiguration.addHbaseResources(conf);

        Job job = new Job(conf, "HBase Bulk Import Example");
        job.setJarByClass(HBaseKVMapper.class);

        job.setMapperClass(HBaseKVMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(KeyValue.class);

        job.setInputFormatClass(TextInputFormat.class);

        HTable hTable = new HTable(conf, args[2]);

        // Auto configure partitioner and reducer
        HFileOutputFormat.configureIncrementalLoad(job, hTable);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);

        // Load generated HFiles into table
    //    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
    //    loader.doBulkLoad(new Path(args[1]), hTable);
      }
    }

hcolumnenum.java文件

package com.cloudera.examples.hbase.bulkimport;

    /**
     * HBase table columns for the 'srv' column family
     */
    public enum HColumnEnum {
      SRV_COL_employeeid ("employeeid".getBytes()),
      SRV_COL_eventdesc ("eventdesc".getBytes()),
      SRV_COL_eventdate ("eventdate".getBytes()),
      SRV_COL_objectname ("objectname".getBytes()),
      SRV_COL_objectfolder ("objectfolder".getBytes()),
      SRV_COL_ipaddress ("ipaddress".getBytes());

      private final byte[] columnName;

      HColumnEnum (byte[] column) {
        this.columnName = column;
      }

      public byte[] getColumnName() {
        return this.columnName;
      }
    }

hbasekvmapper.java文件
`package com.cloudera.examples.hbase.bulkimport;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import au.com.bytecode.opencsv.CSVParser;
/**

  • HBase bulk import example
  • Parses Facebook and Twitter messages from CSV files and outputs
  • <ImmutableBytesWritable, KeyValue>.
  • The ImmutableBytesWritable key is used by the TotalOrderPartitioner to map it
  • into the correct HBase table region.
  • The KeyValue value holds the HBase mutation information (column family,
  • column, and value)
    /
    public class HBaseKVMapper extends
    Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
    final static byte[] SRV_COL_FAM = "srv".getBytes();
    final static int NUM_FIELDS = 6;
    CSVParser csvParser = new CSVParser();
    int tipOffSeconds = 0;
    String tableName = "";
    // DateTimeFormatter p = DateTimeFormat.forPattern("MMM dd, yyyy HH:mm:ss")
    // .withLocale(Locale.US).withZone(DateTimeZone.forID("PST8PDT"));
    ImmutableBytesWritable hKey = new ImmutableBytesWritable();
    KeyValue kv;
    /{@inheritDoc} */
    @Override
    protected void setup(Context context) throws IOException,
    InterruptedException {
    Configuration c = context.getConfiguration();
    // tipOffSeconds = c.getInt("epoch.seconds.tipoff", 0);
    tableName = c.get("hbase.table.name");
    }
    /
    {@inheritDoc} /
    @Override
    protected void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
    /if (value.find("Service,Term,") > -1) {
    // Skip header
    return;
    }
    /
    String[] fields = null;
    try {
    fields = value.toString().split(",");
    //csvParser.parseLine(value.toString());
    } catch (Exception ex) {
    context.getCounter("HBaseKVMapper", "PARSE_ERRORS").increment(1);
    return;
    }
    if (fields.length != NUM_FIELDS) {
    context.getCounter("HBaseKVMapper", "INVALID_FIELD_LEN").increment(1);
    return;
    }
    // Get game offset in seconds from tip-off
    /
    DateTime dt = null;
    try {
    dt = p.parseDateTime(fields[9]);
    } catch (Exception ex) {
    context.getCounter("HBaseKVMapper", "INVALID_DATE").increment(1);
    return;
    }
    int gameOffset = (int) ((dt.getMillis() / 1000) - tipOffSeconds);
    String offsetForKey = String.format("%04d", gameOffset);
    String username = fields[2];
    if (username.equals("")) {
    username = fields[3];
    }
    /
    // Key: e.g. "1200:twitter:jrkinley"
    hKey.set(String.format("%s|%s|%s|%s|%s|%s", fields[0], fields[1], fields[2],fields[3],fields[4],fields[5])
    .getBytes());
    // Service columns
    if (!fields[0].equals("")) {
    kv = new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_employeeid.getColumnName(), fields[0].getBytes());
    context.write(hKey, kv);
    }
    if (!fields[1].equals("")) {
    kv = new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_eventdesc.getColumnName(), fields[1].getBytes());
    context.write(hKey, kv);
    }
    if (!fields[2].equals("")) {
    kv = new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_eventdate.getColumnName(), fields[2].getBytes());
    context.write(hKey, kv);
    }
    if (!fields[3].equals("")) {
    kv = new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_objectname.getColumnName(), fields[3].getBytes());
    context.write(hKey, kv);
    }
    if (!fields[4].equals("")) {
    kv = new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_objectfolder.getColumnName(), fields[4].getBytes());
    context.write(hKey, kv);
    }
    if (!fields[5].equals("")) {
    kv = new KeyValue(hKey.get(), SRV_COL_FAM,
    HColumnEnum.SRV_COL_ipaddress.getColumnName(), fields[5].getBytes());
    context.write(hKey, kv);
    }
    context.getCounter("HBaseKVMapper", "NUM_MSGS").increment(1);
    /*
    • Output number of messages per quarter and before/after game. This should
    • correspond to the number of messages per region in HBase
      /
      /
      if (gameOffset < 0) {
      context.getCounter("QStats", "BEFORE_GAME").increment(1);
      } else if (gameOffset < 900) {
      context.getCounter("QStats", "Q1").increment(1);
      } else if (gameOffset < 1800) {
      context.getCounter("QStats", "Q2").increment(1);
      } else if (gameOffset < 2700) {
      context.getCounter("QStats", "Q3").increment(1);
      } else if (gameOffset < 3600) {
      context.getCounter("QStats", "Q4").increment(1);
      } else {
      context.getCounter("QStats", "AFTER_GAME").increment(1);
      }*/
      }
      }`
      请帮助我提高性能,或请让我知道,如果你有任何替代方案与样本代码。
      我的mapred-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>

<property>
  <name>mapred.job.tracker</name>
    <value>namenode:54311</value>
    </property>

<property>
  <name>mapred.reduce.parallel.copies</name>
    <value>20</value>
    </property>

<property>
  <name>tasktracker.http.threads</name>
    <value>50</value>
    </property>

<property>
  <name>mapred.job.shuffle.input.buffer.percent</name>
    <value>0.70</value>
    </property>

<property>
  <name>mapred.tasktracker.map.tasks.maximum</name>
    <value>4</value>
    </property>

<property>
  <name>mapred.tasktracker.reduce.tasks.maximum</name>
    <value>4</value>
    </property>

<property>
  <name>mapred.map.tasks</name>
    <value>4</value>
    </property>

<property>
  <name>reduce.map.tasks</name>
    <value>4</value>
    </property>

<property>
  <name>mapred.job.shuffle.merge.percent</name>
    <value>0.65</value>
    </property>

<property>
  <name>mapred.task.timeout</name>
    <value>1200000</value>
    </property>

<property>
    <name>mapred.child.java.opts</name>
        <value>-Xms1024M -Xmx2048M</value>
        </property>

<property>
  <name>mapred.job.reuse.jvm.num.tasks</name>
    <value>-1</value>
    </property>

<property>
    <name>mapred.compress.map.output</name>
    <value>true</value>
</property>

<property>
    <name>mapred.map.output.compression.codec</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

<property>
    <name>io.sort.mb</name>
    <value>800</value>
</property>

<property>
  <name>mapred.child.ulimit</name>
    <value>unlimited</value>
    </property>

<property>
<name>io.sort.factor</name>
<value>100</value>
<description>More streams merged at once while sorting files.</description>
</property>  

 <property>
 <name>mapreduce.admin.map.child.java.opts</name>
 <value>-Djava.net.preferIPv4Stack=true</value>
 </property>
 <property>
 <name>mapreduce.admin.reduce.child.java.opts</name>
 <value>-Djava.net.preferIPv4Stack=true</value>
 </property>

<property>
   <name>mapred.min.split.size</name>
   <value>0</value>
</property>

<property>
   <name>mapred.job.map.memory.mb</name>
     <value>-1</value>
     </property>

<property>
   <name>mapred.jobtracker.maxtasks.per.job</name>
        <value>-1</value>
             </property>

</configuration>

hbase-site.xml文件

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
    <name>hbase.rootdir</name>
    <value>hdfs://namenode:54310/hbase</value>
    <description>The directory shared by RegionServers.
    </description>
</property>

<property>
    <name>hbase.master</name>
    <value>slave:60000</value>
    <description>The host and port that the HBase master runs at.
    A value of 'local' runs the master and a regionserver
    in a single process.
    </description>
</property>

<property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
    <description>The mode the cluster will be in. Possible values are
    false: standalone and pseudo-distributed setups with managed Zookeeper
    true: fully-distributed with unmanaged Zookeeper Quorum (see hbase-env.sh)
    </description>
</property>

<property>
    <name>hbase.zookeeper.quorum</name>
    <value>slave</value>
    <description>Comma separated list of servers in the ZooKeeper Quorum.
    For example, "host1.mydomain.com,host2.mydomain.com,host3.mydomain.com".
    By default this is set to localhost for local and pseudo-distributed modes
    of operation. For a fully-distributed setup, this should be set to a full
    list of ZooKeeper quorum servers. If HBASE_MANAGES_ZK is set in hbase-env.sh
    this is the list of servers which we will start/stop ZooKeeper on.
    </description>
</property>

<property>
       <name>hbase.zookeeper.property.clientPort</name>
       <value>2181</value>
</property>

<property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/home/hduser/work/zoo_data</value>
    <description>Property from ZooKeeper's config zoo.cfg.
    The directory where the snapshot is stored.
    </description>
</property>

</configuration>

请帮帮我,这样我才能提高我的表现。

lskq00tm

lskq00tm1#

首先,为什么我们需要mapreduce程序来为这么小的文件(1gb)加载数据到hbase。
以我的经验,我使用jackson流处理了5gb的json(我不想把所有的json都放到内存中),并使用批处理技术在hbase中以8分钟的时间持久化。
我用hbase把10万个对象放入批量列表记录。
下面是我实现这一点的代码片段。在解析其他格式时也可以做同样的事情)
可能你需要在两个地方调用这个方法
1) 一批10万条记录。
2) 用于处理批量记录小于100000的提醒

public void addRecord(final ArrayList<Put> puts, final String tableName) throws Exception {
        try {
            final HTable table = new HTable(HBaseConnection.getHBaseConfiguration(), getTable(tableName));
            table.put(puts);
            LOG.info("INSERT record[s] " + puts.size() + " to table " + tableName + " OK.");
        } catch (final Throwable e) {
            e.printStackTrace();
        } finally {
            LOG.info("Processed ---> " + puts.size());
            if (puts != null) {
                puts.clear();
            }
        }
    }
csbfibhn

csbfibhn2#

我只创建了mapper类和take hbase output format类。现在它需要10分钟。我的网络速度非常慢,这就是为什么它需要很长时间。

o7jaxewo

o7jaxewo3#

在创建hbase表时,可以通过指定要使用的区域拆分的数目来进一步微调它。由于批量装载的减速器示例数量也将取决于区域数量。这可以通过以下命令完成

hbase org.apache.hadoop.hbase.util.RegionSplitter -c <number of regions> -f <column families> <New Hbase Table Name> <splitAlgorithm>

对于分割算法,可以指定 UniformSplit -将键视为任意字节 HexStringSplit -将键视为十六进制ascii

相关问题