kafka删除topic分析

当我们在shell中执行topic删除命令的时候kafka-topics --delete --topic xxxx --zookeeper xxx,会显示,xxxx已经被标记为删除。然后过了很久你再查看topic列表,发现那个topic依然被标记删除,显然删除没有真正执行。那这是为什么呢?下面就深入了解,kafka删除topic的流程。

先说结论

delete.topic.enable配置默认是false,意思是 是否允许kafka集群删除topic,只有为true的情况,kafka才会删除那些已经被标记为删除的topic。否则topic将不会被删除,仅仅被标记,所谓标记,也就是在zk上记录那些delete的topic。注意修改完后需要重启集群。如果想手动删除topic,那么需要做两件事情

  1. 删除zookeeper上topic的数据
    /brokers/topics/xxx
    /config/topics/xxx
  2. 删除该topic所有partition和replica的数据
    数据在所有broker的配置log.dirs目录下,文件夹结构是topic-partition的方式,直接将该topic的整个文件夹删除即可

Topic标记删除

通过shell命令可以找到操作topic的类TopicCommand,在删除topic这块逻辑中,只做了3件事情

  1. 判断该topic是否存在
  2. 判断topic是否是kafka内部topic(不允许被删除)
  3. 在zk上创建一个节点(/admin/delete_toppics/xxx)来记录删除的topic。下面是详细代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
    val topics = getTopics(zkUtils, opts)
    val ifExists = opts.options.has(opts.ifExistsOpt)
    if (topics.isEmpty && !ifExists) {
    throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
    opts.options.valueOf(opts.zkConnectOpt)))
    }
    topics.foreach { topic =>
    try {
    if (Topic.isInternal(topic)) {
    throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
    } else {
    zkUtils.createPersistentPath(getDeleteTopicPath(topic))
    println("Topic %s is marked for deletion.".format(topic))
    println("Note: This will have no impact if delete.topic.enable is not set to true.")
    }
    } catch {
    case _: ZkNodeExistsException =>
    println("Topic %s is already marked for deletion.".format(topic))
    case e: AdminOperationException =>
    throw e
    case _: Throwable =>
    throw new AdminOperationException("Error while deleting topic %s".format(topic))
    }
    }
    }

至此topic删除已经走完,那么背标记为删除的topic是在什么时候才被真正的删除呢?下面接着分析。

Topic删除

首先还是从zk上标记的删除topic开始,KafkaController通过订阅zookeeper的删除节点的变化来监听是否有新的topic需要被删除,再通过注册TopicDeletionListener处理监听到的删除事件,下面就贴一段处理删除事件类TopicDeletion的代码。主要逻辑有4部分

  1. 判断删除的topic是否存在
  2. 判断是否开启delete.topic.enable功能
  3. 判断是否有正在重新分配的topic,topic重分配会导致topic的partition数据在broker中转移,从而导致controller无法精准的定位到该topic所在broker的信息,所以正在重新分配的topic不能被删除,直到重分配结束
  4. 都满足条件,那么执行删除逻辑
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
      override def process(): Unit = {
    if (!isActive) return
    debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
    val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
    if (nonExistentTopics.nonEmpty) {
    warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
    nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
    }
    topicsToBeDeleted --= nonExistentTopics
    if (config.deleteTopicEnable) {
    if (topicsToBeDeleted.nonEmpty) {
    info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
    // mark topic ineligible for deletion if other state changes are in progress
    topicsToBeDeleted.foreach { topic =>
    val partitionReassignmentInProgress =
    controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
    if (partitionReassignmentInProgress)
    topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
    }
    // add topic to deletion list
    topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
    }
    } else {
    // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
    for (topic <- topicsToBeDeleted) {
    info("Removing " + getDeleteTopicPath(topic) + " since delete topic is disabled")
    zkUtils.zkClient.delete(getDeleteTopicPath(topic))
    }
    }
    }
    }

然后就进入删除topic的主角TopicDeletionManager,这个类控制了topic的删除逻辑。联系到delete.topic.enable这个配置,几乎所有方法中都很明确地说明了,只有在delete.topic.enable为true的情况下,topic才会被删除。方法名都可以很好的体现功能,我们直接看方法onTopicDeletion(),里面调用了onPartitionDeletion(),而onPartitionDeletion()又调用了onReplicaDeletion()。显而易见,删除topic其实就是把topic下所有partition删了,而partition又有很多replica组成,也就是说需要把partition的replica删了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Invoked with the list of topics to be deleted
* It invokes onPartitionDeletion for all partitions of a topic.
* The updateMetadataRequest is also going to set the leader for the topics being deleted to
* {@link LeaderAndIsr#LeaderDuringDelete}. This lets each broker know that this topic is being deleted and can be
* removed from their caches.
*/
private def onTopicDeletion(topics: Set[String]) {
info("Topic deletion callback for %s".format(topics.mkString(",")))
// send update metadata so that brokers stop serving data for topics to be deleted
val partitions = topics.flatMap(controllerContext.partitionsForTopic)
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
topics.foreach { topic =>
onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).keySet)
}
}

TopicDeletionManager将需要删除的replica通过rpc发送到各个broker,这里TopicDeletionManager更像一个元数据删除管理者,因为实际删除数据是broker干的事情,毕竟数据是分布式的。接着参考类ReplicaManager,在接收到stopReplica的rpc后,ReplicaManager负责删除本地的数据,参考方法stopReplica(),最后通过LogManager删除相关文件夹。这部分调用链比较长就不贴代码了。
在完成topic删除后,TopicDeletionManager再将topic的元数据删除,参考方法completeDeleteTopic(),删除内存中该topic的相关数据,删除topic在zookeeper上的数据,包括3个地方/brokers/topics/,/config/topics/,/admin/delete_topics。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private def completeDeleteTopic(topic: String) {
// deregister partition change listener on the deleted topic. This is to prevent the partition change listener
// firing before the new topic listener when a deleted topic gets auto created
controller.deregisterPartitionModificationsListener(topic)
val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
// controller will remove this replica from the state machine as well as its partition assignment cache
replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
// move respective partition to OfflinePartition and NonExistentPartition state
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
topicsToBeDeleted -= topic
partitionsToBeDeleted.retain(_.topic != topic)
val zkUtils = controllerContext.zkUtils
zkUtils.zkClient.deleteRecursive(getTopicPath(topic))
zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic))
zkUtils.zkClient.delete(getDeleteTopicPath(topic))
controllerContext.removeTopic(topic)
}

总结

Kafka依托于zookeeper管理其自身的元数据,并由自己的controller管理所有元数据,客户端通过修改zookeeper节点数据来触发kafka的事件,从而完成相关操作。对于删除topic而言,kafka通过partition和replica的状态机和事件的机制来实现在复杂环境下对topic的删除。

参考

Kafka 0.11.0.2 源码

kafka官网文档

ulysses wechat
订阅+