所谓的消息交付可靠性保障,是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:
Kafka 默认提供的交付可靠性保障是即至少一次,因为kafka 的producer 在消息发送失败(没有接收到kafka broker 的ACK信息)的时候则会进行重试,这就是kafka 为什么默认提供的是至少一次的交付语义,但是这样可能导致消息重复
Kafka 也可以提供最多一次交付保障,只需要让 Producer 禁止重试即可。这样一来,消息要么写入成功,要么写入失败,但绝不会重复发送。我们通常不会希望出现消息丢失的情况,但一些场景里偶发的消息丢失其实是被允许的,相反,消息重复是绝对要避免的。此时,使用最多一次交付保障就是最恰当的。
无论是至少一次还是最多一次,都不如精确一次来得有吸引力。大部分用户还是希望消息只会被交付一次,这样的话,消息既不会丢失,也不会被重复处理。或者说,即使 Producer 端重复发送了相同的消息,Broker 端也能做到自动去重。在下游 Consumer 看来,消息依然只有一条,而这就是我们今天要介绍的幂等性。
对于每个PID,该Producer发送数据的每个<Topic,Partition>都对应一个从0开始单调递增的Sequence Number,Broker端在缓存中保存了这seq number
对于接收的每条消息,如果其序号比Broker缓存中序号大于1则接受它,否则将其丢弃,这样就可以实现了消息重复提交了
但是只能保证单个Producer对于同一个<Topic,Partition>的Exactly Once语义
new_seq=old_seq+1: 正常消息;
new_seq<=old_seq : 重复消息;
new_seq>old_seq+1: 消息丢失;
其实前面我们也说过一次,可以看看kafka的client 的源码,看client 的源码其实可以让你快速了解kafka 都有哪些东西可以用,server 端的源码可以让你了解原理,其实很多时候知道有什么样的工具,就能帮你解决很多问题了。
其实prodcuer 端的很多东西我们都介绍过了,这里面的几个类我们都学过了
名称 | 作用 |
---|---|
Producer | 这是一个接口,我们使用的KafkaProducer就是继承自这个接口 |
KafkaProducer | 我们发送数据的时候使用的对象 |
Partitioner | 分区器的接口,我们也看过它的几个默认实现(DefaultPartitioner,RoundRobinPartitioner,UniformStickyPartitioner),你也可以自己继承这个接口写一个 |
ProducerConfig | 客户端配置类,我们在创建Properties的时候都是直接写的字符串,也可以使用这个配置类提供的常量,ProducerConfig.BOOTSTRAP_SERVERS_CONFIG和“bootstrap.servers”等价的 |
ProducerRecord | 我们发送的消息对象 |
RecordMetadata | 发送消息之后返回的对象,记录了消息发送的元信息 |
Callback | 回调接口,主要用在异步发送,这个接口kafka 也提供了一个默认实现ErrorLoggingCallback |
ProducerInterceptor | 生产者拦截器 |
接下来我们看一下今天的几个主角,也就是internals 包下的几个类
首先说一下为什么要说这个类呢,大家看一下下面的截图就知道了
这是KafkaProducer里面doSend方法的段,也就是我们客户端调用的send方法,这个说明了一个问题,什么问题呢,那就是KafkaProducer的send 方法并不是直接将消息发送出去的,而是将消息追加到缓存区。
如果当前缓存区已写满或创建了一个新的缓存区,则唤醒 Sender(消息发送线程),将缓存区中的消息发送到 broker 服务器,最终返回 future。从这里也能得知,doSend 方法执行完成后,此时消息还不一定成功发送到 broker,因为还没有发送呢。
这里当缓存区满了之后则将sender 唤醒,进行消息发送,其实到这里我们应该能猜到sender 是个什么了,是个线程public class Sender implements Runnable
前面我们说PID是实现幂等的关键元素,我们下面看一下PID是怎么获得的,就是在Sender 的run 方法里面
void run(long now) {
if (transactionManager != null) {
try {
if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
// Check if the previous run expired batches which requires a reset of the producer state.
transactionManager.resetProducerId();
if (!transactionManager.isTransactional()) {
// this is an idempotent producer, so make sure we have a producer id 获取ProducerId
maybeWaitForProducerId();
} else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
transactionManager.transitionToFatalError(
new KafkaException("The client hasn't received acknowledgment for " +
"some previously sent messages and can no longer retry them. It isn't safe to continue."));
} else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
// as long as there are outstanding transactional requests, we simply wait for them to return
client.poll(retryBackoffMs, now);
return;
}
// do not continue sending if the transaction manager is in a failed state or if there
// is no producer id (for the idempotent case).
if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
RuntimeException lastError = transactionManager.lastError();
if (lastError != null)
maybeAbortBatches(lastError);
client.poll(retryBackoffMs, now);
return;
} else if (transactionManager.hasAbortableError()) {
accumulator.abortUndrainedBatches(transactionManager.lastError());
}
} catch (AuthenticationException e) {
// This is already logged as error, but propagated here to perform any clean ups.
log.trace("Authentication exception while processing transactional request: {}", e);
transactionManager.authenticationFailed(e);
}
}
long pollTimeout = sendProducerData(now);
client.poll(pollTimeout, now);
}
run 方法里面有一个maybeWaitForProducerId
方法就是用来获取ProducerId的,这个名称真的是见名知意,我们也简单看一下这个方法
private void maybeWaitForProducerId() {
while (!forceClose && !transactionManager.hasProducerId() && !transactionManager.hasError()) {
Node node = null;
try {
node = awaitLeastLoadedNodeReady(requestTimeoutMs);
if (node != null) {
ClientResponse response = sendAndAwaitInitProducerIdRequest(node);
InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody();
Errors error = initProducerIdResponse.error();
if (error == Errors.NONE) {
// 这个 ProducerId 是通过网络请求之后获得的
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
return;
} else if (error.exception() instanceof RetriableException) {
log.debug("Retriable error from InitProducerId response", error.message());
} else {
transactionManager.transitionToFatalError(error.exception());
break;
}
} else {
log.debug("Could not find an available broker to send InitProducerIdRequest to. Will back off and retry.");
}
} catch (UnsupportedVersionException e) {
transactionManager.transitionToFatalError(e);
break;
} catch (IOException e) {
log.debug("Broker {} disconnected while awaiting InitProducerId response", node, e);
}
log.trace("Retry InitProducerIdRequest in {}ms.", retryBackoffMs);
time.sleep(retryBackoffMs);
metadata.requestUpdate();
}
}
当transactionManager 没有ProducerId()的时候才执行的
而且我们看到这InitProducerIdResponse使用response上获得的,所以我们认为这个ProducerId其实不是客户端生成的,而是服务端生成的。接下来我们看一下这个请求的方法sendAndAwaitInitProducerIdRequest
private ClientResponse sendAndAwaitInitProducerIdRequest(Node node) throws IOException {
String nodeId = node.idString();
InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(null);
ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, requestTimeoutMs, null);
return NetworkClientUtils.sendAndReceive(client, request, time);
}
可以看出它是通过node的信息,创建了一个ClientRequest
发送出去,更准确的是InitProducerIdRequest
然后获取返回后的response 中的ProducerId,如果你感兴趣的话,也可以看一下服务端是怎么处理这个请求的,下面是服务端的代码,scala 写的
def handleInitProducerIdRequest(request: RequestChannel.Request): Unit = {
val initProducerIdRequest = request.body[InitProducerIdRequest]
val transactionalId = initProducerIdRequest.transactionalId
if (transactionalId != null) {
// 权限校验
if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId))) {
sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
return
}
} else if (!authorize(request.session, IdempotentWrite, Resource.ClusterResource)) {
sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
return
}
def sendResponseCallback(result: InitProducerIdResult): Unit = {
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val responseBody = new InitProducerIdResponse(requestThrottleMs, result.error, result.producerId, result.producerEpoch)
trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.")
responseBody
}
sendResponseMaybeThrottle(request, createResponse)
}
// 权限校验通过 成相应的了 pid,返回给 producer
txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback)
}
这里我们看到最终是通过Coordinator
生成的
可以看到Server 在给一个 client 初始化 PID 时,实际上是通过 ProducerIdManager 的 generateProducerId()
方法产生一个 PID。
接下来其实就到了ZK 了,你要是感兴趣可以接着往下走走
def generateProducerId(): Long = {
this synchronized {
// grab a new block of producerIds if this block has been exhausted
if (nextProducerId > currentProducerIdBlock.blockEndId) {
getNewProducerIdBlock()
nextProducerId = currentProducerIdBlock.blockStartId + 1
} else {
nextProducerId += 1
}
nextProducerId - 1
}
}
为什么突然又冒出来这么一个类呢,前面我们提到KafkaProducer的send 方法其实只是将消息添加到了缓存之中,并没有真正的发送,我们知道发送是在Sender 的run 方法里面完成的,我们的Producer Id也是在run 方法的里面获取的
我们这里将Sender 的run 方法的内容分为了两块,第一块执行一些初始化的操作,第二部分发送数据,我们接下来看一下这个方法的实现,做了一定的删减
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
// 获取准备好可以接受数据的partition 集合
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
// 从集合中去掉不能接收消息的节点
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// 创建发送数据的批量请求ProducerBatch
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
this.maxRequestSize, now);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
sendProduceRequests(batches, now);
return pollTimeout;
}
accumulator 就是我们缓存消息的地方,这里我们看到,数据最终是封装到ProducerBatch里面进去发送的,接下来到了我们的主角了sequence numbe
,我们知道sequence numbe
应该是在ProducerBatch里面的,那应该就是在accumulator 的drain
方法里面实现的了,那我们就看一下这个代码的实现,代码有点长,我们只选取部分
这个代码首先获取了ProducerIdAndEpoch
这个类的对象,这个对象就封装了ProducerId,然后我们就关注一下sequence number,我们看到这里有一个判断那就是!batch.hasSequence()
,下面也写了一段注释,就是Sequence一旦生成不可改变。
接下来到看了关键之处了batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
我们知道batch
对象应该有sequence number这样的一个属性,所以我们猜测这个属性就是在batch.setProducerState
方法中完成赋值的。
注意一下第二个参数transactionManager.sequenceNumber(batch.topicPartition)
,根据Partition 获取到了sequenceNumber,我们也可以看一下这个代码
/** * Returns the next sequence number to be written to the given TopicPartition. */
synchronized Integer sequenceNumber(TopicPartition topicPartition) {
Integer currentSequenceNumber = nextSequence.get(topicPartition);
if (currentSequenceNumber == null) {
currentSequenceNumber = 0;
nextSequence.put(topicPartition, currentSequenceNumber);
}
return currentSequenceNumber;
}
这里我们就完成了我们的获取,你看一下下面截图的地方,获取完成之后会调用自增的操作,来维持SequenceNumber的自增特性
最后我们再看一下setProducerState的内容,我们看到setProducerState
方法之后,我们的批量数据就有了一个属性baseSequence,可以用于服务器端进行判断。
public void setProducerState(long producerId, short producerEpoch, int baseSequence, boolean isTransactional) {
if (isClosed()) {
// Sequence numbers are assigned when the batch is closed while the accumulator is being drained.
// If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will
// be re queued. In this case, we should not attempt to set the state again, since changing the producerId and sequence
// once a batch has been sent to the broker risks introducing duplicates.
throw new IllegalStateException("Trying to set producer state of an already closed batch. This indicates a bug on the client.");
}
this.producerId = producerId;
this.producerEpoch = producerEpoch;
this.baseSequence = baseSequence;
this.isTransactional = isTransactional;
}
有了 PID 之后,在 PID + Topic-Partition 级别上添加一个 sequence numbers 信息,就可以实现 Producer 的幂等性了。
ProducerBatch 也提供了一个 setProducerState()
方法,它可以给一个 batch 添加一些 meta 信息(pid、baseSequence、isTransactional),这些信息是会伴随着 ProduceRequest 发到 Server 端,Server 端也正是通过这些 meta 来做相应的判断,接下来我们看一下服务器端的处理,代码入口在KafkaApis
在这个类中(scala),我们找到对应处理请求的方法handleProduceRequest
,然后我们看一下这个代码的实现。我们这里只截取了部分
可以看到后面前面主要还是权限校验,后面则开始遍历处理数据
跨会话
不支持跨会话的原因是重启之后标识producer的PID就变化了,这就导致broker无法根据这个<PID,TP,SEQNUM>条件去去判断是否重复。
跨分区
我们知道在某一个partition 上判断是否重复是通过一个递增的sequence number,也就是说这个递增是针对当前特定分区的,如果你要是发送到其他分区上去了,那么递增关系就不存在了。
思考
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/king14bhhb/article/details/114742402
内容来源于网络,如有侵权,请联系作者删除!