org.apache.kafka.common.utils.Utils类的使用及代码示例

x33g5p2x  于2022-02-01 转载在 其他  
字(9.4k)|赞(0)|评价(0)|浏览(177)

本文整理了Java中org.apache.kafka.common.utils.Utils类的一些代码示例,展示了Utils类的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Utils类的具体详情如下:
包路径:org.apache.kafka.common.utils.Utils
类名称:Utils

Utils介绍

暂无

代码示例

代码示例来源:origin: apache/kafka

/**
 * Compute the partition for the given record.
 *
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes serialized key to partition on (or null if no key)
 * @param value The value to partition on or null
 * @param valueBytes serialized value to partition on or null
 * @param cluster The current cluster metadata
 */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  int numPartitions = partitions.size();
  if (keyBytes == null) {
    int nextValue = nextValue(topic);
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (availablePartitions.size() > 0) {
      int part = Utils.toPositive(nextValue) % availablePartitions.size();
      return availablePartitions.get(part).partition();
    } else {
      // no partitions are available, give a non-available partition
      return Utils.toPositive(nextValue) % numPartitions;
    }
  } else {
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  }
}

代码示例来源:origin: apache/kafka

private void createClientCallbackHandler(Map<String, ?> configs) {
  @SuppressWarnings("unchecked")
  Class<? extends AuthenticateCallbackHandler> clazz = (Class<? extends AuthenticateCallbackHandler>) configs.get(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS);
  if (clazz == null)
    clazz = clientCallbackHandlerClass();
  AuthenticateCallbackHandler callbackHandler = Utils.newInstance(clazz);
  saslCallbackHandlers.put(clientSaslMechanism, callbackHandler);
}

代码示例来源:origin: apache/kafka

@Override
  public String toString() {
    return "(name=" + name + ", internal=" + internal + ", partitions=" +
      Utils.join(partitions, ",") + ")";
  }
}

代码示例来源:origin: apache/kafka

/**
 * Look up the class by name and instantiate it.
 * @param klass class name
 * @param base super class of the class to be instantiated
 * @param <T> the type of the base class
 * @return the new instance
 */
public static <T> T newInstance(String klass, Class<T> base) throws ClassNotFoundException {
  return Utils.newInstance(loadClass(klass, base));
}

代码示例来源:origin: apache/kafka

/**
 * Read a UTF8 string from a byte buffer at a given offset. Note that the position of the byte buffer
 * is not affected by this method.
 *
 * @param buffer The buffer to read from
 * @param offset The offset relative to the current position in the buffer
 * @param length The length of the string in bytes
 * @return The UTF8 string
 */
public static String utf8(ByteBuffer buffer, int offset, int length) {
  if (buffer.hasArray())
    return new String(buffer.array(), buffer.arrayOffset() + buffer.position() + offset, length, StandardCharsets.UTF_8);
  else
    return utf8(toArray(buffer, offset, length));
}

代码示例来源:origin: salesforce/mirus

public static void main(String[] argv) throws Exception {
 Mirus.Args args = new Mirus.Args();
 JCommander jCommander =
   JCommander.newBuilder()
     .programName(OffsetStatus.class.getSimpleName())
     .addObject(args)
     .build();
 try {
  jCommander.parse(argv);
 } catch (Exception e) {
  jCommander.usage();
  throw e;
 }
 if (args.help) {
  jCommander.usage();
  System.exit(1);
 }
 Map<String, String> workerProps =
   !args.workerPropertiesFile.isEmpty()
     ? Utils.propsToStringMap(Utils.loadProps(args.workerPropertiesFile))
     : Collections.emptyMap();
 applyOverrides(args.overrides, workerProps);
 startConnect(workerProps);
}

代码示例来源:origin: apache/kafka

/**
 * Read a byte array from its current position given the size in the buffer
 * @param buffer The buffer to read from
 * @param size The number of bytes to read into the array
 */
public static byte[] toArray(ByteBuffer buffer, int size) {
  return toArray(buffer, 0, size);
}

代码示例来源:origin: apache/incubator-pinot

@Override
public int getPartition(Object valueIn) {
 String value = (valueIn instanceof String) ? (String) valueIn : valueIn.toString();
 return (Utils.murmur2(StringUtil.encodeUtf8(value)) & 0x7fffffff) % _numPartitions;
}

代码示例来源:origin: apache/kafka

if (url != null && !url.isEmpty()) {
  try {
    String host = getHost(url);
    Integer port = getPort(url);
    if (host == null || port == null)
      throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);

代码示例来源:origin: linkedin/kafka-monitor

props.put(ProduceServiceConfig.PRODUCE_RECORD_SIZE_BYTE_CONFIG, res.getString("recordSize"));
if (res.getString("producerConfig") != null)
 props.put(ProduceServiceConfig.PRODUCER_PROPS_CONFIG, Utils.loadProps(res.getString("producerConfig")));
 props.put(ConsumeServiceConfig.CONSUMER_PROPS_CONFIG, Utils.loadProps(res.getString("consumerConfig")));
if (res.getString("consumerClassName") != null)
 props.put(ConsumeServiceConfig.CONSUMER_CLASS_CONFIG, res.getString("consumerClassName"));

代码示例来源:origin: apache/kafka

@Override
  public void run() {
    try {
      Utils.delete(file);
    } catch (IOException e) {
      log.error("Error deleting {}", file.getAbsolutePath(), e);
    }
  }
});

