RocketMQ(三) Producer与Consumer

Posted by ZhouJ000 on January 28, 2019

RocketMQ(一) 入门
RocketMQ(二) NameServer与Broker
RocketMQ(三) Producer与Consumer

Producer

DefaultMQProducerImpl:启动

public void start(final boolean startFactory) throws MQClientException {
	switch (this.serviceState) {
		case CREATE_JUST:
			this.serviceState = ServiceState.START_FAILED;
			// check  GroupName
			this.checkConfig();
			// 改变ClientConfig.instanceName为pid
			if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
				this.defaultMQProducer.changeInstanceNameToPID();
			}
			// 初始化mQClientFactory为MQClientInstance,并将该实例加入factoryTable
			this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
			// 将producer注册到MQClientInstance.producerTbale
			boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
			if (!registerOK) {
				this.serviceState = ServiceState.CREATE_JUST;
				throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
					+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
					null);
			}
			// 保存topic对应的routeInfo
			this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

			if (startFactory) {
				// 启动MQClientInstance
				mQClientFactory.start();
			}

			log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
				this.defaultMQProducer.isSendMessageWithVIPChannel());
			this.serviceState = ServiceState.RUNNING;
			break;
		case RUNNING:
		case START_FAILED:
		case SHUTDOWN_ALREADY:
			throw new MQClientException("The producer service state not OK, maybe started once, "
				+ this.serviceState
				+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
				null);
		default:
			break;
	}
	// 启动的时候向所有的broker发送heartbeat
	this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

消息发送过程:先由producer封装通过netty发送到broker,然后由broker进行保存
DefaultMQProducerImpl:发送

