Kafka(三) 简略集群管理

Posted by ZhouJ000 on February 25, 2019

Kafka(一) 入门
Kafka(二) Producer与Consumer开发
Kafka(三) 简略集群管理
Kafka(四) 简略设计原理

集群部署

操作系统选型
Linux比Windows更适合,主要因为I/O模型的使用和数据网络传输效率。关于I/O模型,主流5种:阻塞、非阻塞IO、IO多路复用、信号驱动IO和异步IO。比如Socket的阻塞与非阻塞模式对应前两种,而Linux的select就属于第三种,epoll属于兼具第三和第四种,至于异步IO,很少有支持,Windows的IOCP属于这个模型。Kafka client的底层网络库设计采用了Java的Selector机制,在Linux上的实现就是epoll,但是在Windows上使用select模型而非IOCP,只有Java NIO2才是使用IOCP实现的,因此在这点上Linux上部署有更高效的I/O处理性能。对于Kafka需要大量地通过网络与磁盘进行数据传输,大部分是通过Java的FileChannel.transferTo方法实现的,在Linux平台上该方法底层会调用sendfile系统调用,即采用了Linux提供的零拷贝技术,可以有效改善数据传输的性能,对于Windows平台需要Java 8u60版本后才让其调用TransmitFile函数支持零拷贝技术

硬盘规划
由于Kafka是顺序写磁盘的,所以SSD对HDD的优势不大。另一个比较是JBOD与磁盘阵列(RAID)的区别,由于Kafka提供了通过副本机制的冗余和高可靠性,因此RAID的优势也不是很大。对于硬盘大小的规划,与以下因素有关:新增消息数、消息留存时间、平均消息大小、副本数和是否启用压缩

内存优化
由于Kafka采用操作系统的页缓存来缓存消息,无论缓冲已发送还是带读取,操作系统都要开辟一块内存区域用于存放接收的Kafka消息,因此这块区域大小的设置对Kafka的性能尤为关键。而Kafka对Java堆内存的使用反而不是很多,因为消息都属于朝生夕灭的对象实例,很快被GC,所以一般broker所需堆内存不会超过6GB,而剩余的大部分内存都给了文件系统page cache。另外还需要对page cache大小与实际线上环境设置的日志段大小相比较,假设单个日志段文件大小设为10GB,那么至少给page cache 10GB以上的空间,这样待消费的消息很大概率会保存在页缓存中,故consumer能直接命中缓存而无需执行磁盘IO读操作

CPU规划
Kafka不属于计算密集型系统,只需要记住一点,追求多核而非高时钟频率。然而对于client端启用了消息压缩的情况,那么就需要大量的CPU资源

带宽规划
对于Kafka需要在网络间传输大量数据的分布式数据管道而言,带宽资源尤为重要,并且特别容易成为系统的瓶颈。因此尽量使用高速网络,根据自身网络条件和带宽来评估Kafka集群机器数量(带宽不够机器来凑),避免使用跨机房网络

与Hadoop、HBase这样大多数分布式框架类似,Kafka集群搭建也分为伪分布式集群、多节点的分布式集群。部署完后可以使用Kafka的脚本来进行测试。在保证集群能正常工作后,可以对Kafka集群进行参数调优:broker端参数、topic级别参数、GC配置参数、JVM参数、OS参数

集群管理

脚本方式

topic

严格来说,创建topic有4种途径:
1、执行kafka-topics.sh(bat)命令行工具
2、显式发送CreateTopicsReuqest请求创建
3、发送MetadataRequest请求且broker端设置了auto.create.topics.enable=true
4、向ZooKeeper的/brokers/topics路径下写入以topic名称命名的znode
官方社区推荐使用前两种方式来创建topic,删除与创建类似

Kafka常用命令之kafka-topics.sh

Consumer管理

Kafka 消费者、偏移量与积压、查询脚本:kafka-consumer-groups.sh

常见脚本工具

kafka的相关操作脚本

API方式

除了使用Kafka自带的工具脚本,还可以使用Kafka开放的API来实现,统一由KafkaAdminClient来提供,先引入maven:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.1.1</version>
</dependency>

比如创建topic:

Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(properties);

// name, numPartitions, replicationFactor
NewTopic newTopic = new NewTopic("new-topic",4, (short) 1);
List<NewTopic> newTopicList = new ArrayList<>();
newTopicList.add(newTopic);
adminClient.createTopics(newTopicList);

MirrorMaker

对于多个数据中心部署Kafka集群,数据需要能够从一个Kafka集群被拷贝到另一个集群,而且还必须支持双向拷贝(某次传输的源集群可能是下次传输的目标集群),为了实现这样的需求,Kafka默认提供了工具MirrorMaker,用来帮助实现数据在两个Kafka集群间的拷贝。就实现而言,MirrorMaker仅仅是一个consumer+producer的混合体,对于源集群来说是consumer,而对于目标集群来说它是producer,它读取源集群指定topic的数据,写入目标集群同名topic下

监控集群

