分布式事务
一、分布式事务基础
什么是事务?
事务指的就是一个操作单元,在这个操作单元中的所有操作终要保持一致的行为,要么所有操作都成功,要么所有的操作都被撤销。简单地说,事务提供一种“要么什么都不做,要么做全套”机制
本地事物
本地事物其实可以认为是数据库提供的事务机制。说到数据库事务就不得不说,数据库事务中的四
大特性:
A:原子性(Atomicity),一个事务中的所有操作,要么全部完成,要么全-部不完成
C:一致性(Consistency),在一个事务执行之前和执行之后数据库都必须处于一致性状态
I:隔离性(Isolation),在并发环境中,当不同的事务同时操作相同的数据时,事务之间互不影响
D:持久性(Durability),指的是只要事务成功结束,它对数据库所做的更新就必须的保存下来
数据库事务在实现时会将一次事务涉及的所有操作全部纳入到一个不可分割的执行单元,该执行单元中的所有操作要么都成功,要么都失败,只要其中任一操作执行失败,都将导致整个事务的回滚
分布式事务
分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。
简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。
本质上来说,分布式事务就是为了保证不同数据库的数据一致性。
分布式事务的场景
- 单体系统访问多个数据库
一个服务需要调用多个数据库实例完成数据的增删改操作
- 多个微服务访问同一个数据库
多个服务需要调用一个数据库实例完成数据的增删改操作
- 多个微服务访问多个数据库
多个服务需要调用一个数据库实例完成数据的增删改操作
二、分布式事务解决方案
全局事务
全局事务基于DTP模型实现。DTP是由X/Open组织提出的一种分布式事务模型——X/Open Distributed Transaction Processing Reference Model。它规定了要实现分布式事务,需要三种⻆色:
AP: Application 应用系统 (微服务)
TM: Transaction Manager 事务管理器 (全局事务管理)
RM: Resource Manager 资源管理器 (数据库)
整个事务分成两个阶段:
阶段一: 表决阶段,所有参与者都将本事务执行预提交,并将能否成功的信息反馈发给协调者。
阶段二: 执行阶段,协调者根据所有参与者的反馈,通知所有参与者,步调一致地执行提交或者回滚。
优点
- 提高了数据一致性的概率,实现成本较低
缺点
单点问题: 事务协调者宕机
同步阻塞: 延迟了提交时间,加⻓了资源阻塞时间
数据不一致: 提交第二阶段,依然存在commit结果未知的情况,有可能导致数据不一致
可靠消息服务
基于可靠消息服务的方案是通过消息中间件保证上、下游应用数据操作的一致性。假设有A和B两个系统,分别可以处理任务A和任务B。此时存在一个业务流程,需要将任务A和任务B在同一个事务中处理。就可以使用消息中间件来实现这种分布式事务。
RocketMQ事务消息流程图
1)事务消息发送及提交
(1) 发送消息(half消息)
(2) 服务端响应消息写入结果
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可⻅,本地逻辑不执行)
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生产消息索引,消息对消费者可⻅)
2) 事务补偿
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对于的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用户解决消息Commit或者Rollback发生超时或者失效的情况
3) 事务消息状态
事务消息共有三种状态,提交状态,回查状态,中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息
- TransactionStatus.RollbackTransaction: 回滚事务,它代表消息将被删除,不允许被消费
- TransactionStatus.Unknown: 中间状态,它代表需要消息队列来确认状态
消息生产者实现
发送代码如下:
Message<OperateIntergralVo> message =
MessageBuilder.withPayload(vo).setHeader("orderNo",orderNo).build();
TransactionSendResult sendResult =
rocketMQTemplate.sendMessageInTransaction("tx_group", "tx_topic", message,
orderNo);
String sendStatus = sendResult.getSendStatus().name();
String localTXState = sendResult.getLocalTransactionState().name();
og.info(">>>> send status={},localTransactionState={}
<<<<",sendStatus,localTXState);
if(sendResult.getLocalTransactionState().equals(LocalTransactionState.COMMIT_ME
SSAGE)){
return "退款成功";
}else{
return "退款失败";
}
创建事务消息生产者端的消息监听器,注意是生产者,不是消费者,我们需要实现的是RocketMQLocalTransactionListener接口,代码如下:
@RocketMQTransactionListener(txProducerGroup = "tx_group")
@Slf4j
public class OrderTXMsgListener implements RocketMQLocalTransactionListener {
@Autowired
private IOrderInfoService orderInfoService;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
Object arg) {
log.info("执行本地事务");
RocketMQLocalTransactionState result =
RocketMQLocalTransactionState.COMMIT;
try {
String orderNo = (String) arg;
orderInfoService.changeOrderStatusToRefund(orderNo);
} catch (Exception e) {
result = RocketMQLocalTransactionState.ROLLBACK;
}
return result;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String orderNo = (String) msg.getHeaders().get("orderNo");
if(!StringUtils.isEmpty(orderNo)){
OrderInfo orderInfo = orderInfoService.getOrderStatus(orderNo);
if(OrderInfo.STATUS_REFUND.equals(orderInfo.getStatus())){
return RocketMQLocalTransactionState.COMMIT;
}
}
TCC事务
TCC即为Try Confifirm Cancel,它属于补偿型分布式事务。TCC实现分布式事务一共有三个步骤:
- Try:尝试待执行的业务
这个过程并未执行业务,只是完成所有业务的一致性检查,并预留好执行所需的全部资源
- Confifirm:确认执行业务
确认执行业务操作,不做任何业务检查, 只使用Try阶段预留的业务资源。通常情况下,采用TCC则认为 Confifirm阶段是不会出错的。即:只要Try成功,Confifirm一定成功。若Confifirm阶段真的出错了,需引入重试机制或人工处理。
- Cancel:取消待执行的业务
取消Try阶段预留的业务资源。通常情况下,采用TCC则认为Cancel阶段也是一定成功的。若Cancel阶段真的出错了,需引入重试机制或人工处理。
return RocketMQLocalTransactionState.ROLLBACK;
}
}
TCC两阶段提交与XA两阶段提交的区别是:
XA是资源层面的分布式事务,强一致性,在两阶段提交的整个过程中,一直会持有资源的锁。
TCC是业务层面的分布式事务,终一致性,不会一直持有资源的锁。
TCC事务的优缺点:
优点 :把数据库层的二阶段提交上提到了应用层来实现,规避了数据库层的2PC性能低下问题。
缺点 :TCC的Try、Confifirm和Cancel操作功能需业务提供,开发成本高。
三、Seata分布式事务解决方案
2019 年 1 月,阿里巴巴中间件团队发起了开源项目 Fescar (Fast & EaSy Commit And Rollback),其愿景是让分布式事务的使用像本地事务的使用一样,简单和高效,并逐步解决开发者们遇到的分布式事务方面的所有难题。后来更名为 Seata ,意为:Simple Extensible Autonomous Transaction Architecture,是一套分布式事务解决方案。
Seata的设计目标是对业务无侵入,因此从业务无侵入的2PC方案着手,在传统2PC的基础上演进。
它把一个分布式事务理解成一个包含了若干分支事务的全局事务。全局事务的职责是协调其下管辖的分
支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据
库的本地事务。
3.1 Seata-At模式
Seata主要由三个重要组件组成:
TC:Transaction Coordinator 事务协调器,管理全局的分支事务的状态,用于全局性事务的提交和回滚。
TM:Transaction Manager 事务管理器,用于开启、提交或者回滚全局事务。
RM:Resource Manager 资源管理器,用于分支事务上的资源管理,向TC注册分支事务,上报分支事务的状态,接受TC的命令来提交或者回滚分支事务。
Seata-AT模式的执行流程如下:
A服务的TM向TC申请开启一个全局事务,TC就会创建一个全局事务并返回一个的XID
A服务的RM向TC注册分支事务,并及其纳入XID对应全局事务的管辖
A服务执行分支事务,向数据库做操作4. A服务开始远程调用B服务,此时XID会在微服务的调用链上传播
B服务的RM向TC注册分支事务,并将其纳入XID对应的全局事务的管辖
B服务执行分支事务,向数据库做操作
全局事务调用链处理完毕,TM根据有无异常向TC发起全局事务的提交或者回滚
TC协调其管辖之下的所有分支事务, 决定是否回滚
Seata-AT模式实现2PC与传统2PC的差别 :
架构层次方面,传统2PC方案的 RM 实际上是在数据库层,RM本质上就是数据库自身,通过XA协议实现,而 Seata的RM是以jar包的形式作为中间件层部署在应用程序这一侧的。
两阶段提交方面,传统2PC无论第二阶段的决议是commit还是rollback,事务性资源的锁都要保持到Phase2完成才释放。而Seata的做法是在Phase1 就将本地事务提交,这样就可以省去Phase持锁的时间,整体提高效率。
3.2 秒杀项目集成Seata
启动Seata-server
1.上传,将seata-server-1.3.0.zip
上传到/usr/local/software
目录下
2.解压文件到指定目录
unzip /usr/local/software/seata-server-1.3.0.zip -d /usr/local
3.修改日志配置文件,否则启动控制台乱码(如果是window的情况需要修改如下配置)
vi /usr/local/seata/conf/logback.xml
原配置如下:
<property name="CONSOLE_LOG_PATTERN" value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(%5p) %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n%wEx"/>
修改成如下格式:
<property name="CONSOLE_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %5p --- %[%15.15t] %-40.40logger{39} : %m%n%wEx"/>
此问题是因为开发者为seata1.3.0添加字体颜色,而在window中的shell脚本内不显示发生的乱码错误
4.修改registry.config文件
vi /usr/local/seata/conf/registry.conf
修改内容如下:[注意需要把下面nacos的IP地址
修改成实际地址]
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "nacos的IP地址:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = ""
password = ""
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos {
serverAddr = "nacos的IP地址:8848"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
}
}
5.启动seata-server
nohup /usr/local/seata/bin/seata-server.sh -h 目前所在服务器ip地址 -p 7000 >log.out 2>1 &
项目集成seata配置
1.启动seata-server,详情请看部署文档
2.在项目中添加依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2.2.2.RELEASE</version>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.3.0</version>
</dependency>
在配置文件中添加如下配置
seata:
tx-service-group: seckill-service-group
registry:
type: nacos
nacos:
server-addr: ${spring.cloud.nacos.config.server-addr}
group: SEATA_GROUP
application: seata-server
service:
vgroup-mapping:
seckill-service-group: default
AT模式代码实现
分布式事务发起方只需要贴@GlobalTransactional注解即可
分支分布式事务贴上@Transactional即可
3.3 Seata-TCC深度解析
TCC模型图
模型设计
业务场景
1.账户支付,用户账户金额减少
2.账户退款,用户账户金额增加
表设计
CREATE TABLE `user_account` (
`user_id` varchar( 100 ) NOT NULL COMMENT '用户UID',
`gmt_create` datetime NOT NULL COMMENT '创建时间',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`amount` bigint( 20 ) NOT NULL COMMENT '用户余额',
PRIMARY KEY (`user_id`),
KEY `idx_gmt_create` (`gmt_create`),
KEY `idx_gmt_modified` (`gmt_modified`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
扣钱业务逻辑
场景: 账户A上有 100 元,要扣除其中的 30 元
Try: 检查余额,扣除其中 30 元;
Confirm: 空提交
Cancel: 返还扣除的 30 元
加钱业务逻辑
Try: 空操作;
Confirm: 增加可用金额 30 元
Cancel: 空操作
业务模型总结
并发控制
账户A上有 100 元,事务T1要扣除其中 30 元,事务T2也要扣除 30 元,出现并发
Try: 检查余额,扣除其中 30 元
T2 Confirm: 空提交
T1 Cancel: 释放T1预留的 30 元
业务模型优化
表增加冻结金额字段
CREATE TABLE `user_account` (
`user_id` varchar( 100 ) NOT NULL COMMENT '用户UID',
`gmt_create` datetime NOT NULL COMMENT '创建时间',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`amount` bigint( 20 ) NOT NULL COMMENT '用户余额',
`freezed_amount` bigint( 20 ) unsigned DEFAULT '0',
PRIMARY KEY (`user_id`),
KEY `idx_gmt_create` (`gmt_create`),
KEY `idx_gmt_modified` (`gmt_modified`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
扣钱为例: 账户上有 100 元,要扣除其中 30 元(此时里面的可用金额=amount-freezed_amount)
Try: 检查余额,扣除其中 30 元(freezed_amount=freezed_amount+30)
Confirm: 扣除 30 元( amount=amount-30 freezed_amount=freezed_amount-30)
Cancel: 释放预留的 30 元(freezed_amount=freezed_amount-30)
加钱为例: 账户上有 100 元,要加 30 元(此时里面的可用金额=amount-freezed_amount)
Try: 空操作
Confirm: 增加 30 元( amount=amount+30)
Cancel: 空操作
异常处理
空回滚
Try方法为执行,Cancel执行了
出现原因:
- Try超时
- 分布式事务回滚,触发Cancel
- 未收到Try,收到Cancel
解决方案: Cancel方法需要识别出是否执行Try方法,如果执行了就正常执行Cancel,如果没有就直接结束增加事务日志表来实现这个功能.
CREATE TABLE `account_transaction` (
`tx_id` varchar( 100 ) NOT NULL COMMENT '事务Txid',
`action_id` varchar( 100 ) NOT NULL COMMENT '分支事务id',
`gmt_create` datetime NOT NULL COMMENT '创建时间',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`user_id` varchar( 100 ) NOT NULL COMMENT '账户Uid',
`amount` varchar( 100 ) NOT NULL COMMENT '变动金额',
`type` varchar( 100 ) NOT NULL DEFAULT '' COMMENT '变动类型',
PRIMARY KEY (`tx_id`,`action_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
幂等
多次调用二阶段方法
出现原因:
网络异常
分支事务所在服务器宕机
解决方案: 做幂等性处理
CREATE TABLE `account_transaction` (
`tx_id` varchar( 100 ) NOT NULL COMMENT '事务Txid',
`action_id` varchar( 100 ) NOT NULL COMMENT '分支事务id',
`gmt_create` datetime NOT NULL COMMENT '创建时间',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`user_id` varchar( 100 ) NOT NULL COMMENT '账户Uid',
`amount` varchar( 100 ) NOT NULL COMMENT '变动金额',
`type` varchar( 100 ) NOT NULL DEFAULT '' COMMENT '变动类型',
`state` smallint( 4 ) NOT NULL COMMENT '状态: 1.初始化 2.已提交 3.已回滚',
PRIMARY KEY (`tx_id`,`action_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
防悬挂
Cancel比Try先执行
出现原因:
- Try超时(拥堵)
- 分布式事务回滚触发Cancel
- 拥堵的Try到达
要允许空回滚,但是要拒绝空回滚之后的Try方法.
解决方案: 在Try方法中, 如果根据全局事务ID能查询出数据出来,说明在try方法之前执行了空回滚,此时
就不能进行try方法。否则就正常执行try方法.
异常处理流程图
Try方法
Comfirm方法
Cancel方法
TCC模式代码实现
分布式事务发起方只需要贴@GlobalTransactional注解即可
分支事务需要完成下面步骤:
1.在接口上贴上@LocalTCC和@TwoPhaseBusinessAction注解,具体配置如下:
@LocalTCC
public interface IUsableIntegralService {
/**
* 增加积分
*/
@TwoPhaseBusinessAction(name = "incrIntergralTry", commitMethod =
"incrIntergralCommit", rollbackMethod = "incrIntergralRollback")
void incrIntergralTry(@BusinessActionContextParameter(paramName =
"operateIntergralVo") OperateIntergralVo operateIntergralVo,
BusinessActionContext context);
void incrIntergralCommit(BusinessActionContext context);
void incrIntergralRollback(BusinessActionContext context);
}
2.添加实现类,实现try,confirm,cancel方法逻辑即可