Kafka(二) Producer与Consumer开发

Posted by ZhouJ000 on February 24, 2019

Kafka(一) 入门
Kafka(二) Producer与Consumer开发
Kafka(三) 简略集群管理
Kafka(四) 简略设计原理

Producer开发

Kafka producer在设计上比consumer简单点,因为它不涉及复杂的组管理操作,即每个producer都是独立进行工作的,与其他producer实例之间没有关联。目前producer的首要功能就是向某个topic的某个分区发送一条消息,所以要首先确认向topic的哪个分区写入消息 —— 这就是分区器(partitioner)要做的事情。Kafka producer提供了一个默认的分区器。对于每条发送的消息,如果该消息指定了key,那么该partitioner会根据key的哈希值来选择目标分区;如果没有指定key,则partitioner使用轮循的方式指定目标,最大限度保证消息在所有分区上的均匀性。当然用户也可以直接指定目标分区发送,也可以使用自定义的partitioner。确定目标分区后,producer要做的第二件事就是要寻找这个分区对应的leader,也就是该分区leader副本所在的Kafka broker,因为只有leader才能响应clients发送过来的请求,ISR中的其他副本会与leader副本保持同步。producer也有了多种选择来实现消息发送,比如不等待任何副本的响应便返回成功,或者只等leader副本响应写入操作后返回成功等 kafka-producer-java producer首先用一个线程(用户主线程,即启动producer的线程)将待发送的消息封装进一个ProducerRecord类实例,然后将其序列化之后发送给partitioner,再由后者确定了目标分区后一同发送到位于producer程序中一块内存缓冲区中。而producer的另一个工作线程(I/O发送线程,也称Sender线程)则负责实时地从该缓冲区中提取出准备就绪的消息封装进一个批次(batch),统一发送给对应broker

先引入maven:

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.1.1</version>
</dependency>

创建producer:

Properties props = new Properties();
// 必须配置
props.put("bootstrap.servers", "localhost:9092");
// 对key和消息体做序列化,其中就算发送时key没有指定,也需要指定key的序列化对象
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 选择参数,有默认值,可以使用ProducerConfig定义的key
props.put(ProducerConfig.ACKS_CONFIG, "-1");
props.put("retries", "3");
props.put("batch.size", "323840");
props.put("linger.ms", "10");
props.put("buffer.memory", "33554432");
props.put("max.block.ms", "3000");

// serializer可以在构造方法中定义,也可以定义在Properties中
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i< 20; i++) {
	// topic + (key) + value
	ProducerRecord record = new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i));
	// 3种发送方式:同步、异步发送+回调、不关心结果,这里用的是第三种,真实场景不会用:
	producer.send(record);
	// 所有写入操作默认都是异步的,返回一个java Future对象,第一种方式:
	producer.send(record, new Callback() {
		// RecordMetadata与Exception不会同时非空,至少有一个是null
		@Override
		public void onCompletion(RecordMetadata recordMetadata, Exception e) {
			if (e == null) {
				// 发送成功
			} else {
				// 执行错误逻辑处理
			}
		}
	});
	// 同步其实就是通过java的Future来区分的,调用get无限等待:
	try {
		// 成功返回对应RecordMetadata实例(包含已发送信息的所有元数据)
		RecordMetadata data = (RecordMetadata) producer.send(record).get();
	// 失败抛出异常
	} catch (InterruptedException e) {
		e.printStackTrace();
	} catch (ExecutionException e) {
		e.printStackTrace();
	}
	// 无论同步还是异步,发送都可能失败,Kafka的错误类型包含两类;可重试异常和不可重试异常
}
// 应放在finally里,会优雅关闭退出,也可调用带timeout参数的close方法,超时后强制结束(注意)
producer.close();

