Kafka(一): 入门

Posted by ZhouJ000 on February 22, 2019

Kafka(一) 入门
Kafka(二) Producer与Consumer开发
Kafka(三) 简略集群管理
Kafka(四) 简略设计原理

对比

之前在工作的项目中用过ActiveMQ、RabbitMQ、RocketMQ,现在学习一下Kafka,先来对比看下:

  • ActiveMQ:
    • 完全支持JMS1.1,协议支持包括:OpenWire、REST、STOMP、WS-Notification、MQTT、XMPP以及AMQP
    • 对队列数较多的情况支持不好,不适用于上千个队列的应用场景
  • RabbitMQ:
    • 高级消息队列协议(AMQP)领先的一个实现,它实现了代理(Broker)架构
    • 以Broker为中心,Broker由Exchange、Binding、Queue组成,支持持久化消息,支持事务,有消息的确认机制
    • 结合erlang语言本身的并发优势,性能较好,但是不利于做二次开发和维护
  • RocketMQ:
    • 阿里开源的分布式消息系统,并没有实现JMS规范
    • 部署由一个命名服务(nameserver)和一个代理(broker)组成。能保证消息的顺序,提供丰富的消息拉取模式,支持持久化消息
  • Kafka:
    • 不完全符合JMS规范,吞吐量大,轻松支持每秒数百万的消息,内部采用消息的批量处理,零拷贝机制,数据的存储和获取是本地磁盘顺序批量操作,具有O(1)的复杂度,消息处理的效率很高
特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比RocketMQ、Kafka低一个数量级 同ActiveMQ 10万级,支撑高吞吐 10万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic数量对吞吐量的影响 - - topic可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是RocketMQ的一大优势,在同等机器下可以支撑大量的topic topic从几十到几百个时候,吞吐量会大幅度下降,在同等机器下Kafka尽量保证topic数量不要过多,如果要支撑大规模的topic,需要增加更多的机器资源
时效性 ms级 微秒级,这是RabbitMQ的一大特点,延迟最低 ms级 延迟在ms级以内
可用性 高,基于主从架构实现高可用 同ActiveMQ 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到0丢失 同RocketMQ
功能支持 MQ领域的功能极其完备 基于erlang开发,并发能力很强,性能极好,延时很低 MQ功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用

mq-compare 总的来说按目前个人理解,现在ActiveMQ基本不太有人用了。剩下的RabbitMQ和RocketMQ,因为RabbitMQ是主从,单台并发量大,适用于中型公司中型项目。RocketMQ支持分布式,java开源方便扩展,适用于中大型公司中大型项目。Kafka现在还不太熟悉,目前看还是较多用于数据采集和大数据领域

其他:Apollo在14年底用过,说是ActiveMQ的下一代,但是国内没什么人用;ZeroMQ是一个socket库的重新封装,需要开发大量的代码;Redis提供了消息订阅的服务可以当作MQ来使用,但一般不这么做,而且不方便扩展

入门

现在最新的版本为2.1.1,可以从官网上下载

启动:
1、运行Zookeeper
2、server.properties文件,修改log.dirs路径,zookeeper.connect连接地址
3、带上config\server.properties配置,使用kafka-server-start.sh或windows\kafka-server-start.bat运行Kafka

测试:
1、创建topic:kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
2、创建成功后可以使用kafka-topics.sh --list --zookeeper localhost:2181命令来查看所有,或kafka-topics.sh --describe --zookeeper localhost:2181 --topic test查看当前topic状态
3、开启生产者:kafka-console-producer.sh --broker-list localhost:9092 --topic test进入控制台,可以输入文本发送
4、开启消费者:kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning进入控制台,可以接收到上面发送的消息

介绍

对于这类的MQ,一般称为消息队列、消息中间件等。其实更适合称为消息引擎,毕竟”消息队列”给出一个不准确的暗示,仿佛它是以队列的方式实现的,而”消息中间件”又似乎夸张强调中间件。由于这类系统英文名是Messaging system,也可简单译为消息系统,但是只片面强调了消息主体的作用,而忽视了这类系统天然就具备的且很重要的传递属性(就像引擎一样,具备某种能量转换、传输的能力),因此更适合称为消息引擎消息传输系统

