封面《セレクトオブリージュ》

完整 demo 地址在 github

前言

因为在后续的工作中使用分布式事务的情况会多很多,所以来回顾一下分布式事务方博后续查询

事务

事务 (Transaction): 事务是一个数据库操作序列,由事务开始与事务结束之间执行的全部数据库操作组成,这些操作要么全部执行,要么全部不执行,是一个不可分割的工作单位。

常见例子就是银行的转账,扣款和增加存款必须一起完成,不能只完成一项

事务的 ACID

  • 原子性 (Atomicity): 事务被视为一个不可分割的最小单位,它要么完全执行,要么完全不执行。
  • 一致性 (Consistency): 一致性保证了事务的执行将数据库从一个一致的状态转变到另一个一致的状态。
  • 隔离性 (Isolation): 隔离性是指当多个事务同时对数据库进行操作时,每个事务都是独立的,一个事务的操作不会影响到其他事务。
  • 持久性 (Durability): 持久性意味着一旦事务被提交,它对数据库的修改就是永久性的,即使系统发生故障也不会丢失。

分布式事务理论

随着互联网的发展,后端系统也从单体应用演变成了多体分布式微服务应用。事务也从本地事务转为了需要分布式事务

CAP 理论

CAP 定理,指的是在一个分布式系统中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼。

  • 一致性 ©:指数据在多个副本之间能够保持一致的状态。当在一个节点上更新了数据,其他节点上的数据也应该同步更新,以保证数据的一致性。‌
  • 可用性 (A):指系统提供的服务必须一直处于可用状态,对于用户的每一个请求,系统总是能够在有限的时间内返回结果,无论成功还是失败,都不会出现网络超时等情况。
  • 分区容错性 §:指分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务。也就是说,当网络出现问题,消息可能会丢失或延迟,但系统应该继续运行。

由于网络分区是分布式系统中一种必然的现象,因此分区容错性是无法避免的,必须在一致性和可用性之间做出选择。这就导致了在分布式系统中,无法同时满足一致性、可用性和分区容错性这三个目标

CAP 理论是分布式系统设计时的重要参考,它提醒设计者在设计系统时要根据实际需求进行权衡和取舍

CAP

BASE 理论

CAP 完全实现强一致性较难,因此有了 BASE (Basically Available Soft State Eventual Consistency) 理论演化,其本质是对 CAP 的衍生,是 AP 方案的补充

  • BA: Basically Available 基本可用,分布式系统在出现故障的时候,允许损失部分可用性,即保证核心可用。
  • S: Soft State 软状态,允许系统存在中间状态,而该中间状态不会影响系统整体可用性。
  • E: Eventual Consistency 最终一致性,系统中的所有数据副本经过一定时间后,最终能够达到一致的状态。

BASE

一致性

分布式中一致性可以分为强一致性、弱一致性和最终一致性

强一致性

任何一次读都能读到某个数据的最近一次写的数据。系统中的所有进程,看到的操作顺序,都和全局时钟下的顺序一致。简言之,在任意时刻,所有节点中的数据是一样的。

弱一致性

数据更新后,如果能容忍后续的访问只能访问到部分或者全部访问不到,则是弱一致性。

最终一致性

不保证在任意时刻任意节点上的同一份数据都是相同的,但是随着时间的迁移,不同节点上的同一份数据总是在向趋同的方向变化。简单说,就是在一段时间后,节点间的数据会最终达到一致状态。

分布式事务类型

二阶段提交 (Two-Phase Commit,2PC)

一阶段

事务协调者向事务参与者发送 prepare 请求,事务参与者收到请求。

二阶段

如果所有事务参与者都返回 prepare_ok,则事务协调者向所有事务参与者发送 commit 请求,事务参与者收到请求后,执行事务提交操作。如果有任何一个事务参与者返回 prepare_fail,则事务协调者向所有事务参与者发送 rollback 请求,事务参与者收到请求后,执行事务回滚操作。

缺点

2PC 存在着以下缺点

  • 同步阻塞问题:在事务过程中,当事务参与者占用资源,其他事务就只能阻塞等待资源释放
  • 单点问题:事务协调者是整个事务的控制中心,一旦事务协调者发生故障,整个事务将无法进行
  • 数据不一致问题:在二阶段提交中,如果事务管理器只发送了部分 commit 消息,此时网络发生异常,那么只有部分参与者接收到 commit 消息,此时系统数据不一致

Seata 的 XA 与 AT