主要步骤都在注释里写明了。下面看一下Producer其他参数:
1、acks:
控制producer生产消息的持久性,对于Kafka来说,只在乎”已提交”消息的持久性。一旦有任何一个保存了该消息的副本”存活”,这条消息就视为”不会丢失的”,所以所谓丢失的消息其实没被成功写入Kafka,Kafka保证Consumer永远不会读取到尚未提交完成的消息。显然leader broker何时发送写入结果给Producer是一个需要考虑的问题,它直接影响消息的持久性甚至Producer端的吞吐量。而acks参数就是用来控制这个的。当前acks有三个取值:0(producer完全不理睬leader broker端处理结果)、-1或all(leader broker不仅将消息写入本地日志,同时还会等ISR其他所有副本都成功写入后才会发送结果)、1(默认的折中方案,仅当leader broker存入本地日志后就发送响应结果)。因此设为0时吞吐量最大,消息持久性最差,允许消息丢失;-1或all吞吐量最差,持久性最高,不能容忍消息丢失;而1则为都适中的折中方案
2、buffer.memory:
指定了Producer端用于缓存消息的缓冲区大小,单位字节,默认33554432即32MB。由于采用异步发送消息的框架,java版Producer启动时会首先创建一块内存缓冲区用于保存待发送的消息,然后由另一个线程负责从缓冲区中读取消息执行真正的发送。如果写入速度超过发送速度,必然造成缓冲区增加,这时Producer会放弃手头工作等待I/O线程追上来,若一段时间后还是无法赶上进度,那么Producer会抛出异常并期望用户接入处理。我们几乎可以认为该参数指定的内存大小就是Producer程序使用的内存大小,若Producer需要很多分区发送消息,那么就要仔细设置这个参数防止过小的缓冲区降低了Producer的吞吐量
3、compression.type:
设置Producer端是否压缩消息,默认为none,即不压缩消息。和任何系统相同,Kafka的Producer端引入压缩后可以显著降低网络I/O传输开销并从而提高整体吞吐量,但是会增加Producer端的机器CPU开销。另外如果broker端的压缩参数与Producer设置的不同,还会消耗额外CPU资源对消息进行相应的解压-重压操作。当前Kafka支持GZIP、Snappy、LZ4和Zstandard
4、retries:
重试次数,默认为0。其中有两点需要注意:第一,重试可能造成消息的重复发送;第二,重试可能造成消息的乱序。不过Kafka有了”精确一次”语义,构建于幂等性和原子性之上,设置配置”processing.guarantee = exact_once”。这可以保证所有处理恰好发生一次,包括处理和由写回Kafka的处理作业创建的所有具体状态的精确一次,具体怎样未来遇到后更新
5、batch.size:
重要参数,对于调优吞吐量和延时性能指标非常重要,默认为16384即16KB。batch.size的大小是一种空间和时间权衡的体现
6、linger.ms:
如果之前的batch.size设置太大,那么就会出现消息没有填满batch也可以被发送的情况。这个参数就是控制消息发送延时行为的,默认为0,表示消息需要立即发送,无论batch是否填满,这在大部分情况下是合理的,但是会拉低吞吐量
7、max.request.size:
控制Producer发送请求的大小,可以认为是最大消息大小,由于请求有一些头部数据结构,因此包含一条消息的请求大小比消息本身大,所以这样看是安全的
8、request.timeout.ms
Producer发送请求给broker后,broker需要在规定时间内返回结果,默认是30秒

消息分区机制

Kafka Producer发送过程中一个很重要的步骤就是确定将消息发送到指定的topic的哪个分区中。Producer提供了分区策略以及对应的分区器(partitoner),会尽力确保具有相同key的所有消息都在相同的分区上,若没有指定key,则用轮循来确保消息在topic的所有分区上均匀分配

对于有key的消息,java版本的partitioner会根据surmur2算法计算消息Key的哈希值,然后对总分区数求模得到消息要被发送到的目标分区号。我们也可以自定义分区策略,要使用自定义分区机制,只需要完成两件事情:
1、实现Partitioner接口
2、在用于构造KafkaProducer的Properties对象中设置partitioner.class参数

拦截器

需要实现ProducerInterceptor接口,实现接口方法:
1、onSend:该方法封装进KafkaProducer.send方法中,即运行在主线程中。Producer确保消息被序列化以及分区计算前调用该方法,所以最好不要修改消息所属的topic和分区,否则会影响目标分区的计算
2、onAcknowledgement:会在消息被应答之前或消息发送失败时调用,并且通常都在Producer回调逻辑触发之前。运行在I/O线程中,因此不要放入很”重”的逻辑,否则会拖慢消息发送效率
3、close:关闭interceptor,一些资源清理工作

