Kafka(四) 简略设计原理

Posted by ZhouJ000 on March 4, 2019

Kafka(一) 入门
Kafka(二) Producer与Consumer开发
Kafka(三) 简略集群管理
Kafka(四) 简略设计原理

设计原理

broker端

broker是Kafka的重要组件,本质上它是一个功能载体(或服务载体),承担了绝大多数的Kafka服务。实际上大多数的消息队列框架都有broker或与之类似的角色。一个broker通常是以服务器的形式出现的。对用户而言,broker主要的功能就是持久化消息以及将消息队列中的消息从发送端传输到消费端

消息设计

消息格式:所谓消息引擎,定义消息格式必然是首当其冲的工作,使用什么数据格式来保存消息和消息队列是第一个要解决的问题。之前的文章说过,使用Java类来定义消息,必然受到Java对象开销,在Java内存模型(JMM)中保存对象开销其实相当大,可能会花费比消息大小大2倍的空间来保存数据。为了降低这种开销,JMM通常会对用户自定义的类进行字段重排(JMM要求Java对象必须按照8字节对齐,未对齐的部分会填充空白字节进行补齐,若随意定义字段顺序,那么由于每个字段类型占用字节数各异,会造成不必要的补齐),以试图减少内存占用。另外随着Java堆上数据越来越多,GC性能下降很快,从整体上会拖累应用程序的吞吐量

因此Kafka的实现方式本质上是使用Java NIO的ByteBuffer来保存消息的,同时依赖文件系统提供的页缓存机制,而非依靠Java的堆缓存。另外ByteBuffer是紧凑的二进制字节结构,而不需要padding重排操作,从而省去很多不必要的对象开销

扩展:
一文看懂Kafka消息格式的演变
Kafka的消息格式

集群管理

Kafka依赖ZooKeeper实现自动化的服务发现与成员管理。当每个broker启动时,会将自己注册到Zookeeper下的一个节点。每个broker再ZooKeeper下注册节点的路径是/brokers/ids/,broker向ZooKeeper中注册的信息以JSON格式保存。ZooKeeper临时节点的生命周期与客户端绑定,如果客户端会话失效就会清除临时节点,同时broker在临时节点上还会创建监听器来监听该临时节点的状态

brokers:保存了Kafka集群的所有信息,包括ids、topices、seqid
config:保存集群下各种资源的定制化配置信息,比如每个topic有自己的专属配置,配置在/config/topics/下 isr_change_notification:保存ISR列表发生变化的分区列表 admin:保存管理脚本的输出结果,比如delete_topics controller:保存了controller组件(负责集群的领导者选举)的注册信息 controller_epoch:保存了controller组件的版本号,用来隔离无效的controller请求 cluster:保存集群的简要信息,包括id等信息 其他还有consumers、log_dir_event_not、ification、latest_producer_id_block

kafka-zookeeper

副本与ISR设计

一个Kafka分区的本质就是一个备份日志,即利用多份相同的备份共同提供冗余机制来保持系统高可用性。这些备份在Kafka中被称为副本(replica)。Kafka把分区的所有副本均匀地分配到所有broker上,并从这些副本中挑选一个作为leader副本对外提供服务,其他副本称为follower副本,只能被动向leader副本请求数据,从而保持与leader同步

当leader副本所在broker宕机,follower副本会竞相争夺成为新leader的权力,显然不是所有follower都有资格去竞选的,那些落后的follower是没有资格的。Kafka引入了ISR的概念,一组同步副本集合,每个分区都有自己的ISR列表,ISR中所有副本都与leader保持同步状态,而且leader也是包含在ISR中的,只有ISR中的副本才有资格被选举为leader,而producer写入一条消息只有被ISR所有副本都接收,才被视为已提交状态。因此,若ISR中有N个副本,那么该分区最多可以忍受N-1个副本崩溃而不丢失已提交的消息

