Kafka(一) 入门
Kafka(二) Producer与Consumer开发
Kafka(三) 简略集群管理
Kafka(四) 简略设计原理
集群部署
操作系统选型:
Linux比Windows更适合,主要因为I/O模型的使用和数据网络传输效率。关于I/O模型,主流5种:阻塞、非阻塞IO、IO多路复用、信号驱动IO和异步IO。比如Socket的阻塞与非阻塞模式对应前两种,而Linux的select就属于第三种,epoll属于兼具第三和第四种,至于异步IO,很少有支持,Windows的IOCP属于这个模型。Kafka client的底层网络库设计采用了Java的Selector机制,在Linux上的实现就是epoll,但是在Windows上使用select模型而非IOCP,只有Java NIO2才是使用IOCP实现的,因此在这点上Linux上部署有更高效的I/O处理性能。对于Kafka需要大量地通过网络与磁盘进行数据传输,大部分是通过Java的FileChannel.transferTo方法实现的,在Linux平台上该方法底层会调用sendfile系统调用,即采用了Linux提供的零拷贝技术,可以有效改善数据传输的性能,对于Windows平台需要Java 8u60版本后才让其调用TransmitFile函数支持零拷贝技术
硬盘规划:
由于Kafka是顺序写磁盘的,所以SSD对HDD的优势不大。另一个比较是JBOD与磁盘阵列(RAID)的区别,由于Kafka提供了通过副本机制的冗余和高可靠性,因此RAID的优势也不是很大。对于硬盘大小的规划,与以下因素有关:新增消息数、消息留存时间、平均消息大小、副本数和是否启用压缩
内存优化:
由于Kafka采用操作系统的页缓存来缓存消息,无论缓冲已发送还是带读取,操作系统都要开辟一块内存区域用于存放接收的Kafka消息,因此这块区域大小的设置对Kafka的性能尤为关键。而Kafka对Java堆内存的使用反而不是很多,因为消息都属于朝生夕灭的对象实例,很快被GC,所以一般broker所需堆内存不会超过6GB,而剩余的大部分内存都给了文件系统page cache。另外还需要对page cache大小与实际线上环境设置的日志段大小相比较,假设单个日志段文件大小设为10GB,那么至少给page cache 10GB以上的空间,这样待消费的消息很大概率会保存在页缓存中,故consumer能直接命中缓存而无需执行磁盘IO读操作
CPU规划:
Kafka不属于计算密集型系统,只需要记住一点,追求多核而非高时钟频率。然而对于client端启用了消息压缩的情况,那么就需要大量的CPU资源
带宽规划:
对于Kafka需要在网络间传输大量数据的分布式数据管道而言,带宽资源尤为重要,并且特别容易成为系统的瓶颈。因此尽量使用高速网络,根据自身网络条件和带宽来评估Kafka集群机器数量(带宽不够机器来凑),避免使用跨机房网络
与Hadoop、HBase这样大多数分布式框架类似,Kafka集群搭建也分为伪分布式集群、多节点的分布式集群。部署完后可以使用Kafka的脚本来进行测试。在保证集群能正常工作后,可以对Kafka集群进行参数调优:broker端参数、topic级别参数、GC配置参数、JVM参数、OS参数
集群管理
脚本方式
topic
严格来说,创建topic有4种途径:
1、执行kafka-topics.sh(bat)命令行工具
2、显式发送CreateTopicsReuqest请求创建
3、发送MetadataRequest请求且broker端设置了auto.create.topics.enable=true
4、向ZooKeeper的/brokers/topics路径下写入以topic名称命名的znode
官方社区推荐使用前两种方式来创建topic,删除与创建类似
Consumer管理
Kafka 消费者、偏移量与积压、查询脚本:kafka-consumer-groups.sh
常见脚本工具
API方式
除了使用Kafka自带的工具脚本,还可以使用Kafka开放的API来实现,统一由KafkaAdminClient来提供,先引入maven:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.1.1</version>
</dependency>
比如创建topic:
Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(properties);
// name, numPartitions, replicationFactor
NewTopic newTopic = new NewTopic("new-topic",4, (short) 1);
List<NewTopic> newTopicList = new ArrayList<>();
newTopicList.add(newTopic);
adminClient.createTopics(newTopicList);
MirrorMaker
对于多个数据中心部署Kafka集群,数据需要能够从一个Kafka集群被拷贝到另一个集群,而且还必须支持双向拷贝(某次传输的源集群可能是下次传输的目标集群),为了实现这样的需求,Kafka默认提供了工具MirrorMaker,用来帮助实现数据在两个Kafka集群间的拷贝。就实现而言,MirrorMaker仅仅是一个consumer+producer的混合体,对于源集群来说是consumer,而对于目标集群来说它是producer,它读取源集群指定topic的数据,写入目标集群同名topic下
监控集群
Kafka使用基于yammer metrics的监控指标体系来统计broker端和clients端的各种将你指标(metric)。目前Kafka提供了很多监控指标,都需要使用JMX接口来访问,每一个指标都是以JMX MBean的形式定义的
JMX有许多指标,最简单可以通过JConsole连接后查看。市面上也有很多监控框架,比如JmxTool、kafka-manager、Kafka Monitor、Kafka Offset Monitor、CruiseControl等
JVM监控:Kafka是标准的JVM系框架,必须运行在JVM上,因此除了监控Kafka提供的JMX监控指标外,还需要实时监控Kafka broker运行环境的JVM,毕竟JVM运行状态以及性能决定broker的表现。对于JVM,有两个方面一定要特别实时关注:进程状态和GC性能
OS监控:Kafka broker所在的机器的系统状态,往往能反应出系统瓶颈,大多数操作系统都有很健全的监控工具帮助收集各种OS指标,比如CPU使用率、内存利用率、硬盘繁忙程度、磁盘使用率、磁盘I/O状况和网络利用率等
集群调优
常见非功能性需求:
- 性能(performance)
- 吞吐量(TPS)
- 延时(latency)
- 可用性(availability)
- 持久性(durability) 虽然明确了这些目标,但是它们彼此之间有些是矛盾的,所以到底看重哪方面实际上是一种权衡
集群基础调优
Kafka经常碰到的操作系统级别的错误为:
1、connection refused
2、too many open file
3、address in use:connect
通过恰当的OS调优可以提交预防这些错误的发生
1、禁止atime更新:由于Kafka大量使用物理磁盘进行消息持久化,故文件系统的选择是重要调优步骤。对Linux上任何文件系统,都推荐在挂载文件系统(mount)时设置noatime,取消文件atime(最新访问时间)属性的更新,避免了inode访问时间的写入操作,极大减少文件系统写操作数
2、文件系统选择:对Linux平台来说最常见的是EXT4和XFS,这两种都能很好和Kafka集群进行适配
3、设置swapiness:设置限定一个很小的值,这样既保留了大部分的物理内存,同时也保证swap机制可以帮助用户及时发现并处理OOM
4、JVM设置
5、增加最大文件部署符上线
调优吞吐量
Kafka最基本的并行单元就是分区,producer在设计时就要求能同时向多个分区发送消息,这些消息也要能够被写入到多个broker中供多个consumer同时消费。因此通常来说分区数越多TPS越高,然而并不是每次创建topic时都需要创建大量的分区。首先server/clients端将占用更多的内存,producer默认使用缓冲区是为每个分区缓存消息的,在broker端也维护了很多分区级别的元数据;其次每个分区在底层文件系统都有专属目录,会占用许多文件句柄,而且Kafka打开文件后便不会显式关闭文件,故句柄不会被释放,会随着分区数上升;最后每个分区通常都有若干个副本保存在不同broker上,当leader挂了后,需要进行leader选举的分区数会很多,由于controller是单线程处理事件的,所以只能一个个处理broker变更请求,会拉大系统恢复时间
- broker端
- 适当增加num.replica.fetchers,但不要超过CPU核数
- 调优GC避免经常性的Full GC
- producer端
- producer是批量发送消息的,适当增加batch.size,比如100~512KB
- 同样适当增加linger.ms,比如10~100毫秒
- 压缩,比如设置compression.type=lz4
- acks=0或1
- retries=0
- 如果多线程共享producer或分区数很多,增加buffer.memory
- consumer端
- 采用多consumer实例
- 增加fetch.min.bytes,比如100000
调优延时
部分和TPS类似,但是调优延时与TPS相反的是,要求尽量不要缓存消息,而是尽快将消息发送出去,另外也尽量不要压缩
- broker端
- 适度增加num.replica.fetchers
- 避免创建过多topic分区
- producer端
- 设置linger.ms=0
- 设置compression.type=none
- 设置acks=1或0
- consumer端
- 设置fetch.min.bytes=1
调优持久性
持久性通常靠冗余手段,而Kafka实现的手段就是备份机制(replication)
- broker端
- 设置auto.create.topics.enable=fasle
- 设置replication.factor=3, min.insync.replicas=replication.factor-1
- 设置default.replication.factor=3
- 设置broker.rack属性分散分区数据到不同机架
- 设置log.flush.interval.message和log.flush.interval.ms为一个较小的值
- producer端
- 设置acks=all
- 设置retries为一个较大的值,比如10~30
- 设置max.in.flight.requests.per.connection=1
- 设置enable.idempotence=true启用幂等性
- consumer端
- 设置auto.commit.enable=false
- 消息消费成功后调用commitSync提交位移
调优可用性
- broker端
- 避免创建过多分区
- 设置min.insync.replicas=1
- 设置num.recovery.threads.per.data.dir=broker端参数log.dirs中设置的目录数
- producer端
- 设置acks=1
- consumer端
- 设置session.timeout.ms为较低的值,比如10000
- 设置max.poll.interval.ms比消息平均处理时间稍大的值