在实现好拦截器后,需要在Producer中指定它们:

List<String> interceptors = new Arraylist<>();
interceptors.add("com.test.Interceptors1");
interceptors.add("com.test.Interceptors2");
// ..
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

无消息丢失配置

主要是缓冲区消息丢失和消息乱序,直接看配置:

Producer端配置:
// 旧版本使用,使内存缓冲区被填满后producer处于阻塞状态停止接收新消息,而不是抛出异常,现在被max.black.ms代替
1、block.on.buffer.full=true
// 最强的持久化保证
2、acks=all or -1
// 无限重试
3、retries = Integer.MAX_VALUE
// 防止topic同分区下消息乱序,实际上限制了producer在单个broker连接上能发送的未响应请求的数量
4、max.in.flight.requests.per.connection=1
5、使用带回调机制的send发送消息,即异步版本
6、callback逻辑中显示地立即关闭producer,使用close(0),是为了处理消息乱序的问题,非优雅关闭

broker端配置:
// 关闭unclean leader选举,即不允许非ISR中副本被选举为leader
1、unclean.leader.election.enable=false
// 常用的3备份原则
2、replication.factor=3
// 必须>1,控制某条消息至少被写入到ISR中多少副本才算成功,只有在acks被设置为all或-1时才有意义
3、min.insync.replicas=2
4、replication.factor>min.insync.replicas
// consumer手动提交位移
5、enable.auto.commit=false

Consumer开发

虽然consumer分类不是kafka社区已有概念,但是理解它们的含义和区别,可以更好地在实际使用场景中选择合理的consumer分类完成目标,可以把consumer分为两类:
1、消费者组(consumer group)
2、独立消费者(standalone consumer)

消费者组

消费者使用一个消费者组名(即group.id)来标记自己,topic的每条消息都只会被发送到每个订阅它的消费者组的一个消费者实例上。这就意味着一个consumer group可能有若干个consumer实例(也可以只有一个);对于同一个group而言,topic的每条消息只能发送到group下的一个consumer实例上;topic消息可以发送到多个group中。对于之前提到的基于队列和基于发布/订阅模型,kafka就是通过consumer group的实现来支持的:
1、所有consumer实例都属于相同group —— 实现基于队列的模型,每条消息只会被一个consumer实例处理
2、consumer实例都属于不同的group —— 实现基于发布/订阅的模型,极端的例子就是每个实例都在不同的group中,这样就会广播到所有consumer上 kafka-consumer-poll

consumer group是用于实现高伸缩性、高容错性的consumer机制。组内多个consumer实例可以同时读取kafka消息,而且一旦有某个consumer挂了,consumer group会立即将已崩溃consumer负责的分区转交给其他consumer负责,从而保证整个group可以继续工作,不会丢失数据 —— 这个过程就是重平衡(rebalance)。另外由于kafka目前只提供单个分区内的消息顺序,而不保证全局的消息顺序,因此如果要实现topic全局的消息读取顺序,就只能通过让每个consumer group下只包含一个consumer实例的方式来实现

创建Consumer

创建Consumer,Kafka consumer是单线程设计理念,但是一个双线程java进程,一个用户主线程,一个后台心跳线程(辅助),且非线程安全:

String topicName = "my-topic";
String groupID = "my-group";

Properties props = new Properties();
// 必须配置,地址多组间用,分隔,可以只指定部分broker,启动后能通过它们找到完整的broker列表
props.put("bootstrap.servers", "localhost:9092");
// 指定consumer group名,唯一标识,默认是空字符串,但是必须配置
props.put("group.id", groupID);
// 解序列化,与序列化互逆,同样也可以使用自定义解序列化器(要与序列化对应)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 可选配置
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
// 从最早的消息开始读取
props.put("auto.offset.reset", "earliest");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅topic列表,还支持正则:Pattern.compile("kafka.test.*")。所有订阅都是延迟生效的,需要在下次poll中生效,且是覆盖的
consumer.subscribe(Arrays.asList(topicName));
try {
	while(true) { // isRunning
		// poll类似于Linux的poll、select I/O机制,所有相关事件(包括rebalance、获取消息等)都发生在一个事件循环中,可以使用一个线程完成所有类型I/O操作
		// 需要定期执行其他子任务:poll(较小时间) + runningFlag; 不需要定期执行:pll(MAX_VALUE) + 捕获WakeupException
		// 对于Kafka consumer来说,poll方法返回即认为consumer成功消费了消息
		ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
		for (ConsumerRecord<String, String> record : records) {
			System.out.printf("offset = %d, value = %s", record.offset(), record.value());
		}
	}
} finally {
	// 关闭且最多等待30秒,也可自己设置timeout
	consumer.close();
}