起始水位(base offset):表示该副本所含第一条消息的offset
高水位值(high watermark, WH):副本高水位值,保存该副本最新一条已提交消息的位移。leader分区的HW值决定了副本中已提交消息的范围,也确定了consumer能够获取的消息上限,超过HW值的所有消息都被视为未提交成功的,因此consumer是看不见的。除了leader副本,每个follower副本都有HW值,只不过只有leader副本的HW值才能决定clients能看到的消息
日志末端位移(log end offset, LEO):副本日志中下一条待写入消息的offset。所有副本都需要维护自己的LEO信息。leader副本收到producer推送的消息,更新自己的LEO(通常+1),同样follower副本向leader副本请求到数据后也增加自己的LEO。事实上只有ISR中所有副本都更新了对应的LEO后,leader副本才会向右移动HW值表示消息写入成功

对于ack=-1的producer而言,只有follower接收到消息更新LEO后,leader副本接收其他follower副本的数据请求响应(response)之后,更新HW值让这条消息可以被consumer消费。这时producer才能正常返回,也标志着这条消息发送成功

follower跟不上leader的原因:
1、请求速度跟不上,比如网络I/O开销过大等
2、进程卡住,比如频繁GC或程序BUG等
3、新创建的副本,新副本在追赶上leader前,都是不同步的

在ISR中,跟不上leader副本LEO的follower将会被踢出ISR,跟上后又会被加回ISR。Kafka使用replica.lag.time.max.ms来检测由于慢以及进程卡壳导致的滞后(lagging),即follower副本落后leader副本的时间间隔,默认是10秒。对于producer顺时峰值流量,只要follower不是持续性落后,就不会反复地在ISR中移进移出,只要不是持续性超过这个参数,后面就会加入进来

水印(watermark)和leader epoch

水印也称为高水印或高水位,通常被用在流式处理领域(比如Storm、Flink、Spark等),以表征元素或事件在基于时间层面上的进度。一个比较经典的表述为:流式系统保证在水印t时刻,创建时间(event time) = t’且 t’ <= t的所有事件已经到达或被检测到。而在Kafka中,水印与时间无关,而与位置信息有关,即表示位移(offset)

一个Kafka分区存在多个副本(replica)用于实现数据冗余,可以按角色分为:
1、leader副本:响应clients端的读/写请求的副本
2、follower副本:被动地备份leader副本上的数据,不能响应clients端的读/写请求
3、ISR副本集合:敖汉leader副本和与其保持同步的follower副本
而每个Kafka副本对象都有两个重要的属性,即LEO和HW,如果把它们看成两个指针,那么它们的定位机制是不同的:任意时刻,HW指向的是实实在在的消息,而LEO总是指向下一条待写入的消息,也就是LEO指向的位置上是没有消息的

LEO更新机制:follower不停向leader副本所在broker发送FETCH请求,一旦获取消息,变写入自己的日志中进行备份。Kafka设计了两套follower副本LEO属性,一套LEO保存在follower所在的broker上,另一套LEO保存在leader副本所在broker上,即leader副本所在broker不仅保存自己的LEO,还保存了该分区下所有follower副本的LEO值

HW更新机制:follower将在更新LEO之后,尝试更新HW值,即比较当前LEO值与FETCH响应中leader的HW值取最小值。而leader更新HW值会在4种情况下尝试更新:1.新副本加入时,2某个follower被踢出时,3.producer向leader写入消息时,4.leader处理follower的FETCH请求时。由于leader保存了一套follower副本的LEO以及自己的LEO,因此当尝试更新分区HW时,会选出满足条件的副本(处于ISR中,且LEO落后时间不大于设定时间)比较它们的LEO,选择最小的LEO作为HW值

