分布式事务组件 Seata (二)

作者: adm 分类: java 发布时间: 2021-07-21 20:44

分布式事务组件Seata入门
现在大多数是微服务、分布式系统,那如何保证分布式系统下的数据一致性呢?接下来我们从阿里的分布式组件Seata入手来了解下什么是分布式事务,如果实现分布式事务

什么是分布式事务
在了解什么是分布式事务之前首先要了解以下两个概念:

事务
事务是由一组操作构成的可靠的独立的工作单元,组成事务的所有操作只有在所有操作均能正常执行的情况下方能提交,只要其中任一操作执行失败,都将导致整个事务的回滚。事务具备ACID的特性,即原子性、一致性、隔离性和持久性。
本地事务
本地事务就是用关系数据库来控制事务,关系数据库通常都具有ACID特性,传统的单体应用通常会将数据全部存储在一个数据库中,会借助关系数据库来完成事务控制

分布式事务
随着业务的发展业务需求和架构发生了巨大的变化,整体架构由原来的单体应用逐渐拆分成为了微服务,原来的3个服务被从一个单体架构上拆开了,成为了3个独立的服务,分别使用独立的数据源,也不在之前共享同一个数据源了,具体的业务将由三个服务的调用来完成,如图:

每一个服务的内部数据一致性仍然有本地事务来保证。但是面对整个业务流程上的事务应该如何保证呢?这就是在微服务架构下面临的挑战,如何保证在微服务中的数据一致性
常见分布式解决方案
两阶段提交方案/XA方案
两阶段提交(Two-phase Commit,2PC),通过引入协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务。
– 准备阶段
协调者询问参与者事务是否执行成功,参与者发回事务执行结果。

– 提交阶段
如果事务在每个参与者上都执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务。需要注意的是,在准备阶段,参与者执行了事务,但是还未提交。只有在提交阶段接收到协调者发来的通知后,才进行提交或者回滚

补偿事务/TCC
TCC 其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。它分为三个阶段:
– Try 阶段主要是对业务系统做检测及资源预留
– Confirm 阶段主要是对业务系统做确认提交,Try阶段执行成功并开始执行 Confirm阶段时,默认
Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。
– Cancel 阶段主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

优点: 跟2PC比起来,实现以及流程相对简单了一些,但数据的一致性比2PC也要差一些

缺点: 缺点还是比较明显的,在2,3步中都有可能失败。TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。

Spring Cloud Alibaba Seata
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

原理

Seata AT 模式

-前提

基于支持本地 ACID 事务的关系型数据库。
Java 应用,通过 JDBC 访问数据库
-整体机制

两阶段提交协议的演变:

一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源
二阶段:
提交异步化,非常快速地完成
回滚通过一阶段的回滚日志进行反向补偿
-写隔离

一阶段本地事务提交前,需要确保先拿到 全局锁

拿不到 全局锁 ,不能提交本地事务。

拿 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁

以一个示例来说明:

两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。
tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 – 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 – 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁

tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。

如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。
此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。

-读隔离

在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。

如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理

SELECT FOR UPDATE 语句的执行会申请 全局锁 ,如果 全局锁 被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是 已提交 的,才返回。

出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句

-工作机制
以一个示例来说明整个 AT 分支的工作过程

业务表:product

| Field| Type|Key|
|–|–|–|
| id | bigint(20)| PRI|
|name|varchar(100)|
| since | varchar(100)|
AT 分支事务的业务逻辑:

update product set name = ‘GTS’ where name = ‘TXC’;

1. 一阶段
过程:

解析 SQL:得到 SQL 的类型(UPDATE),表(product),条件(where name = ‘TXC’)等相关的信息

查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。

select id, name, since from product where name = ‘TXC’;

得到前镜像:

id name since
1 TXC 2014
执行业务 SQL:更新这条记录的 name 为 ‘GTS’。

查询后镜像:根据前镜像的结果,通过 主键 定位数据。