主要步骤都在注释里写明了。下面看一下Consumer其他参数:
1、session.timeout.ms:
非常重要的参数,它是consumer group检测组内成员发送崩溃的时间。显然我们想缩短这个时间,让coordinator能快速检测到consumer的失败
2、max.poll.interval.ms
consumer消息处理逻辑的最大时间,即若consumer两次poll的间隔超过这个阈值,那么coordinator就会认为这个consumer已经追不上组内其他成员的消费进度了,该consumer实例就会被踢出组,该consumer负责的分区也会被分配给其他consumer,好一点的情况下进行一次rebalance,差一点无法提交位移,导致rebalance后这些消息会被重新消费一遍。因此该参数需要设置为实际的逻辑处理时间
3、auto.offset.reset:
指定了无位移信息或位移越界,即consumer要消费的消息的位移不在当前消息日志的合理区间范围时kafka的应对策略。需要注意的是只有满足这2点任何1点的时候该参数才有效,比如首次运行一个consumer group并指定从头开始消费,一旦该group成功提交位移后,我们重启了group,再次指定从头开始消费,这时发现并不会是真的从头开始消费,因为kafka已经保存了该group的位移信息,因此会无视auto.offset.reset的设置。该设置有3个取值:earliest,指定从最早的位移开始消费,并不一定是0;latest,指定从最新处开始消费;none,指定如果未发现位移信息或位移越界,则抛出异常
4、enable.auto.commit:
指定consumer是否自动提交位移。若设为true,则consumer在后台自动提交位移;否则需要用用户手动提交。对于有较强”精确处理一次”语义要求的用户来说,最好设为false,由用户自行提交
5、fetch.max.bytes:
指定consumer端单次获取数据的最大字节数,若实际业务消息很大,需要设置一个较大的参数,否则consumer将无法消费这些消息
6、max.poll.records
控制单次poll调用返回的最大消息数,默认为500条
7、heartbeat.interval.ms:
虽然有了session.timeout.ms参数,但consumer group的其他成员如何得知要开启新一轮的rebalance,当coordinator决定开启新一轮rebalance时,它会将这个决定以REBALANCE_IN_PROGRESS异常的形式塞入consumer心跳请求的response中,这样其他成员拿到response后才知道它需要重新加入group。显然这个过程越快越好,即设一个比较低的值,且必须小于session.timeout.ms
8、connections.max.idle.ms:
kafka会定期关闭空闲socket连接,导致下次consumer处理请求时需要重新创建向broker的socket连接,默认为9分钟,如果实际环境不在乎这些socket资源开销,可以设为-1表示不要关闭这些空闲连接

位移(offset)

这里指的是consumer端的offset,与分区日志中的offset不同。每个consumer实例都会为它消费的分区维护属于自己的位置信息来记录当前消费了多少消息,即消费进度,分区中当前最新消费信息的位置,这在kafka中称为位移(offset)。很多消息引擎都把消费端的offset保存在服务器端(broker),比如rabbitmq,这样做好处是简单,但是会带来一下3个问题:
1、broker从此变为有状态的,增加同步成本,影响伸缩性
2、需要引入应答机制(acknowledgement)来确认消费成功
3、由于要保存很多consumer的offset,必然引入复杂的数据结构,从而造成不必要的资源浪费
Kafka则选择了不同的方式,让consumer group保存offset,那么只要简单的保存一个长整型数据就可以了,同时kafka consumer还引入了检查点机制(checkpointing)定期对offset进行持久化,从而简化了应答机制的实现。kafka consumer在内部使用了一个map来保存其订阅topic所属分区的offset