XA 是一种分布式事务协议,由两阶段提交协议(2PC)和 XA 事务管理器组成,XA 协议是由 X/Open 组织提出的,用于支持分布式事务的一种规范。AT 是 Seata 自己实现的一种分布式事务协议,AT 是通过对业务代码进行增强,实现分布式事务的一种方式。
AT 与 XA 的区别在于,XA 是基于数据库二阶段协议实现的,而 AT 是通过对业务代码进行增强来实现的。

TCC (Try-Confirm-Cancel) 模式

关于 TCC(Try-Confirm-Cancel)的概念,最早是由 Pat Helland 于 2007 年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出。 TCC 事务机制相比于上面介绍的 XA,解决了其几个缺点:

  • 解决了协调者单点,由主业务方发起并完成这个业务活动。业务活动管理器也变成多点,引入集群。
  • 同步阻塞:引入超时,超时后进行补偿,并且不会锁定整个资源,将资源转换为业务逻辑形式,粒度变小。
  • 数据一致性,有了补偿机制之后,由业务活动管理器控制一致性

TCC 的操作如下图所示

TCC

TCC 分为了 try、confirm、cancel 三个阶段,分别对应业务的尝试、确认和取消操作。

  • try 阶段:所有业务参与者尝试执行业务,并预留资源
  • confirm 阶段:真正的执行业务阶段,使用 try 阶段预留的资源进行业务处理
  • cancel 阶段:业务发生异常,回滚业务,释放资源

优点

  • 不依赖底层数据库
  • 能够实现跨库事务、跨应用资源

缺点

  • 相较于 AT/XA TCC 是一种代码侵入式方案,需要业务系统自行实现 Try、Confirm、Cancel 三个操作

幂等、空回滚和事务悬挂

对于 TCC 参与者的实现,需要注意各个接口的幂等性,同时允许空回滚,避免事务悬挂

幂等

由于 TCC 业务的特性,可能会导致重试时,业务重复执行,所以需要保证业务 confirm 和 cancel 的幂等性

空回滚

可能由于网络特殊情况,导致 TCC 的 cancel 请求优先于 try 请求到达,这时候不能够进行代码回滚,需要进行空回滚

事务悬挂

同样是在空回滚场景中,try 请求落后于 cancel 请求到达,如果此时执行 try 请求那么预留的资源将会无法释放,此时就发生了事务悬挂。因此对于执行了空回滚的事务操作就不能再执行 try 操作,避免事务悬挂

SAGA 模式

Saga 模式是分布式事务的解决方案之一,理念起源于 1987 年 Hector & Kenneth 发表的 Sagas 论文。它将整个分布式事务流程拆分成多个阶段,每个阶段对应我们的子事务,子事务是本地事务执行的,执行完成就会真实提交。

SAGA 的操作如下,SAGA 事务由多个子事务组成,每个子事务都是一个本地事务,同时每个事务都有一个补偿操作,当事务发生异常时,执行补偿操作。适合用于业务流程长的长事务,或是参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口的事务。

SAGA

优点

一阶段提交本地事务,无锁,高性能

  • 事件驱动架构,参与者可异步执行,高吞吐
  • 补偿服务易于实现

缺点

  • SAGA 事务不保证隔离性,可能会导致脏读、不可重复读、幻读

Seata 代码实现

事前准备

seata 服务端

这里采用 docker 直接部署

1
docker run --name seata-server -p 8091:8091 -p 7091:7091 seataio/seata-server:2.0.0

数据库准备

