优势

应用解藕

异步提速

削峰填谷

劣势

RocketMQ

解决缺点

工作流程

  • 期间三个集群会持续向命名服务器发送心跳,确保自己状态更新

  • 命名服务器类似于Nacos,负载均衡指挥部

安装

官网快速上手

集群部署

若您直接下载 RocketMQ 的二进制包,则可以直接进入二进制包的目录中:

Terminal window

$ cd rocketmq-all-5.2.0-bin-release

若您选择从源码开始体验,且已经在本地自行编译完成了二进制文件,则可进入源码目录下的 distribution/target 中的二进制文件目录:

Terminal window

$ cd distribution/target/rocketmq-5.2.0/rocketmq-5.2.0

后续体验中的所有指令均在上述目录下执行。

启动 NameServer

安装完 RocketMQ 包后,我们执行下面的指令启动 NameServer:

Terminal window

### 启动namesrv$ 
nohup sh bin/mqnamesrv &
### 验证namesrv是否启动成功$ cat ~/logs/rocketmqlogs/namesrv.log

若一切正常,则会在执行完上述命令后,输出如下内容:

Terminal window

The Name Server boot success...

启动 Broker + Proxy

NameServer 成功启动后,我们启动 Broker 和 Proxy。这里我们使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。5.x 版本也支持 Broker 和 Proxy 分离部署以实现更灵活的集群能力。详情参考部署教程

Terminal window

### 先启动broker$ 
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a$ 
cat ~/logs/rocketmqlogs/proxy.log

我们可以在执行完上述命令后看到 proxy.log 中的内容,若看到如下信息,则表明 broker 已成功启动:

Terminal window

The broker[broker-a,192.169.1.2:10911] boot success...

至此,一个单节点副本的 RocketMQ 集群已经部署起来了,我们可以利用脚本进行简单的消息收发。

消息收发

工具测试消息收发

在进行工具测试消息收发之前,我们需要告诉客户端 NameServer 的地址,RocketMQ 有多种方式在客户端中设置 NameServer 地址,这里我们利用环境变量 NAMESRV_ADDR。

Terminal window

$ export NAMESRV_ADDR=localhost:9876

完成环境变量配置后,可以在命令行输入如下指令,启动生产者:

Terminal window

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

若生产成功,会输出如下内容:

Terminal window