private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
	this.makeSureStateOK();
	Validators.checkMessage(msg, this.defaultMQProducer);

	final long invokeID = random.nextLong();
	long beginTimestampFirst = System.currentTimeMillis();
	long beginTimestampPrev = beginTimestampFirst;
	long endTimestamp = beginTimestampFirst;
	// 获取topicRouteInfo
	TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
	if (topicPublishInfo != null && topicPublishInfo.ok()) {
		boolean callTimeout = false;
		MessageQueue mq = null;
		Exception exception = null;
		SendResult sendResult = null;
		int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
		int times = 0;
		String[] brokersSent = new String[timesTotal];
		for (; times < timesTotal; times++) {
			String lastBrokerName = null == mq ? null : mq.getBrokerName();
			// 从messageQueueList取一个MessageQueue
			MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
			if (mqSelected != null) {
				mq = mqSelected;
				brokersSent[times] = mq.getBrokerName();
				try {
					beginTimestampPrev = System.currentTimeMillis();
					long costTime = beginTimestampPrev - beginTimestampFirst;
					if (timeout < costTime) {
						callTimeout = true;
						break;
					}
					// netty发送消息
					sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
					endTimestamp = System.currentTimeMillis();
					this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
					switch (communicationMode) {
						case ASYNC:
							return null;
						case ONEWAY:
							return null;
						case SYNC:
							if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
								if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
									continue;
								}
							}

							return sendResult;
						default:
							break;
					}
				} catch (RemotingException e) {
					endTimestamp = System.currentTimeMillis();
					this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
					log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
					log.warn(msg.toString());
					exception = e;
					continue;
				} catch (MQClientException e) {
					endTimestamp = System.currentTimeMillis();
					this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
					log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
					log.warn(msg.toString());
					exception = e;
					continue;
				} catch (MQBrokerException e) {
					endTimestamp = System.currentTimeMillis();
					this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
					log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
					log.warn(msg.toString());
					exception = e;
					switch (e.getResponseCode()) {
						case ResponseCode.TOPIC_NOT_EXIST:
						case ResponseCode.SERVICE_NOT_AVAILABLE:
						case ResponseCode.SYSTEM_ERROR:
						case ResponseCode.NO_PERMISSION:
						case ResponseCode.NO_BUYER_ID:
						case ResponseCode.NOT_IN_CURRENT_UNIT:
							continue;
						default:
							if (sendResult != null) {
								return sendResult;
							}

							throw e;
					}
				} catch (InterruptedException e) {
					endTimestamp = System.currentTimeMillis();
					this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
					log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
					log.warn(msg.toString());

					log.warn("sendKernelImpl exception", e);
					log.warn(msg.toString());
					throw e;
				}
			} else {
				break;
			}
		}

		if (sendResult != null) {
			return sendResult;
		}

		String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
			times,
			System.currentTimeMillis() - beginTimestampFirst,
			msg.getTopic(),
			Arrays.toString(brokersSent));

		info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

		MQClientException mqClientException = new MQClientException(info, exception);
		if (callTimeout) {
			throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
		}

		if (exception instanceof MQBrokerException) {
			mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
		} else if (exception instanceof RemotingConnectException) {
			mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
		} else if (exception instanceof RemotingTimeoutException) {
			mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
		} else if (exception instanceof MQClientException) {
			mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
		}

		throw mqClientException;
	}

	List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
	if (null == nsList || nsList.isEmpty()) {
		throw new MQClientException(
			"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
	}

	throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
		null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

同样,因为使用netty作为网络通信工具,broker也是先使用netty接收到信息,然后调用注册的processor处理(BrokerController的initialize中进行了注册Processor)
SendMessageProcessor.processRequest

public RemotingCommand processRequest(ChannelHandlerContext ctx,
									  RemotingCommand request) throws RemotingCommandException {
	SendMessageContext mqtraceContext;
	switch (request.getCode()) {
		case RequestCode.CONSUMER_SEND_MSG_BACK:
			return this.consumerSendMsgBack(ctx, request);
		default:
			// 构造requestHeader
			SendMessageRequestHeader requestHeader = parseRequestHeader(request);
			if (requestHeader == null) {
				return null;
			}
			// 构造SendMessageContext
			mqtraceContext = buildMsgContext(ctx, requestHeader);
			// 执行HookBefore
			this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

			RemotingCommand response;
			// 发送消息
			if (requestHeader.isBatch()) {
				response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
			} else {
				response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
			}
			// 执行HookAfter
			this.executeSendMessageHookAfter(response, mqtraceContext);
			return response;
	}
}

发送消息

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
									final RemotingCommand request,
									final SendMessageContext sendMessageContext,
									final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

	final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
	final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

	response.setOpaque(request.getOpaque());
	response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
	response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

	log.debug("receive SendMessage request command, {}", request);

	final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
	if (this.brokerController.getMessageStore().now() < startTimstamp) {
		response.setCode(ResponseCode.SYSTEM_ERROR);
		response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
		return response;
	}
	response.setCode(-1);
	super.msgCheck(ctx, requestHeader, response);
	if (response.getCode() != -1) {
		return response;
	}

	final byte[] body = request.getBody();

	int queueIdInt = requestHeader.getQueueId();
	TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

	if (queueIdInt < 0) {
		queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
	}
	
	// 构造MessageExtBrokerInner
	MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
	msgInner.setTopic(requestHeader.getTopic());
	msgInner.setQueueId(queueIdInt);

	if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
		return response;
	}

	msgInner.setBody(body);
	msgInner.setFlag(requestHeader.getFlag());
	MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
	msgInner.setPropertiesString(requestHeader.getProperties());
	msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
	msgInner.setBornHost(ctx.channel().remoteAddress());
	msgInner.setStoreHost(this.getStoreHost());
	msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
	PutMessageResult putMessageResult = null;
	Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
	String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
	if (traFlag != null && Boolean.parseBoolean(traFlag)) {
		if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
			response.setCode(ResponseCode.NO_PERMISSION);
			response.setRemark(
				"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
					+ "] sending transaction message is forbidden");
			return response;
		}
		putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
	} else {
	    // 保存消息DefaultMessageStore.putMessage
		putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
	}
	return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
}

保存消息

public PutMessageResult putMessage(MessageExtBrokerInner msg) {
	if (this.shutdown) {
		log.warn("message store has shutdown, so putMessage is forbidden");
		return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
	}

	if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
		long value = this.printTimes.getAndIncrement();
		if ((value % 50000) == 0) {
			log.warn("message store is slave mode, so putMessage is forbidden ");
		}

		return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
	}

	if (!this.runningFlags.isWriteable()) {
		long value = this.printTimes.getAndIncrement();
		if ((value % 50000) == 0) {
			log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
		}

		return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
	} else {
		this.printTimes.set(0);
	}

	if (msg.getTopic().length() > Byte.MAX_VALUE) {
		log.warn("putMessage message topic length too long " + msg.getTopic().length());
		return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
	}

	if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
		log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
		return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
	}

	if (this.isOSPageCacheBusy()) {
		return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
	}

	long beginTime = this.getSystemClock().now();
	// 从mapedFileQueue中取出一个mapedFile,使用mapedFile.appendMessage,appendMessag使用directBuffer的方式写入commitLog
	// 同步或者异步刷盘
	// 同步双写
	PutMessageResult result = this.commitLog.putMessage(msg);

	long eclipseTime = this.getSystemClock().now() - beginTime;
	if (eclipseTime > 500) {
		log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
	}
	this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

	if (null == result || !result.isOk()) {
		this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
	}
	return result;
}