基础模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@MappedSuperclass
@Data
public class BaseModel {

/**
* 主键
*/
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
protected Long id;

/**
* 创建时间
*/
@Column(insertable = false, updatable = false, columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
protected Date gmtCreate;

/**
* 更新时间
*/
@Column(insertable = false, updatable = false, columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP")
protected Date gmtUpdate;
}

账户模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@EqualsAndHashCode(callSuper = true)
@Data
@Entity
public class Account extends BaseModel {

private Long userId;

private Long money = 0L;

/**
* 用于事务预留
*/
private Long transactionalMoney = 0L;
}

物品模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@EqualsAndHashCode(callSuper = true)
@Data
@Entity
public class Item extends BaseModel{

private String name;

private Integer count = 0;

/**
* 用于事务预留
*/
private Integer transactionalCount = 0;
}

订单模型

1
2
3
4
5
6
7
8
9
10
11
12
13
@EqualsAndHashCode(callSuper = true)
@Data
@Entity(name = "`order`")
public class Order extends BaseModel{

private Long userId;

private Long itemId;

private Integer count;

private Long amount;
}

通用配置

依赖导入,在 pom.xml 里面添加

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/io.seata/seata-spring-boot-starter -->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>

在 resource 下面添加 seata 配置文件 file.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

transport {
# tcp, unix-domain-socket
type = "TCP"
#NIO, NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the tm client batch send request enable
enableTmClientBatchSendRequest = false
# the rm client batch send request enable
enableRmClientBatchSendRequest = true
# the rm client rpc request timeout
rpcRmRequestTimeout = 2000
# the tm client rpc request timeout
rpcTmRequestTimeout = 30000
# the rm client rpc request timeout
rpcRmRequestTimeout = 15000
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
vgroupMapping.my_test_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}

client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
tableMetaCheckerInterval = 60000
reportSuccessEnable = false
sagaBranchRegisterEnable = false
sagaJsonParser = "jackson"
sagaRetryPersistModeUpdate = false
sagaCompensatePersistModeUpdate = false
tccActionInterceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000
sqlParserType = "druid"
branchExecutionTimeoutXA = 60000
connectionTwoPhaseHoldTimeoutXA = 10000
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
defaultGlobalTransactionTimeout = 60000
degradeCheck = false
degradeCheckPeriod = 2000
degradeCheckAllowTimes = 10
interceptorOrder = -2147482648 #Ordered.HIGHEST_PRECEDENCE + 1000
}
undo {
dataValidation = true
onlyCareUpdateColumns = true
logSerialization = "jackson"
logTable = "undo_log"
compress {
enable = true
# allow zip, gzip, deflater, lz4, bzip2, zstd default is zip
type = zip
# if rollback info size > threshold, then will be compress
# allow k m g t
threshold = 64k
}
}
loadBalance {
type = "XID"
virtualNodes = 10
}
}
log {
exceptionRate = 100
}
tcc {
fence {
# tcc fence log table name
logTableName = tcc_fence_log
# tcc fence log clean period
cleanPeriod = 1h
}
}

在 application 下添加 application.yml 配置文件

1
2
seata:
tx-service-group: my_test_tx_group

AT 模式

数据库准备

创建 AT 模式的 undoLog 表

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
ALTER TABLE `undo_log` ADD INDEX `ix_log_created` (`log_created`);

账户业务

