kafka中为什么 ZooKeeper 被 KRaft 取代——所有日志的日志

肖钟城
  • 大数据技术栈
  • Kafka
大约 19 分钟

kafka中为什么 ZooKeeper 被 KRaft 取代——所有日志的日志

为什么要用 Apache Kafka® 元数据管理的内部日志替换 ZooKeeper? 这篇文章探讨了替代背后的基本原理,研究了为什么使用像 Raft 这样的基于群体的共识协议并将其更改为 KRaft,并描述了构建在 KRaft 协议之上的新的群体控制器。

为什么替代Zookeeper

2012 年,开始实施当前现有的用于集群内复制的 Kafka 控制器的工作正在进行中。 从那时起,控制器设计基本保持不变:每个集群都有一个节点充当控制器,由 ZooKeeper 观察者选举产生。 它不仅存储主题分区日志并像其他代理一样处理消费/生产请求,而且还维护集群元数据,如代理 ID 和机架、主题、分区、领导者和 ISR 信息,以及集群范围和每个主题的配置,以及安全凭证。 它将此信息作为真实来源保存在 ZooKeeper 中,因此 ZooKeeper 的大部分读写流量都由控制器完成。

非控制器代理也会不时直接与 ZooKeeper 对话,例如当领导者更新 ISR 信息时。 因此,控制器会在 ZooKeeper 上注册观察者以获取任何元数据更改。 元数据更改可以由控制器本身、其他代理或客户端进行,客户端也可以直接写入 ZooKeeper。

在大多数情况下,当此类观察者触发时,控制器会使用单线程循环处理它,并将更新后的元数据传播到所有其他代理。 如果你熟悉Kafka的发展历史,你可能会知道,过去消费者等其他客户端也可以直接与ZooKeeper对话。 如今,这种访问已被与经纪人交谈所取代。 这样做的主要原因是为了减少 ZooKeeper 服务器上的读/写负载。 然而,即使今天大多数 ZooKeeper 访问仅由单个控制器完成,随着我们希望在集群中托管的代理数量和主题分区数量的增加,仍然存在一些与读写相关的可扩展性瓶颈 ZooKeeper 上的流量——因为它仍然用作 Kafka 元数据的真实来源。

(旧)控制器可扩展性限制:代理关闭

为了说明此类可伸缩性限制,请考虑使用旧控制器关闭代理。 假设只有一个主题分区,在代理一、二和三上有三个副本。 所有三个副本都同步,因此都在 ISR 列表中。 左侧经纪人是当前领导者,但想要关闭。 为此,它需要向控制器发送请求。 控制器将找出代理当前托管的主题分区,然后尝试更新元数据。 它还需要为旧领导者上的那些托管主题分区选择一个新的领导者。 然后将更新的 ISR 信息写入 ZooKeeper,之后,控制器会将新的元数据传播到所有剩余的代理。 因此,我们有两种类型的请求从控制器发送:UpdateMetadata(更新所有代理的本地元数据缓存)和 LeaderAndISR(针对相应分区的所有副本,为了更新它们的新领导者和 ISR 列表)。

因此,在控制器从它当前托管的所有主题分区中删除代理一后,它可以允许代理一关闭。 在此示例中,关闭的代理仅托管一个分区,但实际上它可以托管数千个分区,并且控制器需要写入 ZooKeeper 以更新每个托管分区的元数据。 这可能需要几秒钟甚至更长时间。 此外,控制器需要将更改后的元数据传播到所有其他代理,一次一个。 最后,如果客户试图通过随机咨询brokers来寻找新的领导者,他们可能会成功,也可能不会成功,这取决于他们咨询的brokers是否收到了最新信息。 这可能会导致客户端的请求超时。

(旧)控制器可扩展性限制:控制器故障转移

对于另一个可伸缩性限制,请考虑旧控制器意外崩溃的场景。 发生这种情况时,已注册的 ZooKeeper 观察者将触发,所有代理将收到通知。 收到通知后,其他代理将尝试向 ZooKeeper 注册自己,谁先到达那里,谁就会成为新的控制器。 新控制器要做的第一件事是从 ZooKeeper 获取元数据,包括跨所有 ZooKeeper 路径的所有主题分区信息。 之后,它将更新崩溃控制器用于托管的所有主题分区元数据,并将新元数据写回 ZooKeeper。 然后它将新的元数据传播到其他代理。

