1. 前言

       分布式事务一直是微服务,分布式应用的难点,还好阿里发布的了不少开源项目,对小羊的云商这样的分布式应用来说帮助非常大。其中dubbo/nacos/seata都是来源于阿里的开源项目,这里要感谢阿里对社会的贡献了。

小羊云商是依赖spring-cloud-alibaba来建设,方便于兼容spring cloud的基础组件,其中最大的差别是在于将内部远程调用方式是采用dubbo的RPC调用,而不是Spring cloud feign client风格的REST调用。

2. 项目依赖包

本文涉及软件环境如下:


依赖包的版本

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-dependencies</artifactId>
    <version>2.1.0.RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>
 
<dependency>
    <groupId>org.apache.dubbo</groupId>
    <artifactId>dubbo</artifactId>
  <version>2.7.4.1</version>
</dependency>
 
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-all</artifactId>
    <version>1.0.0</version>
</dependency>
 
<dependency>
    <groupId>com.alibaba.nacos</groupId>
    <artifactId>nacos-client</artifactId>
    <version>1.1.4</version>
</dependency>

目前dubbo最新的版本为2.7.5, 到这边文章写出来为止, 官网还不建议将2.7.5用于生产环境,2.7.5有一个类似spring cloud一样的服务内省的功能还处于BETA版, 目前dubbo向注册中心nacos注册的单位还是service接口级别,性能还稍微差一些, dubbo下个版本将会支持应用级别。


在Idea中看到的依赖包如下:





3、业务场景

为了简化流程,我们只需要创建同一张表t_trans_test, 分布存在于不同的服务中,每个服务单独对应一个数据库。然后在web层调用其中一个服务shop-serivice,再由这个服务远程调用其他4个微服务。服务列表如下:



为了简单,这个业务表只是简单包含以下几个字段:


--测试业务表
CREATETABLE`t_trans_test` (
  `id`int(11)NOTNULLCOMMENT'主键',
  `name`varchar(255)DEFAULTNULLCOMMENT'名字',
  `rec_date` datetimeDEFAULTNULLCOMMENT'记录时间',
  PRIMARYKEY(`id`)
) ENGINE=InnoDBDEFAULTCHARSET=utf8 COMMENT='seata分布式事务测试表';
 