1
2
3
4
public interface AccountService {

void debit(Long userId,Long money);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
@Service
public class AccountServiceImpl implements AccountService {

@Autowired
private AccountRepository accountRepository;

@Override
public void debit(Long userId, Long money) {
log.info("start account service xid:{}", RootContext.getXID());
Account account = accountRepository.findAccountByUserId(userId);
if (account.getMoney() < money) {
throw new RuntimeException("余额不足");
}
account.setMoney(account.getMoney() - money);
accountRepository.save(account);
}
}

物品库存业务

1
2
3
4
public interface ItemService {

void purchaseItem(Long itemId, Integer count);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Service
@Slf4j
public class ItemServiceImpl implements ItemService {

@Autowired
private ItemRepository itemRepository;


@Override
public void purchaseItem(Long itemId, Integer count) {
log.info("start item service xid:{}", RootContext.getXID());
Item item = itemRepository.findItemById(itemId);
if (item.getCount() < count){
throw new RuntimeException("库存不足");
}
item.setCount(item.getCount() - count);
itemRepository.save(item);
}
}

订单业务

1
2
3
4
public interface OrderService {

void createOrder(Long userId, Long itemId, Integer count);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {

@Autowired
private OrderRepository orderRepository;

@Autowired
private AccountService accountService;

@Override
public void createOrder(Long userId, Long itemId, Integer count) {
log.info("start order service xid:{}", RootContext.getXID());
Long amount = calculateAmount(itemId, count);

accountService.debit(userId, amount);

Order order = new Order();
order.setUserId(userId);
order.setItemId(itemId);
order.setCount(count);
order.setAmount(amount);
// save order
orderRepository.save(order);
}

private Long calculateAmount(Long itemId, Integer count) {
// calculate amount
return count * 100L;
}
}

业务实现

1
2
3
4
public interface BusinessService {

void purchaseItem(Long userId, Long itemId, Integer count);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
@Service
public class BusinessServiceImpl implements BusinessService {

@Autowired
private ItemService itemService;

@Autowired
private OrderService orderService;

@Override
@GlobalTransactional
public void purchaseItem(Long userId, Long itemId, Integer count) {
log.info("start business service xid:{}", RootContext.getXID());
itemService.purchaseItem(itemId, count);
orderService.createOrder(userId, itemId, count);
}
}

XA 模式

XA 模式与 AT 模式相似,但是需要将配置里面的数据源代理设置为 XA 数据源

1
2
3
4
@Configuration
@EnableAutoDataSourceProxy(dataSourceProxyMode = "XA")
public class XAConfig {
}

TCC 模式

需要注意的是这里的代码没有考虑悬挂幂等

库存业务

1
2
3
4
5
6
7
8
9
10
public interface ItemAction {


boolean decreaseStock(BusinessActionContext actionContext, Long itemId, Integer count);

boolean commit(BusinessActionContext actionContext);

boolean cancel(BusinessActionContext actionContext);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@LocalTCC
@Slf4j
@Service
public class ItemActionImpl implements ItemAction {

@Autowired
private ItemRepository itemRepository;

@TwoPhaseBusinessAction(name = "decreaseStock", commitMethod = "commit", rollbackMethod = "cancel")
@Override
public boolean decreaseStock(BusinessActionContext actionContext, @BusinessActionContextParameter("itemId") Long itemId,
@BusinessActionContextParameter("count") Integer count) {
log.info("开始尝试减少库存 事务xid={} 分支事务bid={}",actionContext.getXid(),actionContext.getBranchId());
Item item = itemRepository.findItemById(itemId);
if (item.getCount() < count) {
throw new RuntimeException("库存不足");
}
item.setCount(item.getCount() - count);
item.setTransactionalCount(item.getTransactionalCount() + count);
itemRepository.save(item);
// 恢复事务金额
BusinessActionContextUtil.addContext("recoverCount",count);
return true;
}

@Override
public boolean commit(BusinessActionContext actionContext) {
log.info("开始提交减少库存 事务xid={} 分支事务bid={}",actionContext.getXid(),actionContext.getBranchId());
Long itemId = ((Number) actionContext.getActionContext("itemId")).longValue();
Integer count = (Integer) actionContext.getActionContext("count");
count = count==null?0:count;//临时解决方案
Item item = itemRepository.findItemById(itemId);
item.setTransactionalCount(item.getTransactionalCount()-count);
itemRepository.save(item);
return true;
}

@Override
public boolean cancel(BusinessActionContext actionContext) {
log.info("开始回滚减少库存 事务xid={} 分支事务bid={}",actionContext.getXid(),actionContext.getBranchId());
Long itemId = ((Number) actionContext.getActionContext("itemId")).longValue();
Integer count = (Integer) actionContext.getActionContext("recoverCount");
count = count==null?0:count;//临时解决方案
Item item = itemRepository.findItemById(itemId);
// 回滚 恢复事务金额
item.setTransactionalCount(item.getTransactionalCount()-count);
item.setCount(item.getCount() + count);
itemRepository.save(item);
return true;
}
}

订单业务

1
2
3
4
5
6
7
8
9
public interface OrderAction {


boolean purchaseItem(BusinessActionContext actionContext,Long userId, Long itemId, Integer count);

boolean commit(BusinessActionContext actionContext);

boolean cancel(BusinessActionContext actionContext);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@LocalTCC
@Service
@Slf4j
public class OrderActionImpl implements OrderAction {

@Autowired
private OrderRepository orderRepository;

@Autowired
private AccountRepository accountRepository;

@TwoPhaseBusinessAction(name = "purchaseItem", commitMethod = "commit", rollbackMethod = "cancel")
@Override
public boolean purchaseItem(BusinessActionContext actionContext, @BusinessActionContextParameter("userId") Long userId,
@BusinessActionContextParameter("itemId") Long itemId,@BusinessActionContextParameter("count") Integer count) {
log.info("开始尝试购买商品 事务xid={} 分支事务bid={}",actionContext.getXid(),actionContext.getBranchId());
Account account = accountRepository.findAccountByUserId(userId);
Long amount = calculateAmount(itemId, count);
if (account.getMoney() < amount) {
throw new RuntimeException("余额不足");
}

account.setTransactionalMoney(account.getTransactionalMoney()+amount);
account.setMoney(account.getMoney()-amount);
accountRepository.save(account);
BusinessActionContextUtil.addContext("amount",amount);
return true;
}

private Long calculateAmount(Long itemId, Integer count) {
return 100L*count;
}

@Override
public boolean commit(BusinessActionContext actionContext) {
log.info("开始提交购买商品 事务xid={} 分支事务bid={}",actionContext.getXid(),actionContext.getBranchId());
Long userId =((Number) actionContext.getActionContext("userId")).longValue();
Integer count = (Integer) actionContext.getActionContext("count");
Long itemId = ((Number) actionContext.getActionContext("itemId")).longValue();
Long amount = ((Number) actionContext.getActionContext("amount")).longValue();
Account account = accountRepository.findAccountByUserId(userId);
account.setTransactionalMoney(account.getTransactionalMoney()-amount);
accountRepository.save(account);

Order order = new Order();
order.setUserId(userId);
order.setItemId(itemId);
order.setCount(count);
order.setAmount(amount);
orderRepository.save(order);

return true;
}

@Override
public boolean cancel(BusinessActionContext actionContext) {
log.info("开始回滚购买商品 事务xid={} 分支事务bid={}",actionContext.getXid(),actionContext.getBranchId());
Long userId = ((Number) actionContext.getActionContext("userId")).longValue();
Number amountNumber = ((Number) actionContext.getActionContext("amount"));
if (Objects.isNull(amountNumber)){
return true;
}
Long amount = amountNumber.longValue();
Account account = accountRepository.findAccountByUserId(userId);
account.setTransactionalMoney(account.getTransactionalMoney()-amount);
account.setMoney(account.getMoney()+amount);
accountRepository.save(account);
return true;
}
}

业务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Service
public class BusinessService {

@Autowired
private ItemAction itemAction;

@Autowired
private OrderAction orderAction;

@GlobalTransactional
public void purchase(Long userId, Long itemId, Integer count) {
itemAction.decreaseStock(null, itemId, count);
orderAction.purchaseItem(null, userId, itemId, count);
}
}

SAGA 模式

数据库准备

这里放的是官方的 h2 数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
CREATE CACHED TABLE "PUBLIC"."SEATA_STATE_INST"(
"ID" VARCHAR NOT NULL COMMENT 'id',
"MACHINE_INST_ID" VARCHAR NOT NULL COMMENT 'state machine instance id',
"NAME" VARCHAR NOT NULL COMMENT 'state name',
"TYPE" VARCHAR COMMENT 'state type',
"SERVICE_NAME" VARCHAR COMMENT 'service name',
"SERVICE_METHOD" VARCHAR COMMENT 'method name',
"SERVICE_TYPE" VARCHAR COMMENT 'service type',
"BUSINESS_KEY" VARCHAR COMMENT 'business key',
"STATE_ID_COMPENSATED_FOR" VARCHAR COMMENT 'state compensated for',
"STATE_ID_RETRIED_FOR" VARCHAR COMMENT 'state retried for',
"GMT_STARTED" TIMESTAMP NOT NULL COMMENT 'start time',
"IS_FOR_UPDATE" TINYINT COMMENT 'is service for update',
"INPUT_PARAMS" CLOB COMMENT 'input parameters',
"OUTPUT_PARAMS" CLOB COMMENT 'output parameters',
"STATUS" VARCHAR NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
"EXCEP" BLOB COMMENT 'exception',
"GMT_UPDATED" TIMESTAMP COMMENT 'update time',
"GMT_END" TIMESTAMP COMMENT 'end time'
);


-- PUBLIC.SEATA_STATE_MACHINE_DEF definition

CREATE CACHED TABLE "PUBLIC"."SEATA_STATE_MACHINE_DEF"(
"ID" VARCHAR NOT NULL COMMENT 'id',
"NAME" VARCHAR NOT NULL COMMENT 'name',
"TENANT_ID" VARCHAR NOT NULL COMMENT 'tenant id',
"APP_NAME" VARCHAR NOT NULL COMMENT 'application name',
"TYPE" VARCHAR COMMENT 'state language type',
"COMMENT_" VARCHAR COMMENT 'comment',
"VER" VARCHAR NOT NULL COMMENT 'version',
"GMT_CREATE" TIMESTAMP NOT NULL COMMENT 'create time',
"STATUS" VARCHAR NOT NULL COMMENT 'status(AC:active|IN:inactive)',
"CONTENT" CLOB COMMENT 'content',
"RECOVER_STRATEGY" VARCHAR COMMENT 'transaction recover strategy(compensate|retry)'
);


-- PUBLIC.SEATA_STATE_MACHINE_INST definition

CREATE CACHED TABLE "PUBLIC"."SEATA_STATE_MACHINE_INST"(
"ID" VARCHAR NOT NULL COMMENT 'id',
"MACHINE_ID" VARCHAR NOT NULL COMMENT 'state machine definition id',
"TENANT_ID" VARCHAR NOT NULL COMMENT 'tenant id',
"PARENT_ID" VARCHAR COMMENT 'parent id',
"GMT_STARTED" TIMESTAMP NOT NULL COMMENT 'start time',
"BUSINESS_KEY" VARCHAR COMMENT 'business key',
"START_PARAMS" CLOB COMMENT 'start parameters',
"GMT_END" TIMESTAMP COMMENT 'end time',
"EXCEP" BLOB COMMENT 'exception',
"END_PARAMS" CLOB COMMENT 'end parameters',
"STATUS" VARCHAR COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
"COMPENSATION_STATUS" VARCHAR COMMENT 'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
"IS_RUNNING" TINYINT COMMENT 'is running(0 no|1 yes)',
"GMT_UPDATED" TIMESTAMP NOT NULL
);

库存业务

1
2
3
4
5
6
7
8
public interface ItemAction {


boolean decreaseStock(String businessKey, Long itemId, Integer count, Map<String,Object> params);


boolean compensateDecreaseStock(String businessKey, Long itemId, Integer count, Map<String,Object> params);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
@Service("itemAction")
public class ItemActionImpl implements ItemAction {

@Autowired
private ItemRepository itemRepository;


@Override
public boolean decreaseStock(String businessKey, Long itemId, Integer count, Map<String, Object> params) {
log.info("开始尝试减少库存 businessKey={}",businessKey);
Item item = itemRepository.findItemById(itemId);
if (item.getCount() < count) {
throw new RuntimeException("库存不足");
}
item.setCount(item.getCount() - count);
itemRepository.save(item);
return true;
}

@Override
public boolean compensateDecreaseStock(String businessKey, Long itemId, Integer count, Map<String, Object> params) {
log.info("开始尝试回滚减少库存 businessKey={}",businessKey);
Item item = itemRepository.findItemById(itemId);
item.setCount(item.getCount() + count);
itemRepository.save(item);
return true;
}


}

订单业务

1
2
3
4
5
6
7
public interface OrderAction {


boolean purchaseItem(String businessKey, Long userId, Long itemId, Integer count, Map<String,Object> params);

boolean compensatePurchaseItem(String businessKey, Long userId, Long itemId, Integer count, Map<String,Object> params);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Slf4j
@Service("orderAction")
public class OrderActionImpl implements OrderAction {

@Autowired
private OrderRepository orderRepository;

@Autowired
private AccountRepository accountRepository;


@Override
public boolean purchaseItem(String businessKey, Long userId, Long itemId, Integer count, Map<String, Object> params) {
log.info("开始尝试购买商品 businessKey={}",businessKey);
Account account = accountRepository.findAccountByUserId(userId);
Long amount = calculateAmount(itemId, count);
if (account.getMoney() < amount) {
throw new RuntimeException("余额不足");
}

account.setMoney(account.getMoney()-amount);
accountRepository.save(account);
Order order = new Order();
order.setItemId(itemId);
order.setUserId(userId);
order.setCount(count);
order.setAmount(amount);
orderRepository.save(order);
params.put("orderId",order.getId());
return true;
}

private Long calculateAmount(Long itemId, Integer count) {
return 100L*count;
}

@Override
public boolean compensatePurchaseItem(String businessKey, Long userId, Long itemId, Integer count, Map<String, Object> params) {
log.info("开始尝试回滚购买商品 businessKey={}",businessKey);
Long orderId = (Long) params.getOrDefault("orderId",null);
if (Objects.isNull(orderId)) {
return true;
}
Account account = accountRepository.findAccountByUserId(userId);
Long amount = calculateAmount(itemId, count);

account.setMoney(account.getMoney()+amount);
accountRepository.save(account);


orderRepository.deleteById(orderId);

return true;
}

}

状态机设计

seata 提供了一个状态机设计网址 https://seata.apache.org/saga-designer/

saga状态机

状态机 json 如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
{
"Name": "StateMachine-purchase",
"Comment": "This state machine is modeled by designer tools.",
"Version": "0.0.1",
"style": {
"bounds": {
"x": 362,
"y": 42,
"width": 36,
"height": 36
}
},
"States": {
"ServiceTask-ReduceItem": {
"style": {
"bounds": {
"x": 330,
"y": 110,
"width": 100,
"height": 80
}
},
"Name": "ServiceTask-ReduceItem",
"IsForCompensation": false,
"Input": [
"$.[businessKey]",
"$.[itemId]",
"$.[count]"
],
"Output": {},
"Status": {},
"Retry": [],
"ServiceName": "itemAction",
"ServiceMethod": "decreaseStock",
"Catch": [
{
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger-jykd7ys"
}
],
"CompensateState": "ServiceTask-compensationReduceItem",
"Type": "ServiceTask",
"catch": {
"style": {
"bounds": {
"x": 412,
"y": 162,
"width": 36,
"height": 36
}
},
"edge": {
"CompensationTrigger-jykd7ys": {
"style": {
"waypoints": [
{
"original": {
"x": 448,
"y": 180
},
"x": 448,
"y": 180
},
{
"x": 502,
"y": 180
},
{
"original": {
"x": 522,
"y": 180
},
"x": 522,
"y": 180
}
],
"source": "ServiceTask-ReduceItem",
"target": "CompensationTrigger-jykd7ys"
},
"Type": "ExceptionMatch"
}
}
},
"edge": {
"ServiceTask-compensationReduceItem": {
"style": {
"waypoints": [
{
"original": {
"x": 330,
"y": 150
},
"x": 330,
"y": 150
},
{
"x": 260,
"y": 150
},
{
"original": {
"x": 240,
"y": 150
},
"x": 240,
"y": 150
}
],
"source": "ServiceTask-ReduceItem",
"target": "ServiceTask-compensationReduceItem"
},
"Type": "Compensation"
},
"ServiceTask-CreateOrder": {
"style": {
"waypoints": [
{
"original": {
"x": 380,
"y": 190
},
"x": 380,
"y": 190
},
{
"x": 380,
"y": 300
},
{
"original": {
"x": 380,
"y": 320
},
"x": 380,
"y": 320
}
],
"source": "ServiceTask-ReduceItem",
"target": "ServiceTask-CreateOrder"
},
"Type": "Transition"
}
},
"Next": "ServiceTask-CreateOrder"
},
"Fail-1r9z2ce": {
"style": {
"bounds": {
"x": 522,
"y": 262,
"width": 36,
"height": 36
}
},
"Name": "Fail-1r9z2ce",
"ErrorCode": "1",
"Message": "PURCHASE_FAIL",
"Type": "Fail"
},
"ServiceTask-CreateOrder": {
"style": {
"bounds": {
"x": 330,
"y": 320,
"width": 100,
"height": 80
}
},
"Name": "ServiceTask-CreateOrder",
"IsForCompensation": false,
"Input": [
"$.[businessKey]",
"$.[userId]",
"$.[itemId]",
"$.[count]",
"$.#root"
],
"Output": {},
"Status": {},
"Retry": [],
"ServiceName": "orderAction",
"ServiceMethod": "purchaseItem",
"Catch": [
{
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger-gd1lu4g"
}
],
"CompensateState": "ServiceTask-compensationCreateOrder",
"Type": "ServiceTask",
"catch": {
"style": {
"bounds": {
"x": 412,
"y": 382,
"width": 36,
"height": 36
}
},
"edge": {
"CompensationTrigger-gd1lu4g": {
"style": {
"waypoints": [
{
"original": {
"x": 448,
"y": 400
},
"x": 448,
"y": 400
},
{
"x": 502,
"y": 400
},
{
"original": {
"x": 522,
"y": 400
},
"x": 522,
"y": 400
},
{
"original": {
"x": 540,
"y": 378
},
"x": 540,
"y": 378
}
],
"source": "ServiceTask-CreateOrder",
"target": "CompensationTrigger-gd1lu4g"
},
"Type": "ExceptionMatch"
}
}
},
"Next": "Succeed-fq6cmgu",
"edge": {
"Succeed-fq6cmgu": {
"style": {
"waypoints": [
{
"original": {
"x": 380,
"y": 400
},
"x": 380,
"y": 400
},
{
"x": 380,
"y": 422
},
{
"original": {
"x": 380,
"y": 442
},
"x": 380,
"y": 442
}
],
"source": "ServiceTask-CreateOrder",
"target": "Succeed-fq6cmgu"
},
"Type": "Transition"
},
"ServiceTask-compensationCreateOrder": {
"style": {
"waypoints": [
{
"original": {
"x": 330,
"y": 360
},
"x": 330,
"y": 360
},
{
"x": 260,
"y": 360
},
{
"original": {
"x": 240,
"y": 360
},
"x": 240,
"y": 360
}
],
"source": "ServiceTask-CreateOrder",
"target": "ServiceTask-compensationCreateOrder"
},
"Type": "Compensation"
}
}
},
"ServiceTask-compensationReduceItem": {
"style": {
"bounds": {
"x": 140,
"y": 110,
"width": 100,
"height": 80
}
},
"Name": "ServiceTask-compensationReduceItem",
"IsForCompensation": true,
"Input": [
"$.[businessKey]",
"$.[itemId]",
"$.[count]"
],
"Output": {},
"Status": {},
"Retry": [],
"ServiceName": "itemAction",
"ServiceMethod": "compensateDecreaseStock",
"Type": "ServiceTask"
},
"ServiceTask-compensationCreateOrder": {
"style": {
"bounds": {
"x": 140,
"y": 320,
"width": 100,
"height": 80
}
},
"Name": "ServiceTask-compensationCreateOrder",
"IsForCompensation": true,
"Input": [
"$.[businessKey]",
"$.[userId]",
"$.[itemId]",
"$.[count]",
"$.#root"
],
"Output": {},
"Status": {},
"Retry": [],
"ServiceName": "orderAction",
"ServiceMethod": "compensatePurchaseItem",
"Type": "ServiceTask"
},
"CompensationTrigger-gd1lu4g": {
"style": {
"bounds": {
"x": 522,
"y": 382,
"width": 36,
"height": 36
}
},
"Name": "CompensationTrigger-gd1lu4g",
"Type": "CompensationTrigger",
"Next": "Fail-1r9z2ce",
"edge": {
"Fail-1r9z2ce": {
"style": {
"waypoints": [
{
"original": {
"x": 540,
"y": 382
},
"x": 540,
"y": 382
},
{
"x": 540,
"y": 318
},
{
"original": {
"x": 540,
"y": 298
},
"x": 540,
"y": 298
}
],
"source": "CompensationTrigger-gd1lu4g",
"target": "Fail-1r9z2ce"
},
"Type": "Transition"
}
}
},
"Succeed-fq6cmgu": {
"style": {
"bounds": {
"x": 362,
"y": 442,
"width": 36,
"height": 36
}
},
"Name": "Succeed-fq6cmgu",
"Type": "Succeed"
},
"CompensationTrigger-jykd7ys": {
"style": {
"bounds": {
"x": 522,
"y": 162,
"width": 36,
"height": 36
}
},
"Name": "CompensationTrigger-jykd7ys",
"Type": "CompensationTrigger",
"Next": "Fail-1r9z2ce",
"edge": {
"Fail-1r9z2ce": {
"style": {
"waypoints": [
{
"original": {
"x": 540,
"y": 198
},
"x": 540,
"y": 198
},
{
"x": 540,
"y": 242
},
{
"original": {
"x": 540,
"y": 262
},
"x": 540,
"y": 262
}
],
"source": "CompensationTrigger-jykd7ys",
"target": "Fail-1r9z2ce"
},
"Type": "Transition"
}
}
}
},
"StartState": "ServiceTask-ReduceItem",
"edge": {
"style": {
"waypoints": [
{
"original": {
"x": 380,
"y": 78
},
"x": 380,
"y": 78
},
{
"x": 380,
"y": 90
},
{
"original": {
"x": 380,
"y": 110
},
"x": 380,
"y": 110
}
],
"target": "ServiceTask-ReduceItem"
},
"Type": "Transition"
}
}

SAGA 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Configuration
public class SAGAConfig {

@Autowired
private DataSource dataSource;

@Bean
public DbStateMachineConfig dbStateMachineConfig(){
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 20, 60, java.util.concurrent.TimeUnit.SECONDS, new java.util.concurrent.LinkedBlockingQueue<>(1000));
DbStateMachineConfig config = new DbStateMachineConfig();
config.setDataSource(dataSource);
config.setResources(new String[]{"classpath*:saga/*.json"});
config.setEnableAsync(true);
config.setThreadPoolExecutor(executor);
return config;
}

@Bean
public StateMachineEngine stateMachineEngine() {
ProcessCtrlStateMachineEngine engine = new ProcessCtrlStateMachineEngine();
engine.setStateMachineConfig(dbStateMachineConfig());
return engine;
}

@Bean
public StateMachineEngineHolder stateMachineEngineHolder(){
StateMachineEngineHolder engineHolder = new StateMachineEngineHolder();
engineHolder.setStateMachineEngine(stateMachineEngine());
return engineHolder;
}
}

完整 demo

完整 demo 地址在 github

参考文献