优势

应用解藕


异步提速

削峰填谷

劣势

RocketMQ
解决缺点
工作流程

-
期间三个集群会持续向命名服务器发送心跳,确保自己状态更新
-
命名服务器类似于Nacos,负载均衡指挥部

安装
官网快速上手
集群部署
若您直接下载 RocketMQ 的二进制包,则可以直接进入二进制包的目录中:
Terminal window
$ cd rocketmq-all-5.2.0-bin-release
若您选择从源码开始体验,且已经在本地自行编译完成了二进制文件,则可进入源码目录下的 distribution/target 中的二进制文件目录:
Terminal window
$ cd distribution/target/rocketmq-5.2.0/rocketmq-5.2.0
后续体验中的所有指令均在上述目录下执行。
启动 NameServer
安装完 RocketMQ 包后,我们执行下面的指令启动 NameServer:
Terminal window
### 启动namesrv$
nohup sh bin/mqnamesrv &
### 验证namesrv是否启动成功$ cat ~/logs/rocketmqlogs/namesrv.log
若一切正常,则会在执行完上述命令后,输出如下内容:
Terminal window
The Name Server boot success...
启动 Broker + Proxy
NameServer 成功启动后,我们启动 Broker 和 Proxy。这里我们使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。5.x 版本也支持 Broker 和 Proxy 分离部署以实现更灵活的集群能力。详情参考部署教程。
Terminal window
### 先启动broker$
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a$
cat ~/logs/rocketmqlogs/proxy.log
我们可以在执行完上述命令后看到 proxy.log 中的内容,若看到如下信息,则表明 broker 已成功启动:
Terminal window
The broker[broker-a,192.169.1.2:10911] boot success...
至此,一个单节点副本的 RocketMQ 集群已经部署起来了,我们可以利用脚本进行简单的消息收发。
消息收发
工具测试消息收发
在进行工具测试消息收发之前,我们需要告诉客户端 NameServer 的地址,RocketMQ 有多种方式在客户端中设置 NameServer 地址,这里我们利用环境变量 NAMESRV_ADDR。
Terminal window
$ export NAMESRV_ADDR=localhost:9876
完成环境变量配置后,可以在命令行输入如下指令,启动生产者:
Terminal window
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
若生产成功,会输出如下内容:
Terminal window
SendResult [sendStatus=SEND_OK, msgId= ...
消息生产完成后,该消息便已经保存在本地 Broker 的存储中了。接下去再输入命令,启动消费者:
Terminal window
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
若消费成功,则会出现如下的输出内容:
Terminal window
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
SDK 测试消息收发
工具测试完成后,我们可以尝试使用 SDK 收发消息。使用 SDK 进行消息收发的教程较为复杂,若有一定工程代码编写、运行经验,可以参考该教程自行尝试,本文不再赘述。
配套运维能力
mqadmin 工具介绍
mqadmin 是 RocketMQ 配套的运维工具,能够非常简便的查看集群状态,创建、修改 topic 等元数据。
该工具的使用方式可以参考该说明文档。本文档仅举例如何使用该工具进行集群状态查看。
查看集群状态
对于刚刚启动的 Broker,我们可以尝试使用 mqadmin 工具对它状态进行查看,在二进制包目录下输入如下命令: Terminal window
sh bin/mqadmin clusterlist -n localhost:9876
若集群运行正常,则输出如下:

在该输出中,您可以看到该 NameServer 下的集群名称、Broker 名称、对应 IP 地址、Broker 代码版本、消息生产速度、消息消费速度、定时消息总数、刷盘等待时长、消息保留时长、磁盘使用率等信息。
善用 mqadmin 工具,将能在集群故障时快速定位问题所在,并有能力人工介入作恢复。
关闭集群
当上述测试均完成后,您需要将集群进程(NameServer, Proxy, Broker)进行关闭,关闭方法如下:
Terminal window
# 关闭Broker$
sh bin/mqshutdown broker
# 若一切正常,则会输出如下内容:
# The mqbroker(36695) is running...# Send shutdown request to mqbroker with proxy enable OK(36695)
# 关闭NameServer
$
sh bin/mqshutdown namesrv
# 若一切正常,则会输出如下内容:# The mqnamesrv(36664) is running...# Send shutdown request to mqnamesrv(36664) OK
控制台
cd rocketmq-dashboard
//(非必要指令)
mvn clean package -DskipTests
java -jar target/rocketmq-dashboard-2.1.1-SNAPSHOT.jar --rocketmq.config.namesrvAddr=localhost:9876
消息发送
One2One
Producer
public class Producer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
// 谁来发?
DefaultMQProducer producer = new DefaultMQProducer("group01");
// 发给谁?
producer.setNamesrvAddr("localhost:9876");
// 增加发送超时时间
producer.setSendMsgTimeout(10000);
producer.start();
// 怎么发?
// 发什么?
String msg = "hello world";
Message message = new Message("topic1", "tag1", msg.getBytes());
SendResult sendResult = producer.send(message);
// 发的结果是什么?
System.out.println(sendResult);
// 打扫战场
producer.shutdown();
}
}Consumer
public class Consumer {
public static void main(String[] args) throws MQClientException {
//谁来收
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group01");
//从哪收
consumer.setNamesrvAddr("localhost:9876");
//监听哪个消息队列
consumer.subscribe("topic1","*");
//处理业务流程
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : msgs) {
System.out.println(msg);
byte[] body = msg.getBody();
System.out.println(new String(body));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者消费起来了");
//千万不要关掉消费者
}
}
One2Many
同Topic消费者中 同组平摊,异组广播
若想同组内广播,改变消费模式(不用负载均衡模式)
consumer.setMessageModel(MessageModel.BROADCASTING
消息类别
同步消息
特征:即时性较强,重要的消息必须立刻有回执
异步消息
特征:即时性较弱,但需要回执消息
public class Producer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
// 谁来发?
DefaultMQProducer producer = new DefaultMQProducer("group01");
// 发给谁?
producer.setNamesrvAddr("localhost:9876");
// 增加发送超时时间
producer.setSendMsgTimeout(10000);
producer.start();
// 怎么发?
// 发什么?
for (int i = 0; i < 10; i++) {
String msg = "hello world yibu"+i;
Message message = new Message("topic6", "tag1", msg.getBytes());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功: " + sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送异常: " + throwable);
}
});
// 发的结果是什么?
System.out.println("异步发送完成");
}
// // 等待发送完成后再关闭生产者
// Thread.sleep(5000);
// // 打扫战场
// producer.shutdown();
}
}单向消息
特征:不需要有回执的消息,例如日志类消息
producer.sendOneway(msessage)
延时消息
message.setDelayTimeLevel(3);
SendResult sendResult = producer.send(message);批量消息

