RocketMQ(二) NameServer与Broker

Posted by ZhouJ000 on January 7, 2019

RocketMQ(一) 入门
RocketMQ(二) NameServer与Broker
RocketMQ(三) Producer与Consumer

NameServer

namesrv nameServer设计比较轻量级的,其中几个主要类的功能为:
NamesrvStartup: 为NameServer的启动类,在执行mqnamesrv.cmd时候会调用runserver为jvm设置参数,然后调用NamesrvStartup类
NamesrvController: NameServer控制类,管控NameServer的启动、初始化、停止等生命周期
RouteInfoManager: 路由信息的管理类,就是存放Broker的状态信息及Topic于Broker的关联关系
DefaultRequestProcessor: 处理请求的类,里面封装了对broker发来的各种请求的响应

NamesrvStartup:

public static NamesrvController main0(String[] args) {
	try {
		// 1. 创建NamesrvController
		NamesrvController controller = createNamesrvController(args);
		// 2. 启动NamesrvController
		start(controller);
		String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
		log.info(tip);
		System.out.printf("%s%n", tip);
		return controller;
	} catch (Throwable e) {
		e.printStackTrace();
		System.exit(-1);
	}
	return null;
}

1、创建NamesrvController

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
	System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
	//PackageConflictDetect.detectFastjson();

	Options options = ServerUtil.buildCommandlineOptions(new Options());
	commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
	if (null == commandLine) {
		System.exit(-1);
		return null;
	}

	// NamesrvConfig配置
	final NamesrvConfig namesrvConfig = new NamesrvConfig();
	// NettyServerConfig配置
	final NettyServerConfig nettyServerConfig = new NettyServerConfig();
	nettyServerConfig.setListenPort(9876);
	// 解析命令行参数
	if (commandLine.hasOption('c')) {
		String file = commandLine.getOptionValue('c');
		if (file != null) {
			InputStream in = new BufferedInputStream(new FileInputStream(file));
			properties = new Properties();
			properties.load(in);
			MixAll.properties2Object(properties, namesrvConfig);
			MixAll.properties2Object(properties, nettyServerConfig);

			namesrvConfig.setConfigStorePath(file);

			System.out.printf("load config properties file OK, %s%n", file);
			in.close();
		}
	}
	if (commandLine.hasOption('p')) {
		InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
		MixAll.printObjectProperties(console, namesrvConfig);
		MixAll.printObjectProperties(console, nettyServerConfig);
		System.exit(0);
	}

	MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

	if (null == namesrvConfig.getRocketmqHome()) {
		System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
		System.exit(-2);
	}

	LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
	JoranConfigurator configurator = new JoranConfigurator();
	configurator.setContext(lc);
	lc.reset();
	// logback配置
	configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

	log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

	MixAll.printObjectProperties(log, namesrvConfig);
	MixAll.printObjectProperties(log, nettyServerConfig);

	// 创建NamesrvController
	final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

	// remember all configs to prevent discard
	controller.getConfiguration().registerConfig(properties);

	return controller;
}

2、启动NamesrvController

public static NamesrvController start(final NamesrvController controller) throws Exception {
	if (null == controller) {
		throw new IllegalArgumentException("NamesrvController is null");
	}
	// NamesrvController初始化
	boolean initResult = controller.initialize();
	if (!initResult) {
		controller.shutdown();
		System.exit(-3);
	}
	// 注册shutdown钩子
	Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
		@Override
		public Void call() throws Exception {
			controller.shutdown();
			return null;
		}
	}));
	// NamesrvController启动
	controller.start();
	return controller;
}

NamesrvController:

public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
	this.namesrvConfig = namesrvConfig;
	this.nettyServerConfig = nettyServerConfig;
	this.kvConfigManager = new KVConfigManager(this);
	this.routeInfoManager = new RouteInfoManager();
	this.brokerHousekeepingService = new BrokerHousekeepingService(this);
	this.configuration = new Configuration(
		log,
		this.namesrvConfig, this.nettyServerConfig
	);
	this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}

1、初始化

