Seata 是一款开源的分布式事务解决方案,提供高性能和简单易用的分布式事务服务。Seata为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案

1.分布式环境准备

在这里插入图片描述
开启本地事务

使用注解:@EnableTransactionManagement

@EnableTransactionManagement
@EnableFeignClients
@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.wemedia.user.mapper")
public class UserApplication {
    public static void main(String[] args) {
        SpringApplication.run(UserApplication.class, args);
    }

在业务service实现方法上使用注解:@Transactional

@Transactional

public void add(User user){

orderMapper.add(user);

}

分布式事务架构原理
在这里插入图片描述

2.启动seata server

  • 配置数据库
    Seata 支持不同的应用使用完全不相干的数据库

    -- Seata AT 模式需要使用到 undo_log 表; 注意此处0.3.0+ 增加唯一索引 ux_undo_log
    CREATE TABLE `undo_log` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `branch_id` bigint(20) NOT NULL,
      `xid` varchar(100) NOT NULL,
      `context` varchar(128) NOT NULL,
      `rollback_info` longblob NOT NULL,
      `log_status` int(11) NOT NULL,
      `log_created` datetime NOT NULL,
      `log_modified` datetime NOT NULL,
      `ext` varchar(100) DEFAULT NULL,
      PRIMARY KEY (`id`),
      UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
    
    
    -- 导入 seata-server db 模式所需要的数据库表
    -- -------------------------------- The script used when storeMode is 'db' --------------------------------
    -- the table to store GlobalSession data
    CREATE TABLE IF NOT EXISTS `global_table`
    (
        `xid`                       VARCHAR(128) NOT NULL,
        `transaction_id`            BIGINT,
        `status`                    TINYINT      NOT NULL,
        `application_id`            VARCHAR(32),
        `transaction_service_group` VARCHAR(32),
        `transaction_name`          VARCHAR(128),
        `timeout`                   INT,
        `begin_time`                BIGINT,
        `application_data`          VARCHAR(2000),
        `gmt_create`                DATETIME,
        `gmt_modified`              DATETIME,
        PRIMARY KEY (`xid`),
        KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
        KEY `idx_transaction_id` (`transaction_id`)
    ) ENGINE = InnoDB
      DEFAULT CHARSET = utf8mb4;
    
    -- the table to store BranchSession data
    CREATE TABLE IF NOT EXISTS `branch_table`
    (
        `branch_id`         BIGINT       NOT NULL,
        `xid`               VARCHAR(128) NOT NULL,
        `transaction_id`    BIGINT,
        `resource_group_id` VARCHAR(32),
        `resource_id`       VARCHAR(256),
        `branch_type`       VARCHAR(8),
        `status`            TINYINT,
        `client_id`         VARCHAR(64),
        `application_data`  VARCHAR(2000),
        `gmt_create`        DATETIME(6),
        `gmt_modified`      DATETIME(6),
        PRIMARY KEY (`branch_id`),
        KEY `idx_xid` (`xid`)
    ) ENGINE = InnoDB
      DEFAULT CHARSET = utf8mb4;
    
    -- the table to store lock data
    CREATE TABLE IF NOT EXISTS `lock_table`
    (
        `row_key`        VARCHAR(128) NOT NULL,
        `xid`            VARCHAR(128),
        `transaction_id` BIGINT,
        `branch_id`      BIGINT       NOT NULL,
        `resource_id`    VARCHAR(256),
        `table_name`     VARCHAR(32),
        `pk`             VARCHAR(36),
        `status`         TINYINT      NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
        `gmt_create`     DATETIME,
        `gmt_modified`   DATETIME,
        PRIMARY KEY (`row_key`),
        KEY `idx_status` (`status`),
        KEY `idx_branch_id` (`branch_id`),
        KEY `idx_xid_and_branch_id` (`xid` , `branch_id`)
    ) ENGINE = InnoDB
      DEFAULT CHARSET = utf8mb4;
    
    CREATE TABLE IF NOT EXISTS `distributed_lock`
    (
        `lock_key`       CHAR(20) NOT NULL,
        `lock_value`     VARCHAR(20) NOT NULL,
        `expire`         BIGINT,
        primary key (`lock_key`)
    ) ENGINE = InnoDB
      DEFAULT CHARSET = utf8mb4;
    
    INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
    INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
    INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
    INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
    
  • 启动 seata server应用
    采用 nacos 作为配置,注册中心存储模式为:db 采用 mysql。
    在这里插入图片描述

双击启动

  • 访问Seata Dashboard: http://127.0.0.1:7091/#/login

    账户密码默认为: seata seata