注意点:
- 尽量相同topic
- 相同的waitStoreMsgOK
- 不能包括延时消息
- 消息总长度不超过4M
消息过滤
tag过滤
按照tag过滤信息。
生产者
Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 2").getBytes("UTF-8"));
消费者
//接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag
consumer.subscribe("topic6","tag1 || tag2");
sql过滤
给消息设置属性(像写sql语句一样过滤)
基本语法
- 数值比较,比如:>,>=,<,⇐,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:‘abc’,必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
生产者
//为消息添加属性
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");
消费者
//使用消息选择器来过滤对应的属性,语法格式为类SQL语法
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
consumer.subscribe("topic6", MessageSelector.bySql("name = 'litiedan'"));
注意:SQL过滤需要依赖服务器的功能支持,在broker.conf配置文件中添加对应的功能项,并开启对应功能
enablePropertyFilter=true
重启broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
SpringBoot整合
配置pom.xml文件 配置yml文件
生产者
@RestController
@RequestMapping("/demo")
public class SendController {
@Autowired
RocketMQTemplate rocketMQTemplate;
@GetMapping("/send")
public String send(){
//发送逻辑
String msg = "hello world springMQ!";
rocketMQTemplate.convertAndSend("topic",msg);
return "success!";
}
}
消费者 (使用Service注解)
@Service
@RocketMQMessageListener(topic = "topic10",consumerGroup = "group01")
public class DemoConsumer implements RocketMQListener<String> {
//业务逻辑
@Override
public void onMessage(String s) {
System.out.println(s);
}
}Spring中的实现
消息类别
rocketMQTemplate.syncSend("topic10",msg);
- 异步消息
rocketMQTemplate.asyncSend("topic10",msg,new SendCallback(){...},1000(timeout));
- 单向消息
rocketMQTemplate.sendOneWay("topic10",msg);
- 延时消息
rocketMQTemplate.syncSend("topic10",msg,delayLevel);
- 批量消息