public boolean initialize() {
	// 加载原来的key-value文件到内存(configTable)中
	this.kvConfigManager.load();
	// 初始化NettyRemotingServer
	this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

	this.remotingExecutor =
		Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

	// a.注册requestProcessor,默认为DefaultRequestProcessor,用来处理netty接收到的信息
	this.registerProcessor();
	// 启动定时线程,每隔10s判断broker是否依然存活
	this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
		@Override
		public void run() {
			NamesrvController.this.routeInfoManager.scanNotActiveBroker();
		}
	}, 5, 10, TimeUnit.SECONDS);
	// 启动定时线程,每隔10min打印出所有k-v
	this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
		@Override
		public void run() {
			NamesrvController.this.kvConfigManager.printAllPeriodically();
		}
	}, 1, 10, TimeUnit.MINUTES);

	// SSL modes不为DISABLED,进行处理
	if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
		// Register a listener to reload SslContext
		try {
			fileWatchService = new FileWatchService(
				new String[] {
					TlsSystemConfig.tlsServerCertPath,
					TlsSystemConfig.tlsServerKeyPath,
					TlsSystemConfig.tlsServerTrustCertPath
				},
				new FileWatchService.Listener() {
					boolean certChanged, keyChanged = false;
					@Override
					public void onChanged(String path) {
						if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
							log.info("The trust certificate changed, reload the ssl context");
							reloadServerSslContext();
						}
						if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
							certChanged = true;
						}
						if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
							keyChanged = true;
						}
						if (certChanged && keyChanged) {
							log.info("The certificate and private key changed, reload the ssl context");
							certChanged = keyChanged = false;
							reloadServerSslContext();
						}
					}
					private void reloadServerSslContext() {
						((NettyRemotingServer) remotingServer).loadSslContext();
					}
				});
		} catch (Exception e) {
			log.warn("FileWatchService created error, can't load the certificate dynamically");
		}
	}
	return true;
}

2、启动

public void start() throws Exception {
	// 启动后使用NettyServerHandler进行read和write,最终调用到(初始化时注册的a.)DefaultRequestProcessor.processRequest
	this.remotingServer.start();
	if (this.fileWatchService != null) {
		this.fileWatchService.start();
	}
}

请求处理

请求的KEY值 作用
PUT_KV_CONFIG 向Namesrv追加KV配置
GET_KV_CONFIG 从Namesrv获取KV配置
DELETE_KV_CONFIG 从Namesrv获取KV配置
REGISTER_BROKER 注册一个Broker,数据都是持久化的,如果存在则覆盖配置
UNREGISTER_BROKER 卸载一个Broker,数据都是持久化的
GET_ROUTEINTO_BY_TOPIC 根据Topic获取Broker Name、topic配置信息
GET_BROKER_CLUSTER_INFO 获取注册到Name Server的所有Broker集群信息
WIPE_WRITE_PERM_OF_BROKER 去掉BrokerName的写权限
GET_ALL_TOPIC_LIST_FROM_NAMESERVER 从Name Server获取完整Topic列表
DELETE_TOPIC_IN_NAMESRV 从Namesrv删除Topic配置
GET_KV_CONFIG_BY_VALUE 通过 project 获取所有的 server ip 信息
DELETE_KV_CONFIG_BY_VALUE 删除指定 project group 下的所有 server ip 信息
GET_KVLIST_BY_NAMESPACE 通过NameSpace获取所有的KV List
GET_KVLIST_BY_NAMESPACE 通过NameSpace获取所有的KV List
GET_TOPICS_BY_CLUSTER 获取指定集群下的所有 topic
GET_SYSTEM_TOPIC_LIST_FROM_NS 获取所有系统内置 Topic 列表
GET_UNIT_TOPIC_LIST 单元化相关 topic
GET_HAS_UNIT_SUB_TOPIC_LIST 获取含有单元化订阅组的 Topic 列表
GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST 获取含有单元化订阅组的非单元化 Topic 列表

DefaultRequestProcessor:

