本文整理了Java中org.apache.kafka.common.utils.Utils.murmur2()
方法的一些代码示例,展示了Utils.murmur2()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。Utils.murmur2()
方法的具体详情如下:
包路径:org.apache.kafka.common.utils.Utils
类名称:Utils
方法名:murmur2
[英]Generates 32 bit murmur2 hash from byte array
[中]从字节数组生成32位2哈希
代码示例来源:origin: apache/incubator-pinot
@Override
public int getPartition(Object valueIn) {
String value = (valueIn instanceof String) ? (String) valueIn : valueIn.toString();
return (Utils.murmur2(StringUtil.encodeUtf8(value)) & 0x7fffffff) % _numPartitions;
}
代码示例来源:origin: linkedin/kafka-monitor
public int partition(String key, int partitionNum) {
byte[] keyBytes = key.getBytes();
return toPositive(murmur2(keyBytes)) % partitionNum;
}
代码示例来源:origin: apache/kafka
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
代码示例来源:origin: vakinge/jeesuite-libs
public DefaultMessage partitionFactor(Serializable partitionFactor) {
if(partitionFactor != null){
partitionHash = Utils.murmur2(partitionFactor.toString().getBytes());
}
return this;
}
代码示例来源:origin: simplesteph/medium-blog-kafka-udemy
private boolean isValidReview(Review review) {
try {
int hash = Utils.toPositive(Utils.murmur2(review.toByteBuffer().array()));
return (hash % 100) >= 5; // 95 % of the reviews will be valid reviews
} catch (IOException e) {
return false;
}
}
代码示例来源:origin: rayokota/kafka-graphs
private static <K> int vertexToPartition(K vertex, Serializer<K> serializer, int numPartitions) {
// TODO make configurable, currently this is tied to DefaultStreamPartitioner
byte[] keyBytes = serializer.serialize(null, vertex);
int partition = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
return partition;
}
代码示例来源:origin: Nepxion/Thunder
@SuppressWarnings("resource")
private int getPartitionIndex(Consumer<String, byte[]> consumer, String topic, String key) {
int partitionNumber = consumer.partitionsFor(topic).size();
StringSerializer keySerializer = new StringSerializer();
byte[] serializedKey = keySerializer.serialize(topic, key);
int positive = Utils.murmur2(serializedKey) & 0x7fffffff;
return positive % partitionNumber;
}
}
代码示例来源:origin: org.apache.kafka/kafka-streams
/**
* WindowedStreamPartitioner determines the partition number for a record with the given windowed key and value
* and the current number of partitions. The partition number id determined by the original key of the windowed key
* using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
*
* @param topic the topic name this record is sent to
* @param windowedKey the key of the record
* @param value the value of the record
* @param numPartitions the total number of partitions
* @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
*/
@Override
public Integer partition(final String topic, final Windowed<K> windowedKey, final V value, final int numPartitions) {
final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey);
// hash the keyBytes to choose a partition
return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
代码示例来源:origin: vakinge/jeesuite-libs
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.availablePartitionsForTopic(topic);
int numPartitions = partitions.size();
try {
long partitionHash = ((DefaultMessage)value).partitionHash();
//按hash分区
if(partitionHash > 0){
long index = partitionHash % numPartitions;
//System.out.println("numPartitions:"+numPartitions+",partitionHash:"+partitionHash + ",index:"+index);
return (int)index;
}
} catch (ClassCastException e) {}
if (keyBytes == null) {
int nextValue = counter.getAndIncrement();
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return DefaultPartitioner.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
代码示例来源:origin: me.jeffshaw.kafka/kafka-clients
return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
内容来源于网络,如有侵权,请联系作者删除!