消息过滤
Tag过滤
消费者
@RocketMQMessageListener(topic = "topic9",consumerGroup = "group1",selectorExpression = "tag1")
Sql过滤
@RocketMQMessageListener(topic = "topic9",consumerGroup = "group1",selectorExpression = "age>18"
,selectorType= SelectorType.SQL92)
改消息模式
@RocketMQMessageListener(topic = "topic9",consumerGroup = "group1",messageModel = MessageModel.BROADCASTING)
消息顺序
消息错乱


Consumer
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt msg : msgs) {
System.out.println(msg);
byte[] body = msg.getBody();
System.out.println(new String(body));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});Producer
...
...
orderDemo = new OrderStep();
orderDemo.setOrderId(1L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(3L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(1L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
for (OrderStep orderStep : orderList) {
Message message = new Message("topic12","tag1",orderStep.toString().getBytes());
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//orderID对应一个确定的队列 ,1,2,4,1000,3000
//队列数
int size = list.size();
//取模
int orderId = (int) (orderStep.getOrderId());
int i = orderId%size;
MessageQueue messageQueue = list.get(i);
return messageQueue;
}
}, null);
}事务消息


成功下的消费者
public class Producer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
// 谁来发?
TransactionMQProducer producer = new TransactionMQProducer("group01");
// 发给谁?
producer.setNamesrvAddr("localhost:9876");
//设置事务的监听
producer.setTransactionListener(new TransactionListener() {
//正常事务过程
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//吧消息保存在,mysql中
//根据数据库返回的状态
System.out.println("执行正常事务过程");
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return null;
}
});
// 增加发送超时时间
producer.setSendMsgTimeout(10000);
producer.start();
// 怎么发?
// 发什么?
String msg = "hello world transaction";
Message message = new Message("topic13", "tag1", msg.getBytes());
// 发的结果是什么?
//发送事务消息
TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);
System.out.println(sendResult);
}回滚事务下的消费者
return LocalTransactionState.ROLLBACK_MESSAGE;事务补偿过程
producer.setTransactionListener(new TransactionListener() {
//正常事务过程
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//吧消息保存在,mysql中
//根据数据库返回的状态
System.out.println("执行正常事务过程");
// return LocalTransactionState.COMMIT_MESSAGE;
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("执行事务补偿过程");
return LocalTransactionState.COMMIT_MESSAGE;
}
});集群搭建