public RemotingCommand processRequest(ChannelHandlerContext ctx,
	RemotingCommand request) throws RemotingCommandException {
	if (ctx != null) {
		log.debug("receive request, {} {} {}",
			request.getCode(),
			RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
			request);
	}

	switch (request.getCode()) {
		case RequestCode.PUT_KV_CONFIG:
			return this.putKVConfig(ctx, request);
		case RequestCode.GET_KV_CONFIG:
			return this.getKVConfig(ctx, request);
		case RequestCode.DELETE_KV_CONFIG:
			return this.deleteKVConfig(ctx, request);
		case RequestCode.QUERY_DATA_VERSION:
			return queryBrokerTopicConfig(ctx, request);
		// a. 注册borker信息
		case RequestCode.REGISTER_BROKER:
			Version brokerVersion = MQVersion.value2Version(request.getVersion());
			if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
				return this.registerBrokerWithFilterServer(ctx, request);
			} else {
				return this.registerBroker(ctx, request);
			}
		// 取消注册broker
		case RequestCode.UNREGISTER_BROKER:	
			return this.unregisterBroker(ctx, request);
		case RequestCode.GET_ROUTEINTO_BY_TOPIC:
			// 根据topic获取路由信息,在producer发送消息和consumer在pull消息的时候的时候会从nameServer中获取
			// 返回包含orderTopicConf、QueueData、BrokerData和HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable
			return this.getRouteInfoByTopic(ctx, request);
		case RequestCode.GET_BROKER_CLUSTER_INFO:
			return this.getBrokerClusterInfo(ctx, request);
		case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
			return this.wipeWritePermOfBroker(ctx, request);
		case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
			return getAllTopicListFromNameserver(ctx, request);
		case RequestCode.DELETE_TOPIC_IN_NAMESRV:
			return deleteTopicInNamesrv(ctx, request);
		case RequestCode.GET_KVLIST_BY_NAMESPACE:
			return this.getKVListByNamespace(ctx, request);
		case RequestCode.GET_TOPICS_BY_CLUSTER:
			return this.getTopicsByCluster(ctx, request);
		case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
			return this.getSystemTopicListFromNs(ctx, request);
		case RequestCode.GET_UNIT_TOPIC_LIST:
			return this.getUnitTopicList(ctx, request);
		case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
			return this.getHasUnitSubTopicList(ctx, request);
		case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
			return this.getHasUnitSubUnUnitTopicList(ctx, request);
		case RequestCode.UPDATE_NAMESRV_CONFIG:
			return this.updateConfig(ctx, request);
		case RequestCode.GET_NAMESRV_CONFIG:
			return this.getConfig(ctx, request);
		default:
			break;
	}
	return null;
}

a、注册borker,调用到RouteInfoManager.registerBroker,进行的主要操作是解析request,依次填充以下集合,供producer和consumer获取

public RemotingCommand registerBroker(ChannelHandlerContext ctx,
	RemotingCommand request) throws RemotingCommandException {
	final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
	final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
	final RegisterBrokerRequestHeader requestHeader =
		(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

	if (!checksum(ctx, request, requestHeader)) {
		response.setCode(ResponseCode.SYSTEM_ERROR);
		response.setRemark("crc32 not match");
		return response;
	}

	// 创建TopicConfigSerializeWrapper
	TopicConfigSerializeWrapper topicConfigWrapper;
	if (request.getBody() != null) {
		topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
	} else {
		topicConfigWrapper = new TopicConfigSerializeWrapper();
		topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
		topicConfigWrapper.getDataVersion().setTimestamp(0);
	}
	
	// 注册填充Broker信息
	RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
		requestHeader.getClusterName(),
		requestHeader.getBrokerAddr(),
		requestHeader.getBrokerName(),
		requestHeader.getBrokerId(),
		requestHeader.getHaServerAddr(),
		topicConfigWrapper,
		null,
		ctx.channel()
	);

	responseHeader.setHaServerAddr(result.getHaServerAddr());
	responseHeader.setMasterAddr(result.getMasterAddr());

	byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
	response.setBody(jsonValue);
	response.setCode(ResponseCode.SUCCESS);
	response.setRemark(null);
	return response;
}

RouteInfoManager:

// topic队列表,存储了每个topic包含的队列数据
// 主题与topic配置的对应关系,topics.json的topicConfigTable数据,在QueueData对象中记录了该topic的BrokerName
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// broker地址表,Broker名称与broker属性的map
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 集群主备信息表,集群与broker集合的对应关系
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// broker存活状态信息表,其中的BrokerLiveInfo存储了broker的版本号,channel,和最近心跳时间等信息
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;\
// 记录了每个broker的filter信息,Borker地址与过滤器的集合
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;



