作者:微信小助手
发布时间:2021-12-16T11:22:34
大家好,我是不才陈某~ 为了系统间解耦,我们通常会引入 大致过程: 但现实往往不一样!MQ 架构设计要满足高并发、高性能、高可用等指标 单分区,达不到我们的吞吐量要求,我们考虑采用 两台 MQ机器,组成一个集群,原先一个分区存储 貌似可以满足我们的需求,但任何事情都有两面性! 我们看看下面业务场景: 一个用户在电商网站上下订单到交易完成,中间会经历一系列动作,订单的状态也会随之变化,一个订单会产生多条MQ消息, 我们发现,消息带上了状态,不再是一个个独立的个体,有了上下文依赖关系! 对于这个问题,突然想到 引入Cookie机制,每次请求客户端额外传输一些数据,来达到上下文关联。 回到MQ的消息顺序问题,我们要如何解决? 答案:各退一步,保证局部有序。 比如上面的电商例子,只要保证一个订单的多条状态消息在同一个分区,便可以满足业务需求,这个方案可以覆盖大部分的业务场景。 这里面只需要有一个 考虑到市面MQ开源框架很多,常见的如:Kafka、Pulsar、RabbitMQ、RocketMQ 等,API方法略有区别,但设计思路是相通的。 接下来,我们以 生产端提供了一个接口 接口内定义一个select方法,具体参数含义: 关于 arg参数的hashcode的绝对值,然后对mqs.size()取余,得到目标队列在mqs的下标 对mqs.size()值取随机数作为目标队列在mqs的下标 返回null 特别注意: 虽然保证了单个分片的消息有序,但每个分片的消费者只能是单线程处理,因为多线程无法控制消费顺序。这个可能会损失一些性能。 这里又引出另一个问题,如何保证一个队列只能有一个消费端呢? 1、org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance 2、将 3、org.apache.rocketmq.client.impl.consumer.PullMessageService#run 另一个问题,如何保证一个队列,只有一个线程在处理消息呢? 1、DefaultMQPushConsumerImpl#pullMessage 2、ConsumeMessageOrderlyService.ConsumeRequest 继续往下看,如果扩容了怎么办? 原来有6个分区, 我们能做的是,先将存量消息处理完,再扩容。如果是在线业务,可以搞个临时topic,先将消息暂时堆积,待扩容后,按新的路由规则重新发送。 顺序消息,如果某条失败了怎么办?会不会一直阻塞? 1、如果失败,不会提交消费位移,系统会自动重试(有重试上限),此时会阻塞后面的消息消费,直到这条消息处理完 2、如果这个消息达到重试上限,依然失败,会进入MQ
框架,大家各司其职共同完成上下游的业务流程。
多分区
架构设计,正所谓 ”三个臭皮匠赛过一个诸葛亮“,多分区可以有效分摊全局压力,提升整体系统性能。
6条消息
,现在分摊到两个分区,每个分区各存储3条消息
,性能比上面那个提升一倍。下单
、付款
、发货
、买家确认收货
,消费端需要严格按照业务状态机的顺序处理,否则,就会出现业务问题。HTTP协议
,其本身也是无状态的,也就是说前后两次请求没有关联,但有些业务功能有登录要求,那怎么解决?
“
“
路由策略
组件,由它决定消息该放到哪个分区中!RocketMQ
为例:MessageQueueSelector
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
MessageQueueSelector
接口,RocketMQ 框架提供了三个默认实现类:
“
“
“
MessageQueue
isOrder && !this.lock(mq)
尝试对它加锁,确保一个
MessageQueue
只能被一个消费者处理
PullRequest
对象放入PullMessageService
的pullRequestQueue
队列中public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
PullMessageService
是一个
Runnable
线程任务
ConsumeMessageService
中有两个实现类,因为我们有消费顺序要求,会选择
ConsumeMessageOrderlyService
来处理业务
ConcurrentMap
中获取
messageQueue
对应的锁对象
synchronized
关键字,线程来抢占锁,互斥关系,从而保证了一个
MessageQueue
只能有一个线程并发处理
order_id_1
的消息在MessageQueue6
中,此时扩容一倍,现在12个分区,order_id_1
订单后面产生的消息可能路由到了MessageQueue8
中,同一个订单的消息分布在两个分区中,无法保证顺序。死信队列
,可以继续处理后面的消息
求点赞、在看、分享三连