位移提交:consumer客户端需要定期向kafka集群回报自己消费数据的进度,这一过程被称为位移提交(offset commit)。位移提交对于consumer非常重要,它不仅表示了consumer端的消费进度,同时也决定了consumer端的消费语义保证。位移提交将被consumer提交到kafka的一个内部topic(consumer_offsets)上,通常不能直接操作该topic,而且不能擅自删除或搬移该topic的日志文件,它是kafka自行创建的。在kafka的日志目录下能找到它,它就是一个正常的topic日志文件目录,里面至少有一个日志文件(.log)和两个索引文件(.index和.timeindex),只不过保存的都是kafka集群上consumer的位移信息。可以把__consumer_offsets__的每条消息想象成KV格式的消息,key是三元组:group.id+topic+分区号,value就是offset的值。。同时kafka会定期对该topic执行压实操作(compact),即为每个消息Key只保存含有最新offset的消息,控制日志容量,也能实时反映最新的消费进度。由于考虑生产环境可能有多个consumer或consumer group,因此为了减轻负担,kafka为该topic创建了50个分区,并对每个group.id做哈希求模运算

消费者组重平衡(consumer group rebalance):只对consumer group有效,它本质上是一个协议,规定一个consumer group下所有consumer如何达成一致来分配订阅topic的所有分区。比如有一个consumer group,它有20个consumer实例,该group订阅了一个具有100个分区的topic,那么正常情况下consumer group平均为每个consumer分配5个分区,即consumer负责读取5个分区的数据。这个分配过程就是rebalance

位移管理

假设consumer已经读取了某个分区中第N条消息,那么它应该提交位移值为N,因为位移是从0开始的,位移为N的消息时第N+1条消息,这样下次consumer重启后会从第N+1条消息开始消费。offset对于consumer非常重要,因为它是实现消息交付语义保证(message delivery semantic)的基石。常见的3种消息交付语义保证如下:
1、最多一次(at most once)处理语义:消息可能丢失,但不会被重复处理
2、最少一次(at least once)处理语义:消息不会丢失,但可能被处理多次
3、精确一次(exactly once)处理语义:消息一定会被处理,且只会被处理一次
显然如果consumer在消息消费之前就提交位移,那么就实现at most once,因为如果consumer在提交位移与消息消费之间崩溃,则重启后会从新的offset位置开始消费,前面那条消息就丢失了。相反,如果在消息消费之后提交,则可实现at least once语义,默认提供的就是这种语义,无法保证两步操作在同一个事务中完成。对于精确一次需要通过事务来处理

与consumer相关的多个位置信息:
1、上次提交位移(last committed offset):consumer最近一次提交的offset值
2、当前位置(current position):consuemr已读取但尚未提交时的位置
3、水位(watermark):也称为最高水位(high watermark),严格讲不属于consumer管理范畴,而是属于分区日志的概念。对于水位之下的所有消息,consumer都是可以读取的,consumer无法读取水位以上的消息
4、日志终端位移(Log End Offset, LEO):也称为日志最新位移,同样不属于consumer范畴,而是分区日志管辖。表示了某个分区副本当前保存消息对应的最大位移值。正常情况下LEO不会比水位值小,因为只有分区所有副本都保存了某条消息,该分区的leader副本才会向上移动水位值

默认情况下自动提交位移,间隔5秒,可以通过auto.commit.interval.ms参数控制自动提交间隔。但是自动提交不能细粒度地处理位移提交,特别是有较强的精确一次处理语义时,这种情况下需要手动位移提交,首先设置enable.auto.commit=false,然后调用commitSync或commitAsync方法:

final int minBatchSize = 500;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
try {
	while(true) {
		ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
		for (ConsumerRecord<String, String> record : records) {
			buffer.add(record);
		}
		if (buffer.size() > minBatchSize) {
			// 批量处理业务逻辑...
			// sync方法同步阻塞,async方法异步非阻塞。consumer将在后续poll调用时轮循该位移提交的结果
			consumer.commitSync();
			buffer.clear();
		}
	}
} finally {
	consumer.close();
}

手动提交还有有参版本,能指定一个map告诉kafka为哪些分区提交位移,因为consumer只对它所拥有的的分区做提交是更合理的行为:

