【kafka】kafka /log_dir_event_notification的LogDir脱机事件通知

x33g5p2x  于2022-03-29 转载在 Kafka  
字(3.4k)|赞(0)|评价(0)|浏览(668)

1.概述

我们会看到zk的数据中有一个节点/log_dir_event_notification/,这是一个序列号持久节点
这个节点在kafka中承担的作用是: 当某个Broker上的LogDir出现异常时(比如磁盘损坏,文件读写失败,等等异常): 向zk中谢增一个子节点/log_dir_event_notification/log_dir_event_序列号 ;Controller监听到这个节点的变更之后,会向Brokers们发送LeaderAndIsrRequest请求; 然后做一些副本脱机的善后操作

2.源码

这里说的dirLog是 server.properties中配置的log.dir 例如

2.1 副本异常处理

首先我们找到有使用这个节点的源码;
kafka启动之初有调用

ReplicaManager.startup()

def startup(): Unit = {
  // 省略...
  
  //当inter-broker protocol (IBP) < 1.0的时候,如果存在logDir的一些异常则直接让整个Broker启动失败;
  val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0
  logDirFailureHandler = new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)
  logDirFailureHandler.start()
}
private class LogDirFailureHandler(name: String, haltBrokerOnDirFailure: Boolean) extends ShutdownableThread(name) {
  override def doWork(): Unit = {
    //从队列 offlineLogDirQueue 取数据
    val newOfflineLogDir = logDirFailureChannel.takeNextOfflineLogDir()
    if (haltBrokerOnDirFailure) {
      fatal(s"Halting broker because dir $newOfflineLogDir is offline")
      Exit.halt(1)
    }
    handleLogDirFailure(newOfflineLogDir)
  }
}
// logDir should be an absolute path
  // sendZkNotification is needed for unit test
  def handleLogDirFailure(dir: String, sendZkNotification: Boolean = true): Unit = {
       // 省略...
    logManager.handleLogDirFailure(dir)

    if (sendZkNotification)
      zkClient.propagateLogDirEvent(localBrokerId)
    warn(s"Stopped serving replicas in dir $dir")
  }

代码比较长,就直接概况一下好了:
主要是当读取或操作LogDir的时候出现了异常就会执行到这里,有可能是磁盘脱机了,或者文件突然没有读取写入权限等等之类的一些IOException异常;那么 Broker就需要做一些处理;如下

  1. 做个判断inter.broker.protocol.version 协议版本 < 1.0 的时候 时候直接退出;那个时候还不支持单Broker上存在多个logDir;
  2. 副本停止fetche数据
  3. 标记分区下线
  4. 可能移除一些监控信息
  5. 如果当前的log_dir 都脱机(或者异常了), 那么久可以直接shutdown这台机器了
  6. 如果还有其他的log_dir 还有在线的, 那么继续做一些其他的清理操作;
  7. 创建持久序列节点/log_dir_event_notification/log_dir_event_+序列号;数据是 BrokerID;例如:
    `/log_dir_event_notification/log_dir_event_0000000003```

plaintext
``
| |

{"version":1,"broker":20003,"event":1}

|

PS: log_dir 是可以在一台Broker配置多个路径的 ,用逗号隔开

2.2 LogDir发生异常

比如说在 给文件加锁的时候lockLogDirs,磁盘损坏了就抛出异常IOException

/**
 * Lock all the given directories
 */
private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
  dirs.flatMap { dir =>
    try {
      val lock = new FileLock(new File(dir, LockFile))
      if (!lock.tryLock())
        throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParent +
          ". A Kafka instance in another process or thread is using this directory.")
      Some(lock)
    } catch {
      case e: IOException =>
        logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while locking directory $dir", e)
        None
    }
  }
}
def maybeAddOfflineLogDir(logDir: String, msg: => String, e: IOException): Unit = {
  error(msg, e)
  if (offlineLogDirs.putIfAbsent(logDir, logDir) == null)
    offlineLogDirQueue.add(logDir)
}

offlineLogDirQueue添加了一个异常队列之后就回到上面的副本异常处理代码了, 上面可是一致在queue.take()的

2.3 Controller监听zk节点变更

KafkaController.processLogDirEventNotification

private def processLogDirEventNotification(): Unit = {
  if (!isActive) return
  val sequenceNumbers = zkClient.getAllLogDirEventNotifications
  try {
    val brokerIds = zkClient.getBrokerIdsFromLogDirEvents(sequenceNumbers)
    //尝试将这台Broker上的所有副本 走一下状态流转 到 OnlineReplica
    onBrokerLogDirFailure(brokerIds)
  } finally {
    // delete processed children
    zkClient.deleteLogDirEventNotifications(sequenceNumbers, controllerContext.epochZkVersion)
  }
}

主要将从zk节点 /log_dir_event_notification/log_dir_event_序列号 中获取到的数据的Broker上的所有副本进行一个副本状态流转 ->OnlineReplica ;关于状态机的流转请看 【kafka源码】Controller中的状态机

  1. 给所有broker 发送LeaderAndIsrRequest请求,让brokers们去查询他们的副本的状态,如果副本logDir已经离线则返回KAFKA_STORAGE_ERROR异常;
  2. 完事之后会删除节点

M.参考

【kafka源码】/log_dir_event_notification的LogDir脱机事件通知

相关文章

最新文章

更多