我们正在使用kafka来存储由集群中的一个节点生成的消息,并将其分发到集群中的所有节点,我让它主要用于akka流,但有几个问题我必须解决。这有一些限制。
首先,消息必须由集群中的每个节点使用,但只能由一个节点生成。我知道我可以给每个节点分配一个组id,这个组id可能是它的节点id,这意味着每个节点都将获得消息。这样就好了。但问题是。
数据是非常短暂和相当大的(略低于兆欧),不能进一步压缩或打破。如果在这个主题上有一条新的信息,那么旧的信息就是垃圾。如何将主题限制为当前最多一条消息?
考虑到数据是节点启动所必需的,我需要使用主题上的最新消息,不管我以前是否使用过它,希望每次启动服务器时都不要创建唯一的组id。这有可能吗?如果有,怎么做。
最后,数据通常在主题上,但有时它不在那里,理想情况下,我需要能够检查那里是否有消息,如果没有,请生产者创建消息。这可能吗?
这是我当前用于启动使用者的代码:
private Control startMatrixConsumer() {
final ConsumerSettings<Long, byte[]> matrixConsumerSettings = ConsumerSettings
.create(services.actorSystem(), new LongDeserializer(), new ByteArrayDeserializer())
.withBootstrapServers(services.config().getString("kafka.bootstrapServers"))
.withGroupId("group1") // todo put in the conf ??
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
final String topicName = Matrix.class.getSimpleName() + '-' + eventId;
final AutoSubscription subscription = Subscriptions.topics(topicName);
return Consumer.plainSource(MatrixConsumerSettings, subscription)
.named(Matrix.class.getSimpleName() + "-Kafka-Consumer-" + eventId)
.map(data -> {
final Matrix matrix = services.kryoDeserialize(data.value(), Matrix.class);
log.debug(format("Received %s for event %d from Kafka", Matrix.class.getSimpleName(), matrix.getEventId()));
return matrix;
})
.filter(Objects::nonNull)
.to(Sink.actorRef(getSelf(), NotUsed.getInstance()))
.run(ActorMaterializer.create(getContext()));
}
多谢了。
1条答案
按热度按时间hec6srdp1#
所有消息都必须由集群中的每个节点使用,但只能由一个节点生成。
您是正确的,您可以通过为每个节点提供唯一的组id来实现这一点。
如何将主题限制为当前最多一条消息?
Kafka提供了紧凑的主题。压缩主题仅维护给定密钥的最新消息。例如,Kafka消费者将其偏移量存储在压缩主题中。在您的情况下,使用相同的密钥生成每条消息,kafka log cleaner将删除旧消息。请注意,压缩是定期执行的,因此您可能会在短时间内收到两个(或更多)具有相同密钥的消息(取决于日志清理器配置)。
我需要消费这个主题的最新信息,不管我以前是否消费过它。
您可以通过不提交消费者补偿来实现这一点(
enable.auto.commit
设置为false
)和设置auto.offset.reset
至earliest
. 通过在压缩的主题和使用者中有一条从主题开头开始的消息,该消息总是在节点启动后使用。我需要能够检查是否有一个消息,如果没有要求生产者创建的消息。
不幸的是,我不知道Kafka的任何功能,可以帮助你。大多数时候,Kafka被用来分离生产者和消费者。