参考:
RocketMQ原理解析-Producer
RocketMQ源码 — 三、 Producer消息发送过程

Consumer

有别于其他消息中间件由broker做负载均衡并主动向consumer投递消息,RocketMq是基于拉模式拉取消息,consumer做负载均衡并通过长轮询向broker拉消息

Consumer消费拉取的消息的方式有两种:
1、Push方式:rocketmq已经提供了很全面的实现,consumer通过长轮询拉取消息后回调MessageListener接口实现完成消费,应用系统只要MessageListener完成业务逻辑即可
2、Pull方式:完全由业务系统去控制,定时拉取消息,指定队列消费等等,当然这里需要业务系统去根据自己的业务需求去实现

// DefaultMQPushConsumerImpl
public synchronized void start() throws MQClientException {
	switch (this.serviceState) {
		case CREATE_JUST:
			log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
				this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
			this.serviceState = ServiceState.START_FAILED;
			// 确认consumer的配置是否合法,比如消费者组名,消息模式,是否顺序消费,消息队列分配策略等
			this.checkConfig();
			// 复制订阅关系
			this.copySubscription();

			if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
				this.defaultMQPushConsumer.changeInstanceNameToPID();
			}

			this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
			// 初始化rebalance变量
			this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
			this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
			this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
			this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

			this.pullAPIWrapper = new PullAPIWrapper(
				mQClientFactory,
				this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
			this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
			
			// 构建offsetStore消费进度存储对象
			if (this.defaultMQPushConsumer.getOffsetStore() != null) {
				this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
			} else {
				switch (this.defaultMQPushConsumer.getMessageModel()) {
					case BROADCASTING:	// 广播则本地文件
						this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
						break;
					case CLUSTERING:	// 集群则存于远程broker服务器中
						this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
						break;
					default:
						break;
				}
				this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
			}
			// 读取进度
			this.offsetStore.load();
			// 是否配置了顺序消费
			if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
				this.consumeOrderly = true;
				this.consumeMessageService =
					new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
			} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
				this.consumeOrderly = false;
				this.consumeMessageService =
					new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
			}
			// 启动消费消息服务
			// (非顺序仅启动了定时清理过期消息的任务进行cleanExpireMsg,顺序启动定时任务遍历broker,给消息队列加锁)
			this.consumeMessageService.start();
			// 将当前消费者组名与DefaultMQConsumerImpl以键值对形式向mqClientFactory注册本消费者
			boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
			if (!registerOK) {
				this.serviceState = ServiceState.CREATE_JUST;
				this.consumeMessageService.shutdown();
				throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
					+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
					null);
			}
			// ----> 启动
			mQClientFactory.start();
			log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
			this.serviceState = ServiceState.RUNNING;
			break;
		case RUNNING:
		case START_FAILED:
		case SHUTDOWN_ALREADY:
			throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
				+ this.serviceState
				+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
				null);
		default:
			break;
	}
	// 订阅更新时更新topic订阅信息
	this.updateTopicSubscribeInfoWhenSubscriptionChanged();
	this.mQClientFactory.checkClientInBroker();
	this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
	this.mQClientFactory.rebalanceImmediately();
}

// DefaultMQPullConsumerImpl  类似,且完成更少的步骤
public synchronized void start() throws MQClientException {
	switch (this.serviceState) {
		case CREATE_JUST:
			this.serviceState = ServiceState.START_FAILED;

			this.checkConfig();

			this.copySubscription();

			if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
				this.defaultMQPullConsumer.changeInstanceNameToPID();
			}

			this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);

			this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
			this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
			this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
			this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

			this.pullAPIWrapper = new PullAPIWrapper(
				mQClientFactory,
				this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
			this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

			if (this.defaultMQPullConsumer.getOffsetStore() != null) {
				this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
			} else {
				switch (this.defaultMQPullConsumer.getMessageModel()) {
					case BROADCASTING:
						this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
						break;
					case CLUSTERING:
						this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
						break;
					default:
						break;
				}
				this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);
			}

			this.offsetStore.load();

			boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);
			if (!registerOK) {
				this.serviceState = ServiceState.CREATE_JUST;

				throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
					+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
					null);
			}

			mQClientFactory.start();
			log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
			this.serviceState = ServiceState.RUNNING;
			break;
		case RUNNING:
		case START_FAILED:
		case SHUTDOWN_ALREADY:
			throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
				+ this.serviceState
				+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
				null);
		default:
			break;
	}
}

