微服务项目中MQ实战:解耦异步处理
本文介绍了消息队列(MQ)在在线教育平台中的应用实践。项目采用RabbitMQ实现系统解耦和异步处理,以点赞功能为例详细说明了MQ配置、常量定义、生产者和消费者实现。通过定义统一交换机和路由键,点赞服务将点赞事件异步发送至MQ,学习服务消费消息批量更新点赞数。这种方案实现了异步处理、服务解耦、消息可靠性和批量操作等优势,有效提升了系统性能和扩展性,同时保持了数据一致性。
在微服务中,消息队列(Message Queue,简称MQ)扮演着非常重要的角色。它能够实现系统间的解耦、异步处理、流量削峰等功能。本文将详细介绍在项目开发中如何使用MQ。
1. 项目背景介绍
我的项目是一个在线教育平台,其中包含学习中心服务(learning-service)。在这个服务中,我们需要处理各种业务事件,如用户签到、课程购买、点赞等。这些操作往往需要触发其他服务的处理逻辑,如果采用同步调用的方式,会降低系统性能并增加耦合度。
2. MQ在项目中的配置
首先,让我们看一下项目中的MQ配置。在pom.xml中添加了RabbitMQ的依赖:
<!--mq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
shared-configs:
- data-id: shared-mq.yaml # 共享mq配置
refresh: false
3. MQ常量定义
项目中统一定义了MQ相关的常量,便于维护和管理:
public interface MqConstants {
interface Exchange{
String COURSE_EXCHANGE = "course.topic";
String ORDER_EXCHANGE = "order.topic";
String LEARNING_EXCHANGE = "learning.topic";
String LIKE_RECORD_EXCHANGE = "like.record.topic";
}
interface Key{
// 点赞相关
String LIKED_TIMES_KEY_TEMPLATE = "{}.times.changed";
String QA_LIKED_TIMES_KEY = "QA.times.changed";
// 积分相关
String WRITE_REPLY = "reply.new";
String SIGN_IN = "sign.in";
String LEARN_SECTION = "section.learned";
String WRITE_NOTE = "note.new";
String NOTE_GATHERED = "note.gathered";
// 订单相关
String ORDER_PAY_KEY = "order.pay";
String ORDER_REFUND_KEY = "order.refund";
}
}
4. 生产者端的实现
点赞功能中的MQ使用
在点赞功能中,当用户点赞或取消点赞时,我们需要更新相关内容的点赞数。为了实现这个功能,项目使用了MQ来异步通知其他服务:
// 在点赞服务中发送MQ消息
mqHelper.send(
LIKE_RECORD_EXCHANGE,
StringUtils.format(LIKED_TIMES_KEY_TEMPLATE, recordDTO.getBizType()),
LikedTimesDTO.of(recordDTO.getBizId(), count));
这里:
LIKE_RECORD_EXCHANGE 是交换机名称
LIKED_TIMES_KEY_TEMPLATE 是路由键模板,根据业务类型格式化
LikedTimesDTO 是消息内容,包含业务ID和点赞次数
5. 消费者端的实现
点赞消息消费者
在学习服务中,我们有一个专门处理点赞消息的消费者:
@Slf4j
@Component
@RequiredArgsConstructor
public class LikeTimesChangeListener {
private final IInteractionReplyService replyService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "qa.liked.times.queue", durable = "true"),
exchange = @Exchange(name = LIKE_RECORD_EXCHANGE, type = ExchangeTypes.TOPIC),
key = QA_LIKED_TIMES_KEY
))
public void listenReplyLikedTimesChange(List<LikedTimesDTO> likedTimesDTOs){
log.debug("监听到回答或评论的点赞数变更");
List<InteractionReply> list = new ArrayList<>(likedTimesDTOs.size());
for (LikedTimesDTO dto : likedTimesDTOs) {
InteractionReply r = new InteractionReply();
r.setId(dto.getBizId());
r.setLikedTimes(dto.getLikedTimes());
list.add(r);
}
replyService.updateBatchById(list);
}
}
6. 完整的MQ处理流程示例(点赞场景)
让我们通过一个完整的用户点赞问答的例子来说明MQ的完整流程:
步骤1:用户发起点赞请求
用户在前端点击对某个问答的点赞按钮,发送点赞请求到后端。
步骤2:点赞服务处理点赞逻辑
@Override
public void addLikeRecord(LikeRecordFormDTO recordDTO) {
// 1.基于前端的参数,判断是执行点赞还是取消点赞
boolean success = recordDTO.getLiked() ? like(recordDTO) : unlike(recordDTO);
// 2.判断是否执行成功,如果失败,则直接结束
if (!success) {
return;
}
// 3.如果执行成功,统计点赞总数
Integer likedTimes = lambdaQuery()
.eq(LikedRecord::getBizId, recordDTO.getBizId())
.count();
// 4.发送MQ通知
mqHelper.send(
LIKE_RECORD_EXCHANGE, // 交换机
StringUtils.format(LIKED_TIMES_KEY_TEMPLATE, recordDTO.getBizType()), // 路由键
LikedTimesDTO.of(recordDTO.getBizId(), likedTimes)); // 消息内容
}
例如,如果用户对一个问答(bizId=1001)进行点赞,将会发送如下消息:
- 交换机:`like.record.topic`
- 路由键:`QA.times.changed`(因为bizType是"QA")
- 消息内容:`LikedTimesDTO{bizId=1001, likedTimes=5}`(假设这是第5个赞)
步骤3:MQ将消息路由到指定队列
RabbitMQ根据交换机和路由键将消息路由到`qa.liked.times.queue`队列。
步骤4:点赞消息消费者处理消息
在 `LikeTimesChangeListener` 类中监听并处理点赞消息:
@Slf4j
@Component
@RequiredArgsConstructor
public class LikeTimesChangeListener {
private final IInteractionReplyService replyService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "qa.liked.times.queue", durable = "true"),
exchange = @Exchange(name = LIKE_RECORD_EXCHANGE, type = ExchangeTypes.TOPIC),
key = QA_LIKED_TIMES_KEY
))
public void listenReplyLikedTimesChange(List<LikedTimesDTO> likedTimesDTOs){
log.debug("监听到回答或评论的点赞数变更");
List<InteractionReply> list = new ArrayList<>(likedTimesDTOs.size());
for (LikedTimesDTO dto : likedTimesDTOs) {
InteractionReply r = new InteractionReply();
r.setId(dto.getBizId()); // 设置回答ID
r.setLikedTimes(dto.getLikedTimes()); // 设置新的点赞数
list.add(r);
}
// 批量更新数据库中回答的点赞数
replyService.updateBatchById(list);
}
}
步骤5:问答服务更新点赞数
问答服务接收到消息后,批量更新问答表中的点赞数字段,确保数据一致性。
// InteractionReplyService 中的 updateBatchById 方法
@Override
@Transactional
public void updateBatchById(List<InteractionReply> list) {
// 批量更新问答的点赞数
updateBatchById(list);
}
流程图解:

优势体现
通过这个点赞例子,MQ的使用带来了以下优势:
1. 异步处理:用户点赞后不需要等待点赞数同步完成,提高响应速度
2. 解耦:点赞服务和问答服务相互独立,通过MQ进行通信
3. 可靠性:即使问答服务暂时不可用,消息也不会丢失,等服务恢复后继续处理
4. 批量处理:可以将多个点赞消息合并处理,提高处理效率
5. 扩展性:可以轻松添加更多的消费者来处理点赞事件,如通知服务、推荐服务等
这种方式确保了点赞功能的高性能和数据一致性,同时保持了系统各服务之间的松耦合。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐

所有评论(0)