消息引擎带来了:
1、生产者消费者解耦
2、可集成进任何系统
3、异步消息传输
设计一个引擎系统时需要考虑的两个重要因素:
1、消息设计(XML、JSON、二进制等结构化数据)
2、传输协议设计(AMQP等)

消息引擎范型:最常见的两种消息引擎范型是消息队列模型发布/订阅模型。消息队列(message queue)模型是基于队列提供消息传输服务的,多用于进程间通信以及线程间通信。模型定义了消息队列(queue)、发送者(sender)和接收者(receiver),提供了一种点对点(P2P)的消息传递方式,发送者和消费者是一对一的关系。另一种发布/订阅(pub/sub)模型,与前一种模型不同,它有主题(topic)的概念:一个topic可以理解为逻辑语义相近的消息的容器。这种模型也定义了类似生产者/消费者的角色,即发布者(publisher)和订阅者(subscriber)。发布者将消息生产出来放到指定的topic中,所有订阅了该topic的订阅者都可以时接收到该topic下的所有消息。Kafka同时支持这两种消息引擎模型(引入消息组)

JMS:java消息服务,是一套API规范,提供了很多接口用于实现分布式系统间的消息传递。JMS同时支持上述两种模型,像ActiveMQ、RabbitMQ(通过RabbitMQ JMS Client)、IBM WebSphere MQ和Kafka(并没有完全遵照JMS规范)等

概要设计

吞吐量/延时

吞吐量对于任何消息引擎而言都是至关重要的指标,因为其代表着某种处理能力的最大值。对于Kafka而言,它的吞吐量就是每秒能处理的消息数或每秒能处理的字节数,显然希望越大越好。但是消息引擎还有一个延时的性能指标,衡量一段时间间隔,可能是发出某个操作与接收到操作响应之间的时间,或者是在系统中导致某些物理变更的起始时间与变更正式生效之间的间隔。对于Kafka而言,延时表示客户端发起请求和服务器处理请求并发送响应给客户端之间的这一段时间,显然希望延时越短越好。然而在实际场景中,这两个指标是一对矛盾体,即调优一个指标通常会牺牲另一个指标,不过不是等比例此消彼长的关系

比如Kafka处理一条消息需要2毫秒,那么吞吐量不会超过500条/秒(1000/2)。但是如果采用批处理(batching)的思想,不是一条一条发,而是一小批一小批(micro-batch)地发送,假设发送前需要先等待一段时间(比如8毫秒),那么发送延时变为10毫秒,即延时增加4倍,然后在这8毫秒中累计了1000条消息,那么系统的整体吞吐量变为了100000条/秒(1000*1000/10),吞吐量提高了200倍。因此批处理也是目前诸如Storm Trident和Spark Streaming等消息处理平台采用的处理语义思路

Kafka依靠了以下四点来提高吞吐量、低延时的设计目的:
1、大量使用操作系统页缓存(在内存分配的),内存操作速度快且命中高(读取消息时大部分消息可能依旧保存在页缓存上,不用穿透到硬盘获取)
2、Kafka不直接参与物理I/O操作,而是交由最擅长此事的操作系统来完成
3、采用追加(append)写入方式(磁盘顺序读/写是非常快的),摒弃了缓慢的随机读/写操作
4、使用以sendfile为代表的零拷贝技术加强网络间的数据传输效率

消息持久化

Kafka会持久化消息到硬盘上,这样做的好处如下:
1、解耦消息发送与消息消费:本质上Kafka最核心功能就是提供生产者-消费者模式的完整解决方案。通过消息持久化可以使生产者不在需要直接和消费者耦合,只是简单把消息生产出来并放到Kafka服务器保存即可,因此提升整体的吞吐量
2、实现灵活的消息处理:一些接收Kafka消息的系统可能会需要对已处理过的消息在未来某个时间点再重新处理一次,即消息重演。消息持久化便可以很方便实现这样的需求

Kafka实现持久化的设计也与众不同,大多数会在实现持久化时先尽可能使用内存,当内存资源耗尽后,在一次性把数据”刷盘”。而Kafka反其道而行,所有数据都会立即被写入文件系统的持久化日志中,之后Kafka服务器才会返回结果给客户端通知它们消息已被成功写入。这样做即实时保存了数据,又减少Kafka程序对内存的消耗,从而节省出的内存留给页缓存使用,进一步提升整体性能