可以发现,2种方式代码大致相同,Push实现比Pull实现多了一些内容,主要是多设置了ConsumeMessageService并启动,与最后对mQClientFactory的设置

其中主要对象为:
1、RebalanceImpl:主要实现consumer的负载均衡,但是并不会直接发送获取消息的请求,负责决定当前的consumer应该从哪些Queue中消费消息
2、PullAPIWrapper:长连接,负责从broker处拉取消息,然后利用ConsumeMessageService回调用户的Listener执行消息消费逻辑
3、ConsumeMessageService(PUSH下):实现所谓的”Push-被动”消费机制;从Broker拉取的消息后,封装成ConsumeRequest提交给ConsumeMessageSerivce,此service负责回调用户的Listener消费消息
4、OffsetStore:维护当前consumer的消费记录(offset)。有两种实现:Local和Rmote。Local存储在本地磁盘上,适用于BROADCASTING广播消费模式;而Remote则将消费进度存储在Broker上,适用于CLUSTERING集群消费模式
5、MQClientFactory:负责管理client(consumer、producer),并提供多中功能接口供各个Service(Rebalance、PullMessage等)调用;大部分逻辑均在这个类中完成

通过启动client更能看出主要对象和步骤:

// MQClientInstance
public void start() throws MQClientException {
	synchronized (this) {
		switch (this.serviceState) {
			case CREATE_JUST:
				this.serviceState = ServiceState.START_FAILED;
				// If not specified,looking address from name server
				// 如果一开始没有配置nameServer的地址,那么主动去获取nameserver地址
				if (null == this.clientConfig.getNamesrvAddr()) {
					this.mQClientAPIImpl.fetchNameServerAddr();
				}
				// Start request-response channel
				// 启动client端远程通信
				this.mQClientAPIImpl.start();
				// Start various schedule tasks
				// 启动定时任务
				// (如果客户端配置仍旧没有设置相关的地址服务地址,会每隔10秒去尝试获取一次地址服务的地址、
				// 定时从nameserver中获取并更新本地路由信息、清除掉线的broker跟心跳、定时持久化消费进度offset、调整线程池线程数目)
				this.startScheduledTask();
				// Start pull service
				// ----> a、启动拉消息服务PullMessageService
				this.pullMessageService.start();
				// Start rebalance service
				// ----> b、启动消费端负载均衡服务RebalanceService
				this.rebalanceService.start();
				// Start push service
				this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
				log.info("the client factory [{}] start OK", this.clientId);
				this.serviceState = ServiceState.RUNNING;
				break;
			case RUNNING:
				break;
			case SHUTDOWN_ALREADY:
				break;
			case START_FAILED:
				throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
			default:
				break;
		}
	}
}

a、拉消息服务PullMessageService

public void run() {
	log.info(this.getServiceName() + " service started");
	while (!this.isStopped()) {
		try {
			// 不断从pullRequestQueue阻塞队列中获取元素
			PullRequest pullRequest = this.pullRequestQueue.take();
			this.pullMessage(pullRequest);
		} catch (InterruptedException ignored) {
		} catch (Exception e) {
			log.error("Pull Message Service Run Method exception", e);
		}
	}
	log.info(this.getServiceName() + " service end");
}

private void pullMessage(final PullRequest pullRequest) {
	final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
	if (consumer != null) {
		DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
		impl.pullMessage(pullRequest);
	} else {
		log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
	}
}

b、负载均衡服务RebalanceService

public void run() {
	log.info(this.getServiceName() + " service started");
	while (!this.isStopped()) {
		this.waitForRunning(waitInterval);
		this.mqClientFactory.doRebalance();
	}
	log.info(this.getServiceName() + " service end");
}

// MQClientInstance
public void doRebalance() {
	// 遍历所有的消费者客户端,对每个消费者调用doRebalance()方法
	for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
		MQConsumerInner impl = entry.getValue();
		if (impl != null) {
			try {
				impl.doRebalance();
			} catch (Throwable e) {
				log.error("doRebalance exception", e);
			}
		}
	}
}

