把几个事件类型放在同一个Kafka主题中有什么更好的方法?

biswetbf  于 2021-06-04  发布在  Kafka
关注(0)|答案(4)|浏览(384)

假设有两种类型t1&t2和一个主题t。t1和t2都必须进入主题t(出于某种原因)。有什么方法可以做到这一点?哪一个更好?
一种方法是利用继承,我们可以定义一个基类,然后子类可以扩展它。在我们的例子中,我们可以定义一个基类tb,然后t1和t2可以扩展tb。
基类(tb)

package poc.kafka.domain;

    import java.io.Externalizable;
    import java.io.IOException;
    import java.io.ObjectInput;
    import java.io.ObjectOutput;

    import lombok.AllArgsConstructor;
    import lombok.NoArgsConstructor;
    import lombok.ToString;
    import lombok.extern.java.Log;

    @ToString
    @AllArgsConstructor
    @NoArgsConstructor
    @Log
    public class Animal implements Externalizable {
        public String name;

        public void whoAmI() {
            log.info("I am an Animal");
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            name = (String) in.readObject();
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            out.writeObject(name);
        }
    }

派生类(t1)

package poc.kafka.domain;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.java.Log;

@Log
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Cat extends Animal implements Externalizable {
    private int legs;

    public void whoAmI() {
        log.info("I am a Cat");
    }

    @Override
    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
        super.readExternal(in);
        legs = in.readInt();
    }

    @Override
    public void writeExternal(ObjectOutput out) throws IOException {
        super.writeExternal(out);
        out.writeInt(legs);
    }
}

派生类(t2)

package poc.kafka.domain;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.java.Log;

@Log
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Dog extends Animal implements Externalizable {
    private int legs;

    public void whoAmI() {
        log.info("I am a Dog");
    }

    @Override
    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
        super.readExternal(in);
        legs = in.readInt();
    }

    @Override
    public void writeExternal(ObjectOutput out) throws IOException {
        super.writeExternal(out);
        out.writeInt(legs);
    }
}

反序列化程序

package poc.kafka.domain.serialization;

import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Deserializer;

import poc.kafka.domain.Animal;

public class AnimalDeserializer implements Deserializer<Animal> {

    @Override
    public Animal deserialize(String topic, byte[] data) {
        return SerializationUtils.deserialize(data);
    }

}

序列化程序

package poc.kafka.domain.serialization;

import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Serializer;

import poc.kafka.domain.Animal;

public class AnimalSerializer implements Serializer<Animal> {

    @Override
    public byte[] serialize(String topic, Animal data) {
        return SerializationUtils.serialize(data);
    }

}

然后我们可以发送t1和t2如下

IntStream.iterate(0, i -> i + 1).limit(10).forEach(i -> {
            if (i % 2 == 0)
                producer.send(new ProducerRecord<Integer, Animal>("T", i, new Dog(i)));
            else
                producer.send(new ProducerRecord<Integer, Animal>("gs3", i, new Cat(i)));
        });
c0vxltue

c0vxltue1#

最好的方法是创建自定义分区。
通过partitionkey将每条消息生成到不同的分区
这是默认实现,您需要实现分区逻辑。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        } 
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

查看本教程了解更多示例。
这是Kafka关于何时选择服装分区的权威指南中的一段。
到目前为止,在实现自定义分区策略时,我们已经讨论了最常用的默认分区器的特性。然而,kafka并没有限制您只对分区进行散列,有时也有很好的理由对数据进行不同的分区。例如,假设您是一家b2b供应商,您最大的客户是一家生产称为“香蕉”的手持设备的公司。假设您与客户“香蕉”有如此多的业务往来,以至于您每天超过10%的交易都是与该客户进行的。如果您使用默认的散列分区,香蕉记录将被分配到与其他帐户相同的分区,从而导致一个分区的大小大约是其余分区的两倍。这可能会导致服务器空间不足、处理速度减慢等。我们真正想要的是为banana提供自己的分区,然后使用哈希分区将其余帐户Map到分区。

ntjbwcob

ntjbwcob2#

这也许不是对问题的直接回答,而是重新考虑某些方面的建议,也许可以解决原来的问题。
首先,尽管kafka能够支持任何数据格式,但对于可序列化的二进制格式,我建议使用 Apache Avro ,而不是序列化的java对象。
使用avro,您将获得紧凑的二进制、与语言无关的数据类型和广泛的工具集的所有好处。例如,有cli工具可以读取avro中包含内容的kafka主题,但我不知道有哪一个工具能够反序列化那里的java对象。
你可以在这里读到关于avro本身的资料
关于为什么要使用avro的一些好的见解也可以在这个问题中找到
第二。您的问题标题是关于事件类型的,但判断描述可能意味着“如何通过单个kafka主题处理不同的数据类型”。如果事件之间的区别只是事件类型(例如,单击、提交、登录、注销等等),那么您可以保留 enum 字段,否则使用泛型容器对象。
如果这些事件应该承载的数据有效负载的结构存在差异,那么,同样,使用avro,您可以用 Union 类型。
最后,如果数据差异如此之大,以至于这些事件基本上都是不同的数据结构,没有什么显著的共同点,那么就用不同的Kafka主题。
尽管能够在同一主题中使用不同的分区来发送不同的数据类型,但这实际上只会在将来引起维护方面的麻烦,并且会限制扩展,正如在这里的其他响应中正确指出的那样。所以在这种情况下,如果有一个选择去与不同的主题-最好这样做。

cetgtptt

cetgtptt3#

如果没有继承的概念,例如数据不是

Animal -> Cat
Animal -> Dog

另一种方法是使用 Package 器。

public class DataWrapper
{
private Object data;
private EventType type;
       // getter and setters omitted for the sake of brevity
}

将所有事件放在 Package 器对象中,并用它们的属性区分每个事件 EventType 这可能是一个 enum 例如。
然后你可以用正常的方式序列化它(就像你在问题中发布的那样),在反序列化的同时你可以检查 EventType 然后基于 EventType 此外,为了确保您的datawrapper不会 Package 所有类型的数据,即应该仅用于特定类型的数据,那么您可以使用 Marker 接口,并使所有类的对象推送到主题以实现此接口。
例如,

interface MyCategory {
}

然后你的自定义类可以有,

class MyEvent implements MyCategory {
}

而在 DataWrapper 你可以。。

public class DataWrapper<T extends MyCategory> {
private T data;
private EventType type;
            // getters and setters omitted for the sake of brevity
}
5m1hhzi4

5m1hhzi44#

最简单的方法是使用您的自定义 org.apache.kafka.common.serialization.Serializer ,它将能够处理这两种类型的事件。这两种类型的事件都应该从同一个基于类型/的类继承。
示例代码可能如下所示:

public class CustomSerializer implements Serializer<T> {

    public void configure(Map<String, ?> configs, boolean isKey) {
        // nothing to do
    }

    public byte[] serialize(String topic, T data) {
        // serialization
        return null;
    }

    public void close() {
        // nothing to do
    }
}

相关问题