操作步骤:注意两台机器同时操作
-
配置服务器环境:
vim /etc/hosts# nameserver 192.168.200.129 rocketmq-nameserver1 192.168.200.130 rocketmq-nameserver2 # broker 192.168.200.129 rocketmq-master1 192.168.200.129 rocketmq-slave2 192.168.200.130 rocketmq-master2 192.168.200.130 rocketmq-slave1 -
配置完毕后重启网卡,应用配置
systemctl restart network
-
关闭防火墙或者开发指定端口对外提供服务
# 关闭防火墙 systemctl stop firewalld.service # 查看防火墙的状态 firewall-cmd --state # 禁止firewall开机启动 systemctl disable firewalld.service -
配置服务器环境
vim /etc/profile#set rocketmq ROCKETMQ_HOME=/rocketmq PATH=$PATH:$ROCKETMQ_HOME/bin export ROCKETMQ_HOME PATH -
配置完毕后重启网卡,应用配置
source /etc/profile -
将rocketmq解压到/rocketmq
-
创建集群服务器的数据存储目录
#master 数据存储目录 mkdir /rocketmq/store mkdir /rocketmq/store/commitlog mkdir /rocketmq/store/consumequeue mkdir /rocketmq/store/index #slave 数据存储目录 mkdir /rocketmq/store-slave mkdir /rocketmq/store-slave/commitlog mkdir /rocketmq/store-slave/consumequeue mkdir /rocketmq/store-slave/index -
注意master与slave如果在同一个虚拟机中部署,需要将存储目录区分开
-
第一台129机器上
cd /rocketmq/conf/2m-2s-syncvim broker-a.properties#所属集群名字 brokerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=0 #nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/rocketmq/store #commitLog 存储路径 storePathCommitLog=/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=SYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128vim broker-b-s.properties#所属集群名字 brokerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-b #0 表示 Master,>0 表示 Slave brokerId=1 #nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=11011 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/rocketmq/store-slave #commitLog 存储路径 storePathCommitLog=/rocketmq/store-slave/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/rocketmq/store-slave/consumequeue #消息索引存储路径 storePathIndex=/rocketmq/store-slave/index #checkpoint 文件存储路径 storeCheckpoint=/rocketmq/store-slave/checkpoint #abort 文件存储路径 abortFile=/rocketmq/store-slave/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=SLAVE #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128rm -rf broker-a-s.properties rm -rf broker-b.properties第二台130机器上
cd /rocketmq/conf/2m-2s-syncvim broker-b.properties#所属集群名字 brokerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-b #0 表示 Master,>0 表示 Slave brokerId=0 #nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/rocketmq/store #commitLog 存储路径 storePathCommitLog=/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/rocketmq/store/index #checkpoint 文件存储路径 storeCheckpoint=/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=SYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=SYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128vim broker-a-s.properties#所属集群名字 brokerClusterName=rocketmq-cluster #broker名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=1 #nameServer地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=11011 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/rocketmq/store-slave #commitLog 存储路径 storePathCommitLog=/rocketmq/store-slave/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/rocketmq/store-slave/consumequeue #消息索引存储路径 storePathIndex=/rocketmq/store-slave/index #checkpoint 文件存储路径 storeCheckpoint=/rocketmq/store-slave/checkpoint #abort 文件存储路径 abortFile=/rocketmq/store-slave/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制Master #- SYNC_MASTER 同步双写Master #- SLAVE brokerRole=SLAVE #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128rm -rf broker-a.properties rm -rf broker-b-s.properties -
检查启动内存
vim /rocketmq/bin/runbroker.sh# 开发环境配置 JVM Configuration JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m" -
启动服务器(在bin目录下依次启动)
129上
nohup sh mqnamesrv &nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a.properties &nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties &
130上
nohup sh mqnamesrv &
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties &
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b.properties &
4.3 rocketmq-console集群监控平台搭建
- incubator-rocketmq-externals是一个基于rocketmq的基础之上扩展开发的开源项目
- 获取地址:https://github.com/apache/rocketmq-externals
- rocketmq-console是一款基于java环境开发的(springboot)的管理控制台工具
高级特性
消息存储

- 存储的介质

-
高效存储的第一个原因(磁盘读写方式)

-
第二个原因(零拷贝)

存储的物理地址

刷盘机制
-
同步刷盘

-
异步刷盘(不第一时间写入硬盘)


高可用性


负载均衡

消息重发
-
有序消息

-
无序消息

死信消息
默认重发16次进入死信队列

- 死信处理
在监控平台中,通过查找死信,获取死信的message ID,然后通过id对死信精准消费
重复消费

- 消息幂等性