// 回到DefaultMQPushConsumerImpl
public void doRebalance() {
	if (!this.pause) {
		this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
	}
}

// RebalanceImpl
public void doRebalance(final boolean isOrder) {
	Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
	if (subTable != null) {
		// 遍历订阅信息,对每一个topic调用rebalanceByTopic()方法进行负载均衡
		for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
			final String topic = entry.getKey();
			try {
				// ----> 
				this.rebalanceByTopic(topic, isOrder);
			} catch (Throwable e) {
				if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
					log.warn("rebalanceByTopic Exception", e);
				}
			}
		}
	}
	this.truncateMessageQueueNotMyTopic();
}

private void rebalanceByTopic(final String topic, final boolean isOrder) {
	switch (messageModel) {
		// 在广播模式下,所有的消费者都将应收到所订阅的topic的消息,就直接拿所有的消息队列去更新processQueue的数据
		case BROADCASTING: {
			Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
			if (mqSet != null) {
				// ----> 
				boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
				if (changed) {
					this.messageQueueChanged(topic, mqSet, mqSet);
					log.info("messageQueueChanged {} {} {} {}",
						consumerGroup,
						topic,
						mqSet,
						mqSet);
				}
			} else {
				log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
			}
			break;
		}
		// 在集群模式下,需要使用负载均衡策略分配消息队列
		case CLUSTERING: {
			Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
			List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
			if (null == mqSet) {
				if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
					log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
				}
			}

			if (null == cidAll) {
				log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
			}

			if (mqSet != null && cidAll != null) {
				List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
				mqAll.addAll(mqSet);

				Collections.sort(mqAll);
				Collections.sort(cidAll);

				AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

				List<MessageQueue> allocateResult = null;
				try {
					allocateResult = strategy.allocate(
						this.consumerGroup,
						this.mQClientFactory.getClientId(),
						mqAll,
						cidAll);
				} catch (Throwable e) {
					log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
						e);
					return;
				}

				Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
				if (allocateResult != null) {
					allocateResultSet.addAll(allocateResult);
				}
				// ----> 拿着分配到的消息队列去更新消费者的processQueue的数据
				boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
				if (changed) {
					log.info(
						"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
						strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
						allocateResultSet.size(), allocateResultSet);
					this.messageQueueChanged(topic, mqSet, allocateResultSet);
				}
			}
			break;
		}
		default:
			break;
	}
}

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
	final boolean isOrder) {
	boolean changed = false;
	Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
	// // 根据传入的新分配的消息队列去更新与之对应的processQueue
	while (it.hasNext()) {
		Entry<MessageQueue, ProcessQueue> next = it.next();
		MessageQueue mq = next.getKey();
		ProcessQueue pq = next.getValue();
		if (mq.getTopic().equals(topic)) {
			// 如果新分配中的消息队列集合没有的,而processQueue有的或者失效的那么从processQueue中删除
			if (!mqSet.contains(mq)) {
				pq.setDropped(true);
				if (this.removeUnnecessaryMessageQueue(mq, pq)) {
					it.remove();
					changed = true;
					log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
				}
			// 过期
			} else if (pq.isPullExpired()) {
				switch (this.consumeType()) {
					case CONSUME_ACTIVELY:
						break;
					case CONSUME_PASSIVELY:
						pq.setDropped(true);
						if (this.removeUnnecessaryMessageQueue(mq, pq)) {
							it.remove();
							changed = true;
							log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
								consumerGroup, mq);
						}
						break;
					default:
						break;
				}
			}
		}
	}

	List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
	for (MessageQueue mq : mqSet) {
		// 如果processQueue没有的而新分配中的消息队列集合中有的,那么在processQueue中添加,并计算offset
		if (!this.processQueueTable.containsKey(mq)) {
			if (isOrder && !this.lock(mq)) {
				log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
				continue;
			}

			this.removeDirtyOffset(mq);
			ProcessQueue pq = new ProcessQueue();
			long nextOffset = this.computePullFromWhere(mq);
			// 对于这一部分的每个消息队列,组装相应的pullRequest对象的集合
			// (消息队列、消费者组、与消费队列对应的processQueue、该消息队列下一次消费的进度offset)
			if (nextOffset >= 0) {
				ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
				if (pre != null) {
					log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
				} else {
					log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
					PullRequest pullRequest = new PullRequest();
					pullRequest.setConsumerGroup(consumerGroup);
					pullRequest.setNextOffset(nextOffset);
					pullRequest.setMessageQueue(mq);
					pullRequest.setProcessQueue(pq);
					pullRequestList.add(pullRequest);
					changed = true;
				}
			} else {
				log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
			}
		}
	}
	// ----> 传入dispatchPullRequest方法中,在pull消费者中dispatchPullRequest()方法并没有给出具体实现
	this.dispatchPullRequest(pullRequestList);
	return changed;
}