Kafka使用HW值来决定副本备份的进度,而HW值的更新通常需要另一轮FETCH请求才能完成,因此这个机制存在缺陷,它们可能引起:
1、备份数据丢失:在发起第二轮FETCH请求给leader前,如果follower崩溃,那么恢复后会自动将LEO调整为之前的HW,做日志截断,而此时向leader发送FETCH请求如果leader正好崩溃了,那么follower就成为新leader,当原leader恢复后也会进行日志截断,这样消息就在两个副本的log中都被删除,即永远消失
2、备份数据不一致:崩溃恢复时,follower成为新leader接收到了新的消息,导致HW和原leader一致,而原leader恢复后由于HW一致所以不会日志截断,但是底层log中两边保存的内容并不一致

由于这两个问题的存在,Kafka引入了leader epoch值来彻底解决基于水印备份机制的问题。上面两个问题的根本原因是HW值被用来衡量副本本非成功与否,以及在出现崩溃时作为日志截断的依据,但是HW值是异步延迟的,特别是需要额外的下一次FETCH请求处理流程才会更新,所以这期间发生任何崩溃都可能导致HW值过期。因此在leader端多开辟一段内存区域专门用来保存leader的epoch信息,出现这两个问题时可以规避。所谓领导者epoch,实际上是一对值(epoch,offset),epoch表示leader的版本号,从0开始,当leader变更过1次epoch就加1,而offset则对应该epoch版本的leader写入第一条消息的位移。每个leader broker中会保存这样一个缓存,并定期写入一个检查点文件中。当leader写底层log时,它会尝试更新整个缓存 —— 如果这个leader首次写消息,则会在缓存中增加一个条目,否则就不更新。每次副本重新成为leader后会查询这部分缓存,获取对应leader版本的位移,这样就不会发生数据不一致和丢失的问题了

扩展:
Kafka水位(high watermark)与leader epoch的讨论

日志存储设计

按时间顺序在日志尾部追加写入记录(record),Kafka不会写入原生消息,而是会将消息和一些必要的元数据打包一起封装成一个record写入日志,且每条记录都会分配一个唯一的顺序递增的记录号作为这条记录的位移标识,然后Kafka使用自定义的消息格式并且在写入日志前序列化为紧凑的二进制字节数组来保存日志

Kafka的日志设计都是以分区为单位的,每个分区都有自己的日志,该日志被称为分区日志(partition log),具体对于每个日志而言,Kafka又将其进一步细分为日志段文件(log segment file)以及日志索引文件。因此可以这么说,每个分区日志都是由若干组日志段文件+索引文件构成的

创建topic时,Kafka为该topic的每个分区在文件系统中创建一个对应的子目录,其中每个.log文件都包含了一段位移范围的Kafka记录,当日志段文件写满后,会自动创建一组新的日志段文件和索引文件,这就是日志切分。.index和.timeindex都是索引文件,分别称为位移索引文件和时间戳索引文件,前者帮助broker更快地定位记录所在的物理文件位置,后者根据给定的时间戳查找对应的位移信息

Kafka是会定期清除日志的,清除的单位是日志段文件,即删除符合清除策略的日志段文件和对应的两个索引文件

扩展:
Kafka日志清理之Log Compaction
Kafka日志清理之Log Deletion
Kafka技术内幕-日志压缩

通信协议

Kafka的通信协议(wire protocol)是基于TCP之上的二进制协议,该协议提供的API表现为服务于不同功能的多种请求(request)类型以及对应的响应(response)

扩展:
kafka协议

controller设计

在一个Kafka集群中,某个broker会被选举出来承担特殊的角色,即控制器。引入controller就是用来管理和协调Kafka集群的,管理集群中所有分区的状态并执行相应的管理操作。每个Kafka集群在任何时刻都只能有一个controller,当集群启动时,所有broker都会参与竞选controller,一旦controller在某个时间点崩溃,其他broker立即会收到通知,开启新一轮的竞选

controller维护的状态分为两类:每台broker上的分区副本和每个分区的leader副本信息。从维度上看,这些状态又可分为副本状态和分区状态。controller为了维护这两个状态专门引入了两个状态机:副本状态机(Replica State Machine)和分区状态机(Partition State Machine),来管理副本状态和分区状态