在此过程中,主要瓶颈是新控制器需要从 ZooKeeper 获取元数据的时间。 这与集群当前拥有的主题分区总数成线性关系。 在此引导过程完成之前,新控制器无法处理任何管理请求,例如分区重新平衡。 这意味着很长的不可用窗口。

(旧)控制器可扩展性限制总结

控制器和代理之间的大多数元数据更改传播与涉及的主题分区数量成线性关系。

控制器需要将更新的元数据写入 ZooKeeper 以持久化它,这也与要持久化的元数据量成线性关系。

使用 ZooKeeper 作为元数据存储还有其他挑战,例如 Znode 的大小限制; 观察者的最大数量; 以及对代理的额外验证检查,因为每个代理都维护自己的元数据视图,当更新被延迟或重新排序时,这些视图可能会有所不同。

保护、升级和调试是一件痛苦的事情。 简化分布式系统可以增加它们的寿命和稳定性。

为什么选择KRaft

目标是构建一个能够承受数千个代理和数百万个分区的解决方案。 但首先,我们需要退后一步,看看我们实际用 ZooKeeper 存储了什么。

乍一看,我们似乎正在存储来自各种 ZooKeeper 路径的元数据的当前快照。 然而,对于所有观察者和路径版本,我们真正跟踪的是一系列元数据更改事件,也就是元数据日志。 事实上,在 ZooKeeper API 的背后,所有写入的数据也作为事务日志进行维护。

实现元数据日志

因此,与其将此元数据日志保存在 Zookeeper 的幕后,不如将日志存储在 Kafka 本身,以便我们可以直接访问它? 毕竟,日志是 Kafka 真正擅长的一件事。 让控制器直接将这个元数据日志作为另一个内部 Kafka 主题来维护怎么样? 这意味着链接到元数据的多个操作可以自然地按附加日志条目的偏移量排序,并与异步日志 I/O 一起批处理以获得更好的性能。

元数据更改传播将由代理复制元数据更改日志而不是通过 RPC 来完成。 这意味着您无需再担心分歧,因为每个代理的元数据本地物化视图最终将是一致的,因为它们来自同一日志,并且在给定时间由它们拥有的元数据日志的偏移量进行版本控制。

另一个好处是,这会将控制器的元数据日志管理与其他数据日志分开(将控制平面与数据路径隔离)——使用单独的端口、请求处理队列、指标、线程等。

最后,通过形成一小组 broker 来同步复制元数据日志,我们最终可以得到一组控制器而不是单个控制器。 在这个模型中,当当前的领导控制器故障转移到仲裁中的另一个控制器时,我们只需要非常短的新控制器引导时间,因为它也已经具有复制的元数据日志。 因此,这个由一定数量的控制器管理并由所有代理复制的元数据日志将成为所有其他数据日志的核心元数据日志。

主备份与仲裁复制

现在让我们考虑一下我们应该如何在副本中保持这个“所有日志的日志”同步。 Kafka 现有的数据日志复制利用“主备”复制算法,其中单个领导者副本获取所有传入的写入并尝试将它们复制到其他副本,作为其跟随者。 在追随者确认复制写入后,领导者认为它已提交并返回写入其客户端。 因此,一种选择是也遵循相同的想法来复制元日志,即等待所有副本在向写入者提交/确认之前获得写入。

文献中还有另一种常用的复制算法:quorum replication。 在这种情况下,仍然有一个领导者尝试写入,然后复制给追随者。 但是它不会等待所有的追随者确认复制,它只会等待大多数副本,包括它自己。 这被称为法定人数,在收到法定人数后,领导者将认为写入已提交,并将返回写入客户端。 分布式系统文献中有许多著名的共识算法,如 Paxos 和 Raft,都遵循这种机制。

与 Kafka 的主备复制算法相比,仲裁复制以可用性保证换取更好的复制延迟。 更具体地说,Kafka的故障模式是f+1,也就是说要容忍f次连续故障,至少需要f+1个副本,而quorum replication的故障模式是2f+1。

KRaft – Kafka Raft 实现

对于新的控制器元数据日志,将使用仲裁复制而不是主备算法。 动机是:

  • 在实践中,与其他数据日志相比,我们可以为这个核心元数据日志提供更多的同步副本,以实现更高的可用性。
  • 我们对元数据日志附加延迟更敏感,因为它会成为整个集群的热点。

因此,我们实现了一个新的复制模块 KRaft,它遵循 Raft 算法来实现仲裁复制,同时搭载了 Kafka 现有的日志实用程序,如节流和压缩。 这样我们就可以对新日志使用类似的工具和故障排除协议。

由于我们不再让 ZooKeeper 使用新控制器为元数据日志选举新领导者,因此需要一个单独的领导者选举协议。 这样的选举协议需要防止多个 broker 同时被识别为 leader,还需要防止由于某些条件导致长时间没有 broker 被识别为 leader 的情况。

Leader选举

在 KRaft 中,我们利用现有的 Kafka leader epochs 来保证在一个 epoch 中只选出一个 leader。 更具体地说,当前集群中的代理具有以下角色之一:领导者、投票者或观察者。 领导者和其他投票者共同构成法定人数,负责使复制的日志保持一致,并在需要时选举新的领导者。 集群中的所有其他代理都充当观察者,它们只被动地读取复制的日志以赶上法定人数。 添加到日志中的每条记录都与领导者纪元相关联。

启动时,预先配置的仲裁中的所有代理都初始化为投票者,并从本地日志中设置当前纪元。 在下图中,假设我们有三个选民的法定人数。 每个人在第一和第二纪元的本地日志中都有 6 条记录,颜色分别为绿色和黄色。

在经过一段时间没有找到领导者后,选民可能会跳到一个新的时代并过渡到作为领导者候选人的临时角色。 然后它会向法定人数中的所有其他经纪人发送请求,要求他们投票支持它作为这个时代的新领导者。

投票请求将包含两个关键信息:其他人投票的纪元和候选人本地日志的偏移量。收到请求后,每个选民将检查请求中提供的纪元是否不大于自己的纪元;如果它已经为提供的纪元投票;或者它自己的本地日志实际上比提供的偏移量长。如果这些都不是真的,它将投票给给定时期的候选人。投票保留在本地,因此仲裁代理不会忘记已授予的投票,即使在它开始之后也是如此。当候选人获得包括自己在内的大多数法定人数的足够多的选票时,就可以认为投票程序已经成功完成。

请注意,如果候选人无法在预先配置的投票超时内获得足够的选票,它将认为投票过程失败,并将尝试再次提高其纪元并重试。为了避免出现任何僵局情况,例如多个候选人同时要求投票,从而防止另一个候选人在一个颠簸的时期获得足够的选票,我们还在重试之前引入了随机备份时间。

结合所有这些条件检查和投票超时机制,我们可以保证在KRaft上的给定时期内,最多有一位领导人当选,而且这位当选领导人将拥有截至其当选时期的所有承诺记录。

日志复制

与 Kafka 一样,KRaft 采用基于拉的复制机制,而不是原始 Raft 论文引入的基于推的模型。 在下图中,假设 Leader-1 在 Epoch 3 中有两条记录(红色),Voter-2 正在从中获取。

与 Kafka 中现有的副本获取逻辑一样,Voter-2 将在其获取请求中编码两条信息:要获取的纪元及其日志和偏移量。收到请求后,Leader-1 将首先检查纪元,如果有效,将返回以给定偏移量开头的数据。提取的 Voter-2 会将返回的数据附加到其本地日志,然后使用新的偏移量再次开始提取。这里没有什么新东西,只是普通的副本获取协议。

但是假设另一个选民与日志条目不同。在我们的图表中,Voter-3,它是 Epoch 2 的旧领导者,在其本地日志中有一些附加记录尚未复制到大多数法定人数,因此未提交。当意识到新的 epoch 已经以 Leader-1 作为领导者开始时,它会向 Leader-1 发送一个 fetch 请求,其中包含 Epoch 2 以及日志和偏移量。 Leader-1 将验证并发现这个纪元和偏移量不匹配,因此将在响应中返回一个错误代码,告诉选民 3 纪元 2 只提交了直到偏移量 6 的记录。选民 3 然后将截断它的本地日志偏移量为 6。

然后 Voter-3 将再次重新发送提取,这次使用纪元 2 和偏移量 6。Leader-1 然后可以将新纪元的数据返回给 Voter-3,Voter-3 将从返回的数据中了解这个新纪元,同时附加到它的本地日志。

请注意,如果 Voter-2 和 Voter-3 无法在预定义的时间内成功从 Leader-1 获取响应,它可以提升其 epoch 并尝试选举为 Epoch 4 的新领导者。因此我们可以看到这个获取请求是也用作心跳来确定领导者的活跃度。

基于推或拉的复制

与 Raft 文献中基于推的模型相比,KRaft 中基于拉的日志复制在日志协调方面更有效,因为获取投票者能够在重新发送下一次获取之前直接截断到可行的偏移量。在基于推送的模型中,需要更多的“乒乓”往返,因为推送数据的领导者需要确定将数据发送到的正确日志位置。

基于拉取的 KRaft 也不太容易受到破坏性服务器的影响,即老选民不知道他们已经从法定人数中删除,例如,由于成员重新配置。如果这些老选民继续向基于拉模型的领导者发送获取请求,领导者可以用一个特殊的错误代码来响应,告诉他们他们已经从法定人数中删除,并且他们可以过渡到观察者。相反,在原始的基于推送的 Raft 算法中,推送数据的领导者可能不知道哪些被移除的投票者会成为破坏服务器。 由于被移除的服务器不再从领导人那里获得推送数据,他们将试图当选为新领导人,从而扰乱了进程。

选择基于拉的 Raft 协议的另一个重要动机是 Kafka 的主干日志复制层已经处于基于拉的模型中,因此允许重用更多现有的实现。

不过,好处是有代价的:新领导者需要调用一个单独的“开始纪元”API 来通知法定人数。而在 Raft 模型中,此通知可以通过领导者推送数据 API 搭载。此外,要提交大多数法定人数的记录,领导者需要等待来自选民的下一个提取请求来推进其偏移量。这些都是解决破坏性服务器问题的值得权衡。此外,利用现有的 Kafka 拉式数据复制模型(又名“不重新发明轮子”)节省了数千行代码。

要了解更多关于 KRaft 实现设计的其他细节,例如元数据快照和构建在 KRaft 日志之上的状态机 API,请务必阅读 KIP-500、KIP-595 和 KIP-630 的参考文档。

现在我们可以谈谈没有 Zookeeper 依赖的新控制器(我们称之为 Quorum Controller)设计。 Quorum Controller 建立在上面的 KRaft 协议之上。 当使用新模型在集群中启动 Kafka 代理时,代理总数的一小部分被配置为仲裁。 法定人数中的经纪人遵循 KRaft 算法在他们之间选举一个领导者,作为法定人数的控制者。

控制器负责获取新的代理注册、检测代理故障以及获取所有会更改集群元数据的请求。 当它们相应的更改事件被附加到元数据日志时,所有这些操作都可以流水线化和排序。 法定人数中的其他选民主动复制元数据日志,以便提交新添加的记录。

因此,也可以更新表示集群当前元数据快照的建立在集群之上的状态机。其他未配置为仲裁的一部分的代理是元数据日志的观察者,仅从仲裁中获取已提交的记录以更新自己的元数据缓存。通过这种方式,所有本地元数据快照自然地由获取的日志偏移量进行版本控制,并且很容易推断出元数据过时并修复任何潜在的差异。

这里的 Kafka 集群可以小到单个 broker,它也可以充当 controller。当更多代理添加到该集群时,它们将发现控制器并向其注册。然后控制器可以将它们的条目相应地写入元数据日志。控制器还可以阻止新添加的代理被访问,直到它们完成移动分配的主题分区并准备好服务客户端请求。这会减少超时风险,例如,如果新加入的经纪人落后了。

(新)仲裁控制器:代理关闭和控制器故障转移

现在让我们重新审视前面讨论的两个场景——代理关闭和控制器故障转移——并考虑它们与新的仲裁控制器的关系。

仲裁控制器通过心跳接收所有已注册代理的活跃度。 当现有代理关闭时,它可以在心跳请求中搭载其意图,并且控制器可以像往常一样将其从所有分区中删除,但在将它们附加到元数据日志时也会批处理所有分区移动事件。 (这也大大减少了关闭延迟。)

类似地,对于控制器故障转移,其中一名投票者可以接任新的领导者并直接开始提交数据。

实验:ZooKeeper与Quorum Controller

为了说明 Quorum Controller 的优势,我们做了实验,将基于 ZooKeeper 的旧控制器与新的 Quorum Controller 进行了比较。 该实验是在单个 Kafka 集群上托管的 200 万个主题分区完成的。 正如您在下图中看到的,对于受控关闭和不受控故障转移,Quorum Controller 大大减少了延迟。

总结

总之,您可以从这篇文章中吸取两个要点:

  • 将元数据维护为事件日志会更有效,就像我们在 Kafka 中处理所有其他数据一样。
  • 当在此元数据日志之上构建新的 Quorum Controller 时,我们可以在很大程度上解除我们的可扩展性瓶颈,并超越单个 Kafka 集群中的数千个代理和数百万个分区。v
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.14.1