select id, name, since from product where id = 1`;

得到后镜像:

id name since
1 GTS 2014
插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG 表中。

{
“branchId”: 641789253,
“undoItems”: [{
“afterImage”: {
“rows”: [{
“fields”: [{
“name”: “id”,
“type”: 4,
“value”: 1
}, {
“name”: “name”,
“type”: 12,
“value”: “GTS”
}, {
“name”: “since”,
“type”: 12,
“value”: “2014”
}]
}],
“tableName”: “product”
},
“beforeImage”: {
“rows”: [{
“fields”: [{
“name”: “id”,
“type”: 4,
“value”: 1
}, {
“name”: “name”,
“type”: 12,
“value”: “TXC”
}, {
“name”: “since”,
“type”: 12,
“value”: “2014”
}]
}],
“tableName”: “product”
},
“sqlType”: “UPDATE”
}],
“xid”: “xid:xxx”
}

2. 二阶段-回滚

收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作

通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。

数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍

根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:

update product set name = ‘TXC’ where id = 1;
1
提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。

3. 二阶段-提交

收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。
Seata TCC 模式

Seata Saga 模式

实战演戏

环境:

数据库:MySql 5.6
服务:账户服务、订单服务、库存服务
数据库表: 订单库、账户库、库存库(sql)
服务注册中心和配置中心:Nacos(在此不介绍Nacos服务的部署)
部署seata服务:

下载服务端和源码
地址:https://github.com/seata/seata/releases

配置seata的服务中心和配置中心
修改服务端seata-server-1.1.0\seata\conf 目录下的registry.conf。选择Nacos作为服务中心和配置中心

registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = “nacos”

nacos {
serverAddr = “localhost”
namespace = “”
cluster = “default”
}

config {
# file、nacos 、apollo、zk、consul、etcd3
type = “nacos”

nacos {
serverAddr = “localhost”
namespace = “”
group = “SEATA_GROUP”
}

初始化seata服务数据库配置
修改服务端seata-server-1.1.0\seata\conf 目录下的file.conf。选择db作为全局事务会话信息存储

初始化seata服务端数据库表结构
创建seata库
执行源码目录SQL文件在seata库下。地址:\seata-1.1.0\script\server\db\mysql.sql

初始化seata的Nacos相关配置(依赖Nacos服务已启动)
执行源码目录sh文件或py文件。地址:\seata-1.1.0\script\config-center\nacos
执行完毕之后,打开Nacos的控制台,在配置列表中,可以看到初始化了很多 Group 为 SEATA_GROUP 的配置

启动seata的server端
执行bat文件地址:\seata-server-1.1.0\seata\bin\seata-server.bat
支持的启动参数

代码:

spring-seata项目pom依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<fastjson.version>1.2.41</fastjson.version>
<spring-cloud-alibaba.version>2.2.0.RELEASE</spring-cloud-alibaba.version>
<mybatis-plus.version>3.3.1</mybatis-plus.version>
<druid.version>1.1.21</druid.version>
<lombok.version>1.18.12</lombok.version>
<seata.version>1.1.0</seata.version>
</properties>
<dependencyManagement>
<dependencies>
<!–feign–>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<version>${spring-cloud-alibaba.version}</version>
</dependency>
<!– nacos 作为服务注册中心 –>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>${spring-cloud-alibaba.version}</version>
</dependency>
<!– nacos 作为配置中心 –>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<version>${spring-cloud-alibaba.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
<version>${spring-cloud-alibaba.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>${seata.version}</version>
<exclusions>
<exclusion>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

<!– 子工程自动继承 –>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
</dependencies>

开启全局事务
在需要开启全局事务的方法处,添加注解@GlobalTransactional
@GetMapping(value = “testCommit”)
@GlobalTransactional
public Object testCommit() throws TransactionException {
lock.lock();
try {
Product product = productService.getById(1);
if (product.getStock() > 0) {
LocalDateTime now = LocalDateTime.now();
log.info(“seata分布式事务Id:{}”, RootContext.getXID());
Account account = accountService.getById(1);
Orders orders = new Orders();
orders.setCreateTime(now);
orders.setProductId(product.getId());
orders.setReplaceTime(now);
orders.setSum(1);
orders.setAmount(product.getPrice());
orders.setAccountId(account.getId());
product.setStock(product.getStock() – 1);
account.setSum(account.getSum() != null ? account.getSum() + 1 : 1);
account.setLastUpdateTime(now);
productService.updateById(product);
accountService.updateById(account);
orderService.save(orders);
log.info(“======事务成功结束======”);
return true;
} else {
log.info(“======事务失败结束======”);
return false;
}
} catch (Exception e) {
log.info(“载入事务{}进行回滚” + e.getMessage(), RootContext.getXID());
GlobalTransactionContext.reload(RootContext.getXID()).rollback();
log.info(“======事务回滚======”);
return false;
} finally {
lock.unlock();
log.info(“======释放锁======”);
}
}

测试
直接调用testCommit,根据数据情况观察数据是否回滚

如果觉得我的文章对您有用,请随意赞赏。您的支持将鼓励我继续创作!