7、启动RocketMQ
启动就比较简单了,直接调用bin目录下的脚本就行。只是启动之前要注意看下他们的JVM内存配置,默认的配置都比较高。
1、先启动nameServer。
修改三个节点上的bin/runserver.sh,调整里面的jvm内存配置。找到下面这一行调整下内存
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
复制代码
直接在三个节点上启动nameServer。
nohup bin/mqadminsrv &
复制代码
启动完成后,在nohup.out里看到这一条关键日志就是启动成功了。
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
使用jps指令可以看到一个NamesrvStartup进程。
这里也看到,RocketMQ在runserver.sh中是使用的CMS垃圾回收期,而在runbroker.sh中使用的是G1垃圾回收期。
2、再启动broker
启动broker是使用的mqbroker指令,只是注意启动broker时需要通过-c 指定对应的配置文件。
在worker2上启动broker-a的master节点和broker-b的slave节点
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties &
复制代码
在work3上启动broker-b的master节点和broker-a的slave节点
nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties &
nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties &
复制代码
启动slave时,如果遇到报错 Lock failed,MQ already started ,那是因为有多个实例共用了同一个storePath造成的,这时就需要调整store的路径。
3、启动状态检查
使用jps指令,能看到一个NameSrvStartup进程和两个BrokerStartup进程。
nohup.out中也有启动成功的日志。
对应的日志文件:
# 查看nameServer日志
tail -500f ~/logs/rocketmqlogs/namesrv.log
# 查看broker日志
tail -500f ~/logs/rocketmqlogs/broker.log
复制代码
4、测试mqadmin管理工具
RocketMQ的源代码中并没有为我们提供类似于Nacos或者RabbitMQ那样的控制台,只提供了一个mqadmin指令来管理RocketMQ,命令在bin目录下。使用方式是 ./mqadmin {command} {args}
所有指令如下:
Topic相关:
| 名称 | 含义 | 命令选项 | 说明 |
| updateTopic | 创建更新Topic配置 | -b | Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port |
| -c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询) |
||
| -h- | 打印帮助 | ||
| -n | NameServer服务地址,格式 ip:port | ||
| -p | 指定新topic的读写权限( W=2|R=4|WR=6 ) | ||
| -r | 可读队列数(默认为 8) | ||
| -w | 可写队列数(默认为 8) | ||
| -t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) |
||
| deleteTopic | 删除Topic | -c | cluster 名称,表示删除某集群下的某个 topic (集群 可通过 clusterList 查询) |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) |
||
| topicList | 查看 Topic 列表信息 | -h | 打印帮助 |
| -c | 不配置-c只返回topic列表,增加-c返回clusterName, topic, consumerGroup信息,即topic的所属集群和订阅关系,没有参数 |
||
| -n | NameServer 服务地址,格式 ip:port | ||
| topicRoute | 查看 Topic 路由信息 | -t | topic 名称 |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| topicStatus | 查看 Topic 消息队列offset | -t | topic 名称 |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| topicClusterList | 查看 Topic 所在集群列表 | -t | topic 名称 |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| updateTopicPerm | 更新 Topic 读写权限 | -t | topic 名称 |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -b | Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port |
||
| -p | 指定新 topic 的读写权限( W=2|R=4|WR=6 ) | ||
| -c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询),-b优先,如果没有-b,则对集群中所有Broker执行命令 |
||
| updateOrderConf | 从NameServer上创建、删除、获取特定命名空间的kv配置,目前还未启用 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic,键 | ||
| -v | orderConf,值 | ||
| -m | method,可选get、put、delete | ||
| allocateMQ | 以平均负载算法计算消费者列表负载消息队列的负载结果 | -t | topic 名称 |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -i | ipList,用逗号分隔,计算这些ip去负载Topic的消息队列 | ||
| statsAll | 打印Topic订阅关系、TPS、积累量、24h读写总量等信息 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -a | 是否只打印活跃topic | ||
| -t | 指定topic |
集群相关
| 名称 | 含义 | 命令选项 | 说明 |
| clusterList | 查看集群信息,集群、BrokerName、BrokerId、TPS等信息 | -m | 打印更多信息 (增加打印出如下信息 #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday) |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -i | 打印间隔,单位秒 | ||
| clusterRT | 发送消息检测集群各Broker RT。消息发往${BrokerName} Topic。 | -a | amount,每次探测的总数,RT = 总时间 / amount |
| -s | 消息大小,单位B | ||
| -c | 探测哪个集群 | ||
| -p | 是否打印格式化日志,以|分割,默认不打印 | ||
| -h | 打印帮助 | ||
| -m | 所属机房,打印使用 | ||
| -i | 发送间隔,单位秒 | ||
| -n | NameServer 服务地址,格式 ip:port |
Broker相关
| 名称 | 含义 | 命令选项 | 说明 |
| updateBrokerConfig | 更新 Broker 配置文件,会修改Broker.conf | -b | Broker 地址,格式为ip:port |
| -c | cluster 名称 | ||
| -k | key 值 | ||
| -v | value 值 | ||
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| brokerStatus | 查看 Broker 统计信息、运行状态(你想要的信息几乎都在里面) | -b | Broker 地址,地址为ip:port |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| brokerConsumeStats | Broker中各个消费者的消费情况,按Message Queue维度返回Consume Offset,Broker Offset,Diff,TImestamp等信息 |
-b | Broker 地址,地址为ip:port |
| -t | 请求超时时间 | ||
| -l | diff阈值,超过阈值才打印 | ||
| -o | 是否为顺序topic,一般为false | ||
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| getBrokerConfig | 获取Broker配置 | -b | Broker 地址,地址为ip:port |
| -n | NameServer 服务地址,格式 ip:port | ||
| wipeWritePerm | 从NameServer上清除 Broker写权限 | -b | Broker 地址,地址为ip:port |
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 | ||
| cleanExpiredCQ | 清理Broker上过期的Consume Queue,如果手动减少对列数可能产生过期队列 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -b | Broker 地址,地址为ip:port | ||
| -c | 集群名称 | ||
| cleanUnusedTopic | 清理Broker上不使用的Topic,从内存中释放Topic的Consume Queue,如果手动删除Topic会产生不使用的Topic |
-n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -b | Broker 地址,地址为ip:port | ||
| -c | 集群名称 | ||
| sendMsgStatus | 向Broker发消息,返回发送状态和RT | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -b | BrokerName,注意不同于Broker地址 | ||
| -s | 消息大小,单位B | ||
| -c | 发送次数 |
消息相关
| 名称 | 含义 | 命令选项 | 说明 |
| queryMsgById | 根据offsetMsgId查询msg,如果使用开源控制台,应使用offsetMsgId,此命令还有其他参数,具体作用请阅读QueryMsgByIdSubCommand。 | -i | msgId |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| queryMsgByKey | 根据消息 Key 查询消息 | -k | msgKey |
| -t | Topic 名称 | ||
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| queryMsgByOffset | 根据 Offset 查询消息 | -b | Broker 名称,(这里需要注意 填写的是 Broker 的名称,不是 Broker 的地址,Broker 名称可以在 clusterList 查到) |
| -i | query 队列 id | ||
| -o | offset 值 | ||
| -t | topic 名称 | ||
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| queryMsgByUniqueKey | 根据msgId查询,msgId不同于offsetMsgId,区别详见常见运维问题。-g,-d配合使用,查到消息后尝试让特定的消费者消费消息并返回消费结果 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -i | uniqe msg id | ||
| -g | consumerGroup | ||
| -d | clientId | ||
| -t | topic名称 | ||
| checkMsgSendRT | 检测向topic发消息的RT,功能类似clusterRT | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic名称 | ||
| -a | 探测次数 | ||
| -s | 消息大小 | ||
| sendMessage | 发送一条消息,可以根据配置发往特定Message Queue,或普通发送。 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic名称 | ||
| -p | body,消息体 | ||
| -k | keys | ||
| -c | tags | ||
| -b | BrokerName | ||
| -i | queueId | ||
| consumeMessage | 消费消息。可以根据offset、开始&结束时间戳、消息队列消费消息,配置不同执行不同消费逻辑,详见ConsumeMessageCommand。 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic名称 | ||
| -b | BrokerName | ||
| -o | 从offset开始消费 | ||
| -i | queueId | ||
| -g | 消费者分组 | ||
| -s | 开始时间戳,格式详见-h | ||
| -d | 结束时间戳 | ||
| -c | 消费多少条消息 | ||
| printMsg | 从Broker消费消息并打印,可选时间段 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic名称 | ||
| -c | 字符集,例如UTF-8 | ||
| -s | subExpress,过滤表达式 | ||
| -b | 开始时间戳,格式参见-h | ||
| -e | 结束时间戳 | ||
| -d | 是否打印消息体 | ||
| printMsgByQueue | 类似printMsg,但指定Message Queue | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic名称 | ||
| -i | queueId | ||
| -a | BrokerName | ||
| -c | 字符集,例如UTF-8 | ||
| -s | subExpress,过滤表达式 | ||
| -b | 开始时间戳,格式参见-h | ||
| -e | 结束时间戳 | ||
| -p | 是否打印消息 | ||
| -d | 是否打印消息体 | ||
| -f | 是否统计tag数量并打印 | ||
| resetOffsetByTime | 按时间戳重置offset,Broker和consumer都会重置 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -g | 消费者分组 | ||
| -t | topic名称 | ||
| -s | 重置为此时间戳对应的offset | ||
| -f | 是否强制重置,如果false,只支持回溯offset,如果true,不管时间戳对应offset与consumeOffset关系 | ||
| -c | 是否重置c++客户端offset |
消费者和消费者组相关
| 名称 | 含义 | 命令选项 | 说明 |
| consumerProgress | 查看订阅组消费状态,可以查看具体的client IP的消息积累量 | -g | 消费者所属组名 |
| -s | 是否打印client IP | ||
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| consumerStatus | 查看消费者状态,包括同一个分组中是否都是相同的订阅,分析Process Queue是否堆积,返回消费者jstack结果,内容较多,使用者参见ConsumerStatusSubCommand |
-h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -g | consumer group | ||
| -i | clientId | ||
| -s | 是否执行jstack | ||
| getConsumerStatus | 获取 Consumer 消费进度 | -g | 消费者所属组名 |
| -t | 查询主题 | ||
| -i | Consumer 客户端 ip | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 | ||
| updateSubGroup | 更新或创建订阅关系 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -b | Broker地址 | ||
| -c | 集群名称 | ||
| -g | 消费者分组名称 | ||
| -s | 分组是否允许消费 | ||
| -m | 是否从最小offset开始消费 | ||
| -d | 是否是广播模式 | ||
| -q | 重试队列数量 | ||
| -r | 最大重试次数 | ||
| -i | 当slaveReadEnable开启时有效,且还未达到从slave消费时建议从哪个BrokerId消费,可以配置备机id,主动从备机消费 | ||
| -w | 如果Broker建议从slave消费,配置决定从哪个slave消费,配置BrokerId,例如1 | ||
| -a | 当消费者数量变化时是否通知其他消费者负载均衡 | ||
| deleteSubGroup | 从Broker删除订阅关系 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -b | Broker地址 | ||
| -c | 集群名称 | ||
| -g | 消费者分组名称 | ||
| cloneGroupOffset | 在目标群组中使用源群组的offset | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -s | 源消费者组 | ||
| -d | 目标消费者组 | ||
| -t | topic名称 | ||
| -o | 暂未使用 |
连接相关
| 名称 | 含义 | 命令选项 | 说明 |
| consumerConnec tion | 查询 Consumer 的网络连接 | -g | 消费者所属组名 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 | ||
| producerConnec tion | 查询 Producer 的网络连接 | -g | 生产者所属组名 |
| -t | 主题名称 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 |
NameServer相关
| 名称 | 含义 | 命令选项 | 说明 |
| updateKvConfig | 更新NameServer的kv配置,目前还未使用 | -s | 命名空间 |
| -k | key | ||
| -v | value | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 | ||
| deleteKvConfig | 删除NameServer的kv配置 | -s | 命名空间 |
| -k | key | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 | ||
| getNamesrvConfig | 获取NameServer配置 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| updateNamesrvConfig | 修改NameServer配置 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -k | key | ||
| -v | value |
其他
| 名称 | 含义 | 命令选项 | 说明 |
| startMonitoring | 开启监控进程,监控消息误删、重试队列消息数等 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 |
注意:
1、几乎所有指令都需要通过-n参数配置nameServer地址,格式为ip:port
2、几乎所有执行都可以通过-h参数获得帮助
3、当既有Broker地址(-b)又有集群名称clustername(-c)配合项,则优先以Broker地址执行指令。如果不配置Broker地址,则对集群中所有主机执行指令。
5、命令行快速验证
在RocketMQ的安装包中,提供了一个tools.sh工具可以用来在命令行快速验证RocketMQ服务。我们在worker2上进入RocketMQ的安装目录:
发送消息:默认会发1000条消息
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
复制代码
接收消息:
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
复制代码
注意,这是官方提供的Demo,但是官方的源码中,这两个类都是没有指定nameServer的,所以运行会有点问题。要指定NameServer地址,可以配置一个环境变量NAMESRV_ADDR,这样默认会读取这个NameServer地址。可以配到.bash_profile里或者直接临时指定。
export NAMESRV_ADDR='worker1:9876;worker2:9876;worker3:9876'
复制代码
然后就可以正常执行了。
这个NameServer地址的读取方式见源码中org.apache.rocketmq.common.utils.NameServerAddressUtils
public static String getNameServerAddresses() {
return System.getProperty("rocketmq.namesrv.addr", System.getenv("NAMESRV_ADDR"));
}
复制代码
这个方法就是在DefaultMQProducer中默认的设置NameServer地址的方式,这个rokcetmq.namesrv.addr属性可以在java中使用System.setproperties指定,也可以在SpringBoot中配到配置文件里。
这个tools.sh就封装了一个简单的运行RocketMQ的环境,可以运行源码中的其他示例,然后自己的例子也可以放到RocketMQ的lib目录下去执行。























![[桜井宁宁]COS和泉纱雾超可爱写真福利集-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/4d3cf227a85d7e79f5d6b4efb6bde3e8.jpg)

![[桜井宁宁] 爆乳奶牛少女cos写真-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/d40483e126fcf567894e89c65eaca655.jpg)