Guava EventBus:事件驱动框架
本文介绍了消息队列和事件总线在软件系统中的异步通信作用。消息队列(如Kafka、RabbitMQ)通过生产者-消费者模型实现解耦和可靠传递,而事件总线(如Guava EventBus)提供了更高层次的组件间事件发布/订阅机制。重点演示了Guava EventBus在Spring Boot中的实现方式:包括事件定义、监听器注册、同步/异步事件处理等。通过订单创建和支付处理的示例,展示了如何利用事件驱
简介
在复杂的软件系统中,不同组件之间需要进行通信和协作。传统的方法是直接调用函数或使用RPC(远程过程调用),但随着系统规模的增大和服务的分布式部署,这些方法往往会引入耦合性和可伸缩性的问题。消息队列和事件总线作为解决方案,提供了一种异步、松耦合的通信机制,帮助系统更好地实现解耦和水平扩展。
1. 消息队列
消息队列是一种基于生产者-消费者模型的通信机制,主要用于服务间解耦和异步处理。生产者将消息发送到队列中,消费者从队列中获取消息并处理。消息队列的核心优势包括:
- 解耦:生产者和消费者之间不需要直接通信,通过队列来传递消息,降低了组件之间的依赖关系。
- 异步处理:生产者可以快速生成消息并将其放入队列中,而消费者可以按照自身的处理能力和速度来消费消息,实现解耦和异步处理。
- 可靠性:大多数消息队列系统保证消息的可靠性传递,通过持久化、复制和确认机制来确保消息不会丢失。
常见的Java消息队列实现包括Apache Kafka、RabbitMQ、ActiveMQ等,它们都提供了丰富的特性来支持不同的应用场景和需求。
2. 事件总线
事件总线是一种更高层次的抽象,它提供了一种在应用内部中发布和订阅事件的机制。事件总线允许应用内的不同部分(组件、服务)通过事件进行通信,从而实现解耦和灵活的组件间通信。
与消息队列相比,事件总线更加关注于领域内部事件的传递和处理,强调事件驱动的架构设计。
Guava EventBus介绍
事件总线模式:组件间通过发布/订阅事件进行通信,实现解耦
Guava EventBus:Google 提供的轻量级事件总线库
关键特性:
任意 POJO 都可作为事件对象
同步/异步事件处理
基于注解的订阅机制
线程安全
使用步骤
1. 添加依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version>
</dependency>
2. 配置 EventBus Bean
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors;
@Configuration
public class EventBusConfig {
// 同步事件总线(立即处理)
@Bean
public EventBus eventBus() {
return new EventBus();
}
// 异步事件总线(后台线程处理)
@Bean
public AsyncEventBus asyncEventBus() {
return new AsyncEventBus(
"async-eventbus",
Executors.newFixedThreadPool(10)
);
}
}
3. 定义事件类(任意 POJO)
// 订单创建事件
public class OrderCreatedEvent {
private Long orderId;
private String customerName;
private double amount;
public OrderCreatedEvent(Long orderId, String customerName, double amount) {
this.orderId = orderId;
this.customerName = customerName;
this.amount = amount;
}
// Getters...
}
// 支付完成事件
public class PaymentCompletedEvent {
private Long orderId;
private double amount;
private boolean success;
public PaymentCompletedEvent(Long orderId, double amount, boolean success) {
this.orderId = orderId;
this.amount = amount;
this.success = success;
}
// Getters...
}
4. 创建事件监听器
import com.google.common.eventbus.Subscribe;
import org.springframework.stereotype.Component;
@Component
public class OrderEventListener {
// 处理订单创建事件
@Subscribe
public void handleOrderCreated(OrderCreatedEvent event) {
System.out.println("[订单处理] 创建订单 #" + event.getOrderId());
// 业务逻辑:保存订单到数据库、更新库存等
}
// 处理支付完成事件
@Subscribe
public void handlePaymentCompleted(PaymentCompletedEvent event) {
if (event.isSuccess()) {
System.out.println("[支付处理] 订单 #" + event.getOrderId() + " 支付成功");
// 业务逻辑:更新订单状态、发货等
} else {
System.out.println("[支付处理] 订单 #" + event.getOrderId() + " 支付失败");
// 业务逻辑:发送通知、重试机制等
}
}
}
@Component
public class NotificationListener {
// 处理订单创建事件(多个监听器可订阅同一事件)
@Subscribe
public void sendOrderCreatedNotification(OrderCreatedEvent event) {
System.out.println("[通知] 新订单创建: #" + event.getOrderId());
// 业务逻辑:发送邮件/SMS通知
}
// 处理支付完成事件
@Subscribe
public void sendPaymentNotification(PaymentCompletedEvent event) {
if (event.isSuccess()) {
System.out.println("[通知] 订单 #" + event.getOrderId() + " 支付成功");
} else {
System.out.println("[通知] 订单 #" + event.getOrderId() + " 支付失败");
}
}
}
5. 注册监听器
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.AsyncEventBus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class EventBusRegister implements ApplicationListener<ApplicationReadyEvent> {
private final EventBus eventBus;
private final AsyncEventBus asyncEventBus;
private final List<Object> eventListeners; // 自动注入所有监听器
@Autowired
public EventBusRegister(
EventBus eventBus,
AsyncEventBus asyncEventBus,
List<Object> eventListeners
) {
this.eventBus = eventBus;
this.asyncEventBus = asyncEventBus;
this.eventListeners = eventListeners;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
// 注册所有监听器
for (Object listener : eventListeners) {
eventBus.register(listener);
asyncEventBus.register(listener);
System.out.println("注册事件监听器: " + listener.getClass().getSimpleName());
}
}
}
6. 发布事件
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.AsyncEventBus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderService {
private final EventBus eventBus;
private final AsyncEventBus asyncEventBus;
@Autowired
public OrderService(EventBus eventBus, AsyncEventBus asyncEventBus) {
this.eventBus = eventBus;
this.asyncEventBus = asyncEventBus;
}
public void createOrder(String customerName, double amount) {
// 1. 创建订单(业务逻辑)
Long orderId = saveOrderToDatabase(customerName, amount);
// 2. 发布同步事件(立即处理)
eventBus.post(new OrderCreatedEvent(orderId, customerName, amount));
System.out.println("订单创建完成,开始处理支付...");
// 3. 模拟支付处理
boolean paymentSuccess = processPayment(orderId, amount);
// 4. 发布异步事件(后台处理)
asyncEventBus.post(new PaymentCompletedEvent(orderId, amount, paymentSuccess));
}
private Long saveOrderToDatabase(String customerName, double amount) {
// 数据库操作...
return System.currentTimeMillis(); // 模拟返回订单ID
}
private boolean processPayment(Long orderId, double amount) {
// 支付处理逻辑...
return Math.random() > 0.3; // 70%成功率
}
}
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐

所有评论(0)