public RegisterBrokerResult registerBroker(
	final String clusterName,
	final String brokerAddr,
	final String brokerName,
	final long brokerId,
	final String haServerAddr,
	final TopicConfigSerializeWrapper topicConfigWrapper,
	final List<String> filterServerList,
	final Channel channel) {
	RegisterBrokerResult result = new RegisterBrokerResult();
	try {
		try {
			this.lock.writeLock().lockInterruptibly();

			Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
			if (null == brokerNames) {
				brokerNames = new HashSet<String>();
				// clusterAddrTable
				this.clusterAddrTable.put(clusterName, brokerNames);
			}
			brokerNames.add(brokerName);

			boolean registerFirst = false;

			BrokerData brokerData = this.brokerAddrTable.get(brokerName);
			if (null == brokerData) {
				registerFirst = true;
				brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
				// brokerAddrTable
				this.brokerAddrTable.put(brokerName, brokerData);
			}
			String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
			registerFirst = registerFirst || (null == oldAddr);

			if (null != topicConfigWrapper
				&& MixAll.MASTER_ID == brokerId) {
				if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
					|| registerFirst) {
					ConcurrentMap<String, TopicConfig> tcTable =
						topicConfigWrapper.getTopicConfigTable();
					if (tcTable != null) {
						for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
							// topicQueueTable
							this.createAndUpdateQueueData(brokerName, entry.getValue());
						}
					}
				}
			}

			// brokerLiveTable
			BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
				new BrokerLiveInfo(
					System.currentTimeMillis(),
					topicConfigWrapper.getDataVersion(),
					channel,
					haServerAddr));
			if (null == prevBrokerLiveInfo) {
				log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
			}

			if (filterServerList != null) {
				if (filterServerList.isEmpty()) {
					this.filterServerTable.remove(brokerAddr);
				} else {
					// filterServerTable
					this.filterServerTable.put(brokerAddr, filterServerList);
				}
			}

			if (MixAll.MASTER_ID != brokerId) {
				String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
				if (masterAddr != null) {
					BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
					if (brokerLiveInfo != null) {
						result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
						result.setMasterAddr(masterAddr);
					}
				}
			}
		} finally {
			this.lock.writeLock().unlock();
		}
	} catch (Exception e) {
		log.error("registerBroker Exception", e);
	}

	return result;
}

b、根据topic获取路由

public TopicRouteData pickupTopicRouteData(final String topic) {
	TopicRouteData topicRouteData = new TopicRouteData();
	boolean foundQueueData = false;
	boolean foundBrokerData = false;
	Set<String> brokerNameSet = new HashSet<String>();
	// 创建brokerDataList
	List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
	topicRouteData.setBrokerDatas(brokerDataList);
	// 创建filterServerMap
	HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
	topicRouteData.setFilterServerTable(filterServerMap);

	try {
		try {
			this.lock.readLock().lockInterruptibly();
			// 从topicQueueTable根据topic获取QueueData
			List<QueueData> queueDataList = this.topicQueueTable.get(topic);
			if (queueDataList != null) {
				topicRouteData.setQueueDatas(queueDataList);
				foundQueueData = true;

				Iterator<QueueData> it = queueDataList.iterator();
				while (it.hasNext()) {
					QueueData qd = it.next();
					brokerNameSet.add(qd.getBrokerName());
				}
				// 遍历broker
				for (String brokerName : brokerNameSet) {
					BrokerData brokerData = this.brokerAddrTable.get(brokerName);
					if (null != brokerData) {
						BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
							.getBrokerAddrs().clone());
						// 填充brokerDataList
						brokerDataList.add(brokerDataClone);
						foundBrokerData = true;
						for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
							List<String> filterServerList = this.filterServerTable.get(brokerAddr);
							// 填充filterServerMap
							filterServerMap.put(brokerAddr, filterServerList);
						}
					}
				}
			}
		} finally {
			this.lock.readLock().unlock();
		}
	} catch (Exception e) {
		log.error("pickupTopicRouteData Exception", e);
	}

	log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);

	if (foundBrokerData && foundQueueData) {
		return topicRouteData;
	}

	return null;
}

RocketMQ原理解析-NameServer
RocketMQ观后感–NameServer

Broker

broker

BrokerStartup:

public static void main(String[] args) {
	start(createBrokerController(args));
}

1、创建BrokerController