// RebalancePushImpl
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
	for (PullRequest pullRequest : pullRequestList) {
		// 将pullRequest加入了阻塞队列pullRequestQueue
		this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
		log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
	}
}

回到a、拉消息服务PullMessageService中发现pullMessage调用的就是DefaultMQPushConsumerImpl的pullMessage方法

public void pullMessage(final PullRequest pullRequest) {
	// 得到pullRequest里封装的processQueue
	final ProcessQueue processQueue = pullRequest.getProcessQueue();
	if (processQueue.isDropped()) {
		log.info("the pull request[{}] is dropped.", pullRequest.toString());
		return;
	}
	// 更新processQueue的最新pull时间戳为当前时间
	pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

	try {
		// 确定当前消费者客户端的状态,确定不是中止的状态
		this.makeSureStateOK();
	} catch (MQClientException e) {
		log.warn("pullMessage exception, consumer state not ok", e);
		this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
		return;
	}

	// 对于流量控制消费者状态消息长度等一系列的判断,如果不符合配置要求,那么把pullRequest丢入定时任务中executePullRequestLater
	// 稍后调用executePullRequestImmediately(),即把pullRequest重新塞入阻塞队列,等待下次执行
	if (this.isPause()) {
		log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
		this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
		return;
	}
	long cachedMessageCount = processQueue.getMsgCount().get();
	long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
	if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
		this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
		if ((queueFlowControlTimes++ % 1000) == 0) {
			log.warn(
				"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
				this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
		}
		return;
	}
	if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
		this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
		if ((queueFlowControlTimes++ % 1000) == 0) {
			log.warn(
				"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
				this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
		}
		return;
	}
	// 如果不是顺序执行,那么当并发量大于配置时,稍后重新尝试
	if (!this.consumeOrderly) {
		if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
			this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
			if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
				log.warn(
					"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
					processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
					pullRequest, queueMaxSpanFlowControlTimes);
			}
			return;
		}
	} else {
		// 如果是顺序执行
		if (processQueue.isLocked()) {
			// 计算当前的offset并存入pullRequest的nextOffset成员中
			if (!pullRequest.isLockedFirst()) {
				final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
				boolean brokerBusy = offset < pullRequest.getNextOffset();
				log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
					pullRequest, offset, brokerBusy);
				if (brokerBusy) {
					log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
						pullRequest, offset);
				}

				pullRequest.setLockedFirst(true);
				pullRequest.setNextOffset(offset);
			}
		// 当前线程没获得锁,那么稍后重新执行
		} else {
			this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
			log.info("pull message later because not locked in broker, {}", pullRequest);
			return;
		}
	}
	final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
	// 由于并发关系,即使找不到订阅关系,也要重试下,防止丢失PullRequest 
	if (null == subscriptionData) {
		this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
		log.warn("find the consumer's subscription failed, {}", pullRequest);
		return;
	}

	final long beginTimestamp = System.currentTimeMillis();
	// push采用异步发送方式,当接收到broker的回复的消息后,会调用这里pullCallBack的onSucess方法或者onException方法
	PullCallback pullCallback = new PullCallback() {
		@Override
		public void onSuccess(PullResult pullResult) {
			if (pullResult != null) {
				// 对接收到的消息进行反序列化跟过滤
				pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
					subscriptionData);

				switch (pullResult.getPullStatus()) {
					case FOUND:	// 找到
						long prevRequestOffset = pullRequest.getNextOffset();
						pullRequest.setNextOffset(pullResult.getNextBeginOffset());
						long pullRT = System.currentTimeMillis() - beginTimestamp;
						DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
							pullRequest.getMessageQueue().getTopic(), pullRT);

						long firstMsgOffset = Long.MAX_VALUE;
						if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
							DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
						} else {
							firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

							DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
								pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
							// 将其反序列化了的拉取过来的没有被tag过滤掉的新的消息存放在processQueue里的msgTreeMap当中
							boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
							// ----> 消费,分为顺序与非顺序
							DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
								pullResult.getMsgFoundList(),
								processQueue,
								pullRequest.getMessageQueue(),
								dispatchToConsume);

							if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
								DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
									DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
							} else {
								DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
							}
						}

						if (pullResult.getNextBeginOffset() < prevRequestOffset
							|| firstMsgOffset < prevRequestOffset) {
							log.warn(
								"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
								pullResult.getNextBeginOffset(),
								firstMsgOffset,
								prevRequestOffset);
						}

						break;
					case NO_NEW_MSG:	// 非新消息
						pullRequest.setNextOffset(pullResult.getNextBeginOffset());

						DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

						DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
						break;
					case NO_MATCHED_MSG:	// 没有匹配消息
						pullRequest.setNextOffset(pullResult.getNextBeginOffset());

						DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

						DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
						break;
					case OFFSET_ILLEGAL:	// offset格式不合法
						log.warn("the pull request offset illegal, {} {}",
							pullRequest.toString(), pullResult.toString());
						pullRequest.setNextOffset(pullResult.getNextBeginOffset());

						pullRequest.getProcessQueue().setDropped(true);
						DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

							@Override
							public void run() {
								try {
									DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
										pullRequest.getNextOffset(), false);

									DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

									DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

									log.warn("fix the pull request offset, {}", pullRequest);
								} catch (Throwable e) {
									log.error("executeTaskLater Exception", e);
								}
							}
						}, 10000);
						break;
					default:
						break;
				}
			}
		}

		// 如果消息接收失败
		@Override
		public void onException(Throwable e) {
			if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
				log.warn("execute the pull request exception", e);
			}
			// 同样的,把PullRequest扔回阻塞队列,稍后执行
			DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
		}
	};

	boolean commitOffsetEnable = false;
	long commitOffsetValue = 0L;
	// 集群
	if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
		commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
		if (commitOffsetValue > 0) {
			commitOffsetEnable = true;
		}
	}

	String subExpression = null;
	boolean classFilter = false;
	// 根据消息队列和topic得到对应的topic订阅消息
	SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
	if (sd != null) {
		if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
			subExpression = sd.getSubString();
		}

		classFilter = sd.isClassFilterMode();
	}
	// 构造标志量sysFlag
	int sysFlag = PullSysFlag.buildSysFlag(
		commitOffsetEnable, // commitOffset
		true, // suspend
		subExpression != null, // subscription
		classFilter // class filter
	);
	try {
		// 同步/异步发送消息 PullAPIWrapper->MQClientAPIImpl
		this.pullAPIWrapper.pullKernelImpl(
			pullRequest.getMessageQueue(),
			subExpression,
			subscriptionData.getExpressionType(),
			subscriptionData.getSubVersion(),
			pullRequest.getNextOffset(),
			this.defaultMQPushConsumer.getPullBatchSize(),
			sysFlag,
			commitOffsetValue,
			BROKER_SUSPEND_MAX_TIME_MILLIS,
			CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
			CommunicationMode.ASYNC,
			pullCallback
		);
	} catch (Exception e) {
		log.error("pullKernelImpl exception", e);
		this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
	}
}

