com.alibaba.datax.common.util.Configuration.getLong()方法的使用及代码示例

x33g5p2x  于2022-01-18 转载在 其他  
字(13.2k)|赞(0)|评价(0)|浏览(144)

本文整理了Java中com.alibaba.datax.common.util.Configuration.getLong()方法的一些代码示例,展示了Configuration.getLong()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Configuration.getLong()方法的具体详情如下:
包路径:com.alibaba.datax.common.util.Configuration
类名称:Configuration
方法名:getLong

Configuration.getLong介绍

[英]根据用户提供的json path,寻址Long对象
[中]根据用户提供的json路径寻址长的对象

代码示例

代码示例来源:origin: ECNU-1X/DataX-Masking

/**
 * 根据用户提供的json path,寻址Long对象,如果对象不存在,返回默认Long对象
 * 
 * @return Long对象,如果path不存在或者Integer不存在,返回默认Long对象
 */
public Long getLong(final String path, long defaultValue) {
  Long result = this.getLong(path);
  if (null == result) {
    return defaultValue;
  }
  return result;
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public AbstractContainerCommunicator(Configuration configuration) {
  this.configuration = configuration;
  this.jobId = configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public ErrorRecordChecker(Configuration configuration) {
  this(configuration.getLong(CoreConstant.DATAX_JOB_SETTING_ERRORLIMIT_RECORD),
      configuration.getDouble(CoreConstant.DATAX_JOB_SETTING_ERRORLIMIT_PERCENT));
}

代码示例来源:origin: ECNU-1X/DataX-Masking

long byteSpeed = configuration.getLong(
    CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_BYTE, 1024 * 1024);
long recordSpeed = configuration.getLong(
    CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_SPEED_RECORD, 10000);
this.byteSpeed = byteSpeed;
this.recordSpeed = recordSpeed;
this.flowControlInterval = configuration.getLong(
    CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_FLOWCONTROLINTERVAL, 1000);

代码示例来源:origin: ECNU-1X/DataX-Masking

@Override
public void init() {
  this.writerSliceConfig = getPluginJobConf();
  this.print = this.writerSliceConfig.getBool(Key.PRINT, true);
  this.recordNumBeforSleep = this.writerSliceConfig.getLong(Key.RECORD_NUM_BEFORE_SLEEP, 0);
  this.sleepTime = this.writerSliceConfig.getLong(Key.SLEEP_TIME, 0);
  String servers = this.writerSliceConfig.getString(Key.SERVERS);
  Properties props = new Properties();
  props.put("bootstrap.servers", servers);
  props.put("acks", "all");
  props.put("retries", 0);
  props.put("batch.size", 16384);
  props.put("linger.ms", 1);
  props.put("buffer.memory", 33554432);
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  this.producer = new KafkaProducer<>(props);
  if(recordNumBeforSleep < 0) {
    throw DataXException.asDataXException(StreamKafkaWriterErrorCode.CONFIG_INVALID_EXCEPTION, "recordNumber 不能为负值");
  }
  if(sleepTime <0) {
    throw DataXException.asDataXException(StreamKafkaWriterErrorCode.CONFIG_INVALID_EXCEPTION, "sleep 不能为负值");
  }
}

代码示例来源:origin: ECNU-1X/DataX-Masking

@Override
public void init() {
  this.readerSliceConfig = super.getPluginJobConf();
  this.columns = this.readerSliceConfig.getList(Key.COLUMN,
      String.class);
  this.sliceRecordCount = this.readerSliceConfig
      .getLong(Key.SLICE_RECORD_COUNT);
  this.haveMixupFunction = this.readerSliceConfig.getBool(
      Constant.HAVE_MIXUP_FUNCTION, false);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

private void preCheckInit() {
  this.jobId = this.configuration.getLong(
      CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);
  if (this.jobId < 0) {
    LOG.info("Set jobId = 0");
    this.jobId = 0;
    this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
        this.jobId);
  }
  Thread.currentThread().setName("job-" + this.jobId);
  JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
      this.getContainerCommunicator());
  this.jobReader = this.preCheckReaderInit(jobPluginCollector);
  this.jobWriter = this.preCheckWriterInit(jobPluginCollector);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

@Override
public void init() {
  this.originalConfig = super.getPluginJobConf();
  // warn: 忽略大小写
  this.mixupFunctionPattern = Pattern.compile(Constant.MIXUP_FUNCTION_PATTERN, Pattern.CASE_INSENSITIVE);
  dealColumn(this.originalConfig);
  Long sliceRecordCount = this.originalConfig
      .getLong(Key.SLICE_RECORD_COUNT);
  if (null == sliceRecordCount) {
    throw DataXException.asDataXException(StreamReaderErrorCode.REQUIRED_VALUE,
        "没有设置参数[sliceRecordCount].");
  } else if (sliceRecordCount < 1) {
    throw DataXException.asDataXException(StreamReaderErrorCode.ILLEGAL_VALUE,
        "参数[sliceRecordCount]不能小于1.");
  }
}

代码示例来源:origin: ECNU-1X/DataX-Masking

/**
 * reader和writer的初始化
 */
private void init() {
  this.jobId = this.configuration.getLong(
      CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);
  if (this.jobId < 0) {
    LOG.info("Set jobId = 0");
    this.jobId = 0;
    this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
        this.jobId);
  }
  Thread.currentThread().setName("job-" + this.jobId);
  JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
      this.getContainerCommunicator());
  //必须先Reader ,后Writer
  this.jobReader = this.initJobReader(jobPluginCollector);
  this.jobWriter = this.initJobWriter(jobPluginCollector);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static String prepareJdbcUrl(Configuration conf) {
  String adsURL = conf.getString(Key.ADS_URL);
  String schema = conf.getString(Key.SCHEMA);
  Long socketTimeout = conf.getLong(Key.SOCKET_TIMEOUT,
      Constant.DEFAULT_SOCKET_TIMEOUT);
  String suffix = conf.getString(Key.JDBC_URL_SUFFIX, "");
  return AdsUtil.prepareJdbcUrl(adsURL, schema, socketTimeout, suffix);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public StandAloneJobContainerCommunicator(Configuration configuration) {
  super(configuration);
  super.setCollector(new ProcessInnerCollector(configuration.getLong(
      CoreConstant.DATAX_CORE_CONTAINER_JOB_ID)));
  super.setReporter(new ProcessInnerReporter());
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public TaskGroupContainer(Configuration configuration) {
  super(configuration);
  initCommunicator(configuration);
  this.jobId = this.configuration.getLong(
      CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
  this.taskGroupId = this.configuration.getInt(
      CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
  this.channelClazz = this.configuration.getString(
      CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CLASS);
  this.taskCollectorClass = this.configuration.getString(
      CoreConstant.DATAX_CORE_STATISTICS_COLLECTOR_PLUGIN_TASKCLASS);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

@Override
public void init() {
  this.writerSliceConfig = getPluginJobConf();
  this.fieldDelimiter = this.writerSliceConfig.getString(
      Key.FIELD_DELIMITER, "\t");
  this.print = this.writerSliceConfig.getBool(Key.PRINT, true);
  this.path = this.writerSliceConfig.getString(Key.PATH, null);
  this.fileName = this.writerSliceConfig.getString(Key.FILE_NAME, null);
  this.recordNumBeforSleep = this.writerSliceConfig.getLong(Key.RECORD_NUM_BEFORE_SLEEP, 0);
  this.sleepTime = this.writerSliceConfig.getLong(Key.SLEEP_TIME, 0);
  if(recordNumBeforSleep < 0) {
    throw DataXException.asDataXException(StreamWriterErrorCode.CONFIG_INVALID_EXCEPTION, "recordNumber 不能为负值");
  }
  if(sleepTime <0) {
    throw DataXException.asDataXException(StreamWriterErrorCode.CONFIG_INVALID_EXCEPTION, "sleep 不能为负值");
  }
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static void validateParameter(com.alibaba.datax.common.util.Configuration originalConfig) {
  originalConfig.getNecessaryValue(Key.HBASE_CONFIG, Hbase11xWriterErrorCode.REQUIRED_VALUE);
  originalConfig.getNecessaryValue(Key.TABLE, Hbase11xWriterErrorCode.REQUIRED_VALUE);
  Hbase11xHelper.validateMode(originalConfig);
  String encoding = originalConfig.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
  if (!Charset.isSupported(encoding)) {
    throw DataXException.asDataXException(Hbase11xWriterErrorCode.ILLEGAL_VALUE, String.format("Hbasewriter 不支持您所配置的编码:[%s]", encoding));
  }
  originalConfig.set(Key.ENCODING, encoding);
  Boolean walFlag = originalConfig.getBool(Key.WAL_FLAG, false);
  originalConfig.set(Key.WAL_FLAG, walFlag);
  long writeBufferSize = originalConfig.getLong(Key.WRITE_BUFFER_SIZE,Constant.DEFAULT_WRITE_BUFFER_SIZE);
  originalConfig.set(Key.WRITE_BUFFER_SIZE, writeBufferSize);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static AdsHelper createAdsHelper(Configuration originalConfig){
  //Get adsUrl,userName,password,schema等参数,创建AdsHelp实例
  String adsUrl = originalConfig.getString(Key.ADS_URL);
  String userName = originalConfig.getString(Key.USERNAME);
  String password = originalConfig.getString(Key.PASSWORD);
  String schema = originalConfig.getString(Key.SCHEMA);
  Long socketTimeout = originalConfig.getLong(Key.SOCKET_TIMEOUT, Constant.DEFAULT_SOCKET_TIMEOUT);
  String suffix = originalConfig.getString(Key.JDBC_URL_SUFFIX, "");
  return new AdsHelper(adsUrl,userName,password,schema,socketTimeout,suffix);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static AdsHelper createAdsHelperWithOdpsAccount(Configuration originalConfig) {
  String adsUrl = originalConfig.getString(Key.ADS_URL);
  String userName = originalConfig.getString(TransferProjectConf.KEY_ACCESS_ID);
  String password = originalConfig.getString(TransferProjectConf.KEY_ACCESS_KEY);
  String schema = originalConfig.getString(Key.SCHEMA);
  Long socketTimeout = originalConfig.getLong(Key.SOCKET_TIMEOUT, Constant.DEFAULT_SOCKET_TIMEOUT);
  String suffix = originalConfig.getString(Key.JDBC_URL_SUFFIX, "");
  return new AdsHelper(adsUrl, userName, password, schema,socketTimeout,suffix);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static void validateParameter(com.alibaba.datax.common.util.Configuration originalConfig) {
  originalConfig.getNecessaryValue(Key.HBASE_CONFIG, Hbase094xWriterErrorCode.REQUIRED_VALUE);
  originalConfig.getNecessaryValue(Key.TABLE, Hbase094xWriterErrorCode.REQUIRED_VALUE);
  Hbase094xHelper.validateMode(originalConfig);
  String encoding = originalConfig.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
  if (!Charset.isSupported(encoding)) {
    throw DataXException.asDataXException(Hbase094xWriterErrorCode.ILLEGAL_VALUE, String.format("Hbasewriter 不支持您所配置的编码:[%s]", encoding));
  }
  originalConfig.set(Key.ENCODING, encoding);
  Boolean autoFlush = originalConfig.getBool(Key.AUTO_FLUSH, false);
  //本期设置autoflush 一定为flase,通过hbase writeBufferSize来控制每次flush大小
  originalConfig.set(Key.AUTO_FLUSH,false);
  Boolean walFlag = originalConfig.getBool(Key.WAL_FLAG, false);
  originalConfig.set(Key.WAL_FLAG, walFlag);
  long writeBufferSize = originalConfig.getLong(Key.WRITE_BUFFER_SIZE,Constant.DEFAULT_WRITE_BUFFER_SIZE);
  originalConfig.set(Key.WRITE_BUFFER_SIZE, writeBufferSize);
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static Table getTable(com.alibaba.datax.common.util.Configuration configuration){
  String hbaseConfig = configuration.getString(Key.HBASE_CONFIG);
  String userTable = configuration.getString(Key.TABLE);
  long writeBufferSize = configuration.getLong(Key.WRITE_BUFFER_SIZE, Constant.DEFAULT_WRITE_BUFFER_SIZE);
  org.apache.hadoop.hbase.client.Connection hConnection = Hbase11xHelper.getHbaseConnection(hbaseConfig);
  TableName hTableName = TableName.valueOf(userTable);
  org.apache.hadoop.hbase.client.Admin admin = null;
  org.apache.hadoop.hbase.client.Table hTable = null;
  try {
    admin = hConnection.getAdmin();
    Hbase11xHelper.checkHbaseTable(admin,hTableName);
    hTable = hConnection.getTable(hTableName);
    BufferedMutatorParams bufferedMutatorParams =  new BufferedMutatorParams(hTableName);
    bufferedMutatorParams.writeBufferSize(writeBufferSize);
  } catch (Exception e) {
    Hbase11xHelper.closeTable(hTable);
    Hbase11xHelper.closeAdmin(admin);
    Hbase11xHelper.closeConnection(hConnection);
    throw DataXException.asDataXException(Hbase11xWriterErrorCode.GET_HBASE_TABLE_ERROR, e);
  }
  return hTable;
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static BufferedMutator getBufferedMutator(com.alibaba.datax.common.util.Configuration configuration){
  String hbaseConfig = configuration.getString(Key.HBASE_CONFIG);
  String userTable = configuration.getString(Key.TABLE);
  long writeBufferSize = configuration.getLong(Key.WRITE_BUFFER_SIZE, Constant.DEFAULT_WRITE_BUFFER_SIZE);
  org.apache.hadoop.conf.Configuration hConfiguration = Hbase11xHelper.getHbaseConfiguration(hbaseConfig);
  org.apache.hadoop.hbase.client.Connection hConnection = Hbase11xHelper.getHbaseConnection(hbaseConfig);
  TableName hTableName = TableName.valueOf(userTable);
  org.apache.hadoop.hbase.client.Admin admin = null;
  BufferedMutator bufferedMutator = null;
  try {
    admin = hConnection.getAdmin();
    Hbase11xHelper.checkHbaseTable(admin,hTableName);
    //参考HTable getBufferedMutator()
    bufferedMutator = hConnection.getBufferedMutator(
        new BufferedMutatorParams(hTableName)
        .pool(HTable.getDefaultExecutor(hConfiguration))
        .writeBufferSize(writeBufferSize));
  } catch (Exception e) {
    Hbase11xHelper.closeBufferedMutator(bufferedMutator);
    Hbase11xHelper.closeAdmin(admin);
    Hbase11xHelper.closeConnection(hConnection);
    throw DataXException.asDataXException(Hbase11xWriterErrorCode.GET_HBASE_BUFFEREDMUTATOR_ERROR, e);
  }
  return bufferedMutator;
}

代码示例来源:origin: ECNU-1X/DataX-Masking

public static HTable getTable(com.alibaba.datax.common.util.Configuration configuration){
  String hbaseConfig = configuration.getString(Key.HBASE_CONFIG);
  String userTable = configuration.getString(Key.TABLE);
  org.apache.hadoop.conf.Configuration hConfiguration = Hbase094xHelper.getHbaseConfiguration(hbaseConfig);
  Boolean autoFlush = configuration.getBool(Key.AUTO_FLUSH, false);
  long writeBufferSize = configuration.getLong(Key.WRITE_BUFFER_SIZE, Constant.DEFAULT_WRITE_BUFFER_SIZE);
  HTable htable = null;
  HBaseAdmin admin = null;
  try {
    htable = new HTable(hConfiguration, userTable);
    admin = new HBaseAdmin(hConfiguration);
    Hbase094xHelper.checkHbaseTable(admin,htable);
    //本期设置autoflush 一定为flase,通过hbase writeBufferSize来控制每次flush大小
    htable.setAutoFlush(false);
    htable.setWriteBufferSize(writeBufferSize);
    return htable;
  } catch (Exception e) {
    Hbase094xHelper.closeTable(htable);
    throw DataXException.asDataXException(Hbase094xWriterErrorCode.GET_HBASE_TABLE_ERROR, e);
  }finally {
    Hbase094xHelper.closeAdmin(admin);
  }
}

相关文章