while(true) {
	ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
	for (TopicPartition partition : records.partitions()) {
		List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
		for (ConsumerRecord<String, String> record : partitionRecords) {
			System.out.println(record.offset() + ": " + record.value());
		}
		long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
		consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
	}
}

重平衡

与旧版本依托于ZooKeeper不同,现在使用Kafka内置的一个全新的组协调协议(group corrdination protocol)。对于每个组而言,Kafka的某个broker会被选举为组协调者(group coordinator),coordinator负责对组的状态进行管理,它的主要职责就是当新成员到达时促成组内成员达成新的分区分配方案,即coordinator负责对组执行rebalance操作

rebalance触发条件:
1、组成员变更,比如新consumer加入组,或已有consumer主动离开组,或是已有consumer崩溃时触发rebalance
2、组订阅topic数发生变更,比如使用基于正则表达式的订阅,当匹配正则表达式的新topic被创建时触发rebalance
3、组订阅topic的分区数发生变更,比如使用命令行脚本增加了订阅topic的分区数

分配策略:
1、range策略,主要基于范围的思想,将单个topic的所有分区按照顺序排列,然后把这些分区划分成固定大小的分区段并依次分配给每个consumer
2、round-robin策略,把所有topic的所有分区顺序排开,然后轮循式地分配给各个consumer 3、sticky策略,解决前两种方法完全无视历史分配方案的缺陷,才有有黏性的策略对所有consumer实例进行分配,可以规避极端情况下的数据倾斜并且在两次rebalance间最大限度地维持之前的分配方案

rebalance generation:用于标识某次rebalance。JVM GC使用的分代就是generational的概念。kafka引入consumer generation主要是保护consumer group的,特备是防止无效offset的提交(上一代)

由于rebalance本质上是一种协议,group与coordinator共同使用这组协议完成group的rebalance:
1、JoinGroup请求:consumer请求加入组
2、SyncGroup请求:group leader把分配方案同步更新到组内所有成员中
3、Heartbeat请求:consumer定期向coordinator汇报心跳表明自己依旧存活
4、LeaveGroup请求:consumer主动通知coordinator该consumer即将离组
5、DescribeGroup请求:查看组的所有信息,包括成员信息、协议信息、分配方案以及订阅信息等。主要供管理员使用

rebalance监听器

之前说过consumer默认把位移提交到__consumer_offsets__中,其实kafka也支持用户把位移提交到外部存储中,比如数据库中。如果要实现这个功能,就必须使用rebalance监听器。使用rebalance监听器的前提就是用户使用consumer group,如果使用独立consumer或是直接手动分配分区,那么rebalance监听器是无效的

rebalance监听器有一个主要的接口回调类ConsumerRebalanceListener,里面有2个方法:onPartitionsRevoked和onPartitionAssigned。在coordinator开启新一轮rebalance前onPartitionsRevoked方法会被调用,而rebalance完成后会调用onPartitionAssigned。rebalance监听器最常见的用法就是手动提交位移到第三方存储,以及在rebalance前后执行一些必要的审计操作:

// 订阅
consumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
	@Override
	public void onPartitionsRevoked(Collection<TopicPartition> collection) {
		for (TopicPartition tp : collection) {
			// 该分区offset保存到外部
		}
	}

	@Override
	public void onPartitionsAssigned(Collection<TopicPartition> collection) {
		// 统计rebalance时长
		for (TopicPartition tp : collection) {
			// 读取外部存储该分区的offset
			long offset = 100;
			// 将consumer当前位移指定到读取的位移处并从该位移开始读取消息
			consumer.seek(tp, offset);
		}
	}
});

多线程消费

KafkaConsumer与KafkaProducer不同,它是线程非安全的,因此用户可以在多个线程中放心使用同一个KafkaProducer实例,而且通常比每个线程维护一个KafkaProducer实例效率更高。但是对于consumer来说,用户无法直接在多个线程中共享一个KafkaConsumer实例

1、每个线程维护一个KafkaConsumer:创建多个线程来消费topic数据,每个线程都会创建专属于自己线程的KafkaConsumer实例

2、单KafkaConsumer实例 + 多worker线程:将消息的获取和消息的处理解耦,把业务逻辑放到工作者线程中,同时在全局维护一个或若干个consumer实例执行消息获取任务