public static BrokerController createBrokerController(String[] args) {
	// ...
	commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
                new PosixParser());
	// ...
	// 各种配置config
	// ...
	final BrokerController controller = new BrokerController(
		brokerConfig,
		nettyServerConfig,
		nettyClientConfig,
		messageStoreConfig);
	// remember all configs to prevent discard
	controller.getConfiguration().registerConfig(properties);
	// 初始化
	boolean initResult = controller.initialize();
	if (!initResult) {
		controller.shutdown();
		System.exit(-3);
	}

	Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
		private volatile boolean hasShutdown = false;
		private AtomicInteger shutdownTimes = new AtomicInteger(0);

		@Override
		public void run() {
			synchronized (this) {
				log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
				if (!this.hasShutdown) {
					this.hasShutdown = true;
					long beginTime = System.currentTimeMillis();
					controller.shutdown();
					long consumingTimeTotal = System.currentTimeMillis() - beginTime;
					log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
				}
			}
		}
	}, "ShutdownHook"));
	
	return controller;
1

2、初始化

// BrokerController
public boolean initialize() throws CloneNotSupportedException {
	boolean result = this.topicConfigManager.load();

	result = result && this.consumerOffsetManager.load();
	result = result && this.subscriptionGroupManager.load();
	result = result && this.consumerFilterManager.load();

	if (result) {
		try {
			this.messageStore =
				new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
					this.brokerConfig);
			this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
			//load plugin
			MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
			this.messageStore = MessageStoreFactory.build(context, this.messageStore);
			this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
		} catch (IOException e) {
			result = false;
			log.error("Failed to initialize", e);
		}
	}
	result = result && this.messageStore.load();
	
	if (result) {
		this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
		NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
		fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
		this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
	
		// 创建各种ExecutorService
		this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
		this.brokerConfig.getSendMessageThreadPoolNums(),
		this.brokerConfig.getSendMessageThreadPoolNums(),
		1000 * 60,
		TimeUnit.MILLISECONDS,
		this.sendThreadPoolQueue,
		new ThreadFactoryImpl("SendMessageThread_"));
		// ...
	
		// 注册处理类
		this.registerProcessor();
		
		// 创建各种定时任务
		this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
			@Override
			public void run() {
				try {
					BrokerController.this.getBrokerStats().record();
				} catch (Throwable e) {
					log.error("schedule record error.", e);
				}
			}
		}, initialDelay, period, TimeUnit.MILLISECONDS);
		// ...
		
		if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
			// Register a listener to reload SslContext
			// ...
			initialTransaction();
	}
	return result;
}	

3、启动

public static BrokerController start(BrokerController controller) {
	try {
		// 启动
		controller.start();

		String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
			+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
		if (null != controller.getBrokerConfig().getNamesrvAddr()) {
			tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
		}
		log.info(tip);
		System.out.printf("%s%n", tip);
		return controller;
	} catch (Throwable e) {
		e.printStackTrace();
		System.exit(-1);
	}
	return null;
}

BrokerController:

public void start() throws Exception {
	if (this.messageStore != null) {
		this.messageStore.start();
	}

	if (this.remotingServer != null) {
		this.remotingServer.start();
	}

	if (this.fastRemotingServer != null) {
		this.fastRemotingServer.start();
	}

	if (this.fileWatchService != null) {
		this.fileWatchService.start();
	}

	if (this.brokerOuterAPI != null) {
		this.brokerOuterAPI.start();
	}

	if (this.pullRequestHoldService != null) {
		this.pullRequestHoldService.start();
	}

	if (this.clientHousekeepingService != null) {
		this.clientHousekeepingService.start();
	}

	if (this.filterServerManager != null) {
		this.filterServerManager.start();
	}

	// 注册broker到name server
	// BrokerOuterAPI.registerBrokerAll
	// 获取namesrv的地址列表,遍历向每个namesrv注册topic的配置信息topicconfig
	this.registerBrokerAll(true, false, true);

	// 注册定时任务定时更新注册broker到name server
	this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
		@Override
		public void run() {
			try {
				BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
			} catch (Throwable e) {
				log.error("registerBrokerAll Exception", e);
			}
		}
	}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

	if (this.brokerStatsManager != null) {
		this.brokerStatsManager.start();
	}

	if (this.brokerFastFailure != null) {
		this.brokerFastFailure.start();
	}

	if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) {
		if (this.transactionalMessageCheckService != null) {
			log.info("Start transaction service!");
			this.transactionalMessageCheckService.start();
		}
	}
}