Kafka采用的是独立型的存储结构,每个队列一个文件。而其他比如RocketMQ用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据(CommitLog)文件来存储

负载均和与故障转移

Kafka实现负载均衡(load balancing)实际上是通过智能化的分区领导选举(partition leader election)来实现的,可以在集群的所有机器上以均等机会分散各个partition的leader,从而整体上实现了负载均衡

故障转移(fail-over)通常是依靠心跳或会话的机制来实现的,Kafka服务器使用的方式是会话机制。每台Kafka服务器启动后以会话的形式将自己注册到ZooKeeper服务器上,当出现问题后与ZooKeeper的会话将不能维持而超时失效,此时Kafka集群会推选出另一台服务器来完全代替这台服务器提供服务

伸缩性

有了消息持久化,Kafka实现了高可靠性;有了负载均衡和使用文件系统的独特设计,Kafka实现了高吞吐量;有了故障转移,Kafka实现了高可用性。那么作为分布式系统中的高伸缩性(scalability),Kafka是采用的是这样的思想 ———— 每台Kafka服务器上的状态统一交由ZooKeeper保管。因为阻碍线性扩展的一个很常见因素就是状态的保存,而ZooKeeper是专门的协调服务,整个集群服务器之间就无需繁重的状态共享,只需要启动新的Kafka服务器就行了(Kafka服务器上也还是保存了很轻量级的内部状态,维护一致性的代价很低)

Kafka的术语

Kafka的标准定义为分布式流式处理平台,虽然其推出时是以消息引擎的身份出现的,其拥有强大的消息传输效率和完备的分布式解决方案。随着Kafka不断演进,发现Kafka交由下游数据处理平台做的事情自己也能做,因此推出了Kafka Streams,即流式处理组件,自此Kafka成为了一个流式处理框架,而不仅仅是一个消息引擎。但是不论怎么变化,Kafka的处理流程从未发生变化,即:生产者发送消息给Kafka服务器,消费者从Kafka服务器读取消息,Kafka服务器依托ZooKeeper集群进行服务的协调管理

broker:Kafka服务器的官方名字

消息:Kafka的消息格式由很多字段组成,使用紧凑的二进制字节数组来保存这些字段,也就是没有任何多余的比特位浪费。在JVM中,对象保存的开销很大,特别对小对象需要花费2倍甚至更多的空间来保存数据,而且随着堆上数据量增大,GC性能下降很多。因此Kafka的消息设计特意避开繁重的Java堆上内存分配,直接使用紧凑二进制字节数组ByteBuffer而不是独立的对象,因此至少可以访问至少多一倍的可用内存,同时采用也缓存而非堆,在Kafka broker崩溃后,内存上数据会消失,而页缓存的数据还存在,下次重启后可以继续服务,不需要再单独热缓存了

topic(主题)与partition(分区):topic只是一个逻辑概念,代表了一类消息,也可以认为是消息被发送到的地方。通常可以使用topic来区分实际业务。Kafka中的topic通常都会被多个消费者订阅,因此出于性能的考量,Kafka并不是topic-message的两级结构,而是采用topic-partition-message的三级结构来分散负载,从本质上讲,每个Kafka topic都有若干个partition组成。而Kafka的partition是不可修改的有序消息序列,也可以说是有序的消息日志,每个partition有自己专属的partition号,通常从0开始。用户对partition唯一能做的操作就是在消息序列的尾部追加写入消息。partition上的每条消息都会被分配一个唯一的序列号 ———— 该序列号被称为位移(offset)。该位移是从0开始顺序递增的整数,位移信息可以位移定位到某partition下的一条消息。partition的引入没有实际的业务含义,只是纯粹的为了提升系统的吞吐量,因此在创建Kafka topic的是时候可以根据集群实际配置设置具体的partition数,实现整体性能最大化

offset:前面说过,topic partition下的每条消息都被分配一个位移值。实际上,Kafka消费者端也有位移的概念,但是这两个offset属于不同的概念。显然每条消息在某个partition的位移是固定的,但消费该partition的消费者的位移会随着消费进度不断前移,但终究不可能超过该分区最新一条消息的位移。综合讲,Kafka中的一条消息其实就是一个<topic, partition, offset>三元组(tuple),通过该元组可以在Kafka集群中找到唯一对应的那条消息