比较:
1、方法1实现简单,速度较快,因为无线程间交互开销;方便位移管理;易于维护分区间的消息消费顺序。然而Socket连接开销大,consumer数受限于topic分区数,扩展性差,broker端负载高,因为发完broker的请求更多,而且rebalance可能性变大
2、方法2对消息获取与处理解耦,可独立扩展consumer数和worker数,伸缩性更好。然而难于维护分区内的消息顺序,处理链路变长,导致位移管理困难,worker线程异常可能导致消费数据丢失

独立consumer

之前说的consumer group,group自动帮助我们执行分区分配和rebalance,对于需要有多个consumer共同读取某个topic的需求来说,使用group是非常方便的。然而有时候要有精确控制消费的需求,比如严格控制某个consumer固定地消费哪些分区,比如:
1、如果进程自己维护分区的状态,那么它就可以固定消费某些分区而不用担心消费状态丢失的问题
2、如果进程本身已经是高可用并且能自动重启恢复错误(比如使用YARN和Mesos等容器调度框架),那么就不需要让Kafka来帮它完成错误检测和状态恢复
这样consumer group就无用武之地了,取而代之的是独立consumer角色(standalone consumer),它们consumer彼此间互不干扰,任何一个consumer崩溃都不影响其他独立consumer的工作

使用独立consumer的方法就是在订阅topic时,不采用subscribe方法,而是采用assign方法,它会接收一个分区列表,直接赋予该consumer访问这些分区的权力:

List<TopicPartition> partitions = new ArrayList<>();
List<PartitionInfo> allPartitions = consumer.partitionsFor(topicName);
if (!CollectionUtils.isEmpty(allPartitions)) {
	for (PartitionInfo info : allPartitions) {
		partitions.add(new TopicPartition(info.topic(), info.partition()));
	}
	consumer.assign(partitions);
}

try {
	while(true) {
		ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
		for (ConsumerRecord<String, String> record : records) {
			System.out.printf("offset = %d, value = %s", record.offset(), record.value());
		}
		consumer.commitSync();
	}
} catch (Exception e) {
	// ...
} finally {
	consumer.commitSync();
	consumer.close();
}		

Spring boot整合

引入maven,需要注意版本对应的kafka-client的版本,我本地用的Kafka是最新的2.1.1,从Spring for Apache Kafka看应该至少要用2.2.x,但是貌似kafka与spring boot结合不同,用的2.0.2版本的spring boot -autoconfigure里kafka的ConcurrentKafkaListenerContainerFactoryConfigurer里用的还是老版本路径的类,所以改成了2.1.x后才可以用了。而spring-integration-kafka模块我没有加入:

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<!--<version>2.2.4.RELEASE</version>-->
	<version>2.1.12.RELEASE</version>
</dependency>

然后进行最简单的配置:

spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-log
spring.kafka.consumer.auto-offset-reset=earliest

Producer最简单的实现:

@Autowired
private KafkaTemplate kafkaTemplate;

public void send() {
	String topicName = "app-log";
	String message = UUID.randomUUID().toString();
	// 也可以指定partition、(timestamp)、key
	ListenableFuture future = kafkaTemplate.send(topicName, message);
	future.addCallback(o -> System.out.println("send-消息发送成功:" + message),
			throwable -> System.out.println("消息发送失败:" + message));
}


// 也可以设置一个listener:
@Component
public class MyKafkaProducerListener implements ProducerListener {
    @Override
    public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
        System.out.printf("MyKafkaProducerListener-onSuccess: topic:%s ,partition:%s ,key:%s ,value:%s", topic, partition, key, value);
    }
    @Override
    public void onError(String topic, Integer partition, Object key, Object value, Exception exception) {
        System.out.println("MyKafkaProducerListener-onError:" + exception);
    }
}

Consumer也很简单:

@KafkaListener(topics = {"app-log"})
public void receive(String message) {
	System.out.println("接收到的消费消息: " + message);
}

// 也可以额外实现listener,这里略了

跑个junit测试一下:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {KafkaApplication.class})
public class KafkaApplicationTest {

    @Autowired
    private MyKafkaProducer producer;

    @Test
    public void toSend() {
        producer.send();
	// 等下consumer
        try {
            Thread.sleep(1000 * 2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}