broker存储

rocketMq的broker消息存储主要包括3个部分,分别commitLog的存储,consumeQueue的存储,index的存储:
1、commitLog的存储是producer发送消息给broker端broker同步处理的
2、consumeQueue和index两者存储其实是一个定时任务从commitLog中获取偏移量然后存储过去的
3、consumeQueue和index的存储与commitLog的存储是隔离开的,非同步的

consume queue: 消息的逻辑队列,相当于字典的目录用来指定消息在消息的真正的物理文件commitLog上的位置,
每个topic下的每个queue都有一个对应的consumequeue文件。
文件地址:${user.home} \store\consumequeue\${topicName}\${queueId}\${fileName}

consume queue中存储单元是一个20字节定长的数据,是顺序写顺序读
1. commitLogOffset是指这条消息在commitLog文件实际偏移量
2. size就是指消息大小
3. 消息tag的哈希值

ConsumeQueue文件组织:
1. topic + queueId来组织的,比如TopicA配了读写队列0,1,那么TopicA和Queue=0组成一个ConsumeQueue,TopicA和Queue=1组成一个另一个ConsumeQueue.
2. 按消费端group分组重试队列,如果消费端消费失败,发送到retry消费队列中
3. 按消费端group分组死信队列,如果消费端重试超过指定次数,发送死信队列
4. 每个ConsumeQueue可以由多个文件组成无限队列被MapedFileQueue对象管理

CommitLog:消息写入内存,存放物理文件,每台broker上的commitLog被本机器所有queue共享不做区分。一个消息存储单元长度是不定的,顺序写但是随机读。有两层,其中MappendFileQueue是逻辑的存储队列概念,里面保存着顺序增长的MappedFile文件

MapedFileQueue:存储队列,管理mapedFile,新建、获取、删除mapedFile将消息写入文件。队列有多个文件(MapedFile)组成,由集合对象List表示升序排列

MapedFile:PageCache文件封装,是真正存储实际数据的文件。操作物理文件在内存中的映射以及将内存数据持久化到物理文件中,代码中写死了要求os系统的页大小为4k, 消息刷盘根据参数(commitLog默认至少刷4页, consumeQueue默认至少刷2页)才会去刷。在整个broker的存储体系中,MappedFile文件保存了commitLog、consumeQueue、Index等,是核心的数据结构

DefaultMessageStore:消息存储层实现 broker-msg-store

public PutMessageResult putMessage(MessageExtBrokerInner msg) {
	// 各种检查
	if (this.shutdown) {
		log.warn("message store has shutdown, so putMessage is forbidden");
		return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
	}
	if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
		long value = this.printTimes.getAndIncrement();
		if ((value % 50000) == 0) {
			log.warn("message store is slave mode, so putMessage is forbidden ");
		}
		return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
	}
	if (!this.runningFlags.isWriteable()) {
		long value = this.printTimes.getAndIncrement();
		if ((value % 50000) == 0) {
			log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
		}
		return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
	} else {
		this.printTimes.set(0);
	}
	// message topic长度校验
	if (msg.getTopic().length() > Byte.MAX_VALUE) {
		log.warn("putMessage message topic length too long " + msg.getTopic().length());
		return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
	}
	// message properties长度校验
	if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
		log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
		return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
	}
	if (this.isOSPageCacheBusy()) {
		return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
	}

	long beginTime = this.getSystemClock().now();
	// ----> 消息的存储
	PutMessageResult result = this.commitLog.putMessage(msg);
	// 性能数据统计
	long eclipseTime = this.getSystemClock().now() - beginTime;
	if (eclipseTime > 1000) {
		log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
	}
	this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

	if (null == result || !result.isOk()) {
		this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
	}
	return result;
}