--事务回滚表
CREATETABLE`undo_log` (
  `id`bigint(20)NOTNULLAUTO_INCREMENT,
  `branch_id`bigint(20)NOTNULL,
  `xid`varchar(100)NOTNULL,
  `context`varchar(128)NOTNULL,
  `rollback_info` longblobNOTNULL,
  `log_status`int(11)NOTNULL,
  `log_created` datetimeNOTNULL,
  `log_modified` datetimeNOTNULL,
  PRIMARYKEY(`id`),
  UNIQUEKEY`ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=131DEFAULTCHARSET=utf8;



业务流程解析:

以保存一条记录为例,

用户通过访问web层,web层通过远程RPC调用shop-service的接口,存入一条记录,同时shop-service也会远程调用其他四个微服务, 如果5个微服务都同时成功那就会一起提交到数据库, 如果其中一个服务出现问题,则会出现全部回滚的现象。


4、软件安装

   seata的安装请参考seata官网或者其他文章,seata跟其他框架的整合请看:  https://github.com/seata/seata-samples


5、具体设计

  5.1 使用seata-spring-boot-starter外部化配置

由于seata的默认配置是采用文件的方式,在spring boot的环境下无法支持多个环境的配置。 因此需要引入seata-spring-boot-starter包。

seata-spring-boot-starter是使用springboot自动装配来简化seata-all的复杂配置。1.0.0可用于替换seata-all,

GlobalTransactionScanner自动初始化(依赖SpringUtils)若其他途径实现GlobalTransactionScanner初始化,请保证io.seata.spring.boot.autoconfigure.util.SpringUtils先初始化;
seata-spring-boot-starter默认开启数据源自动代理,用户若再手动配置DataSourceProxy将会导致异常。由于legendDao支持分布式的ID生产策略,因为ID每次生成就不能再回滚,因此不能使用默认的数据源开启代理,需要手动设置数据源代理,要将ID库排除在数据源代理之外。


<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>1.0.0</version>
</dependency>


spring boot的配置如下


server:
  port:8181
spring:
  application:
    name:@artifactId@
  cloud:
    nacos:
      config:
        server-addr: ${NACOS-HOST:192.168.0.146}:${NACOS-PORT:8848}
        shared-dataids:application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}
        file-extension:yml
  profiles:
    active:@profiles.active@


通过配置文件可见,application.yml的配置将会配置在nacos配置中心之中,那单独看一下seata在nacos中的配置:


seata:
  enabled:true
  application-id:${spring.application.name}
   #事务群组(可以每个应用独立取名,也可以使用相同的名字)
  tx-service-group:my_test_tx_group
  client:
    rm-report-success-enable:true
    rm-table-meta-check-enable:false# 自动刷新缓存中的表结构(默认false)
    rm-report-retry-count:5# 一阶段结果上报TC重试次数(默认5)
    rm-async-commit-buffer-limit:10000# 异步提交缓存队列长度(默认10000)
    rm:
      lock:
        lock-retry-internal:10# 校验或占用全局锁重试间隔(默认10ms)
        lock-retry-times:   30# 校验或占用全局锁重试次数(默认30)
        lock-retry-policy-branch-rollback-on-conflict:true# 分支事务与其它全局回滚事务冲突时锁策略
    tm-commit-retry-count:  3# 一阶段全局提交结果上报TC重试次数(默认1次,建议大于1)
    tm-rollback-retry-count:3# 一阶段全局回滚结果上报TC重试次数(默认1次,建议大于1)
    undo:
      undo-data-validation:true# 二阶段回滚镜像校验(默认true开启)
      undo-log-serialization:jackson# undo序列化方式(默认jackson)
      undo-log-table:undo_log # 自定义undo表名(默认undo_log)
    log:
      exceptionRate:100# 日志异常输出概率(默认100)
    support:
      spring:
        datasource-autoproxy:false #为了照顾idDatasource不用DataSourceProxy,因此需要配置为false
  service:
    vgroup-mapping:default# TC 集群(必须与seata-server保持一致)
    enable-degrade:false# 降级开关
    disable-global-transaction:false #禁用全局事务(默认false)
    grouplist:${SEATA-HOST:172.18.102.181}:${SEATA-PORT:8091}
  transport:
    shutdown:
      wait:3
    thread-factory:
      boss-thread-prefix:NettyBoss
      worker-thread-prefix:NettyServerNIOWorker
      server-executor-thread-prefix:NettyServerBizHandler
      share-boss-worker:false
      client-selector-thread-prefix:NettyClientSelector
      client-selector-thread-size:1
      client-worker-thread-prefix:NettyClientWorkerThread
    type:TCP
    server:NIO
    heartbeat:true
    serialization:seata
    compressor:none
    enable-client-batch-send-request:true# 客户端事务消息请求是否批量合并发送(默认true)
  registry:
    file:
      name:file.conf
    type:nacos
    nacos:
      server-addr:${NACOS-HOST:172.18.102.181}:${NACOS-PORT:8848}
      namespace:369355a6-eb56-4864-bee9-75f4bd4592be
      cluster:default
  config:
    file:
      name:file.conf
    type:file
    nacos:
      namespace:369355a6-eb56-4864-bee9-75f4bd4592be
      server-addr:${NACOS-HOST:172.18.102.181}:${NACOS-PORT:8848}


NACOS-HOST的地址是由启动命令中传入,在nacos中也指明的默认的地址为172.18.102.181

nacos的namespace是为了将配置分离到不同的命令空间,让他更容易找。registry模式是采用nacos方式, config配置方式可以采用本地文件的方式,本地文件file.conf如下,需要放在类的上下文中。


# seata 本地的配置,不可缺少
transport{
  # tcp udt unix-domain-socket
  type ="TCP"
  #NIO NATIVE
  server ="NIO"
  #enable heartbeat
  heartbeat =true
  # the client batch send request enable
  enable-client-batch-send-request =true
  #thread factory for netty
  thread-factory{
    boss-thread-prefix ="NettyBoss"
    worker-thread-prefix ="NettyServerNIOWorker"
    server-executor-thread-prefix ="NettyServerBizHandler"
    share-boss-worker =false
    client-selector-thread-prefix ="NettyClientSelector"
    client-selector-thread-size =1
    client-worker-thread-prefix ="NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    boss-thread-size =1
    #auto default pin or 8
    worker-thread-size =8
  }
  shutdown{
    # when destroy server, wait seconds
    wait =3
  }
  serialization ="seata"
  compressor ="none"
}
service{
  #transaction service group mapping
  vgroup_mapping.my_test_tx_group ="default"
  #only support when registry.type=file, please don't set multiple addresses
  # default.grouplist = "192.168.0.146:8091" 已经在nacos中配置,无需再配置
  #degrade, current not support
  enableDegrade =false
  #disable seata
  disableGlobalTransaction =false
}
 
client{
  rm{
    async.commit.buffer.limit =10000
    lock{
      retry.internal =10
      retry.times =30
      retry.policy.branch-rollback-on-conflict =true
    }
    report.retry.count =5
    table.meta.check.enable =false
    report.success.enable =true
  }
  tm{
    commit.retry.count =5
    rollback.retry.count =5
  }
  undo{
    data.validation =true
    log.serialization ="jackson"
    log.table ="undo_log"
  }
  log{
    exceptionRate =100
  }
  support{
    # auto proxy the DataSource bean
    spring.datasource.autoproxy =false
  }
}

5.2 业务代码

Entity实体类如下:


/**
 * 测试seata的分布式事务.演示不需要加入@Column的方式
 */
@Entity
@Table(name ="t_trans_test")
@Data
publicclassTransTestimplementsGenericEntity<Long> {
 
    @Id
    @GeneratedValue(strategy = GenerationType.TABLE, generator ="generator")
    @TableGenerator(name ="generator", pkColumnValue ="TRANS_TEST_SEQ")
    Long id;
 
    /** 名称 */
    String name;
 
    /** 连接地址 */
    Date recDate;
 
}


Controller如下:


/**
 * seata分布式事务测试Dao,测试分布式事务,同时调用其他5个服务
 */
@RestController
@Slf4j
@RequestMapping(value="/transTest")
publicclassTransTestController {
 
    @ShopReference
    privateShopTransAllTestService shopTransAllTestService;
 
    /**
     * 保存
     */
    @RequestMapping(value="/saveTransTest/{name}")
    publicLong saveTransTest(@PathVariableString name){
        TransTest transTest =newTransTest();
        transTest.setName(name);
        transTest.setRecDate(newDate());
        log.info("saveTransTest {}", transTest);
 
        returnshopTransAllTestService.saveTransTest(transTest);
    }
}


ShopTransAllTestService的实现代码如下:


@GlobalTransactional(timeoutMills =300000)
publicLong saveTransTest(TransTest transTest) {
    System.out.println("Task 开始全局事务,XID = "+ RootContext.getXID());
    log.info("saveTransTest {}", transTest);
 
    Long result = 0l;
    result += shopTransTestService.saveTransTest(transTest);
    result += prodTransTestService.saveTransTest(transTest);
    result += orderTransTestService.saveTransTest(transTest);
    result += logTransTestService.saveTransTest(transTest);
    result += basicTransTestService.saveTransTest(transTest);
    returnresult;
}


GlobalTransactional用于标记启动全局事务。

其中一个服务BasicTransTestService的代码如下:


@Override
publicLong saveTransTest(TransTest transTest) {
    System.out.println("Basic 全局事务id :"+ RootContext.getXID());
    returntransTestDao.saveTransTest(transTest);
}
TransTestDao实现类
?
@Override
publicLong saveTransTest(TransTest transTest) {
    String suffix ="-basic";
    if(transTest.getName() !=null&& !transTest.getName().endsWith(suffix)){
        transTest.setName(transTest.getName() + suffix);
 
    }
    Long result = save(transTest);
 
    if(true){
        thrownewBusinessException("some error happened!");
    }
    returnresult;
}


if(true)语句专门构造一个运行时异常。


最后不要忘了做数据源代理


<!-- 数据源定义 -->
<bean id="shopDataSource"parent="publicDataSource">
    <property name="url"value="${shop.jdbc.url}"/>
    <property name="username"value="${shop.jdbc.username}"/>
    <property name="password"value="${shop.jdbc.password}"/>
</bean>
 
<!-- 注册所有的数据源 -->
<bean id="dataSource"class="com.legendshop.dao.shards.datasource.DataSourceHolder">
    <property name="dataSourceMap">
        <map>
            <!-- 主键数据源 -->
            <entry key="idDataSource"value-ref="idDataSource"></entry>
            <!-- 默认数据源 -->
            <entry key="dataSource"value-ref="shopDataSourceProxy"></entry>
            <entry key="shopDataSource"value-ref="shopDataSourceProxy"></entry>
        </map>
    </property>
</bean>
 
<bean id="shopDataSourceProxy"class="io.seata.rm.datasource.DataSourceProxy">
    <constructor-arg index="0"ref="shopDataSource"/>
</bean>


可见ID库并没有做数据源的代理,是不会参与到事务回滚当中, 其余数据源必须要用io.seata.rm.datasource.DataSourceProxy代理,否则无法回滚。