分布式事务理论
CAP(强一致性)
CAP定理,又被叫作布鲁尔定理。对于共享数据系统,最多只能同时拥有CAP其中的两个,任意两个都有其适应的场景。
BASE(最终一致性)
BASE 是指基本可用(Basically Available)、软状态( Soft State)、最终一致性( Eventual Consistency)。它的核心思想是即使无法做到强一致性(CAP 就是强一致性),但应用可以采用是即使无法做到强一致性(CAP 就是强一致性),但应用可以采用
- BA指的是基本业务可用性,支持分区失败
- S表示柔性状态,也就是允许短时间内不同步
- E表示最终一致性,数据最终是一致的,但是实时是不一致的
原子性和持久性必须从根本上保障,为了可用性、性能和服务降级的需要,只有降低一致性和隔离性的要求。BASE 解决了 CAP 理论中没有考虑到的网络延迟问题,在BASE中用软状态和最终一致,保证了延迟后的一致性。
分布式事务模式
了解了分布式事务中的强一致性和最终一致性理论,下面介绍几种常见的分布式事务的解决方案。
2PC模式(强一致性)
2PC是Two-Phase Commit缩写,即两阶段提交,就是将事务的提交过程分为两个阶段来进行处理。事务的发起者称协调者,事务的执行者称参与者。协调者统一协调参与者执行。
- 阶段 1:准备阶段。协调者向所有参与者发送事务内容,询问是否可以提交事务,并等待所有参与者答复。各参与者执行事务操作,但不提交事务,将 undo 和 redo 信息记入事务日志中。如参与者执行成功,给协调者反馈 yes;如执行失败,给协调者反馈 no
- 阶段 2:提交阶段。如果协调者收到了参与者的失败消息或者超时,直接给每个参与者发送回滚(rollback)消息。否则,发送提交(commit)消息
2PC 方案实现起来简单,实际项目中使用比较少,主要因为以下问题:
- 性能问题:所有参与者在事务提交阶段处于同步阻塞状态,占用系统资源,容易导致性能瓶颈
- 可靠性问题:如果协调者存在单点故障问题,如果协调者出现故障,参与者将一直处于锁定状态
- 数据一致性问题:在阶段 2 中,如果发生局部网络问题,一部分事务参与者收到了提交消息,另一部分事务参与者没收到提交消息,那么就导致了节点之间数据的不一致
3PC模式(强一致性)
3PC 三阶段提交,是两阶段提交的改进版本,与两阶段提交不同的是,引入超时机制。同时在协调者和参与者中都引入超时机制。三阶段提交将两阶段的准备阶段拆分为 2 个阶段,插入了一个preCommit 阶段,解决了原先在两阶段提交中,参与者在准备之后,由于协调者或参与者发生崩溃或错误,而导致参与者无法知晓处于长时间等待的问题。如果在指定的时间内协调者没有收到参与者的消息则默认失败。
- 阶段1:canCommit。协调者向参与者发送 commit 请求,参与者如果可以提交就返回 yes 响应,否则返回 no 响应
- 阶段2:preCommit。协调者根据阶段 1 canCommit 参与者的反应情况执行预提交事务或中断事务操作。参与者均反馈 yes:协调者向所有参与者发出 preCommit 请求,参与者收到preCommit 请求后,执行事务操作,但不提交;将 undo 和 redo 信息记入事务日志中;各参与者向协调者反馈 ack 响应或 no 响应,并等待最终指令。任何一个参与者反馈 no或等待超时:协调者向所有参与者发出 abort 请求,无论收到调者发出的 abort 请求,或者在等待协调者请求过程中出现超时,参与者均会中断事务
- 阶段3:do Commit。该阶段进行真正的事务提交,根据阶段 2 preCommit反馈的结果完成事务提交或中断操作
相比2PC模式,3PC模式降低了阻塞范围,在等待超时后协调者或参与者会中断事务。避免了协调者单点问题,阶段 3 中协调者出现问题时(比如网络中断等),参与者会继续提交事务。
XA(强一致性)
XA是由X/Open组织提出的分布式事务的规范,是基于两阶段提交协议。 XA规范主要定义了全局事务管理器(TM)和局部资源管理器(RM)之间的接口。目前主流的关系型数据库产品都是实现了XA接口。
XA之所以需要引入事务管理器,是因为在分布式系统中,从理论上讲两台机器理论上无法达到一致的状态,需要引入一个单点进行协调。由全局事务管理器管理和协调的事务,可以跨越多个资源(数据库)和进程。
事务管理器用来保证所有的事务参与者都完成了准备工作(第一阶段)。如果事务管理器收到所有参与者都准备好的消息,就会通知所有的事务都可以提交了(第二阶段)。MySQL 在这个XA事务中扮演的是参与者的角色,而不是事务管理器。
TCC模式(最终一致性)
TCC(Try-Confirm-Cancel)的概念,最早是由 Pat Helland 于 2007 年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出。TCC 是服务化的两阶段编程模型,其 Try、Confirm、Cancel 3 个方法均由业务编码实现:
- Try 操作作为一阶段,负责资源的检查和预留
- Confirm 操作作为二阶段提交操作,执行真正的业务
- Cancel 是预留资源的取消
TCC事务模式相对于 XA 等传统模型如下图所示:
TCC 模式相比于 XA,解决了如下几个缺点:
- 解决了协调者单点:由主业务方发起并完成这个业务活动。业务活动管理器可以变成多点,引入集群
- 同步阻塞:引入超时机制,超时后进行补偿,并且不会锁定整个资源,将资源转换为业务逻辑形式,粒度变小
- 数据一致性:有了补偿机制之后,由业务活动管理器控制一致性
消息队列模式(最终一致性)
消息队列的方案最初是由 eBay 提出,基于TCC模式,消息中间件可以基于 Kafka、RocketMQ 等消息队列。此方案的核心是将分布式事务拆分成本地事务进行处理,将需要分布式处理的任务通过消息日志的方式来异步执行。消息日志可以存储到本地文本、数据库或MQ中间件,再通过业务规则人工发起重试。
下面描述下事务的处理流程:
- 步骤1:事务主动方处理本地事务。事务主动方在本地事务中处理业务更新操作和MQ写消息操作
- 步骤 2:事务主动方通过消息中间件,通知事务被动方处理事务通知事务待消息。事务主动方主动写消息到MQ,事务消费方接收并处理MQ中的消息
- 步骤 3:事务被动方通过MQ中间件,通知事务主动方事务已处理的消息,事务主动方根据反馈结果提交或回滚事务
为了数据的一致性,当流程中遇到错误需要重试,容错处理规则如下:
- 当步骤 1 处理出错,事务回滚,相当于什么都没发生
- 当步骤 2 处理出错,由于未处理的事务消息还是保存在事务发送方,可以重试或撤销本地业务操作
- 如果事务被动方消费消息异常,需要不断重试,业务处理逻辑需要保证幂等
- 如果是事务被动方业务上的处理失败,可以通过MQ通知事务主动方进行补偿或者事务回滚
- 如果多个事务被动方已经消费消息,事务主动方需要回滚事务时需要通知事务被动方回滚
Saga模式(最终一致性)
Saga这个概念源于 1987 年普林斯顿大学的 Hecto 和 Kenneth 发表的一篇数据库论文Sagas ,一个Saga事务是一个有多个短时事务组成的长时的事务。 在分布式事务场景下,我们把一个Saga分布式事务看做是一个由多个本地事务组成的事务,每个本地事务都有一个与之对应的补偿事务。在Saga事务的执行过程中,如果某一步执行出现异常,Saga事务会被终止,同时会调用对应的补偿事务完成相关的恢复操作,这样保证Saga相关的本地事务要么都是执行成功,要么通过补偿恢复成为事务执行之前的状态。(自动反向补偿机制)。
Saga 事务基本协议如下:
- 每个 Saga 事务由一系列幂等的有序子事务(sub-transaction) Ti 组成
- 每个 Ti 都有对应的幂等补偿动作 Ci,补偿动作用于撤销 Ti 造成的结果。Saga是一种补偿模式,它定义了两种补偿策略:向前恢复(forward recovery):对应于上面第一种执行顺序,发生失败进行重试,适用于必须要成功的场景;向后恢复(backward recovery):对应于上面提到的第二种执行顺序,发生错误后撤销掉之前所有成功的子事务,使得整个 Saga 的执行结果撤销
Saga 的执行顺序有两种,如上图:
- 事务正常执行完成:T1, T2, T3, …, Tn,例如:减库存(T1),创建订单(T2),支付(T3),依次有序完成整个事务
- 事务回滚:T1, T2, …, Tj, Cj,…, C2, C1,其中 0 < j < n,例如:减库存(T1),创建订单(T2),支付(T3),支付失败,支付回滚(C3),订单回滚(C2),恢复库存(C1)
Seata框架
Fescar开源项目是Seata的前身项目,最初愿景是能像本地事务一样控制分布式事务,解决分布式环境下的难题。Seata(Simple Extensible Autonomous Transaction Architecture)是一套一站式分布式事务解决方案,是阿里集团和蚂蚁金服联合打造的分布式事务框架。Seata目前的事务模式有AT、TCC、Saga和XA,默认是AT模式,AT本质上是2PC协议的一种实现。
Seata AT事务模型包含TM(事务管理器),RM(资源管理器),TC(事务协调器)。其中TC是一个独立的服务需要单独部署,TM和RM以jar包的方式同业务应用部署在一起,它们同TC建立长连接,在整个事务生命周期内,保持RPC通信。
- 全局事务的发起方作为TM,全局事务的参与者作为RM
- TM负责全局事务的begin和commit/rollback
- RM负责分支事务的执行结果上报,并且通过TC的协调进行commit/rollback
在 Seata 中,AT时分为两个阶段的,第一阶段,就是各个阶段本地提交操作;第二阶段会根据第一阶段的情况决定是进行全局提交还是全局回滚操作。具体的执行流程如下:
- TM 开启分布式事务,负责全局事务的begin和commit/rollback(TM 向 TC 注册全局事务记录)
- RM 作为参与者,负责分支事务的执行结果上报,并且通过TC的协调进行commit/rollback(RM 向 TC 汇报资源准备状态 )
- RM分支事务结束,事务一阶段结束
- 根据TC 汇总事务信息,由TM发起事务提交或回滚操作
- TC 通知所有 RM 提交/回滚资源,事务二阶段结束
Sharding-JDBC整合XA原理
Java通过定义JTA接口实现了XA的模型,JTA接口里的ResourceManager
需要数据库厂商提供XA的驱动实现,而TransactionManager
则需要事务管理器的厂商实现,传统的事务管理器需要同应用服务器绑定,因此使用的成本很高。 而嵌入式的事务管器可以以jar包的形式提供服务,同ShardingSphere集成后,可保证分片后跨库事务强一致性。
ShardingSphere支持以下功能:
- 支持数据分片后的跨库XA事务
- 两阶段提交保证操作的原子性和数据的强一致性
- 服务宕机重启后,提交/回滚中的事务可自动恢复
SPI
机制整合主流的XA事务管理器,默认Atomikos
- 同时支持XA和非XA的连接池
- 提供
spring-boot
和namespace
的接入端
ShardingSphere整合XA事务时,分离了XA事务管理和连接池管理,这样接入XA时,可以做到对业务的零侵入。
- Begin(开启XA全局事务)。
XAShardingTransactionManager
会调用具体的XA事务管理器开启XA的全局事务 - 执行物理SQL。ShardingSphere进行解析/优化/路由后会生成SQL操作,执行引擎为每个物理SQL创建连接的同时,物理连接所对应的
XAResource
也会被注册到当前XA事务中。事务管理器会在此阶段发送XAResource.start
命令给数据库,数据库在收到XAResource.end命令之前的所有SQL操作,会被标记为XA事务 - Commit/rollback(提交XA事务)。
XAShardingTransactionManager
收到接入端的提交命令后,会委托实际的XA事务管理进行提交动作,这时事务管理器会收集当前线程里所有注册的XAResource
,首先发送XAResource.end
指令,用以标记此XA事务的边界。 接着会依次发送prepare
指令,收集所有参与XAResource
投票,如果所有XAResource
的反馈结果都是OK,则会再次调用commit
指令进行最终提交,如果有一个XAResource
的反馈结果为No,则会调用rollback
指令进行回滚。 在事务管理器发出提交指令后,任何XAResource
产生的异常都会通过recovery
日志进行重试,来保证提交阶段的操作原子性,和数据强一致性。
Sharding-JDBC整合Saga原理
ShardingSphere的柔性事务已通过第三方servicecomb-saga
组件实现的,通过SPI
机制注入使用。ShardingSphere是基于反向SQL技术实现的反向补偿操作,它将对数据库进行更新操作的SQL自动生成反向SQL,并交由Saga-actuator
引擎执行。使用方则无需再关注如何实现补偿方法,将柔性事务管理器的应用范畴成功的定位回了事务的本源——数据库层面。ShardingSphere支持以下功能:
- 完全支持跨库事务
- 支持失败SQL重试及最大努力送达
- 支持反向SQL、自动生成更新快照以及自动补偿
- 默认使用关系型数据库进行快照及事务日志的持久化,支持使用SPI的方式加载其他类型的持久化
Saga柔性事务的实现类为SagaShardingTransactionMananger
,。ShardingSphere通过Hook的方式拦截逻辑SQL的解析和路由结果。这样,在分片物理SQL执行前,可以生成逆向SQL,在事务提交阶段再把SQL调用链交给Saga引擎处理。
- Init(Saga引擎初始化)。包含Saga柔性事务的应用启动时,
saga-actuator
引擎会根据saga.properties
的配置进行初始化的流程 - Begin(开启Saga全局事务)。每次开启Saga全局事务时,将会生成本次全局事务的上下文(
SagaTransactionContext
),事务上下文记录了所有子事务的正向SQL和逆向SQL,作为生成事务调用链的元数据使用 - 执行物理SQL。在物理SQL执行前,ShardingSphere根据SQL的类型生成逆向SQL,这里是通过Hook的方式拦截Parser的解析结果进行实现
- Commit/rollback(提交Saga事务)。提交阶段会生成Saga执行引擎所需的调用链路图,
commit
操作产生ForwardRecovery
(正向SQL补偿)任务,rollback
操作产生BackwardRecovery
任务(逆向SQL补偿)。
Sharding-JDBC整合Seata原理
分布式事务的实现目前主要分为两阶段的XA强事务和BASE柔性事务。
Seata AT事务作为BASE柔性事务的一种实现,可以无缝接入到ShardingSphere生态中。在整合Seata AT事务时,需要把TM,RM,TC的模型融入到ShardingSphere 分布式事务的SPI的生态中。在数据库资源上,Seata通过对接DataSource
接口,让JDBC操作可以同TC进行RPC通信。同样,ShardingSphere也是面向DataSource
接口对用户配置的物理DataSource
进行了聚合,因此把物理DataSource
二次包装为Seata 的DataSource
后,就可以把Seata AT事务融入到ShardingSphere的分片中。
- Init(Seata引擎初始化)。包含Seata柔性事务的应用启动时,用户配置的数据源会按seata.conf的配置,适配成Seata事务所需的
DataSourceProxy
,并且注册到RM中 - Begin(开启Seata全局事务)。TM控制全局事务的边界,TM通过向TC发送Begin指令,获取全局事务ID,所有分支事务通过此全局事务ID,参与到全局事务中;全局事务ID的上下文存放在当前线程变量中
- 执行分片物理SQL。处于Seata全局事务中的分片SQL通过RM生成undo快照,并且发送participate指令到TC,加入到全局事务中。ShardingSphere的分片物理SQL是按多线程方式执行,因此整合Seata AT事务时,需要在主线程和子线程间进行全局事务ID的上下文传递,这同服务间的上下文传递思路完全相同
- Commit/rollback(提交Seata事务)。提交Seata事务时,TM会向TC发送全局事务的
commit
和rollback
指令,TC根据全局事务ID协调所有分支事务进行commit
和rollback
Sharding-JDBC分布式事务实战
我们的实战基于博文MySQL之Sharding-JDBC数据分片中的实战案例。我们首先修改项目的profile
为sharding-database
。
XA模式
引入XA的依赖项
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-transaction-xa-core</artifactId>
<version>4.1.0</version>
</dependency>
添加SpringBoot对于Web支持的starter:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
修改我们的启动类如下:
package com.rubin.shardingjdbcdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.sql.DataSource;
@SpringBootApplication
@EntityScan(basePackages = "com.rubin.shardingjdbcdemo.entity")
@EnableTransactionManagement
public class ShardingJdbcDemoApplication {
public static void main(String[] args) {
SpringApplication.run(ShardingJdbcDemoApplication.class, args);
}
@Bean
public PlatformTransactionManager transactionManager(final DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
}
- 开启我们框架对于事务的支持
- 注入我们Sharding-JDBC封装的分布式事务管理器
接下来开发我们的接口和业务实现:
package com.rubin.shardingjdbcdemo.controller;
import com.rubin.shardingjdbcdemo.service.TransactionService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TransactionController {
@Autowired
private TransactionService transactionService;
@GetMapping("commit")
public String commit() {
transactionService.commit();
return "ok";
}
@GetMapping("rollback")
public String rollback() {
try {
transactionService.rollback();
} catch (Exception e) {
e.printStackTrace();
}
return "ok";
}
}
package com.rubin.shardingjdbcdemo.service;
import com.rubin.shardingjdbcdemo.acpect.ShardingTransaction;
import com.rubin.shardingjdbcdemo.entity.Position;
import com.rubin.shardingjdbcdemo.entity.PositionDetail;
import com.rubin.shardingjdbcdemo.repository.PositionDetailRepository;
import com.rubin.shardingjdbcdemo.repository.PositionRepository;
import org.apache.shardingsphere.transaction.core.TransactionType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class TransactionService {
@Autowired
private PositionRepository positionRepository;
@Autowired
private PositionDetailRepository positionDetailRepository;
@Transactional(rollbackFor = Exception.class)
@ShardingTransaction(TransactionType.XA)
public void commit() {
for (int i = 1; i <= 5; i++) {
Position position = new Position();
position.setName("root" + i);
position.setSalary("1000000");
position.setCity("beijing");
positionRepository.save(position);
PositionDetail positionDetail = new PositionDetail();
positionDetail.setPid(position.getId());
positionDetail.setDescription("this is a root " + i);
positionDetailRepository.save(positionDetail);
}
}
@Transactional(rollbackFor = Exception.class)
@ShardingTransaction(TransactionType.XA)
public void rollback() {
for (int i = 1; i <= 5; i++) {
Position position = new Position();
position.setName("root" + i);
position.setSalary("1000000");
position.setCity("beijing");
positionRepository.save(position);
if (i == 3) {
throw new RuntimeException("人为制造异常");
}
PositionDetail positionDetail = new PositionDetail();
positionDetail.setPid(position.getId());
positionDetail.setDescription("this is a root " + i);
positionDetailRepository.save(positionDetail);
}
}
}
这里我们需要注意一下:根据官方文档的介绍,我们开启分布式事务之后有两种方式设置分布式事务的模式:
- 手动设置
// 默认LOCAL,还可以设置XA和BASE
TransactionTypeHolder.set(TransactionType.LOCAL);
- 使用注解
// 开启注解扫之后,使用下面的注解组合(该注解组合少一个将不生效)
@Transactional
// 默认LOCAL,还可以设置XA和BASE
@ShardingTransactionType(TransactionType.XA)
但是根据我的实验,4.X的版本ShardingTransationType是会抛出异常日志的,这在使用上暂时没有发现异常,但是本着排除隐患的想法(毕竟是开源的软件),我们使用手动设置的方式来设置事务的类型。
手动设置事务类型需要在每个需要事务的方法中均加入上述模板代码,这样肯定是不友好的,我们使用AOP技术来达到一劳永逸的效果。
首先定义我们的注解:
package com.rubin.shardingjdbcdemo.acpect;
import org.apache.shardingsphere.transaction.core.TransactionType;
import java.lang.annotation.*;
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface ShardingTransaction {
TransactionType value() default TransactionType.LOCAL;
}
基于注解定义我们的切面逻辑:
package com.rubin.shardingjdbcdemo.acpect;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.transaction.core.TransactionTypeHolder;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
@Aspect
@Component
@Slf4j
public class ShardingTransactionAspect {
private final static String POINT_CUT = "@annotation(com.rubin.shardingjdbcdemo.acpect.ShardingTransaction)";
/**
* 切点
*/
@Pointcut(value = POINT_CUT)
public void setShardingTransactionType() {
}
@Before("setShardingTransactionType()")
public void beforeTransaction(JoinPoint joinPoint) throws NoSuchMethodException {
ShardingTransaction shardingTransaction = joinPoint.getTarget().getClass().getAnnotation(ShardingTransaction.class);
if (shardingTransaction == null) {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method targetMethod = signature.getMethod();
shardingTransaction = targetMethod.getAnnotation(ShardingTransaction.class);
}
log.info("进入切面,设置事务类型为:{}", shardingTransaction.value());
TransactionTypeHolder.set(shardingTransaction.value());
}
@After("setShardingTransactionType()")
public void afterTransation() {
TransactionTypeHolder.clear();
}
}
定义好之后,我们可以启动项目。通过以下两个链接来测试我们的分布式事务是否生效:
ShardingSphere默认的XA事务管理器为Atomikos,通过在项目的classpath中添加jta.properties来定制化Atomikos配置项。具体的配置规则如下:
# 指定是否启动磁盘日志,默认为true。在生产环境下一定要保证为true,否则数据的完整性无法保证
com.atomikos.icatch.enable_logging=true
# JTA/XA资源是否应该自动注册
com.atomikos.icatch.automatic_resource_registration=true
# JTA事务的默认超时时间,默认为10000ms
com.atomikos.icatch.default_jta_timeout=10000
# 事务的最大超时时间,默认为300000ms。这表示事务超时时间由UserTransaction.setTransactionTimeout()较大者决定。4.x版本之后,指定为0的话则表示不设置超时时间
com.atomikos.icatch.max_timeout=300000
# 指定在两阶段提交时,是否使用不同的线程(意味着并行)。3.7版本之后默认为false,更早的版本默认为true。如果为false,则提交将按照事务中访问资源的顺序进行。
com.atomikos.icatch.threaded_2pc=false
# 指定最多可以同时运行的事务数量,默认值为50,负数表示没有数量限制。在调用UserTransaction.begin()方法时,可能会抛出一个”Max number of active transactionsreached”异常信息,表示超出最大事务数限制
com.atomikos.icatch.max_actives=50
# 是否支持subtransaction,默认为true
com.atomikos.icatch.allow_subtransactions=true
# 指定在可能的情况下,否应该join 子事务(subtransactions),默认值为true。如果设置为false,对于有关联的不同subtransactions,不会调用XAResource.start(TM_JOIN)
com.atomikos.icatch.serial_jta_transactions=true
# 指定JVM关闭时是否强制(force)关闭事务管理器,默认为false
com.atomikos.icatch.force_shutdown_on_vm_exit=false
# 在正常关闭(no-force)的情况下,应该等待事务执行完成的时间,默认为Long.MAX_VALUE
com.atomikos.icatch.default_max_wait_time_on_shutdown=9223372036854775807
# ========= 日志记录配置=======
# 事务日志目录,默认为./。
com.atomikos.icatch.log_base_dir=./
# 事务日志文件前缀,默认为tmlog。事务日志存储在文件中,文件名包含一个数字后缀,日志文件以.log为扩展名,如tmlog1.log。遇到checkpoint时,新的事务日志文件会被创建,数字增加。
com.atomikos.icatch.log_base_name=tmlog
# 指定两次checkpoint的时间间隔,默认为500
com.atomikos.icatch.checkpoint_interval=500
# =========日志恢复配置=============
# 指定在多长时间后可以清空无法恢复的事务日志(orphaned),默认86400000ms
com.atomikos.icatch.forget_orphaned_log_entries_delay=86400000
# 指定两次恢复扫描之间的延迟时间。默认值为与com.atomikos.icatch.default_jta_timeout相同
com.atomikos.icatch.recovery_delay=${com.atomikos.icatch.default_jta_timeout}
# 提交失败时,再抛出一个异常之前,最多可以重试几次,默认值为5
com.atomikos.icatch.oltp_max_retries=5
# 提交失败时,每次重试的时间间隔,默认10000ms
com.atomikos.icatch.oltp_retry_interval=10000
BASE模式
因为Seata搭建需要Server和注册中心,我们这里只是单体项目演示,所以BASE模式我们使用saga来演示。首先添加如下依赖项:
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>sharding-jdbc-spring-boot-starter</artifactId>
<version>4.0.0-RC2</version>
</dependency>
<dependency>
<groupId>io.shardingsphere</groupId>
<artifactId>sharding-transaction-base-saga</artifactId>
<version>4.0.0-RC2</version>
</dependency>
将我们的业务处理类的事务类型改成BASE,如下所示:
package com.rubin.shardingjdbcdemo.service;
import com.rubin.shardingjdbcdemo.acpect.ShardingTransaction;
import com.rubin.shardingjdbcdemo.entity.Position;
import com.rubin.shardingjdbcdemo.entity.PositionDetail;
import com.rubin.shardingjdbcdemo.repository.PositionDetailRepository;
import com.rubin.shardingjdbcdemo.repository.PositionRepository;
import org.apache.shardingsphere.transaction.core.TransactionType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class TransactionService {
@Autowired
private PositionRepository positionRepository;
@Autowired
private PositionDetailRepository positionDetailRepository;
@Transactional(rollbackFor = Exception.class)
@ShardingTransaction(TransactionType.BASE)
public void commit() {
for (int i = 1; i <= 5; i++) {
Position position = new Position();
position.setName("root" + i);
position.setSalary("1000000");
position.setCity("beijing");
positionRepository.save(position);
PositionDetail positionDetail = new PositionDetail();
positionDetail.setPid(position.getId());
positionDetail.setDescription("this is a root " + i);
positionDetailRepository.save(positionDetail);
}
}
@Transactional(rollbackFor = Exception.class)
@ShardingTransaction(TransactionType.BASE)
public void rollback() {
for (int i = 1; i <= 5; i++) {
Position position = new Position();
position.setName("root" + i);
position.setSalary("1000000");
position.setCity("beijing");
positionRepository.save(position);
if (i == 3) {
throw new RuntimeException("人为制造异常");
}
PositionDetail positionDetail = new PositionDetail();
positionDetail.setPid(position.getId());
positionDetail.setDescription("this is a root " + i);
positionDetailRepository.save(positionDetail);
}
}
}
定义好之后,我们可以启动项目。通过以下两个链接来测试我们的分布式事务是否生效:
Saga可以通过在项目的classpath中添加 saga.properties 来定制化Saga事务的配置项。配置项的属性及说明如下:
属性名称 | 默认值 | 说明 |
saga.actuator.executor.size | 5 | 使用的线程池大小 |
saga.actuator.transaction.max.retries | 5 | 失败SQL的最大重试次数 |
saga.actuator.compensation.max.retries | 5 | 失败SQL的最大尝试补偿次数 |
saga.actuator.transaction.retry.delay.milliseconds | 5000 | 失败SQL的重试间隔,单位毫秒 |
saga.actuator.compensation.retry.delay.milliseconds | 3000 | 失败SQL的补偿间隔,单位毫秒 |
saga.persistence.enabled | false | 是否对日志进行持久化 |
saga.persistence.ds.url | 无 | 事务日志数据库JDBC连接 |
saga.persistence.ds.username | 无 | 事务日志数据库用户名 |
saga.persistence.ds.password | 无 | 事务日志数据库密码 |
saga.persistence.ds.max.pool.size | 50 | 事务日志连接池最大连接数 |
saga.persistence.ds.min.pool.size | 1 | 事务日志连接池最小连接数 |
saga.persistence.ds.max.life.time.milliseconds | 0(无限制) | 事务日志连接池最大存活时间,单位毫秒 |
saga.persistence.ds.idle.timeout.milliseconds | 60 * 1000 | 事务日志连接池空闲回收时间,单位毫秒 |
saga.persistence.ds.connection.timeout.milliseconds | 30 * 1000 | 事务日志连接池超时时间,单位毫秒 |
示例文件如下:
saga.actuator.executor.size=16
saga.actuator.transaction.max.retries=8
saga.actuator.compensation.max.retries=4
saga.actuator.transaction.retry.delay.milliseconds=1000
saga.actuator.compensation.retry.delay.milliseconds=2000
#saga.actuator.recovery.policy=BackwardRecovery
saga.persistence.enabled=false
# saga.persistence.ds.url=jdbc:mysql://localhost:3306/saga
# saga.persistence.ds.username=root
# saga.persistence.ds.password=
# saga.persistence.ds.connection.timeout.milliseconds=30000
# saga.persistence.ds.idle.timeout.milliseconds=60000
# saga.persistence.ds.max.life.time.milliseconds=1800000
# saga.persistence.ds.max.pool.size=32
# saga.persistence.ds.min.pool.size=4
# saga.persistence.ds.maintenance.interval.milliseconds=29999
以上就是本文的全部内容。欢迎小伙伴们积极留言交流~~~
文章评论