replica(副本):既然已知partition是有序消息日志,那么一定不能只保存这一份日志,否则一旦保存partition的Kafka服务器挂掉了,其上保存的消息也就都丢失了。分布式系统当然要实现高可靠性,目前实现的主要途径还是依靠冗余机制,简单讲就是备份多份日志。这些日志副本在Kafka中被称为副本(replica),它们存在的唯一目的就是防止数据丢失。副本分为两类:领导者副本(leader replica)和追随者副本(follower replica),追随者副本是不能提供服务给客户端的,也就是不负责响应客户端发来的消息写入和消息消费请求,它只是被动从领导者副本获取数据,而一旦领导者副本所在broker宕机,Kafka会从剩余replica中选举出新的leader继续提供服务

leader和follower:领导者和追随者已经完全取代了过去的主备的提法(Master-Slave)。和传统主备系统不同的是,这类leader-follower系统中通常只有leader对外提供服务,follower只是被动追随leader的状态,保持同步,一旦leader挂掉立即有一个follower被选举为新的leader接替它的工作,Kafka就是这样的设计。Kafka保证同一个partition的多个replica一定不会分配在同一台broker上

ISR:全称in-sync replica,就是与leader replica保持同步的replica集合。Kafka为partition动态维护一个replica集合,该集合中所有replica保存的消息日志都与leader replica保持同步状态。只有这个集合中的replica才能被选举为leader,也只有该集合中所有replica都接收到同一条消息,Kafka才会将该消息置于”已提交”状态,即认为这条消息发送成功。Kafka承诺只有这个集合中至少存在一个存活的replica,那些已提交状态的消息就不会丢失。正常情况下所有replica都应该与leader replica保持同步,但各种原因下可能一小部分replica落后了进度,当滞后到一定程度后,Kafka会将这部分踢出ISR,相反如果这些replica又追上了leader的进度时,Kafka会将它们加回到ISR中,一切都是自动维护的

使用场景

Kafka以消息引擎闻名,因此它特别适合处理生产环境中的那些流式数据

消息传输:Kafka非常适合代替传统的消息总线(message bus)或消息代理(message broker)。传统这类系统擅长解耦生产者消费者以及批量处理消息,这些Kafka都具备,除外还有更好的吞吐量特性,内置的分区机制和副本机制既实现了高性能的消息传输,还达到了高可靠性和高容错性。因此Kafka特别适合用于实现一个超大量级的消息处理应用

网站行为日志跟踪:Kafka最早就是用于重建用户行为数据追踪系统的,很多网站上用户的操作都会以消息的形式发送到Kafka的某个对应topic上,由于这些点击流数量很庞大,Kafka超强的吞吐量特性正好适合

审计数据收集:很多企业和组织都需要对关键的操作和运维进行监控和审计,这就需要从各个运维应用程序实时汇总操作步骤信息进行集中式管理。在这种场景下,Kafka可以便捷地对多路消息进行实时收集,同时由于其持久化特性,使后续离线审计成为可能

日志收集:Kafka最常见的使用方式,日志收集汇总解决方案。大量的服务日志散落在不同的机器上,使用Kafka对它们进行全量收集,并集中送往下游的分布式存储中(比如HDFS等)。比起其他主流日志抽取框架(比如apache flume),Kafka有更好的性能,还提供完备的可靠性解决方案,同时保持低延时

Event Sourcing:实际上是领域驱动设计(DDD)的名词,它使用事件序列来表示状态变更,这种思想和Kafka的设计特性不谋而合。Kafka也是用不可变更的消息序列来抽象化表示业务消息的,因此Kafka特别适合作为这种应用的后端存储

流式处理:由于在0.10.0.0版本开始Kafka社区推出全新的流式处理组件Kafka Streams,标志着Kafka进入流式处理俱乐部。相比老牌流式处理框架,比如apache storm、apache samza或spark streaming、apache flink,Kafka Streams竞争力不明

Google的Tyler Akidau曾在一篇文章中指出,流式处理只要能实现以下两个方面就能完全代替当前的离线批处理方式:
1、正确性:一旦流式处理实现了正确性,它足以匹敌批处理
2、时间推导工具:一旦流式处理提供了时间推导工具,它便完全超过了批处理