消费

// ConsumeMessageConcurrentlyService非顺序,丢入线程池中进行消费
public void submitConsumeRequest(
	final List<MessageExt> msgs,
	final ProcessQueue processQueue,
	final MessageQueue messageQueue,
	final boolean dispatchToConsume) {
	final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
	if (msgs.size() <= consumeBatchSize) {
		ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
		try {
			// ----> 内部类ConsumeRequest.run
			this.consumeExecutor.submit(consumeRequest);
		} catch (RejectedExecutionException e) {
			this.submitConsumeRequestLater(consumeRequest);
		}
	} else {
		for (int total = 0; total < msgs.size(); ) {
			List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
			for (int i = 0; i < consumeBatchSize; i++, total++) {
				if (total < msgs.size()) {
					msgThis.add(msgs.get(total));
				} else {
					break;
				}
			}
			ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
			try {
				this.consumeExecutor.submit(consumeRequest);
			} catch (RejectedExecutionException e) {
				for (; total < msgs.size(); total++) {
					msgThis.add(msgs.get(total));
				}

				this.submitConsumeRequestLater(consumeRequest);
			}
		}
	}
}

public void run() {
	if (this.processQueue.isDropped()) {
		log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
		return;
	}

	MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
	ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
	ConsumeConcurrentlyStatus status = null;

	ConsumeMessageContext consumeMessageContext = null;
	if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
		consumeMessageContext = new ConsumeMessageContext();
		consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
		consumeMessageContext.setProps(new HashMap<String, String>());
		consumeMessageContext.setMq(messageQueue);
		consumeMessageContext.setMsgList(msgs);
		consumeMessageContext.setSuccess(false);
		ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
	}

	long beginTimestamp = System.currentTimeMillis();
	boolean hasException = false;
	ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
	try {
		ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
		if (msgs != null && !msgs.isEmpty()) {
			for (MessageExt msg : msgs) {
				MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
			}
		}
		// 调用用户配置的Listenner的consumeMessage方法对消息进行消费
		status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
	} catch (Throwable e) {
		log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
			RemotingHelper.exceptionSimpleDesc(e),
			ConsumeMessageConcurrentlyService.this.consumerGroup,
			msgs,
			messageQueue);
		hasException = true;
	}
	long consumeRT = System.currentTimeMillis() - beginTimestamp;
	if (null == status) {
		if (hasException) {
			returnType = ConsumeReturnType.EXCEPTION;
		} else {
			returnType = ConsumeReturnType.RETURNNULL;
		}
	} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
		returnType = ConsumeReturnType.TIME_OUT;
	} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
		returnType = ConsumeReturnType.FAILED;
	} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
		returnType = ConsumeReturnType.SUCCESS;
	}

	if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
		consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
	}

	if (null == status) {
		log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
			ConsumeMessageConcurrentlyService.this.consumerGroup,
			msgs,
			messageQueue);
		status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
	}

	if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
		consumeMessageContext.setStatus(status.toString());
		consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
		ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
	}

	ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
		.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

	if (!processQueue.isDropped()) {
		// ----> 对消息的消费结果进行处理
		ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
	} else {
		log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
	}
}

