RocketMQ(一) 入门

Posted by ZhouJ000 on December 6, 2018
最后更新于:2019-01-01

RocketMQ(一) 入门
RocketMQ(二) NameServer与Broker
RocketMQ(三) Producer与Consumer

准备

windows环境下,官网下载rocketmq-4.3.2
配置环境变量ROCKETMQ_HOME为解压缩目录

rocket-jg 包含的组件:
NameServer:供Producer和Consumer获取Broker地址
Producer:产生并发送消息
Consumer:接受并消费消息
Broker:消息暂存,消息转发

单master启动:
/bin路径下执行start mqnamesrv.cmd启动NameServer
然后执行start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true启动Broker

插件部署:
1、rocketmq-externals去clone下来
2、进入rocketmq-externals\rocketmq-console\src\main\resources文件夹下修改application.properties配置文件
3、修改server.port,rocketmq.config.namesrvAddr,还可以修改rocketmq.config.dataPath
4、在rocketmq-externals\rocketmq-console文件夹下进行打包mvn clean package -Dmaven.test.skip=true
5、进入target文件夹下执行java -jar rocketmq-console-ng-1.0.0.jar启动

介绍

Name Server:Name Server为producer和consumer提供路由信息
Broker:Broker是RocketMQ系统的主要角色,其实就是MQ。Broker接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备

Producer:消息生产者,生产者的作用就是将消息发送到MQ
Producer Group:生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组

Consumer:消息消费者
Consumer Group:消费者组,和生产者类似,消费同一类消息的多个 consumer 实例组成一个消费者组

Topic:Topic是一种消息的逻辑分类,比如说你有订单类的消息,也有库存类的消息,那么就需要进行分类。topic只是消息的逻辑分类,内部实现其实是由queue组成
Tag:标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息
Message:Message是消息的载体。一个Message必须指定topic,相当于寄信的地址。Message还有一个可选的tag设置,以便消费端可以基于tag进行过滤消息,也可以添加额外的键值对

NameServer

作用:Producer和Consumer获取Broker的地址
目的:解耦Broker、Producer、Consumer
原理:使用netty作为通信工具,监听指定端口,如果是broker注册,将broker的信息保存在内存中并保存到文件中,producer和consumer的请求获取broker地址

当前版本的RocketMQ没有选择ZooKeeper,而是自己开发了NameServer。其在RocketMQ中起着中转承接的作用,是一个无状态的服务,多个NameServer之间不通信。任何Producer,Consumer,Broker与所有NameServer通信,向NameServer请求或者发送数据。而且都是单向的,Producer和Consumer请求数据,Broker发送数据。正是因为这种单向的通信,RocketMQ水平扩容变得很容易

1、NameServer互相独立,是一个几乎无状态节点,可集群部署,节点之间无任何信息同步,这样的设计直接降低了Broker实现的复查性,单台NameServer挂掉,不影响其他NameServer。NameServer不去连接别的机器,不会主动推消息
2、每个Broker启动的时候会向Namesrv发送注册请求,Namesrv接收Broker的请求注册路由信息,NameServer保存活跃的broker列表,包括Master和Slave
3、单个broker(Master、Slave)与所有NameServer进行定时注册,以便告知NameServer自己还活着。Broker每隔30秒向所有NameServer发送心跳,心跳包含了自身的topic配置信息。NameServer每隔10秒,扫描所有还存活的broker连接,如果某个连接的最后更新时间与当前时间差值超过2分钟,则断开此连接。此外NameServer也会断开此broker下所有与slave的连接。同时更新topic与队列的对应关系,但不会通知生产者和消费者
4、用来保存所有topic和该topic所有队列的列表
5、NameServer用来保存所有broker的Filter列表
6、接收client(Producer和Consumer)的请求根据某个topic获取所有到broker的路由信息
7、客户端client是先从NameServer寻址的,得到可用Broker的IP和端口信息,然后自己去连接broker
8、接收broker的请求注册broker路由信息(包括master和slave)。Producer随机与NameServer集群中的一个节点建立长连接,每隔30秒(此处时间可配置)从NameServer获取topic的最新信息,这就表示如果某个broker master宕机,producer最多30秒才能感知,在这个期间,发往该broker master的消息将会失败。Producer会向提供topic服务的broker master建立长连接,且定时向master发送心跳
9、接收client的请求根据某个topic获取所有到broker的路由信息。consumer随机与NameServer集群中的一个节点建立长连接,如果该NameServer断开,则从NameServer列表中查找下一个进行连接。consumer主要从NameServer中根据topic查询broker的地址,查到就会缓存到客户端,并向提供topic服务的master、slave建立长连接,且定时向master、slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。如果broker宕机,则NameServer会将其剔除,而consumer端的定时任务MQClientInstance.this.updateTopicRouteInfoFromNameServer每30秒执行一次,会将topic对应的broker地址拉取下来,此地址已经为slave地址了,故此时consumer会从slave上消费。 消费者与master和slave都建有连接,在不同场景有不同的消费规则

RocketMQ初探一:NameServer的作用
分布式消息队列RocketMQ与Kafka架构上的巨大差异之1 – 为什么RocketMQ要去除ZK依赖

Broker

Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server

Broker slave同步或者异步从Broker master上拷贝数据

在执行mqbroker.cmd时会调用runbroker.cmd为jvm设置参数,然后调用BrokerStartup启动类

demo

org.apache.rocketmq.example.quickstart下有Producer与Consumer

Producer:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
		// 声明并初始化一个producer
        // 需要一个producer group名字作为构造方法的参数
        DefaultMQProducer producer = new DefaultMQProducer("producergroup1");

        //设置NameServer地址,此处应改为实际NameServer地址,多个地址之间用;分隔
        //NameServer的地址必须有,但是也可以通过环境变量的方式设置,不一定非得写死在代码里
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 调用start()方法启动一个producer实例
        producer.start();

        // 发送10条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值
        for (int i = 0; i < 1000; i++) {
            try {
                 /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                // 调用producer的send()方法发送消息
                // 这里调用的是同步的方式,所以会有返回结果
                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        // 发送完消息之后,调用shutdown()方法关闭producer
        producer.shutdown();
    }
}

Consumer:

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumergroup1");
		consumer.setNamesrvAddr("127.0.0.1:9876");

        // 这里设置的是一个consumer的消费策略
        // CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
        // CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
        // CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        // 设置consumer所订阅的Topic和Tag,*代表全部的Tag
        consumer.subscribe("TopicTest", "*");

        // 设置一个Listener,主要进行消息的逻辑处理
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // CONSUME_SUCCESS 消费成功  
                // RECONSUME_LATER 消费失败,需要稍后重新消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 调用start()方法启动consumer
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}