http://showmecode.cn/links/book
RocketMQ 原理解析
斩秋
博客:http://blog.csdn.net/quhongwei_zhanqiu
http://showmecode.cn/links/book
前言
目录
此文档是从学习 rocketmq 源码过程中的笔记中整理出来的,由于时间及能力原因,理
解有误之处还请谅解,希望对大家学习使用 rocketmq 有所帮助。
Rocketmq 是阿里基于开源思想做的一款产品,代码托管于 github 上,要想学好用好
rocketmq 请从 https://github.com/alibaba/RocketMQ 获取最权威的文档、问题解答、原理介
绍等。
http://showmecode.cn/links/book
第一章: producer
一:Producer 启动流程
Producer 如何感知要发送消息的 broker 即 brokerAddrTable 中的值是怎么获得的,
1. 发送消息的时候指定会指定 topic,如果 producer 集合中没有会根据指定 topic 到 namesrv
获取 topic 发布信息 TopicPublishInfo,并放入本地集合
2. 定时从 namesrv 更新 topic 路由信息,
Producer 与 broker 间的心跳
Producer 定时发送心跳将 producer 信息(其实就是 procduer 的 group)定时发送到,
brokerAddrTable 集合中列出的 broker 上去
Producer 发送消息只发送到 master 的 broker 机器,在通过 broker 的主从复制机制拷贝到
broker 的 slave 上去
http://showmecode.cn/links/book
二:Producer 如何发送消息
Producer 轮询某 topic 下的所有队列的方式来实现发送方的负载均衡
1) Topic 下的所有队列如何理解:
比如 broker1, broker2, borker3 三台 broker 机器都配置了 Topic_A
Broker1 的队列为 queue0 , queue1
Broker2 的队列为 queue0, queue2, queue3,
Broker3 的队列为 queue0
当然一般情况下的 broker 的配置都是一样的
以上当 broker 启动的时候注册到 namesrv 的 Topic_A 队列为共 6 个分别为:
broker1_queue0, broker1_queue1,
broker2_queue0, broker2_queue1, broker2_queue2,
broker3_queue0,
2) Producer 如何实现轮询队列:
--List messageQueueList //Topic_A 的所有的队列
--AtomicInteger sendWhichQueue //自增整型
Producer 从 namesrv 获取的到 Topic_A 路由信息 TopicPublishInfo
方法 selectOneMessageQueue 方法用来选择一个发送队列
(++sendWitchQueue) % messageQueueList.size 为队列集合的下标
每次获取 queue 都会通过 sendWhichQueue 加一来实现对所有 queue 的轮询
如果入参 lastBrokerName 不为空,代表上次选择的 queue 发送失败,这次选
择应该避开同一个 queue
3) Producer 发消息系统重试:
发送失败后,重试几次 retryTimesWhenSendFailed = 2
发送消息超时 sendMsgTimeout = 3000
http://showmecode.cn/links/book
--topic //Topic_A
--brokerName
--queueId
Producer 通过 selectOneMessageQueue 方法获取一个 MessagQueue 对象
向指定 broker 的指定 topic 的指定 queue 发送消息
发送失败(1)重试次数不到两次(2)发送此条消息花费时间还没有到 3000(毫秒), 换
个队列继续发送。
//代表发送消息到达的 broker
//代表发送消息的在指定 broker 上指定 topic 下的队列编号
2.1 producer 发送普通消息
2.2 顺序消息发送
Rocketmq 能够保证消息严格顺序,但是 Rocketmq 需要 producer 保证顺序消息按顺序发
送到同一个 queue 中, 比如购买流程(1)下单(2)支付(3)支付成功,这三个消息需要根据
特定规则将这个三个消息按顺序发送到一个 queue
如何实现把顺序消息发送到同一个 queue:
http://showmecode.cn/links/book
一般消息是通过轮询所有队列发送的,顺序消息可以根据业务比如说订单号
orderId 相同的消息发送到同一个队列, 或者同一用户 userId 发送到同一队列等等
messageQueueList [orderId%messageQueueList.size()]
messageQueueList [userId%messageQueueList.size()]
http://showmecode.cn/links/book
2.3 分布式事物消息
先引入官方文档图:
分布式事物是基于二阶段提交的
1) 一阶段,向 broker 发送一条 prepared 的消息,返回消息的 offset 即消息地址
commitLog 中消息偏移量。Prepared 状态消息不被消费
发送消息 ok,执行本地事物分支, 本地事物方法需要实现 rocketmq 的回调接口 2)2)
2) LocalTransactionExecuter , 处 理 本 地 事 物 逻 辑 返 回 处 理 的 事 物 状 态
LocalTransactionState
3) 二阶段,处理完本地事物中业务得到事物状态, 根据 offset 查找到 commitLog 中
的 prepared 消息,设置消息状态 commitType 或者 rollbackType , 让后将信息添加到
commitLog 中, 其实二阶段生成了两条消息
http://showmecode.cn/links/book
事物消息发送