对集群状态的维护只是controller保持运行状态一致性的一个基本要素,但却不是controller的职责所在。如果保持controller持续稳定对外服务,就必须要求controller妥善地保存这些状态:更新集群元数据信息、创建topic、删除topic、分区重分配、preferred leader副本选举、topic分区扩展、broker加入集群、broker崩溃、关闭控制和controller选举

controller启动时会为集群中所有broker创建一个专属的Socket连接,也包括controller所在的broker

扩展:
Kafka controller重设计

broker请求处理

Reactor模式:
Reactor设计模式是一种事件处理模式,旨在处理多个输入源同时发送过来的请求。Reactor模式中的服务处理器(service handler)或分发器(dispatcher)将入站请求(inbound request)按照多路复用的方式分发到对应的请求处理器(request handler)中。外部的输入源将生产事件发送到dispatcher,将事件放入dispatcher中的队列上,而Reactor通常会创建多个request handler线程专门消费dispatcher分发过来的事件。broker上每当有新的Socket连接通道被创建,dispatcher都会将该连接分配给下面某个request handler来消费。除此之外还有两个重要的组件acceptor线程和processor线程,前者实时监听外部数据源发送过来的事件并分发任务,后者执行事件处理逻辑并将处理结果发送给client

client -> 【acceptor线程】 -> 【processor线程数组】 -> 请求队列 -> 请求处理线程池 -> 响应队列 -> 【processor线程数组】

producer端

基本数据结构

ProducerRecord:封装了一条待发送的消息(记录),允许用户在创建消息对象的时候直接指定要发送的分区,页可以指定消息的时间戳

topic          所属topic
partition      所属分区
key            键值
value          消息体
timestamp      时间戳

RecordMetadata:Kafka服务器端返回给客户端的消息的元数据信息。前3项相对比较重要,Producer端可以使用这些消息做一些消息发送成功之后的处理,比如写入日志等

offset                   该条消息的位移
timestamp                消息时间戳
topic + partition        所属topic的分区
checksum                 消息CRC32码
serializedKeySize        序列化后的消息键字节数
serializedValueSize      序列化后的消息体字节数

工作流程

kafka-producer-flow 1、序列化+计算目标分区
2、追加写入消息缓冲区(accumulator)
3、Sender线程预处理及消息发送
4、Sender线程处理response

引用:
Kafka producer介绍

consumer端

Kafka为每个consumer group定义了5个状态:
1、Dead:组内已经没有任何成员的最终状态,组的元数据也已经被coordinator移除了。这种状态响应各种请求都是一个response:UNKNOWN_MEMBER_ID
2、Empty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求
3、PreparingRebalance:组准备开启新的rebalance,等待成员加入
4、AwaitingSync:正在等待leader consumer将分配方案传给各个成员
5、Stable:rebalance完成可以开始消费
kafka-consumer-states

扩展:
Kafka消费组(consumer group)

精确语义

前面的博客讲到clients端有3种常见的消息交付语义:最多一次、至少一次、精确一次。这3种类型并没有还坏之分,具体使用哪种语义要结合实际业务需求而定,不必要过分追求精确一次语义。最典型的例子就是统计页面的PV和UV,这种场景下结果的不完全准确不影响利用它们做决策,所以没必要引入复杂的数据结构或功能来实现EOS(exactly-once semantics)

对于producer端而言,Kafka引入已提交信息(committed message)的概念,一旦消息被成功地提交到日志文件,只要至少存在一个可用的包含该消息的副本,那么这条消息就永远不会丢失,因此Kafka producer端提供的不是at most once语义。在老的版本默认提供的是at least once语义,当分区副本成功写入本地磁盘,返回响应时由于网络故障导致没有发送成功,producer会认为该消息失败而重试,此时同一条消息将会被写入两次。而Kafka在0.11.0.0版本推出了幂等性producer和对事务的支持,从而完美解决了消息重发发送的问题