public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest) {
	int ackIndex = context.getAckIndex();

	if (consumeRequest.getMsgs().isEmpty())
		return;

	switch (status) {
		// 成功,对其消费的数量ackIndex进行设置
		case CONSUME_SUCCESS:
			if (ackIndex >= consumeRequest.getMsgs().size()) {
				ackIndex = consumeRequest.getMsgs().size() - 1;
			}
			int ok = ackIndex + 1;
			int failed = consumeRequest.getMsgs().size() - ok;
			this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
			this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
			break;
		// 失败则ackIndex设为-1
		case RECONSUME_LATER:
			ackIndex = -1;
			this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
				consumeRequest.getMsgs().size());
			break;
		default:
			break;
	}

	switch (this.defaultMQPushConsumer.getMessageModel()) {
		// 广播模式,仅仅记录日志,不处理
		case BROADCASTING:
			for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
				MessageExt msg = consumeRequest.getMsgs().get(i);
				log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
			}
			break;
		// 集群模式
		case CLUSTERING:
			List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
			for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
				MessageExt msg = consumeRequest.getMsgs().get(i);
				// 把消息向broker发送回去,如果失败了那么将会将这条消息扔进定时任务中稍后重新消费,并清空失败消息队列
				boolean result = this.sendMessageBack(msg, context);
				if (!result) {
					msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
					msgBackFailed.add(msg);
				}
			}

			if (!msgBackFailed.isEmpty()) {
				consumeRequest.getMsgs().removeAll(msgBackFailed);

				this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
			}
			break;
		default:
			break;
	}
	// 从processQueue中的treeMap中移除消费了消息,并得到offset,更新消息队列的offset
	long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
	if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
		this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
	}
}

那么常用的PUSH模式就介绍完了,PULL模式比较麻烦,需要自己调用pull方法主动去拉取消息

// DefaultMQPullConsumer
public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
	throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
	return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout);
}
public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
	throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
	return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, timeout);
}
public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
	PullCallback pullCallback, long timeout)
	throws MQClientException, RemotingException, InterruptedException {
	this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback, timeout);
}

DefaultMQPullConsumer的pull方法直接调用了DefaultMQPullConsumerImpl的pull()方法,最后调用其pullSyncImpl方法

参考:
RocketMQ原理解析-Consumer
rocketmq3.26研究之六DefaultMQPushConsumer消费流程
RocketMQ之PushConsumer的启动
RocketMQ之PullConsumer消息拉取实现