Kafka使用基于yammer metrics的监控指标体系来统计broker端和clients端的各种将你指标(metric)。目前Kafka提供了很多监控指标,都需要使用JMX接口来访问,每一个指标都是以JMX MBean的形式定义的

JMX有许多指标,最简单可以通过JConsole连接后查看。市面上也有很多监控框架,比如JmxTool、kafka-manager、Kafka Monitor、Kafka Offset Monitor、CruiseControl等

JVM监控:Kafka是标准的JVM系框架,必须运行在JVM上,因此除了监控Kafka提供的JMX监控指标外,还需要实时监控Kafka broker运行环境的JVM,毕竟JVM运行状态以及性能决定broker的表现。对于JVM,有两个方面一定要特别实时关注:进程状态和GC性能

OS监控:Kafka broker所在的机器的系统状态,往往能反应出系统瓶颈,大多数操作系统都有很健全的监控工具帮助收集各种OS指标,比如CPU使用率、内存利用率、硬盘繁忙程度、磁盘使用率、磁盘I/O状况和网络利用率等

集群调优

常见非功能性需求:

  • 性能(performance)
    • 吞吐量(TPS)
    • 延时(latency)
  • 可用性(availability)
  • 持久性(durability) 虽然明确了这些目标,但是它们彼此之间有些是矛盾的,所以到底看重哪方面实际上是一种权衡

集群基础调优

Kafka经常碰到的操作系统级别的错误为:
1、connection refused
2、too many open file
3、address in use:connect
通过恰当的OS调优可以提交预防这些错误的发生

1、禁止atime更新:由于Kafka大量使用物理磁盘进行消息持久化,故文件系统的选择是重要调优步骤。对Linux上任何文件系统,都推荐在挂载文件系统(mount)时设置noatime,取消文件atime(最新访问时间)属性的更新,避免了inode访问时间的写入操作,极大减少文件系统写操作数
2、文件系统选择:对Linux平台来说最常见的是EXT4和XFS,这两种都能很好和Kafka集群进行适配
3、设置swapiness:设置限定一个很小的值,这样既保留了大部分的物理内存,同时也保证swap机制可以帮助用户及时发现并处理OOM
4、JVM设置
5、增加最大文件部署符上线

调优吞吐量

Kafka最基本的并行单元就是分区,producer在设计时就要求能同时向多个分区发送消息,这些消息也要能够被写入到多个broker中供多个consumer同时消费。因此通常来说分区数越多TPS越高,然而并不是每次创建topic时都需要创建大量的分区。首先server/clients端将占用更多的内存,producer默认使用缓冲区是为每个分区缓存消息的,在broker端也维护了很多分区级别的元数据;其次每个分区在底层文件系统都有专属目录,会占用许多文件句柄,而且Kafka打开文件后便不会显式关闭文件,故句柄不会被释放,会随着分区数上升;最后每个分区通常都有若干个副本保存在不同broker上,当leader挂了后,需要进行leader选举的分区数会很多,由于controller是单线程处理事件的,所以只能一个个处理broker变更请求,会拉大系统恢复时间

  • broker端
    • 适当增加num.replica.fetchers,但不要超过CPU核数
    • 调优GC避免经常性的Full GC
  • producer端
    • producer是批量发送消息的,适当增加batch.size,比如100~512KB
    • 同样适当增加linger.ms,比如10~100毫秒
    • 压缩,比如设置compression.type=lz4
    • acks=0或1
    • retries=0
    • 如果多线程共享producer或分区数很多,增加buffer.memory
  • consumer端
    • 采用多consumer实例
    • 增加fetch.min.bytes,比如100000

调优延时

部分和TPS类似,但是调优延时与TPS相反的是,要求尽量不要缓存消息,而是尽快将消息发送出去,另外也尽量不要压缩

  • broker端
    • 适度增加num.replica.fetchers
    • 避免创建过多topic分区
  • producer端
    • 设置linger.ms=0
    • 设置compression.type=none
    • 设置acks=1或0
  • consumer端
    • 设置fetch.min.bytes=1

调优持久性

持久性通常靠冗余手段,而Kafka实现的手段就是备份机制(replication)

  • broker端
    • 设置auto.create.topics.enable=fasle
    • 设置replication.factor=3, min.insync.replicas=replication.factor-1
    • 设置default.replication.factor=3
    • 设置broker.rack属性分散分区数据到不同机架
    • 设置log.flush.interval.message和log.flush.interval.ms为一个较小的值
  • producer端
    • 设置acks=all
    • 设置retries为一个较大的值,比如10~30
    • 设置max.in.flight.requests.per.connection=1
    • 设置enable.idempotence=true启用幂等性
  • consumer端
    • 设置auto.commit.enable=false
    • 消息消费成功后调用commitSync提交位移

调优可用性

  • broker端
    • 避免创建过多分区
    • 设置min.insync.replicas=1
    • 设置num.recovery.threads.per.data.dir=broker端参数log.dirs中设置的目录数
  • producer端
    • 设置acks=1
  • consumer端
    • 设置session.timeout.ms为较低的值,比如10000
    • 设置max.poll.interval.ms比消息平均处理时间稍大的值