SendResult [sendStatus=SEND_OK, msgId= ...

消息生产完成后,该消息便已经保存在本地 Broker 的存储中了。接下去再输入命令,启动消费者:

Terminal window

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

若消费成功,则会出现如下的输出内容:

Terminal window

ConsumeMessageThread_%d Receive New Messages: [MessageExt...

SDK 测试消息收发

工具测试完成后,我们可以尝试使用 SDK 收发消息。使用 SDK 进行消息收发的教程较为复杂,若有一定工程代码编写、运行经验,可以参考该教程自行尝试,本文不再赘述。

配套运维能力

mqadmin 工具介绍

mqadmin 是 RocketMQ 配套的运维工具,能够非常简便的查看集群状态,创建、修改 topic 等元数据。
该工具的使用方式可以参考该说明文档。本文档仅举例如何使用该工具进行集群状态查看。

查看集群状态

对于刚刚启动的 Broker,我们可以尝试使用 mqadmin 工具对它状态进行查看,在二进制包目录下输入如下命令: Terminal window

sh bin/mqadmin clusterlist -n localhost:9876

若集群运行正常,则输出如下:
image.png
在该输出中,您可以看到该 NameServer 下的集群名称、Broker 名称、对应 IP 地址、Broker 代码版本、消息生产速度、消息消费速度、定时消息总数、刷盘等待时长、消息保留时长、磁盘使用率等信息。
善用 mqadmin 工具,将能在集群故障时快速定位问题所在,并有能力人工介入作恢复。

关闭集群

当上述测试均完成后,您需要将集群进程(NameServer, Proxy, Broker)进行关闭,关闭方法如下:

Terminal window

# 关闭Broker$ 
sh bin/mqshutdown broker
# 若一切正常,则会输出如下内容:
# The mqbroker(36695) is running...# Send shutdown request to mqbroker with proxy enable OK(36695)
# 关闭NameServer
$ 
sh bin/mqshutdown namesrv
# 若一切正常,则会输出如下内容:# The mqnamesrv(36664) is running...# Send shutdown request to mqnamesrv(36664) OK

控制台

cd rocketmq-dashboard

//(非必要指令)
mvn clean package -DskipTests

java -jar target/rocketmq-dashboard-2.1.1-SNAPSHOT.jar --rocketmq.config.namesrvAddr=localhost:9876


消息发送

One2One

Producer

public class Producer {  
  
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {  
        // 谁来发?  
        DefaultMQProducer producer = new DefaultMQProducer("group01");  
//        发给谁?  
        producer.setNamesrvAddr("localhost:9876");  
        // 增加发送超时时间  
        producer.setSendMsgTimeout(10000);  
        producer.start();  
//        怎么发?  
  
//        发什么?  
        String msg = "hello world";  
        Message message = new Message("topic1", "tag1", msg.getBytes());  
        SendResult sendResult = producer.send(message);  
//        发的结果是什么?  
        System.out.println(sendResult);  
//        打扫战场  
        producer.shutdown();  
  
    }  
  
}

Consumer

public class Consumer {  
    public static void main(String[] args) throws MQClientException {  
        //谁来收  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group01");  
        //从哪收  
        consumer.setNamesrvAddr("localhost:9876");  
  
        //监听哪个消息队列  
        consumer.subscribe("topic1","*");  
        //处理业务流程  
        consumer.registerMessageListener(new MessageListenerConcurrently() {  
            @Override  
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {  
  
                for (MessageExt msg : msgs) {  
                    System.out.println(msg);  
                    byte[] body = msg.getBody();  
                    System.out.println(new String(body));  
                }  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
            }  
        });  
  
        consumer.start();  
        System.out.println("消费者消费起来了");  
        //千万不要关掉消费者  
    }  
}
 

One2Many

同Topic消费者中 同组平摊,异组广播

若想同组内广播,改变消费模式(不用负载均衡模式) consumer.setMessageModel(MessageModel.BROADCASTING

消息类别

同步消息

特征:即时性较强,重要的消息必须立刻有回执

异步消息

特征:即时性较弱,但需要回执消息

public class Producer {  
  
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {  
        // 谁来发?  
        DefaultMQProducer producer = new DefaultMQProducer("group01");  
//        发给谁?  
        producer.setNamesrvAddr("localhost:9876");  
        // 增加发送超时时间  
        producer.setSendMsgTimeout(10000);  
  
        producer.start();  
//        怎么发?  
  
//        发什么?  
        for (int i = 0; i < 10; i++) {  
            String msg = "hello world yibu"+i;  
            Message message = new Message("topic6", "tag1", msg.getBytes());  
            producer.send(message, new SendCallback() {  
                @Override  
                public void onSuccess(SendResult sendResult) {  
                    System.out.println("发送成功: " + sendResult);  
                }  
  
                @Override  
                public void onException(Throwable throwable) {  
                    System.out.println("发送异常: " + throwable);  
                }  
            });  
//        发的结果是什么?  
            System.out.println("异步发送完成");  
        }  
  
//        // 等待发送完成后再关闭生产者  
//        Thread.sleep(5000);  
//        // 打扫战场  
//        producer.shutdown();  
  
    }  
  
	}

单向消息

特征:不需要有回执的消息,例如日志类消息

producer.sendOneway(msessage)

延时消息

message.setDelayTimeLevel(3);
SendResult sendResult = producer.send(message);

批量消息

注意点:

  1. 尽量相同topic
  2. 相同的waitStoreMsgOK
  3. 不能包括延时消息
  4. 消息总长度不超过4M

消息过滤

tag过滤

按照tag过滤信息。

生产者

Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 2").getBytes("UTF-8"));

消费者

//接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag
consumer.subscribe("topic6","tag1 || tag2");

sql过滤

给消息设置属性(像写sql语句一样过滤)

基本语法

  • 数值比较,比如:>,>=,<,,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:‘abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUE 或 FALSE

生产者

//为消息添加属性
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");

消费者

//使用消息选择器来过滤对应的属性,语法格式为类SQL语法
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
consumer.subscribe("topic6", MessageSelector.bySql("name = 'litiedan'"));

注意:SQL过滤需要依赖服务器的功能支持,在broker.conf配置文件中添加对应的功能项,并开启对应功能

enablePropertyFilter=true

重启broker

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

SpringBoot整合

配置pom.xml文件 配置yml文件

生产者

 
@RestController  
@RequestMapping("/demo")  
public class SendController {  
  
    @Autowired  
    RocketMQTemplate rocketMQTemplate;  
  
    @GetMapping("/send")  
    public String send(){  
  
        //发送逻辑  
        String msg = "hello world springMQ!";  
        rocketMQTemplate.convertAndSend("topic",msg);  
  
        return "success!";  
    }  
}
 

消费者 (使用Service注解)

@Service  
@RocketMQMessageListener(topic = "topic10",consumerGroup = "group01")  
public class DemoConsumer implements RocketMQListener<String> {  
    //业务逻辑  
    @Override  
    public void onMessage(String s) {  
        System.out.println(s);  
    }  
}

Spring中的实现

消息类别

rocketMQTemplate.syncSend("topic10",msg);

  • 异步消息

rocketMQTemplate.asyncSend("topic10",msg,new SendCallback(){...},1000(timeout));

  • 单向消息

rocketMQTemplate.sendOneWay("topic10",msg);

  • 延时消息

rocketMQTemplate.syncSend("topic10",msg,delayLevel);

  • 批量消息

消息过滤

Tag过滤

消费者

@RocketMQMessageListener(topic = "topic9",consumerGroup = "group1",selectorExpression = "tag1")

Sql过滤

@RocketMQMessageListener(topic = "topic9",consumerGroup = "group1",selectorExpression = "age>18"
        ,selectorType= SelectorType.SQL92)

改消息模式

@RocketMQMessageListener(topic = "topic9",consumerGroup = "group1",messageModel = MessageModel.BROADCASTING)

消息顺序

消息错乱

Consumer

consumer.registerMessageListener(new MessageListenerOrderly() {  
    @Override  
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {  
        for (MessageExt msg : msgs) {  
            System.out.println(msg);  
            byte[] body = msg.getBody();  
            System.out.println(new String(body));  
        }  
        return ConsumeOrderlyStatus.SUCCESS;  
  
    }  
});

Producer

...
...
orderDemo = new OrderStep();  
orderDemo.setOrderId(1L);  
orderDemo.setDesc("推送");  
orderList.add(orderDemo);  
  
orderDemo = new OrderStep();  
orderDemo.setOrderId(3L);  
orderDemo.setDesc("完成");  
orderList.add(orderDemo);  
  
orderDemo = new OrderStep();  
orderDemo.setOrderId(1L);  
orderDemo.setDesc("完成");  
orderList.add(orderDemo);  
  
for (OrderStep orderStep : orderList) {  
    Message message = new Message("topic12","tag1",orderStep.toString().getBytes());  
    SendResult sendResult = producer.send(message, new MessageQueueSelector() {  
        @Override  
        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {  
            //orderID对应一个确定的队列 ,1,2,4,1000,3000  
            //队列数  
            int size = list.size();  
            //取模  
            int orderId = (int) (orderStep.getOrderId());  
            int i = orderId%size;  
  
            MessageQueue messageQueue = list.get(i);  
            return messageQueue;  
        }  
    }, null);  
}

事务消息

成功下的消费者

public class Producer {  
  
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {  
        // 谁来发?  
        TransactionMQProducer producer = new TransactionMQProducer("group01");  
//        发给谁?  
        producer.setNamesrvAddr("localhost:9876");  
  
        //设置事务的监听  
        producer.setTransactionListener(new TransactionListener() {  
            //正常事务过程  
            @Override  
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {  
                //吧消息保存在,mysql中  
                //根据数据库返回的状态  
                System.out.println("执行正常事务过程");  
                return LocalTransactionState.COMMIT_MESSAGE;  
            }  
            @Override  
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {  
                return null;  
            }  
        });  
        // 增加发送超时时间  
        producer.setSendMsgTimeout(10000);  
        producer.start();  
//        怎么发?  
//        发什么?  
        String msg = "hello world transaction";  
        Message message = new Message("topic13", "tag1", msg.getBytes());  
//        发的结果是什么?  
        //发送事务消息  
        TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);  
        System.out.println(sendResult);  
    }

回滚事务下的消费者

return LocalTransactionState.ROLLBACK_MESSAGE;

事务补偿过程

        producer.setTransactionListener(new TransactionListener() {  
            //正常事务过程  
            @Override  
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {  
                //吧消息保存在,mysql中  
                //根据数据库返回的状态  
                System.out.println("执行正常事务过程");  
//                return LocalTransactionState.COMMIT_MESSAGE;  
                return LocalTransactionState.UNKNOW;  
            }  
  
            @Override  
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {  
                System.out.println("执行事务补偿过程");  
                return LocalTransactionState.COMMIT_MESSAGE;  
            }  
        });

集群搭建

操作步骤:注意两台机器同时操作

  1. 配置服务器环境:

    vim /etc/hosts
    
    # nameserver
    192.168.200.129 rocketmq-nameserver1
    192.168.200.130 rocketmq-nameserver2
    # broker
    192.168.200.129 rocketmq-master1
    192.168.200.129 rocketmq-slave2
    192.168.200.130 rocketmq-master2
    192.168.200.130 rocketmq-slave1
    
  2. 配置完毕后重启网卡,应用配置

systemctl restart network
  1. 关闭防火墙或者开发指定端口对外提供服务

    # 关闭防火墙
    systemctl stop firewalld.service
    # 查看防火墙的状态
    firewall-cmd --state
    # 禁止firewall开机启动
    systemctl disable firewalld.service
    
  2. 配置服务器环境

    vim /etc/profile
    
    #set rocketmq
    ROCKETMQ_HOME=/rocketmq
    PATH=$PATH:$ROCKETMQ_HOME/bin
    export ROCKETMQ_HOME PATH
    
  3. 配置完毕后重启网卡,应用配置

    source /etc/profile
    
  4. 将rocketmq解压到/rocketmq

  5. 创建集群服务器的数据存储目录

    #master 数据存储目录
    mkdir /rocketmq/store
    mkdir /rocketmq/store/commitlog
    mkdir /rocketmq/store/consumequeue
    mkdir /rocketmq/store/index
    
    #slave 数据存储目录
    mkdir /rocketmq/store-slave
    mkdir /rocketmq/store-slave/commitlog
    mkdir /rocketmq/store-slave/consumequeue
    mkdir /rocketmq/store-slave/index
    
  6. 注意master与slave如果在同一个虚拟机中部署,需要将存储目录区分开

  7. 第一台129机器上

    cd /rocketmq/conf/2m-2s-sync
    
    vim broker-a.properties
    
    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-a
    #0 表示 Master,>0 表示 Slave
    brokerId=0
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/rocketmq/store
    #commitLog 存储路径
    storePathCommitLog=/rocketmq/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/rocketmq/store/consumequeue
    #消息索引存储路径
    storePathIndex=/rocketmq/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/rocketmq/store/checkpoint
    #abort 文件存储路径
    abortFile=/rocketmq/store/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SYNC_MASTER
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=SYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    vim broker-b-s.properties
    
    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-b
    #0 表示 Master,>0 表示 Slave
    brokerId=1
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=11011
    #删除文件时间点,默认凌晨 4
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/rocketmq/store-slave
    #commitLog 存储路径
    storePathCommitLog=/rocketmq/store-slave/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/rocketmq/store-slave/consumequeue
    #消息索引存储路径
    storePathIndex=/rocketmq/store-slave/index
    #checkpoint 文件存储路径
    storeCheckpoint=/rocketmq/store-slave/checkpoint
    #abort 文件存储路径
    abortFile=/rocketmq/store-slave/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SLAVE
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    rm -rf broker-a-s.properties
    rm -rf broker-b.properties
    

    第二台130机器上

    cd /rocketmq/conf/2m-2s-sync
    
    vim broker-b.properties
    
    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-b
    #0 表示 Master,>0 表示 Slave
    brokerId=0
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/rocketmq/store
    #commitLog 存储路径
    storePathCommitLog=/rocketmq/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/rocketmq/store/consumequeue
    #消息索引存储路径
    storePathIndex=/rocketmq/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/rocketmq/store/checkpoint
    #abort 文件存储路径
    abortFile=/rocketmq/store/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SYNC_MASTER
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=SYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    vim broker-a-s.properties
    
    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-a
    #0 表示 Master,>0 表示 Slave
    brokerId=1
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=11011
    #删除文件时间点,默认凌晨 4
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/rocketmq/store-slave
    #commitLog 存储路径
    storePathCommitLog=/rocketmq/store-slave/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/rocketmq/store-slave/consumequeue
    #消息索引存储路径
    storePathIndex=/rocketmq/store-slave/index
    #checkpoint 文件存储路径
    storeCheckpoint=/rocketmq/store-slave/checkpoint
    #abort 文件存储路径
    abortFile=/rocketmq/store-slave/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SLAVE
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    rm -rf broker-a.properties
    rm -rf broker-b-s.properties
    
  8. 检查启动内存

    vim /rocketmq/bin/runbroker.sh
    
    # 开发环境配置 JVM Configuration
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
    
  9. 启动服务器(在bin目录下依次启动)

    129上

    nohup sh mqnamesrv &
    
    nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a.properties &
    
    nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties &
    

​ 130上

nohup sh mqnamesrv &
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties &
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b.properties &

4.3 rocketmq-console集群监控平台搭建

  1. incubator-rocketmq-externals是一个基于rocketmq的基础之上扩展开发的开源项目
  2. 获取地址:https://github.com/apache/rocketmq-externals
  3. rocketmq-console是一款基于java环境开发的(springboot)的管理控制台工具

高级特性

消息存储

  • 存储的介质

  • 高效存储的第一个原因(磁盘读写方式)

  • 第二个原因(零拷贝)

存储的物理地址

刷盘机制

  • 同步刷盘

  • 异步刷盘(不第一时间写入硬盘)

高可用性

负载均衡

消息重发

  • 有序消息

  • 无序消息

死信消息

默认重发16次进入死信队列

  • 死信处理

在监控平台中,通过查找死信,获取死信的message ID,然后通过id对死信精准消费

重复消费

  • 消息幂等性