详解微服务-分布式事务组件seata(启动seata server、客户端依赖、微服务增加分布式事务配置、全局业务方法注解GlobalTransactonal、seata原理)以及openfeign
详解springcloudalibaba-分布式事务组件seata(启动seata server、客户端依赖、微服务增加分布式事务配置、全局业务方法注解GlobalTransactonal、seata原理)以及openfeign
文章目录
Seata 是一款开源的分布式事务解决方案,提供高性能和简单易用的分布式事务服务。Seata为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案
1.分布式环境准备

开启本地事务
使用注解:@EnableTransactionManagement
@EnableTransactionManagement
@EnableFeignClients
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.wemedia.user.mapper")
public class UserApplication {
public static void main(String[] args) {
SpringApplication.run(UserApplication.class, args);
}
在业务service实现方法上使用注解:@Transactional
@Transactional
public void add(User user){
orderMapper.add(user);
}
分布式事务架构原理
2.启动seata server
-
配置数据库
Seata 支持不同的应用使用完全不相干的数据库-- Seata AT 模式需要使用到 undo_log 表; 注意此处0.3.0+ 增加唯一索引 ux_undo_log CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; -- 导入 seata-server db 模式所需要的数据库表 -- -------------------------------- The script used when storeMode is 'db' -------------------------------- -- the table to store GlobalSession data CREATE TABLE IF NOT EXISTS `global_table` ( `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `status` TINYINT NOT NULL, `application_id` VARCHAR(32), `transaction_service_group` VARCHAR(32), `transaction_name` VARCHAR(128), `timeout` INT, `begin_time` BIGINT, `application_data` VARCHAR(2000), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`xid`), KEY `idx_status_gmt_modified` (`status` , `gmt_modified`), KEY `idx_transaction_id` (`transaction_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; -- the table to store BranchSession data CREATE TABLE IF NOT EXISTS `branch_table` ( `branch_id` BIGINT NOT NULL, `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `resource_group_id` VARCHAR(32), `resource_id` VARCHAR(256), `branch_type` VARCHAR(8), `status` TINYINT, `client_id` VARCHAR(64), `application_data` VARCHAR(2000), `gmt_create` DATETIME(6), `gmt_modified` DATETIME(6), PRIMARY KEY (`branch_id`), KEY `idx_xid` (`xid`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; -- the table to store lock data CREATE TABLE IF NOT EXISTS `lock_table` ( `row_key` VARCHAR(128) NOT NULL, `xid` VARCHAR(128), `transaction_id` BIGINT, `branch_id` BIGINT NOT NULL, `resource_id` VARCHAR(256), `table_name` VARCHAR(32), `pk` VARCHAR(36), `status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking', `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`row_key`), KEY `idx_status` (`status`), KEY `idx_branch_id` (`branch_id`), KEY `idx_xid_and_branch_id` (`xid` , `branch_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; CREATE TABLE IF NOT EXISTS `distributed_lock` ( `lock_key` CHAR(20) NOT NULL, `lock_value` VARCHAR(20) NOT NULL, `expire` BIGINT, primary key (`lock_key`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4; INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0); INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0); INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0); INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0); -
启动 seata server应用
采用 nacos 作为配置,注册中心存储模式为:db 采用 mysql。
双击启动
-
访问Seata Dashboard: http://127.0.0.1:7091/#/login
账户密码默认为: seata seata

3. 微服务引入依赖seata客户端:
使用 spring-cloud-starter-alibaba-seata 完成 Spring Cloud 应用的分布式事务接入
<!--引入分布式事务组件seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
4. 给需要控制分布式事务的微服务, 增加分布式事务配置
file.conf文件
#分布式事务seata配置
service {
#vgroup映射,格式为:TC唯一名称->SEATA的组ID
vgroupMapping.default_tx_group = "default"
#设置seata server注册地址
default.grouplist = "127.0.0.1:8091"
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
给每个微服务对应的数据库增加undo_log表,进行分布式事务日志的记录,方便回滚
备注说明:当然也可以使用nacos配置中心管理分布式事务配置
5. 给全局业务方法上标注全局事务@GlobalTransactonal
@GlobalTransactional //标注全局事务
@Override
public CommonResult<String> vipReCharge(String num) {
//在代码中使用opentracing代码自定义链路追踪
Tracer tracer=new SkywalkingTracer();
Tracer.SpanBuilder spanBuilder=tracer.buildSpan("UserServiceImpl.vipRecharge(String num)方法");
Span span=spanBuilder.withTag("startTag",false).startManual();
List<TUser> userList=tUserMapper.selectAll();
System.out.println("userList:"+userList.size());
CommonResult<String> result=payFeignApi.callWechatPay();
System.out.println("调用第三方payFeignApi.callWechatPay 返回结果:"+result);
span.log("调用第三方payFeignApi.callWechatPay 返回结果:"+result);
span.finish();
return CommonResult.success("调用支付服务成功!");
}
6. seata原理详解
二阶提交协议




seata 事务模式
-
AT模式
默认模式,
-
TCC模式:全手动提交的二阶段协议


-
Saga模式:长事务的模式

适应场景:
- 业务流程长、业务流程多
- 参与者包含其他公司或遗留系统服务,无法提供TCC模式要求的三个接口
- 一般采用消息队列实现分布式事务最终一致性
- XA模式:利用事物资源(数据库、消息服务)对xa协议的支持,以XA协议的机制来管理分支事务的一种模式

XA模式,性能低效,TA模式性能高
7. 补充openfeign使用
1.声明式rest客户端

2.远程调用-业务API

- 微服务引入springcloud-started-openfeign依赖
- 定义服务消费方feign接口,保持和服务提供者定义的controller中接口参数都一致
@FeignClient(value = "wemedia-oss-pay",fallback = PayFeignApiFallback.class)
public interface PayFeignApi {
@GetMapping("/api/pay/callWechatPay")
CommonResult<String> callWechatPay();
}
- 开启服务发现客户端@EnableDiscoveryClient 和开启扫描feignclient包@EnableFeignClients
@EnableTransactionManagement
@EnableFeignClients(basePackages = "com.wemedia.user.feign") //开启feign远程调用
@SpringBootApplication
@EnableDiscoveryClient
public class UserApplication {
public static void main(String[] args) {
SpringApplication.run(UserApplication.class, args);
}
- 在业务方法中使用注解,引入feign api,进行调用
@Resource
PayFeignApi payFeignApi;
3.拦截器 RequestInterceptor
自定义token拦截器实现
package com.wemedia.user.interceptor;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Slf4j
@Component
public class XTokenRequestInterceptor implements RequestInterceptor {
/**
* 请求拦截器
* @param template
*/
@Override
public void apply(RequestTemplate template) {
log.info("XTokenRequestInterceptor start up!!!!");
template.header("X-Token", UUID.randomUUID().toString());
}
}
4.重试 Retryer
配置重试
@Configuration
public class FeignConfig {
//重试机制
@Bean
public Retryer myRetryer() {
// return new Retryer.Default();
// 初次间隔 最大间隔 最大请求次数(1+2) = 3
return new Retryer.Default(100, 1, 3);
}
}
5.日志feign.Logger
配置feign日志
@Configuration
public class FeignConfig {
// 日志记录级别
@Bean
public Logger.Level feignLoggerLevel() {
return Logger.Level.FULL;
}
}
yml中配置如下:
feign:
sentinel:
enabled: true
#配置日志
logging:logging:
level:
com.wemedia.user.feign: info #指定feign api所在包
6.超时配置
spring
cloud:
openfeign:
client:
config:
default:
connect-timeout: 20000
read-timeout: 20000
7.兜底返回Fallback

1.定义fallback
package com.wemedia.user.feign.fallback;
import com.wemedia.common.api.CommonResult;
import com.wemedia.user.feign.PayFeignApi;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class PayFeignApiFallback implements PayFeignApi {
@Override
public CommonResult<String> callWechatPay() {
log.info("兜底回调。。。。");
Integer result=100;
return CommonResult.failed("调用pay服务失败,返回兜底结果:"+result);
}
}
2.feign api引入fallback类
3. 增加配置
feign:
sentinel:
enabled: true
4.自定义sentinel异常处理器,处理返回结果
package com.wemedia.user.exception;
import com.alibaba.csp.sentinel.adapter.spring.webmvc_v6x.callback.BlockExceptionHandler;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wemedia.common.api.CommonResult;
import com.wemedia.common.api.ResultCode;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.stereotype.Component;
import java.io.PrintWriter;
/**
* 自定义sentinel异常处理器
*/
@Component
public class MyBlockExceptionHandler implements BlockExceptionHandler {
private ObjectMapper objectMapper=new ObjectMapper();//创建对象转json字符串
@Override
public void handle(HttpServletRequest request, HttpServletResponse response, String resourceName, BlockException e) throws Exception {
response.setContentType("application/json;charset=utf-8");
PrintWriter writer = response.getWriter();
CommonResult<Object> failed = CommonResult.failed(ResultCode.RC202, resourceName + "被sentinel限制了,原因msg:" + e.getMessage()+";异常类:"+e.getClass());
String json = objectMapper.writeValueAsString(failed);
writer.write(json);
writer.flush();
writer.close();
}
}
8. 参考资料
- https://sca.aliyun.com/docs/2023/user-guide/seata/quick-start/?spm=7145af80.671cd0fd.0.0.609a7021QbaNDm
- https://github.com/nacycher/study-cloud/tree/master/services/static/notes
- seata使用
https://seata.apache.org/zh-cn/docs/user/quickstart/
https://seata.apache.org/zh-cn/
https://github.com/alibaba/spring-cloud-alibaba/blob/2023.x/spring-cloud-alibaba-examples/seata-example/readme-zh.md - Spring Cloud Alibaba https://sca.aliyun.com/docs/2023/overview/what-is-sca/?spm=5176.29160081.0.0.689134bfhKhOsy
- springcloudalibaba demo: https://github.com/spring-cloud-alibaba-group/springboot-transfer-to-springcloud
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐

所有评论(0)