简介

            在复杂的软件系统中,不同组件之间需要进行通信和协作。传统的方法是直接调用函数或使用RPC(远程过程调用),但随着系统规模的增大和服务的分布式部署,这些方法往往会引入耦合性和可伸缩性的问题。消息队列和事件总线作为解决方案,提供了一种异步、松耦合的通信机制,帮助系统更好地实现解耦和水平扩展。

1. 消息队列

消息队列是一种基于生产者-消费者模型的通信机制,主要用于服务间解耦和异步处理。生产者将消息发送到队列中,消费者从队列中获取消息并处理。消息队列的核心优势包括:

  • 解耦:生产者和消费者之间不需要直接通信,通过队列来传递消息,降低了组件之间的依赖关系。
  • 异步处理:生产者可以快速生成消息并将其放入队列中,而消费者可以按照自身的处理能力和速度来消费消息,实现解耦和异步处理。
  • 可靠性:大多数消息队列系统保证消息的可靠性传递,通过持久化、复制和确认机制来确保消息不会丢失。

常见的Java消息队列实现包括Apache Kafka、RabbitMQ、ActiveMQ等,它们都提供了丰富的特性来支持不同的应用场景和需求。

2. 事件总线

       事件总线是一种更高层次的抽象,它提供了一种在应用内部中发布和订阅事件的机制。事件总线允许应用内的不同部分(组件、服务)通过事件进行通信,从而实现解耦和灵活的组件间通信。

与消息队列相比,事件总线更加关注于领域内部事件的传递和处理,强调事件驱动的架构设计。

Guava EventBus介绍

事件总线模式:组件间通过发布/订阅事件进行通信,实现解耦

Guava EventBus:Google 提供的轻量级事件总线库

关键特性:

任意 POJO 都可作为事件对象

同步/异步事件处理

基于注解的订阅机制

线程安全

使用步骤


1. 添加依赖

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>32.1.3-jre</version>
</dependency>

2. 配置 EventBus Bean

import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import java.util.concurrent.Executors;
 
@Configuration
public class EventBusConfig {
 
    // 同步事件总线(立即处理)
    @Bean
    public EventBus eventBus() {
        return new EventBus();
    }
 
    // 异步事件总线(后台线程处理)
    @Bean
    public AsyncEventBus asyncEventBus() {
        return new AsyncEventBus(
            "async-eventbus", 
            Executors.newFixedThreadPool(10)
        );
    }
}

3. 定义事件类(任意 POJO)

// 订单创建事件
public class OrderCreatedEvent {
    private Long orderId;
    private String customerName;
    private double amount;
    
    public OrderCreatedEvent(Long orderId, String customerName, double amount) {
        this.orderId = orderId;
        this.customerName = customerName;
        this.amount = amount;
    }
    
    // Getters...
}
 
// 支付完成事件
public class PaymentCompletedEvent {
    private Long orderId;
    private double amount;
    private boolean success;
    
    public PaymentCompletedEvent(Long orderId, double amount, boolean success) {
        this.orderId = orderId;
        this.amount = amount;
        this.success = success;
    }
    
    // Getters...
}

4. 创建事件监听器

import com.google.common.eventbus.Subscribe;
import org.springframework.stereotype.Component;
 
@Component
public class OrderEventListener {
 
    // 处理订单创建事件
    @Subscribe
    public void handleOrderCreated(OrderCreatedEvent event) {
        System.out.println("[订单处理] 创建订单 #" + event.getOrderId());
        // 业务逻辑:保存订单到数据库、更新库存等
    }
 
    // 处理支付完成事件
    @Subscribe
    public void handlePaymentCompleted(PaymentCompletedEvent event) {
        if (event.isSuccess()) {
            System.out.println("[支付处理] 订单 #" + event.getOrderId() + " 支付成功");
            // 业务逻辑:更新订单状态、发货等
        } else {
            System.out.println("[支付处理] 订单 #" + event.getOrderId() + " 支付失败");
            // 业务逻辑:发送通知、重试机制等
        }
    }
}
 
@Component
public class NotificationListener {
 
    // 处理订单创建事件(多个监听器可订阅同一事件)
    @Subscribe
    public void sendOrderCreatedNotification(OrderCreatedEvent event) {
        System.out.println("[通知] 新订单创建: #" + event.getOrderId());
        // 业务逻辑:发送邮件/SMS通知
    }
 
    // 处理支付完成事件
    @Subscribe
    public void sendPaymentNotification(PaymentCompletedEvent event) {
        if (event.isSuccess()) {
            System.out.println("[通知] 订单 #" + event.getOrderId() + " 支付成功");
        } else {
            System.out.println("[通知] 订单 #" + event.getOrderId() + " 支付失败");
        }
    }
}

5. 注册监听器

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.AsyncEventBus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
 
import java.util.List;
 
@Component
public class EventBusRegister implements ApplicationListener<ApplicationReadyEvent> {
 
    private final EventBus eventBus;
    private final AsyncEventBus asyncEventBus;
    private final List<Object> eventListeners; // 自动注入所有监听器
 
    @Autowired
    public EventBusRegister(
        EventBus eventBus, 
        AsyncEventBus asyncEventBus,
        List<Object> eventListeners
    ) {
        this.eventBus = eventBus;
        this.asyncEventBus = asyncEventBus;
        this.eventListeners = eventListeners;
    }
 
    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        // 注册所有监听器
        for (Object listener : eventListeners) {
            eventBus.register(listener);
            asyncEventBus.register(listener);
            System.out.println("注册事件监听器: " + listener.getClass().getSimpleName());
        }
    }
}

6. 发布事件

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.AsyncEventBus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
@Service
public class OrderService {
 
    private final EventBus eventBus;
    private final AsyncEventBus asyncEventBus;
 
    @Autowired
    public OrderService(EventBus eventBus, AsyncEventBus asyncEventBus) {
        this.eventBus = eventBus;
        this.asyncEventBus = asyncEventBus;
    }
 
    public void createOrder(String customerName, double amount) {
        // 1. 创建订单(业务逻辑)
        Long orderId = saveOrderToDatabase(customerName, amount);
        
        // 2. 发布同步事件(立即处理)
        eventBus.post(new OrderCreatedEvent(orderId, customerName, amount));
        
        System.out.println("订单创建完成,开始处理支付...");
        
        // 3. 模拟支付处理
        boolean paymentSuccess = processPayment(orderId, amount);
        
        // 4. 发布异步事件(后台处理)
        asyncEventBus.post(new PaymentCompletedEvent(orderId, amount, paymentSuccess));
    }
    
    private Long saveOrderToDatabase(String customerName, double amount) {
        // 数据库操作...
        return System.currentTimeMillis(); // 模拟返回订单ID
    }
    
    private boolean processPayment(Long orderId, double amount) {
        // 支付处理逻辑...
        return Math.random() > 0.3; // 70%成功率
    }
}

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