对consumer端而言,相同日志下的所有副本有相同的内容以及相同的当前位移,consumer通过consumer位移自行控制和标记日志读取的进度。如果consumer程序崩溃,那么代替它的新程序实例就会接管这个consumer位移,即从崩溃时读取位置继续开始消费。如果要判断consumer到底支持什么语义,位移提交的时机显得至关重要。一种方式是consumer先获取若干消息,然后提交位移,再处理消息,这种方法下若consumer在提交位移后处理消息前崩溃,那么它实现的就是at most once语义;另一种方式是在处理后再提交位移,显然它实现的是at least once语义,因为消息处理过程中如果出错会引发重试,那么某些消息将会被消费两次

幂等性producer(idempotent producer)

如果一个操作执行多次的结果与只运行一次的结果是相同的,那么就称该操作为幂等操作。幂等性producer就表示它的发送操作是幂等的。顺时的发送错误可能导致producer端出现重试,同一条消息被发送多次,但是在broker端这条消息只会被写入日志一次。对于单个topic分区而言,这种producer提供的幂等性消除了各种错误导致的重复消息。如果要启用幂等性producer以及获取其提供的EOS语义,需要显示的设置producer端的参数enable.idempotence=true

幂等性producer的设计思路类似于TCP的工作方式,发送到broker端的每批消息都会被赋予一个序列号(sequence number)用于消息去重。但是与TCP不同,这个需利好不会被丢弃,相反Kafka会将它保存在底层日志中,这样即使分区的leader副本挂掉,新选出来的leader broker也能执行消息去重工作。报错序列号只需要额外几个字节,因此整体上对消息保存的开销并不大。除了序列号,Kafka还会为每个producer实例分配一个producer id(PID),producer在初始化的时候必须分配一个PID,对用户是完全透明的。消息要被发送到的每个分区都有对应的序列号值,它们总是从0开始严格单调递增,对于PID、分区和序列号的关系,就相当于一个Map,Key就是(PID,分区号),value就是序列号,即每对PID+分区号都有对应的序列号,若发送消息的序列号小于或等于broker端保存的序列号,那么broker会拒绝这条消息的写入操作

这种设计确保了即使出现重试操作,每条消息也只会被保存在日志一次,不过由于每个新的producer实例都会分配不同的PID,当前设计只能保证单个producer实例的EOS语义,而无法实现多个producer实例一起提供EOS语义

事务(transaction)

引入事务使得clients端程序(producer或consumer)能够将一组消息放入一个原子性单元中统一处理。处于事务中的这组消息能够从多个分区中消费,也能发送到多个分区中,重要的是不论发送还是消费,Kafka都能保证它们是原子的,即所有的写入操作要么全部成功,要么全部失败。当然对于consumer来说EOS语义的支持要弱一点,这是consuemr本身的特性决定的,也就是consumer有可能以原子性的方式消费这批消息,也可能是非原子性的。比如consumer总是需要replay某些消息,这种场景对EOS的支持就弱很多

Kafka为实现事务要求应用程序必须提供一个唯一的id来表征事务,这个id被称为事务id或TransactionId,它必须在应用程序的所有会话上是唯一的,它与PID不同,前者是用户提供的,后者是producer自行分配的。当提供了事务id后,Kafka就能确保:
1、跨应用程序会话间的幂等发送语义,使用具有版本含义的generation来隔离旧事务的操作
2、支持跨会话的事务恢复,如果某个producer实例挂了,Kafka能保证下一个实例首先完成之前未完成的事务,从而总保证状态的一致性
对于consumer端来说,事务支持就弱一点:
1、对于compacted的topic来说,事务中的消息可能已经被删除了
2、事务可能跨多个日志段,因此若老的日志段被删除,用户将丢失事务中的部分消息
3、consumer程序可能使用seek方法定位事务中的任意位置,也可能造成部分消息的丢失
4、consumer可能选择不消费事务中的所有消息,即无法保证读取事务的全部消息

在实际使用上,可以通过producer.initTransactions()初始化事务状态,然后通过producer.beginTransaction()等方法来使用事务