假设有两种类型t1&t2和一个主题t。t1和t2都必须进入主题t(出于某种原因)。有什么方法可以做到这一点?哪一个更好?
一种方法是利用继承,我们可以定义一个基类,然后子类可以扩展它。在我们的例子中,我们可以定义一个基类tb,然后t1和t2可以扩展tb。
基类(tb)
package poc.kafka.domain;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.extern.java.Log;
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Log
public class Animal implements Externalizable {
public String name;
public void whoAmI() {
log.info("I am an Animal");
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
name = (String) in.readObject();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(name);
}
}
派生类(t1)
package poc.kafka.domain;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.java.Log;
@Log
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Cat extends Animal implements Externalizable {
private int legs;
public void whoAmI() {
log.info("I am a Cat");
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
legs = in.readInt();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
out.writeInt(legs);
}
}
派生类(t2)
package poc.kafka.domain;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.java.Log;
@Log
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Dog extends Animal implements Externalizable {
private int legs;
public void whoAmI() {
log.info("I am a Dog");
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
legs = in.readInt();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
out.writeInt(legs);
}
}
反序列化程序
package poc.kafka.domain.serialization;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Deserializer;
import poc.kafka.domain.Animal;
public class AnimalDeserializer implements Deserializer<Animal> {
@Override
public Animal deserialize(String topic, byte[] data) {
return SerializationUtils.deserialize(data);
}
}
序列化程序
package poc.kafka.domain.serialization;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Serializer;
import poc.kafka.domain.Animal;
public class AnimalSerializer implements Serializer<Animal> {
@Override
public byte[] serialize(String topic, Animal data) {
return SerializationUtils.serialize(data);
}
}
然后我们可以发送t1和t2如下
IntStream.iterate(0, i -> i + 1).limit(10).forEach(i -> {
if (i % 2 == 0)
producer.send(new ProducerRecord<Integer, Animal>("T", i, new Dog(i)));
else
producer.send(new ProducerRecord<Integer, Animal>("gs3", i, new Cat(i)));
});
4条答案
按热度按时间c0vxltue1#
最好的方法是创建自定义分区。
通过partitionkey将每条消息生成到不同的分区
这是默认实现,您需要实现分区逻辑。
查看本教程了解更多示例。
这是Kafka关于何时选择服装分区的权威指南中的一段。
到目前为止,在实现自定义分区策略时,我们已经讨论了最常用的默认分区器的特性。然而,kafka并没有限制您只对分区进行散列,有时也有很好的理由对数据进行不同的分区。例如,假设您是一家b2b供应商,您最大的客户是一家生产称为“香蕉”的手持设备的公司。假设您与客户“香蕉”有如此多的业务往来,以至于您每天超过10%的交易都是与该客户进行的。如果您使用默认的散列分区,香蕉记录将被分配到与其他帐户相同的分区,从而导致一个分区的大小大约是其余分区的两倍。这可能会导致服务器空间不足、处理速度减慢等。我们真正想要的是为banana提供自己的分区,然后使用哈希分区将其余帐户Map到分区。
ntjbwcob2#
这也许不是对问题的直接回答,而是重新考虑某些方面的建议,也许可以解决原来的问题。
首先,尽管kafka能够支持任何数据格式,但对于可序列化的二进制格式,我建议使用
Apache Avro
,而不是序列化的java对象。使用avro,您将获得紧凑的二进制、与语言无关的数据类型和广泛的工具集的所有好处。例如,有cli工具可以读取avro中包含内容的kafka主题,但我不知道有哪一个工具能够反序列化那里的java对象。
你可以在这里读到关于avro本身的资料
关于为什么要使用avro的一些好的见解也可以在这个问题中找到
第二。您的问题标题是关于事件类型的,但判断描述可能意味着“如何通过单个kafka主题处理不同的数据类型”。如果事件之间的区别只是事件类型(例如,单击、提交、登录、注销等等),那么您可以保留
enum
字段,否则使用泛型容器对象。如果这些事件应该承载的数据有效负载的结构存在差异,那么,同样,使用avro,您可以用
Union
类型。最后,如果数据差异如此之大,以至于这些事件基本上都是不同的数据结构,没有什么显著的共同点,那么就用不同的Kafka主题。
尽管能够在同一主题中使用不同的分区来发送不同的数据类型,但这实际上只会在将来引起维护方面的麻烦,并且会限制扩展,正如在这里的其他响应中正确指出的那样。所以在这种情况下,如果有一个选择去与不同的主题-最好这样做。
cetgtptt3#
如果没有继承的概念,例如数据不是
另一种方法是使用 Package 器。
将所有事件放在 Package 器对象中,并用它们的属性区分每个事件
EventType
这可能是一个enum
例如。然后你可以用正常的方式序列化它(就像你在问题中发布的那样),在反序列化的同时你可以检查
EventType
然后基于EventType
此外,为了确保您的datawrapper不会 Package 所有类型的数据,即应该仅用于特定类型的数据,那么您可以使用Marker
接口,并使所有类的对象推送到主题以实现此接口。例如,
然后你的自定义类可以有,
而在
DataWrapper
你可以。。5m1hhzi44#
最简单的方法是使用您的自定义
org.apache.kafka.common.serialization.Serializer
,它将能够处理这两种类型的事件。这两种类型的事件都应该从同一个基于类型/的类继承。示例代码可能如下所示: