认识Kafka
date
May 18, 2024
slug
learning-kafka
status
Published
tags
编程开发
summary
在如今设计分布式系统的时候,我们通常会采用消息队列来作为请求的中间件,削峰填谷、异步处理和服务解耦是其核心作用,而消息队列又有点对点模型和发布与订阅模型,这里就不对这些基础概念不做过多解释,可以自行google。而在消息队列的选型上,目前大多都偏向于Kafka。
type
Post
在如今设计分布式系统的时候,我们通常会采用消息队列来作为请求的中间件,削峰填谷、异步处理和服务解耦是其核心作用,而消息队列又有点对点模型和发布与订阅模型,这里就不对这些基础概念不做过多解释,可以自行google。而在消息队列的选型上,目前大多都偏向于Kafka。
严谨来说,我们不能把kafka认为是一个消息队列系统,它和rabbitMQ、activeMQ这些传统的消息队列是不同的。我们可以看下Wikipedia是如何介绍kafka的:Apache Kafka 是一个分布式事件存储和流处理平台。它是由 Apache 软件基金会开发的一个开源系统,用 Java 和 Scala 编写。该项目旨在提供一个统一、高吞吐量、低延迟的平台来处理实时数据源。
虽然如此,多数情况我们还是把kafka作为消息队里来使用,因其具有的高度可扩展性、高性能、高可靠等特点,让其在消息队列里长期属于第一梯队。因此本文还是想将其以消息队列的方式来进行介绍,从一个整体的方式来认识Kafka。
Kafka基础概念
kafka是通过“主题+分区+消息”的方式,提供了消息的发布与订阅。
生产者(Producer)向指定主题(Topic)发送消息,消费者(Consumer)负责订阅这个主题并对收到的消息进行消费,生产者和消费者统称为客户端(Client)。而kafka的服务端则负责主题的管理、消息的处理和分发等操作,这个服务端就叫做Broker。
在kafka中,会将一个主题划分成多个分区(Partition),以此来实现并行消费,当然也可以只有一个分区。生产者发送到Topic的消息只会被分配到其中一个分区,而且每一个分区的消息都是有序的。每一个分区中的消息会被分配一个偏移量(offset),表示它在分区中的位置。
为了提高消费者的吞吐能力,kafka引入了消费者组(Consumer group)的概念,即一组消费者组合成一个组来消费一组主题,消费者组与分区一起实现了消息的高效的并行处理能力。同一个消费者组中的消费者只能消费主题下的其中一个分区的消息,不同消费者组的消费者,则可以消费某一主题的同一个分区。由此可知道,同一个消费者组下的消费者数量不能超过分区的数量,多余的消费者是无法消费到消息的。而当消费者组下的消费者数量小于分区的数量时,则一个消费者会收到多个分区的消息。
- 当我们想实现发布/订阅的消息模型时,可以让消费者加入不同的消费者组中,这样多个生产者发送到主题的消息,这些消费者都能消费到。
- 当我们想实现点对点消息模型时,则可以让消费者加入到同一个消费者组中,这样生产者发送到这个主题的消息,只会被多个消费者中的其中一个消费到。
为了避免消息丢失,则需要对消息进行持久化,即写入磁盘存储。由于消息是分布在各个主题的分区下的,因此持久化实际上是针对分区进行的。kafka会对每个分区创建若干个副本(Replica),其中一个副本负责用来对外与客户端交互,它被称作领导者副本(Leader Replica)。其余的副本则用来提供数据冗余,不对外提供服务,它被称作追随者副本(Follower Replica)。
为了满足高可用性,kafka可以创建多个Broker实例,将这些实例分布在不同的服务器中,这样即使其中一台Broker进程挂了,其他服务器上的Broker依然能够对外提供服务。而为了提供可扩展性,我们自然也不能将所有副本都放在一个服务器节点(Broker)上,而是应该将其分散在各个服务器节点中。
这里我用一张图将这些概念串起来。

为什么Kafka具有高性能
Kafka具有的高性能一方面和前面提到结构有关。
- 分布式的Broker架构,让每个服务器节点都能够独立的处理数据,也让系统能够水平扩展,以便于提高消息的处理性能。
- 分区机制,让kafka能够在每个分区独立存储和处理消息,结合消费者组从而实现并行处理消息。
- 持久化和副本机制,Kafka 将消息持久化存储在磁盘上,而不是内存中,这使得它能够处理大量数据而不会消耗过多内存。
针对持久化方面,kafka采用的是和多数数据库同样的(如mysql binlog)顺序读写,相比随机写入磁盘,顺序读写节省了大部分的寻址时间,只需要寻址一次就可以连续地写下去。对于kafka来说,同样也是将消息写入到日志文件,当收到生产者的消息后,顺序的写入到日志文件中,消费者也是从某个位置(offset)开始顺序的读取消息进行消费。
为了提高磁盘读写的效率,kafka还利用了操作系统的pageCache和零拷贝技术,它允许数据在生产者、Broker 和消费者之间传输时避免多次内存拷贝,这显著提高了数据传输效率。关于这两个技术,我之前也写过一篇相关的博客《利用零拷贝技术优化IO操作》。
除了在磁盘方面读写的优化,kafka在处理消息的时候,还会采用批处理的方式来提高系统的吞吐性能。当我们在生产者发送消息时,即使采用的是同步发送,消息也不会立马发送出去,而是先在内存中缓存起来,再选择合适的时机组合成一批发送给Broker服务端。当服务端接收到这批消息后,也并不会将其解开,而是直接进行存储和发送给消费者。至于在消费者侧,拉取到的自然也是一批消息,这个时候便会把消息解开,逐步处理。通过消息批处理的方式减少了网络通信的开销,进一步提高了性能。
而消息的压缩也是提高带宽利用率进而提高IO性能的有效手段,通常会在生产者侧指定压缩算法,而且必须是Broker和消费者都支持的压缩算法。Broker接收到压缩后的消息后,多数情况下会直接转发给消费者,只有在压缩算法不支持的时候会解压缩并采用支持的算法再次压缩,这会有很大的性能损耗尤其需要注意。
Kafka中如何避免消息丢失
kafka具有消息持久化和副本备份的能力,但也不代表消息绝对不会丢失。然而在了解如何避免消息丢失之前,我们需要知道消息可能会在哪些阶段发生丢失,针对这些阶段kafka是否存在对应的解决方案。
一条消息从发送到消费会经过如下三个阶段:

- 生产者创建消息,发送到Broker,这是第一阶段。
- Broker负责对消息持久化并发送到副本,这是第二阶段。
- 消费者从Broker拉取消息进行处理,这是第三阶段。
由此可见,这三个阶段都有可能出现消息丢失的可能,我们来看下kafka是如何解决的。
在第一阶段中,如果生产者采用的同步发送向Broker发送消息,此时则会等待Broker一个消息确认(ACK)的回复,收到这个回复则说明Broker已经将消息持久化,不会发生丢失。如果超过一定时间没有收到,则生产者可以进行消息重发,有效的杜绝消息在生产阶段丢失。
在第二阶段中,之前提到消息的处理会由主题的一个领导者副本来处理和持久化,同时也会将消息发送给各个追随者副本进行备份。这些副本通常都位于不同的broker当中,当这些broker完成消息的持久化后,kafka就会将消息的状态修改为“已提交”,进而回复给生产者ack。这里有个关键就是需要多少个副本确认持久化成功才算“已提交”,需要我们根据实际情况来配置。
在第三阶段中,消费者拉取到消息进行消费后,则需要在业务处理成功后给Broker回复消费确认响应(ACK),这样Broker就能知道消费者是否消费成功,没有成功的话下次消费者重新拉取消息的时候,则会拉取到同一条消息,同样也能避免消息的丢失。
如何处理重复的消息
在将消息丢失的时候提到生产者如果长时间没有收到Broker的ack回复,则可以触发消息的重发。但如果消息是已经提交的,但是由于网络波动等原因导致生产者没有及时收到回复触发的消息重发,那么就有可能会导致消费者收到重复的消息。为什么说有可能,因为这取决于我们采用哪种程度的消息服务质量。
消息服务质量按照低到高分为如下三种:
- 最多一次(At most once)。消息在传递时,最多会被送达一次。这种情况下,消息可能会发生丢失,但绝不会被重复发送。如果我们对消息的可靠性要求不高(例如一些机器的监控数据,cpu、内存等)则可以采用这种机制。
- 最少一次(At least once)。消息在传递时,最少会被送达一次。这种情况下,消息也不会发生丢失,但是可能会重复送达。
- 精确一次(Exactly once)。消息在传递时,只会被送达一次。这种情况下,消息既不会发生丢失,也不会重复送达。
对于kafka来说,它是支持精确一次这个级别的消息服务质量的,也就是说当生产者重复发送消息到Broker后,Broker会负责对这条消息进行去重,这样消费者就不会收到重复的消息了。要开启这种级别的消息质量只需要在生产者侧发送消息的时候配置一个幂等参数即可。
但是kafka对“精确一次”的支持是有前提的,就是这些消息需要被发到主题的同一个分区下,也就是说kafka只支持了单分区上的消息幂等性。如果生产者重发的消息被落在了另一个分区上,那么即使上一个分区的消息已经被持久化了,也可能导致这个分区上的消息被重复发送到消费者。
为了解决这个问题,kafka提供了事务的能力。通过在生产者侧开启消息事务,让多个消息能够原子的写入到多个分区中,这些消息要么全部写入成功要么全部写入失败。值得注意的是,当生产者开启消息事务时,消费者也需要配置消息的隔离级别为读提交,否则即使是被终止的事务消息,也会被消费者获取到。
因此严格来讲,kafka是不支持精确一次的,而是支持最少一次,或者说kafka的精确一次并不是消息队列里常说的精确一次。原因是文章开始提到的,kafka是一个分布式流处理系统,在kafka中的精确一次解决的是采用kafka作为数据源时,计算集群(如flink)从kafka拉取消息计算后再存储到kafka的其他主题下。这种情况为了保证计算的准确,则需要保证消息只能被计算一次,而不能重复计算。
这里我们采用go语言来给出事务性消息的示例,使用的kafka sdk是confluent-kafka-go。
如果我们不采用事务的方式来避免消息重复,毕竟开启事务必然对性能存在影响,那么则需要从业务代码侧来解决。我们需要先在生产者处为每条消息增加一个唯一标识(例如UUID)。消费者在处理消息时,将消息的唯一标识符存储在一个持久化存储中(如数据库或缓存)。在处理新消息前,先检查该唯一标识符是否已经处理过,如果已经处理过,则跳过该消息。
消费者组重平衡带来的性能影响
前文提到,多个消费者可以组成一个消费者组(Consumer Group)来消费主题下的多个分区,提高消息的消费效率。为了将主题下的分区分配给消费者组下的消费者,kafka引入了重平衡机制(Rebalance)。例如某个主题下有10个分区,消费者组下有5个消费者,此时kafka则会采用重平衡为每个消费者分配2个分区。
重平衡触发的条件有如下3个:
- 消费者组的成员发生了变化。
- 消费者组订阅的主题数发生变化。
- 主题的分区数发生变化。
然而一旦发生重平衡,会对消费者组的消费产生很大的影响。在这个过程中,所有消费者都会停止消费,直到完成重平衡。另一方面,由于涉及到所有的消费者负责的分区调整,整个过程是非常缓慢的。
既然重平衡会给消费带来影响,那有没什么办法来避免。从上面3个条件来看,第二和第三都是属于运维操作,通常无法避免。多数情况下,第一个条件才是触发重平衡的主要原因。当消费者加入消费者组后,会定期的向kafka集群中协调者发送心跳(Coordinator),协调者也会检测消费者是否还存在。其中有三个参数可以帮助调整重平衡的频率。
- session.timeout.ms 协调者多久没有收到消费者的心跳则认为消费者已经离线
- heartbeat.interval.ms 消费者多久发送一次心跳给协调者
- max.poll.interval.ms 用于说明消费者两次拉取消息的最大时间间隔,如果超过这个间隔,说明消费者消费消息的时间过长,此时消费者会主动离开消费者组。
通常来说,可以配置session.timeout.ms ≥ 3 * heartbeat.interval.ms,避免是因为心跳未及时送达导致的重平衡。而在max.poll.interval.ms的配置上,则需要我们对消费者的处理能力做合理的评估,设置一个合理的消费时间,例如一次消息的处理是2分钟,我们可以配置为4分钟。
再谈副本机制与ISR
前面提到kafka的副本分为领导者副本和追随者副本,只有领导者副本对外提供服务,而追随者副本只用于消息的备份,这与我们常见的主从结构中,从节点可以提供读操作不同。为什么kafka会这样做呢?
第一方面还是为了实现更高效和实时的消息读取。当消息写入到领导者副本后,消费者可以立即从领导者副本中读取到消息。如果从追随者副本拉取消息的话,由于追随者是异步从领导者副本同步消息的,可能会出现拉取不到或者延迟获取到消息的情况。
第二方面则是为了实现单调读。简单来说就是对于一个消费者来说,在多次消费消息的时候,不会出现消息一下子出现一下子不出现的情况。例如消费者第一次拉取消息时从领导者副本拉取,此时由于一些原因没有处理成功,便再次发起消息拉取。而第二次拉取消息时从追随者副本拉取,如果追随者此时还没同步到最新的消息,就会出现第二次没有拉取到的情况。
在kafka中还有个非常重要的概念叫ISR,即In-sync Replicas,它表示当前kafka集群中与领导者副本同步的副本集合。需要注意的是,领导者副本也是ISR中的,所以如果有1个领导者副本2个追随者副本,此时这两个追随者副本都与领导者副本同步的话,那么此时ISR就是这三个副本。
kafka对同步的定义是通过broker参数 replica.lag.time.max.ms 来定义的,也就是说当追随者副本落后领导者副本的时间超过这个参数值时,则会从ISR集合中移除。如果没有超过这个参数值,则会被加入到ISR集合中,即使这个追随者副本的消息并没有和领导者副本一致。
这里会出现一个极端情况,就是ISR为空,这说明了连领导者副本也不存在了。此时我们可以让kafka的协调者从“未同步”的追随者副本中挑选一个作为领导者副本,只需要配置Broker参数 unclean.leader.election.enable 即可,这个选举也被称为unclean选举。但是这就会出现消息丢失的风险,毕竟这些副本都是属于“未同步”的状态,与原始的领导者副本的消息存在一定的差距。如果不开启这个参数,禁止unclean选举,那么此时的kafka集群就会出现不可用的情况,但是保证了消息的一致性,不会存在消息丢失的风险。