代码示例来源:origin: apache/kafka

public static String readFileAsString(String path) throws IOException {
  return Utils.readFileAsString(path, Charset.defaultCharset());
}

代码示例来源:origin: me.jeffshaw.kafka/kafka-clients

List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic());
if (availablePartitions.size() > 0) {
  int part = Utils.abs(nextValue) % availablePartitions.size();
  return availablePartitions.get(part).partition();
} else {
  return Utils.abs(nextValue) % numPartitions;
return Utils.abs(Utils.murmur2(record.key())) % numPartitions;

代码示例来源:origin: sixt/ja-micro

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  int numPartitions = partitions.size();
  if (keyBytes == null) {
    int nextValue = roundRobin.getAndIncrement();
    return Utils.toPositive(nextValue) % numPartitions;
  } else {
    // hash the keyBytes to choose a partition
    return Utils.toPositive(xxHasher.hash(keyBytes, 0, keyBytes.length, SEED)) % numPartitions;
  }
}

代码示例来源:origin: apache/kafka

@Test
public void testGetPort() {
  assertEquals(8000, getPort("127.0.0.1:8000").intValue());
  assertEquals(8080, getPort("mydomain.com:8080").intValue());
  assertEquals(8080, getPort("MyDomain.com:8080").intValue());
  assertEquals(1234, getPort("[::1]:1234").intValue());
  assertEquals(5678, getPort("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678").intValue());
  assertEquals(5678, getPort("[2001:DB8:85A3:8D3:1319:8A2E:370:7348]:5678").intValue());
  assertEquals(5678, getPort("[fe80::b1da:69ca:57f7:63d8%3]:5678").intValue());
}

代码示例来源:origin: apache/kafka

@Test
public void testGetHost() {
  assertEquals("127.0.0.1", getHost("127.0.0.1:8000"));
  assertEquals("mydomain.com", getHost("PLAINTEXT://mydomain.com:8080"));
  assertEquals("MyDomain.com", getHost("PLAINTEXT://MyDomain.com:8080"));
  assertEquals("My_Domain.com", getHost("PLAINTEXT://My_Domain.com:8080"));
  assertEquals("::1", getHost("[::1]:1234"));
  assertEquals("2001:db8:85a3:8d3:1319:8a2e:370:7348", getHost("PLAINTEXT://[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5678"));
  assertEquals("2001:DB8:85A3:8D3:1319:8A2E:370:7348", getHost("PLAINTEXT://[2001:DB8:85A3:8D3:1319:8A2E:370:7348]:5678"));
  assertEquals("fe80::b1da:69ca:57f7:63d8%3", getHost("PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:5678"));
}

代码示例来源:origin: salesforce/mirus

private static MirusOffsetTool newOffsetTool(Args args) throws IOException {
 // This needs to be the admin topic properties.
 // By default these are in the worker properties file, as this has the has admin producer and
 // consumer settings.  Separating these might be wise - also useful for storing state in
 // source cluster if it proves necessary.
 final Map<String, String> properties =
   !args.propertiesFile.isEmpty()
     ? Utils.propsToStringMap(Utils.loadProps(args.propertiesFile))
     : Collections.emptyMap();
 final DistributedConfig config = new DistributedConfig(properties);
 final KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
 offsetBackingStore.configure(config);
 // Avoid initializing the entire Kafka Connect plugin system by assuming the
 // internal.[key|value].converter is org.apache.kafka.connect.json.JsonConverter
 final Converter internalConverter = new JsonConverter();
 internalConverter.configure(config.originalsWithPrefix("internal.key.converter."), true);
 final OffsetSetter offsetSetter = new OffsetSetter(internalConverter, offsetBackingStore);
 final OffsetFetcher offsetFetcher = new OffsetFetcher(config, internalConverter);
 final OffsetSerDe offsetSerDe = OffsetSerDeFactory.create(args.format);
 return new MirusOffsetTool(args, offsetFetcher, offsetSetter, offsetSerDe);
}

代码示例来源:origin: apache/kafka

/**
 * Convert a ByteBuffer to a nullable array.
 * @param buffer The buffer to convert
 * @return The resulting array or null if the buffer is null
 */
public static byte[] toNullableArray(ByteBuffer buffer) {
  return buffer == null ? null : toArray(buffer);
}

代码示例来源:origin: linkedin/kafka-monitor

public int partition(String key, int partitionNum) {
 byte[] keyBytes = key.getBytes();
 return toPositive(murmur2(keyBytes)) % partitionNum;
}

代码示例来源:origin: org.apache.kafka/kafka-streams

private static HostInfo parseHostInfo(final String endPoint) {
  if (endPoint == null || endPoint.trim().isEmpty()) {
    return StreamsMetadataState.UNKNOWN_HOST;
  }
  final String host = getHost(endPoint);
  final Integer port = getPort(endPoint);
  if (host == null || port == null) {
    throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint));
  }
  return new HostInfo(host, port);
}

相关文章