作者:微信小助手
发布时间:2025-01-05T17:17:30
大家好,我是田螺。 我们日常开发中,经常跟MQ(消息队列)打交道。本文田螺哥梳理了MQ的8种使用场景。 面试官在问我们MQ作用时,很多伙伴马上想到异步处理、解耦、流量削锋等等。 MQ 最常见的应用场景之一就是异步处理。 比如,在用户注册场景中,当用户信息保存成功后,系统需要发送一个短信、或者邮箱消息,通知用户注册成功。如果这个短信或者邮箱消息发送比较耗时,则可能拖垮注册接口。又或者如果调用第三方短信、邮件发送接口失败,也会影响注册接口。一般我们不希望一个通知类的功能,去影响注册主流程,这时候,则可以使用MQ来实现异步处理。 简要代码如下:先保存用户信息,然后发送注册成功的MQ消息 消费者从队列中读取消息并发送短信或邮件: 在微服务架构中,各个服务通常需要进行相互通信。使用 MQ 可以帮助解耦服务,避免直接调用导致的强耦合。 一个电商平台的库存服务和支付服务。支付服务在处理支付后,需要向库存服务发送扣库存的请求,但不直接调用 API,而是通过 MQ 发送消息,让库存服务异步处理。 支付服务在支付成功后将消息发送到 RocketMQ: 库存服务从 RocketMQ 中消费支付消息,并处理扣库存的逻辑: 在高并发的情况下,有些请求可能会产生瞬时流量峰值,直接处理可能会导致服务过载。比如: 这些场景,我们都可以使用MQ来进行流量的削峰填谷,确保系统平稳运行。 假设秒杀系统每秒最多可以处理2k个请求,每秒却有5k的请求过来,可以引入消息队列,秒杀系统每秒从消息队列拉2k请求处理得了。 在电商平台的订单处理中,如果用户下单后一定时间内未支付,需要自动取消订单。通过MQ的延时队列功能,可以设置消息延迟消费的时间,当消息到达延迟时间后,由消费者处理取消订单的逻辑。 当用户下单时,生成一个订单并发送一条延迟消息到RocketMQ。延迟时间可以根据订单的超时时间设置: 注意:RocketMQ的延迟级别是固定的,如1s、5s、10s等。如果订单的延迟时间不是RocketMQ支持的延迟级别的整数倍,那么消息将不会精确地在预期的延迟时间后被消费。为了解决这个问题,你可以选择最接近的延迟级别,或者根据业务需求进行适当的调整。 创建一个用来消费延迟消息的消费者,处理取消订单的逻辑。例如: 消息队列常用于日志系统中,将应用生成的日志异步地发送到日志处理系统,进行统一存储和分析。 生产者(发送日志到 Kafka) 消费者(收集日志信息) 业界经常使用 我举个下订单的场景,使用MQ实现分布式事务的例子吧。 我们先来看,一条普通的MQ消息,从产生到被消费,大概流程如下: 回到下订单这个例子,订单系统创建完订单后,再发送消息给下游系统。如果订单创建成功,然后消息没有成功发送出去,下游系统就无法感知这个事情,出导致数据不一致。 这时候就可以使用MQ实现分布式事务消息。大家看下这个流程: 我以前公司(微众)基于MQ(RocketMQ),自研了远程调用框架。 RocketMQ 作为远程调用框架,主要就是金融场景的适配性。 还有可以基于RocketMQ的定制开发:多中心多活、灰度发布、流量权重与消息去重、背压模式(能够根据后续服务的治理能力决定拉取的消息数量,确保系统的稳定运行。) 消息队列(MQ) 可以非常适合用于 广播通知。在广播通知场景中,消息队列可以将消息推送到多个订阅者,让不同的服务或者应用接收到通知。 针对事件驱动的消息通知,我们以 订单支付成功 事件为例,假设多个系统(如库存管理系统、用户积分系统、财务系统等)都需要监听这个事件来进行相应处理。 当订单支付成功 事件发生时,系统会通过消息队列广播一个事件通知(比如消息内容是订单ID、支付金额等),其他系统可以根据这个事件来执行相应的操作,如: 发送订单支付成功事件: 事件消费者(接收并处理订单支付成功事件): 最后,坚持创作不易,田螺哥工作七年多,整理了日常工作的踩坑专栏,已经更新到83篇啦~有需要的伙伴可以购买,购买了的伙伴,都说非常有用,很多坑自己也踩过,49.9永久买断前言
1. 异步处理
// 用户注册方法
public void registerUser(String username, String email, String phoneNumber) {
// 保存用户信息(简化版)
userService.add(buildUser(username,email,phoneNumber))
// 发送消息
String registrationMessage = "User " + username + " has registered successfully.";
// 发送消息到队列
rabbitTemplate.convertAndSend("registrationQueue", registrationMessage);
}@Service
public class NotificationService {
// 监听消息队列中的消息并发送短信/邮件
@RabbitListener(queues = "registrationQueue")
public void handleRegistrationNotification(String message) {
// 这里可以进行短信或邮件的发送操作
System.out.println("Sending registration notification: " + message);
// 假设这里是发送短信的操作
sendSms(message);
// 也可以做其他通知(比如发邮件等)
sendEmail(message);
}
}2. 解耦
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class PaymentService {
private DefaultMQProducer producer;
public PaymentService() throws Exception {
producer = new DefaultMQProducer("PaymentProducerGroup");
producer.setNamesrvAddr("localhost:9876"); // RocketMQ NameServer 地址
producer.start();
}
public void processPayment(String orderId, int quantity) throws Exception {
// 1. 模拟调用支付接口(例如:支付宝、微信支付等)
boolean paymentSuccessful = callPayment(orderId, quantity);
if (paymentSuccessful) {
// 2. 支付成功后,创建支付消息并发送到 RocketMQ
String messageBody = "OrderId: " + orderId + ", Quantity: " + quantity;
Message message = new Message("paymentTopic", "paymentTag", messageBody.getBytes());
producer.send(message);
}
}
}public class InventoryService {
private DefaultMQPushConsumer consumer;
public InventoryService() throws Exception {
consumer = new DefaultMQPushConsumer("InventoryConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("paymentTopic", "paymentTag");
// 消息监听器
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt msg : msgs) {
String messageBody = new String(msg.getBody());
// 执行扣库存操作
reduceStock(messageBody);
}
return null; // 返回消费成功
});
consumer.start();
System.out.println("InventoryService started...");
}
}3.流量削锋
4.延时任务
@Service
public class OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void createOrder(Order order) {
// 保存订单逻辑(省略)
// 计算延迟时间(单位:毫秒)
long delay = order.getTimeout();
// 发送延迟消息
rocketMQTemplate.syncSend("orderCancelTopic:delay" + delay,
MessageBuilder.withPayload(order).build(),
10000, // 消息发送超时时间(单位:毫秒)
(int) (delay / 1000) // RocketMQ的延迟级别是以秒为单位的,因此需要转换为秒
);
}
}
@Component
@RocketMQMessageListener(topic = "orderCancelTopic", consumerGroup = "order-cancel-consumer-group")
public class OrderCancelListener implements RocketMQListener
@Override
public void onMessage(Order order) {
// 取消订单逻辑
// 检查订单状态,如果订单仍处于未支付状态则进行取消
System.out.println(
"Cancelling order: " + order.getOrderId());
// (省略实际的取消订单逻辑)
}
}
5. 日志收集
// 配置和发送日志到 Kafka 主题 "app-logs"
KafkaProducer
String logMessage =
"{\"level\": \"INFO\", \"message\": \"Application started\", \"timestamp\": \"2024-12-29T20:30:59\"}";
producer.send(new ProducerRecord<>(
"app-logs",
"log-key", logMessage));
@Service
public class LogConsumer {
// 使用 @KafkaListener 注解来消费 Kafka 中的日志
@KafkaListener(topics = "app-logs", groupId = "log-consumer-group")
public void consumeLog(String logMessage) {
// 打印或处理收到的日志
System.out.println("Received log: " + logMessage);
}
}6.分布式事务
MQ
来实现分布式事务。
7. 远程调用
8. 广播通知:事件驱动的消息通知
// 创建订单支付成功事件消息
String orderEventData = "{\"orderId\": 12345, \"userId\": 67890, \"amount\": 100.0, \"event\": \"ORDER_PAYMENT_SUCCESS\"}";
Message msg = new Message("order_event_topic", "order_payment_success", orderEventData.getBytes());
// 发送消息
producer.send(msg);
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (Message msg : msgs) {
String eventData = new String(msg.getBody());
System.out.println("Inventory system received: " + eventData);
// 处理库存减少逻辑
// 解析消息(假设是 JSON 格式)
// updateInventory(eventData); // 假设调用库存更新方法
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (Message msg : msgs) {
String eventData = new String(msg.getBody());
System.out.println("Points system received: " + eventData);
// 处理用户积分增加逻辑
// updateUserPoints(eventData); // 假设调用积分更新方法
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (Message msg : msgs) {
String eventData = new String(msg.getBody());
System.out.println("Finance system received: " + eventData);
// 处理财务记录逻辑
// recordPaymentTransaction(eventData); // 假设调用财务记录方法
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});