commitLog.putMessage(final MessageExtBrokerInner msg)

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
	// Set the storage time
	msg.setStoreTimestamp(System.currentTimeMillis());
	// Set the message body BODY CRC (consider the most appropriate setting
	// on the client)
	msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
	// Back to Results
	AppendMessageResult result = null;

	StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

	String topic = msg.getTopic();
	int queueId = msg.getQueueId();
	
	// 事务相关
	final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
	if (tranType == MessageSysFlag.TransactionNotType//
			|| tranType == MessageSysFlag.TransactionCommitType) {
		// Delay Delivery
		if (msg.getDelayTimeLevel() > 0) {
			if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
				msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
			}

			topic = ScheduleMessageService.SCHEDULE_TOPIC;
			queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

			// Backup real topic, queueId
			MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
			MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
			msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

			msg.setTopic(topic);
			msg.setQueueId(queueId);
		}
	}

	long eclipseTimeInLock = 0;
	 // 获取写入映射文件MapedFile
	MapedFile unlockMapedFile = null;
	MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();
	synchronized (this) {
		long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
		this.beginTimeInLock = beginLockTimestamp;

		// Here settings are stored timestamp, in order to ensure an orderly
		// global
		msg.setStoreTimestamp(beginLockTimestamp);
		// 当不存在映射文件时,进行创建
		if (null == mapedFile || mapedFile.isFull()) {
			mapedFile = this.mapedFileQueue.getLastMapedFile();
		}
		if (null == mapedFile) {
			log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
			beginTimeInLock = 0;
			return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
		}
		// ----> 插入消息到MappedFile(file的缓冲区),并返回插入结果
		// 其中消息保存到了msgStoreItemMemory,这是一个内存映射文件MappedByteBuffer
		result = mapedFile.appendMessage(msg, this.appendMessageCallback);
		switch (result.getStatus()) {
			case PUT_OK:
				break;
			// 当文件尾时,获取新的映射文件,并进行插入
			case END_OF_FILE:	
				unlockMapedFile = mapedFile;
				// Create a new file, re-write the message
				mapedFile = this.mapedFileQueue.getLastMapedFile();
				if (null == mapedFile) {
					// XXX: warn and notify me
					log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
					beginTimeInLock = 0;
					return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
				}
				result = mapedFile.appendMessage(msg, this.appendMessageCallback);
				break;
			case MESSAGE_SIZE_EXCEEDED:
			case PROPERTIES_SIZE_EXCEEDED:
				beginTimeInLock = 0;
				return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
			case UNKNOWN_ERROR:
				beginTimeInLock = 0;
				return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
			default:
				beginTimeInLock = 0;
				return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
		}

		eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
		beginTimeInLock = 0;
	} // end of synchronized

	if (eclipseTimeInLock > 1000) {
		log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
	}
	if (null != unlockMapedFile) {
		this.defaultMessageStore.unlockMapedFile(unlockMapedFile);
	}

	PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

	// Statistics
	storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
	storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

	// 消息刷盘,即持久化到文件。上面插入消息实际未存储到硬盘。此处,根据不同的刷盘策略,执行会有不同。详细解析见:FlushCommitLogService。
    // 进行同步||异步 flush||commit
	GroupCommitRequest request = null;

	// Synchronization flush
	if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
		// 该线程处理刷盘逻辑,最后调用CommitLog.this.mapedFileQueue.commit(0)来保存到硬盘
		GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
		if (msg.isWaitStoreMsgOK()) {
			request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
			service.putRequest(request);
			boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
			if (!flushOK) {
				log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
						+ " client address: " + msg.getBornHostString());
				putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
			}
		} else {
			service.wakeup();
		}
	}
	// Asynchronous flush
	// 唤醒commitLog线程,进行flush
	else {
		this.flushCommitLogService.wakeup();
	}

	// Synchronous write double	如果是同步Master,同步到从节点
	if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
		HAService service = this.defaultMessageStore.getHaService();
		if (msg.isWaitStoreMsgOK()) {
			// Determine whether to wait
			if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
				if (null == request) {
					request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
				}
				service.putRequest(request);

				service.getWaitNotifyObject().wakeupAll();

				boolean flushOK =
						// TODO
						request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
				if (!flushOK) {
					log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "
							+ msg.getTags() + " client address: " + msg.getBornHostString());
					putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
				}
			}
			// Slave problem
			else {
				// Tell the producer, slave not available
				putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
			}
		}
	}

	return putMessageResult;
}

RocketMQ原理解析-Broker
再说rocketmq消息存储
YunaiV/Blog/RocketMQ