    在这里插入图片描述

3. 微服务引入依赖seata客户端:

使用 spring-cloud-starter-alibaba-seata 完成 Spring Cloud 应用的分布式事务接入

<!--引入分布式事务组件seata-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        </dependency>

4. 给需要控制分布式事务的微服务, 增加分布式事务配置

file.conf文件

#分布式事务seata配置
service {
  #vgroup映射,格式为:TC唯一名称->SEATA的组ID
  vgroupMapping.default_tx_group = "default"
  #设置seata server注册地址
  default.grouplist = "127.0.0.1:8091"
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

给每个微服务对应的数据库增加undo_log表,进行分布式事务日志的记录,方便回滚

备注说明:当然也可以使用nacos配置中心管理分布式事务配置

5. 给全局业务方法上标注全局事务@GlobalTransactonal

@GlobalTransactional //标注全局事务
    @Override
    public CommonResult<String> vipReCharge(String num) {
        //在代码中使用opentracing代码自定义链路追踪
        Tracer tracer=new SkywalkingTracer();
        Tracer.SpanBuilder spanBuilder=tracer.buildSpan("UserServiceImpl.vipRecharge(String num)方法");
        Span span=spanBuilder.withTag("startTag",false).startManual();

        List<TUser> userList=tUserMapper.selectAll();
        System.out.println("userList:"+userList.size());
        CommonResult<String> result=payFeignApi.callWechatPay();
        System.out.println("调用第三方payFeignApi.callWechatPay 返回结果:"+result);

        span.log("调用第三方payFeignApi.callWechatPay 返回结果:"+result);
        span.finish();
        return CommonResult.success("调用支付服务成功!");
    }

6. seata原理详解

二阶提交协议

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

seata 事务模式

  1. AT模式

    默认模式,

  2. TCC模式:全手动提交的二阶段协议

    在这里插入图片描述
    在这里插入图片描述

  3. Saga模式:长事务的模式

    在这里插入图片描述

适应场景:

  • 业务流程长、业务流程多
  • 参与者包含其他公司或遗留系统服务,无法提供TCC模式要求的三个接口
  • 一般采用消息队列实现分布式事务最终一致性
  1. XA模式:利用事物资源(数据库、消息服务)对xa协议的支持,以XA协议的机制来管理分支事务的一种模式
    在这里插入图片描述
    XA模式,性能低效,TA模式性能高

7. 补充openfeign使用

1.声明式rest客户端

在这里插入图片描述

2.远程调用-业务API

在这里插入图片描述

  1. 微服务引入springcloud-started-openfeign依赖
  2. 定义服务消费方feign接口,保持和服务提供者定义的controller中接口参数都一致
@FeignClient(value = "wemedia-oss-pay",fallback = PayFeignApiFallback.class)
public interface PayFeignApi {

   @GetMapping("/api/pay/callWechatPay")
    CommonResult<String> callWechatPay();
}

  1. 开启服务发现客户端@EnableDiscoveryClient 和开启扫描feignclient包@EnableFeignClients
@EnableTransactionManagement
@EnableFeignClients(basePackages = "com.wemedia.user.feign") //开启feign远程调用
@SpringBootApplication
@EnableDiscoveryClient
public class UserApplication {
    public static void main(String[] args) {
        SpringApplication.run(UserApplication.class, args);
    }
  1. 在业务方法中使用注解,引入feign api,进行调用
 @Resource
   PayFeignApi payFeignApi;

3.拦截器 RequestInterceptor

自定义token拦截器实现

package com.wemedia.user.interceptor;

import feign.RequestInterceptor;
import feign.RequestTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.UUID;

@Slf4j
@Component
public class XTokenRequestInterceptor implements RequestInterceptor {

    /**
     * 请求拦截器
     * @param template
     */
    @Override
    public void apply(RequestTemplate template) {
        log.info("XTokenRequestInterceptor start up!!!!");
        template.header("X-Token", UUID.randomUUID().toString());
    }
}

4.重试 Retryer

配置重试

@Configuration
public class FeignConfig {

     //重试机制
    @Bean
    public Retryer myRetryer() {
       // return new Retryer.Default();
        // 初次间隔 最大间隔 最大请求次数(1+2) = 3
        return new Retryer.Default(100, 1, 3);
    }
}    

5.日志feign.Logger

配置feign日志

@Configuration
public class FeignConfig {

   // 日志记录级别
    @Bean
    public Logger.Level feignLoggerLevel() {
        return Logger.Level.FULL;
    }
}    

yml中配置如下:

feign:
  sentinel:
    enabled: true

#配置日志
logging:logging:
  level:
    com.wemedia.user.feign: info    #指定feign api所在包
  

6.超时配置

spring
   cloud:
	  openfeign:
	      client:
	        config:
	          default:
	            connect-timeout: 20000
	            read-timeout: 20000

7.兜底返回Fallback

在这里插入图片描述
1.定义fallback

package com.wemedia.user.feign.fallback;

import com.wemedia.common.api.CommonResult;
import com.wemedia.user.feign.PayFeignApi;
import lombok.extern.slf4j.Slf4j;

import org.springframework.stereotype.Component;

@Component
@Slf4j
public class PayFeignApiFallback implements PayFeignApi {

    @Override
    public CommonResult<String> callWechatPay() {
        log.info("兜底回调。。。。");
        Integer result=100;
        return CommonResult.failed("调用pay服务失败,返回兜底结果:"+result);
    }
}

2.feign api引入fallback类
在这里插入图片描述
3. 增加配置

feign:
  sentinel:
    enabled: true

4.自定义sentinel异常处理器,处理返回结果

package com.wemedia.user.exception;

import com.alibaba.csp.sentinel.adapter.spring.webmvc_v6x.callback.BlockExceptionHandler;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wemedia.common.api.CommonResult;
import com.wemedia.common.api.ResultCode;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.stereotype.Component;

import java.io.PrintWriter;

/**
 * 自定义sentinel异常处理器
 */
@Component
public class MyBlockExceptionHandler implements BlockExceptionHandler {
    private ObjectMapper objectMapper=new ObjectMapper();//创建对象转json字符串

    @Override
    public void handle(HttpServletRequest request, HttpServletResponse response, String resourceName, BlockException e) throws Exception {
        response.setContentType("application/json;charset=utf-8");
        PrintWriter writer = response.getWriter();
        CommonResult<Object> failed = CommonResult.failed(ResultCode.RC202, resourceName + "被sentinel限制了,原因msg:" + e.getMessage()+";异常类:"+e.getClass());
        String json = objectMapper.writeValueAsString(failed);
        writer.write(json);
        writer.flush();
        writer.close();
    }
}

8. 参考资料

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