Spring(一) IOC核心类
Spring(二) Resource定位与载入
Spring(三) BeanDefinition解析与注册
Spring(四) 自定义标签解析
Spring(五) 其他初始化步骤
Spring(六) bean的加载01
Spring(七) bean的加载02
Spring(八) SpringBean的生命周期
Spring(九) IOC时序图
Spring(十) AOP 01
Spring(十一) AOP 02
Spring(十二) spring事务
事务
事务是恢复和并发控制的基本单位。具有四个属性:原子性、一致性、阻离性、持久性,合起来通常被称为ACID特性。
1、原子性(atomicity): 一个事务是一个不可分割的工作单位,事务中包括的诸操作要么都做,要么都不做。
2、一致性(consistency): 事务必须是使数据库从一个一致性状态变到另一个一致性状态。一致性与原子性是密切相关的。
3、隔离性(isolation): 一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对并发的其他事务是隔离的,并发执行的各个事务之间不能互相干扰。
4、持久性(durability): 持久性也称永久性(permanence),指一个事务一旦提交,它对数据库中数据的改变就应该是永久性的。接下来的其他操作或故障不应该对其有任何影响。
对于同时运行的多个事务,当这些事务访问数据库中相同的数据时,如果没有采取必要的隔离机制,就会导致各种并发问题:
1、脏读(dirty read): 事务A读取了事务B未提交的数据,如果B回滚了操作,那么A读取到的数据是脏数据
2、不可重复读(no-repeatable read): 事务A读取了事务B已提交的更改数据。那么会在多次读取同一数据时,前后不一致
3、幻读(phantom read): 事务A读取了事务B已提交的新增/删除数据
MySQL的四种事务隔离级别(默认为REPEATABLE-READ):
事务隔离级别 | 脏读 | 不可重复读 | 幻读 |
---|---|---|---|
READ-UNCOMMITTED | √ | √ | √ |
READ-COMMITTED | X | √ | √ |
REPEATABLE-READ(条件行锁+[MVCC机制]) | X | X | X/(√) |
SERIALIZABLE(锁表) | X | X | X |
补充:
1 . 事务隔离级别为READ-COMMITTED时,写数据只会锁住相应的行
2 . 事务隔离级别为REPEATABLE-READ时,如果检索条件有索引(包括主键索引)的时候,默认加锁方式是next-key 锁;如果检索条件没有索引,更新数据时会锁住整张表。一个间隙被事务加了锁,其他事务是不能在这个间隙插入记录的,这样可以防止幻读。InnoDB和XtraDB存储引擎通过多版本并发控制MVCC解决了幻读的问题
3 . 事务隔离级别为SERIALIZABLE时,读写数据都会锁住整张表
4 . 隔离级别越高,越能保证数据的完整性和一致性,但是对并发性能的影响也越大
注:MVCC机制:select操作不会更新版本号,是快照读(历史版本);insert、update和delete会更新版本号,是当前读(当前版本)
扩展:
轻松理解MYSQL MVCC 实现机制
mysql锁 innodb下的记录锁,间隙锁,next-key锁
MySQL学习之——锁(行锁、表锁、页锁、乐观锁、悲观锁等)
MySQL 加锁处理分析
java事务
首先要求数据库支持事务,然后可以通过关闭自动提交,完成后手动提交,异常时回滚来实现:
Connection con = db.getConnection();
try {
con.setAutoCommit(false);
...
con.commit();
con.setAutoCommit(true);
...
} catch (Exception ex) {
con.rollBack();
...
}
Connection.setTransactionIsolation方法可以设置当前连接的隔离级别
int TRANSACTION_NONE = 0;
int TRANSACTION_READ_UNCOMMITTED = 1;
int TRANSACTION_READ_COMMITTED = 2;
int TRANSACTION_REPEATABLE_READ = 4;
int TRANSACTION_SERIALIZABLE = 8;
spring事务
概念
spring所有的事务管理策略类都继承自org.springframework.transaction.PlatformTransactionManager接口
其中TransactionDefinition接口定义以下特性:事务隔离级别、事务传播行为、事务超时、事务只读属性、事务回滚规则等
spring传播行为:
传播行为 | 含义 |
---|---|
PROPAGATION_REQUIRED(默认) | 支持当前事务,表示当前方法必须运行在事务中。如果当前事务存在,方法将会在该事务中运行。否则,会启动一个新的事务 |
PROPAGATION_SUPPORTS | 支持当前事务,如果存在当前事务,那么该方法会在这个事务中运行;否则以非事务方式运行 |
PROPAGATION_MANDATORY | 支持当前事务,表示该方法必须在事务中运行,如果当前事务不存在,则会抛出一个异常 |
PROPAGATION_REQUIRED_NEW | 表示当前方法必须运行在它自己的事务中。一个新的不依赖于环境的事务将被启动。如果存在当前事务,在该方法执行期间,当前事务会被挂起。此事务结束,之前事务继续执行 |
PROPAGATION_NOT_SUPPORTED | 表示该方法不应该运行在事务中。如果存在当前事务,在该方法运行期间,当前事务将被挂起,以非事务方式执行操作 |
PROPAGATION_NEVER | 表示当前方法不应该运行在事务上下文中。如果当前正有一个事务在运行,则会抛出异常 |
PROPAGATION_NESTED | 表示如果当前已经存在一个事务,那么该方法将会在嵌套事务中运行。它将取到一个savepoint,如果嵌套事务失败,将回滚至此savepoint。嵌套事务是外部事务的一部分,只有外部事务结束后它才会被提交。如果当前事务不存在,那么其行为与PROPAGATION_REQUIRED一样。注意各厂商对这种传播行为的支持是有所差异的。可以参考资源管理器的文档来确认它们是否支持嵌套事务 |
事务超时:
设置timeout(s)后,spring通过deadLine(根据timeout+当前时间点)和jdbc的stament#setQueryTime两种策略来判断超时,
Spring事务超时 = 事务开始到最后一个Statement创建时间 + 最后一个Statement的执行超时时间(即其queryTimeout)。所以在执行Statement之后的其他逻辑超时不会进行事务回滚
只读属性:
设置readOnly = true后,就只能查询,增删改都会报异常。但这个属性并不是所有数据库都支持,Mysql是支持的。
回滚策略:
事务默认在RuntimeException/Error下才会回滚。rollbackFor属性可以设置会被回滚的异常类型,类似还有rollbackForClassName/noRollbackFor/noRollbackForClassName属性可以设置
推荐扩展:脱离 Spring 实现复杂嵌套事务
spring声明式事务配置
- 定义dataSource: org.apache.commons.dbcp2.BasicDataSource
- 定义transactionManager: org.springframework.jdbc.datasource.DataSourceTransactionManager
- 配置事务
- @Transactional注解:
- 设置<tx:annotation-driven transaction-manager=”transactionManager”>
- 在类/接口或public方法上@Transactional注解,定义事务传播策略,(接口上使用注解仅当设置了基于接口的代理时才生效,因为接口不继承注解)
- tx + aop配置:
- 设置tx:advice通知,里面配置tx:attributes设置方法匹配策略和传播行为
- 设置aop:config切面,配置advice-ref引用,定义pointcut切点
- @Transactional注解:
源码解析
配置了tx标签,那么肯定是走自定义标签处理流程,通过命名空间获取处理类TxNamespaceHandler
public void init() {
registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
}
annotation-driven标签
AnnotationDrivenBeanDefinitionParser解析
public BeanDefinition parse(Element element, ParserContext parserContext) {
// 注册internalTransactionalEventListenerFactory
registerTransactionalEventListenerFactory(parserContext);
String mode = element.getAttribute("mode");
if ("aspectj".equals(mode)) {
// mode="aspectj"
registerTransactionAspect(element, parserContext);
}
else {
// mode="proxy"
AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
}
return null;
}
mode=”proxy”
// 内部类
public static void configureAutoProxyCreator(Element element, ParserContext parserContext) {
// 1. 注册InfrastructureAdvisorAutoProxyCreator
AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);
// org.springframework.transaction.config.internalTransactionAdvisor
String txAdvisorBeanName = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME;
if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) {
Object eleSource = parserContext.extractSource(element);
// 创建TransactionAttributeSource Create the TransactionAttributeSource definition.
RootBeanDefinition sourceDef = new RootBeanDefinition(
"org.springframework.transaction.annotation.AnnotationTransactionAttributeSource");
sourceDef.setSource(eleSource);
sourceDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
// 注册bean,并使用spring中的定义规则生成beanName
String sourceName = parserContext.getReaderContext().registerWithGeneratedName(sourceDef);
// 创建TransactionInterceptor Create the TransactionInterceptor definition.
RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class);
interceptorDef.setSource(eleSource);
interceptorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
registerTransactionManager(element, interceptorDef);
interceptorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
String interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef);
// 创建BeanFactoryTransactionAttributeSourceAdvisor Create the TransactionAttributeSourceAdvisor definition.
RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
advisorDef.setSource(eleSource);
advisorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
// 上面的2个bean被注册到了BeanFactoryTransactionAttributeSourceAdvisor内
advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);
if (element.hasAttribute("order")) {
advisorDef.getPropertyValues().add("order", element.getAttribute("order"));
}
parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);
// 创建CompositeComponentDefinition
CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource);
compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName));
compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName));
compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName));
parserContext.registerComponent(compositeDef);
}
}
1 . 注册InfrastructureAdvisorAutoProxyCreator
public static void registerAutoProxyCreatorIfNecessary(
ParserContext parserContext, Element sourceElement) {
// 创建InfrastructureAdvisorAutoProxyCreator BeanDefinition
BeanDefinition beanDefinition = AopConfigUtils.registerAutoProxyCreatorIfNecessary(
parserContext.getRegistry(), parserContext.extractSource(sourceElement));
// 分别判断设置proxy-target-class属性与expose-proxy属性
useClassProxyingIfNecessary(parserContext.getRegistry(), sourceElement);
// 注册
registerComponentIfNecessary(beanDefinition, parserContext);
}
public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry,
@Nullable Object source) {
// InfrastructureAdvisorAutoProxyCreator
return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
}
因此会在postProcessAfterInitialization方法中会对其进行调用
public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) throws BeansException {
if (bean != null) {
// 构建key
Object cacheKey = getCacheKey(bean.getClass(), beanName);
// 是否是由于避免循环依赖而创建的bean代理
if (!this.earlyProxyReferences.contains(cacheKey)) {
// 2. 创建aop代理
return wrapIfNecessary(bean, beanName, cacheKey);
}
}
return bean;
}
2 . 是不是非常熟悉,就是和Spring(十) AOP 01中的一模一样,创建aop代理
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) {
return bean;
}
if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
return bean;
}
if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {
this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}
// Create proxy if we have advice.
// 找出指定bean对应的增强器
Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
if (specificInterceptors != DO_NOT_PROXY) {
this.advisedBeans.put(cacheKey, Boolean.TRUE);
// 创建代理
Object proxy = createProxy(
bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
this.proxyTypes.put(cacheKey, proxy.getClass());
return proxy;
}
this.advisedBeans.put(cacheKey, Boolean.FALSE);
return bean;
}
中间步骤省略
protected List<Advisor> findEligibleAdvisors(Class<?> beanClass, String beanName) {
// 1. 获取增强器
List<Advisor> candidateAdvisors = findCandidateAdvisors();
// 2. 获取其中能应用(合适)的增强器
List<Advisor> eligibleAdvisors = findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName);
extendAdvisors(eligibleAdvisors);
if (!eligibleAdvisors.isEmpty()) {
eligibleAdvisors = sortAdvisors(eligibleAdvisors);
}
return eligibleAdvisors;
}
protected List<Advisor> findCandidateAdvisors() {
Assert.state(this.advisorRetrievalHelper != null, "No BeanFactoryAdvisorRetrievalHelper available");
return this.advisorRetrievalHelper.findAdvisorBeans();
}
BeanFactoryAdvisorRetrievalHelper中实现,查找增强器
public List<Advisor> findAdvisorBeans() {
// Determine list of advisor bean names, if not cached already.
String[] advisorNames = this.cachedAdvisorBeanNames;
if (advisorNames == null) {
// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let the auto-proxy creator apply to them!
// 会获取到org.springframework.transaction.config.internalTransactionAdvisor
// 就是txAdvisorBeanName,之前注册的BeanFactoryTransactionAttributeSourceAdvisor,同样实现了Advisor接口
advisorNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(
this.beanFactory, Advisor.class, true, false);
this.cachedAdvisorBeanNames = advisorNames;
}
if (advisorNames.length == 0) {
return new ArrayList<>();
}
List<Advisor> advisors = new ArrayList<>();
for (String name : advisorNames) {
if (isEligibleBean(name)) {
if (this.beanFactory.isCurrentlyInCreation(name)) {
if (logger.isDebugEnabled()) {
logger.debug("Skipping currently created advisor '" + name + "'");
}
}
else {
try {
// 实例化并放入advisors
advisors.add(this.beanFactory.getBean(name, Advisor.class));
}
catch (BeanCreationException ex) {
Throwable rootCause = ex.getMostSpecificCause();
if (rootCause instanceof BeanCurrentlyInCreationException) {
BeanCreationException bce = (BeanCreationException) rootCause;
String bceBeanName = bce.getBeanName();
if (bceBeanName != null && this.beanFactory.isCurrentlyInCreation(bceBeanName)) {
if (logger.isDebugEnabled()) {
logger.debug("Skipping advisor '" + name +
"' with dependency on currently created bean: " + ex.getMessage());
}
// Ignore: indicates a reference back to the bean we're trying to advise.
// We want to find advisors other than the currently created bean itself.
continue;
}
}
throw ex;
}
}
}
}
return advisors;
}
在实例化BeanFactoryTransactionAttributeSourceAdvisor时,会创建pointcut
private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
@Override
@Nullable
protected TransactionAttributeSource getTransactionAttributeSource() {
// 这里的transactionAttributeSource就是自定义标签解析时放入的TransactionAttributeSource
return transactionAttributeSource;
}
};
然后查看返回的增强器是否符合
public static List<Advisor> findAdvisorsThatCanApply(List<Advisor> candidateAdvisors, Class<?> clazz) {
if (candidateAdvisors.isEmpty()) {
return candidateAdvisors;
}
List<Advisor> eligibleAdvisors = new ArrayList<>();
// 先处理引介增强
for (Advisor candidate : candidateAdvisors) {
if (candidate instanceof IntroductionAdvisor && canApply(candidate, clazz)) {
eligibleAdvisors.add(candidate);
}
}
boolean hasIntroductions = !eligibleAdvisors.isEmpty();
for (Advisor candidate : candidateAdvisors) {
if (candidate instanceof IntroductionAdvisor) {
// already processed
continue;
}
// 普通bean
if (canApply(candidate, clazz, hasIntroductions)) {
eligibleAdvisors.add(candidate);
}
}
return eligibleAdvisors;
}
public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
if (advisor instanceof IntroductionAdvisor) {
return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
}
// BeanFactoryTransactionAttributeSourceAdvisor实现了PointcutAdvisor
else if (advisor instanceof PointcutAdvisor) {
PointcutAdvisor pca = (PointcutAdvisor) advisor;
return canApply(pca.getPointcut(), targetClass, hasIntroductions);
}
else {
// It doesn't have a pointcut so we assume it applies.
return true;
}
}
判断给定的pointcut是否能应用给定的Class
public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) {
Assert.notNull(pc, "Pointcut must not be null");
if (!pc.getClassFilter().matches(targetClass)) {
return false;
}
// BeanFactoryTransactionAttributeSourceAdvisor
MethodMatcher methodMatcher = pc.getMethodMatcher();
if (methodMatcher == MethodMatcher.TRUE) {
// No need to iterate the methods if we're matching any method anyway...
return true;
}
IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null;
if (methodMatcher instanceof IntroductionAwareMethodMatcher) {
introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher;
}
// 获取对应类的所有接口以及类本身
Set<Class<?>> classes = new LinkedHashSet<>();
if (!Proxy.isProxyClass(targetClass)) {
classes.add(ClassUtils.getUserClass(targetClass));
}
classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass));
for (Class<?> clazz : classes) {
// 对所有方法进行匹配判断
Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);
for (Method method : methods) {
if (introductionAwareMethodMatcher != null ?
introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) :
// 3. TransactionAttributeSourcePointcut去匹配,里面会查看是否存在事务声明
// 如果方法上有事务属性,则使用方法上的,没有则去类上查找,若还是没有则取接口中的方法中查找,最后尝试接口上的声明
methodMatcher.matches(method, targetClass)) {
return true;
}
}
}
return false;
}
3 . 匹配
// TransactionAttributeSourcePointcut
public boolean matches(Method method, @Nullable Class<?> targetClass) {
if (targetClass != null && TransactionalProxy.class.isAssignableFrom(targetClass)) {
return false;
}
// org.springframework.transaction.annotation.AnnotationTransactionAttributeSource
TransactionAttributeSource tas = getTransactionAttributeSource();
return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}
// AbstractFallbackTransactionAttributeSource
public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
if (method.getDeclaringClass() == Object.class) {
return null;
}
// First, see if we have a cached value.
Object cacheKey = getCacheKey(method, targetClass);
Object cached = this.attributeCache.get(cacheKey);
if (cached != null) {
// Value will either be canonical value indicating there is no transaction attribute,
// or an actual transaction attribute.
if (cached == NULL_TRANSACTION_ATTRIBUTE) {
return null;
}
else {
return (TransactionAttribute) cached;
}
}
else {
// We need to work it out.
// 提取事务标签
TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
// Put it in the cache.
if (txAttr == null) {
this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
}
else {
String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
if (txAttr instanceof DefaultTransactionAttribute) {
((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
}
if (logger.isDebugEnabled()) {
logger.debug("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
}
this.attributeCache.put(cacheKey, txAttr);
}
return txAttr;
}
}
提取事务标签
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// Don't allow no-public methods as required.
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
return null;
}
// The method may be on an interface, but we need attributes from the target class.
// If the target class is null, the method will be unchanged.
Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
// First try is the method in the target class.
// 查看方法中是否存在事务标签
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
if (txAttr != null) {
return txAttr;
}
// Second try is the transaction attribute on the target class.
// 查看所属类上是否存在事务标签
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
if (specificMethod != method) {
// Fallback is to look at the original method.
// 接口中方法
txAttr = findTransactionAttribute(method);
if (txAttr != null) {
return txAttr;
}
// Last fallback is the class of the original method.
// 接口类中查找
txAttr = findTransactionAttribute(method.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
}
return null;
}
protected TransactionAttribute findTransactionAttribute(Method method) {
return determineTransactionAttribute(method);
}
protected TransactionAttribute determineTransactionAttribute(AnnotatedElement ae) {
for (TransactionAnnotationParser annotationParser : this.annotationParsers) {
// ---->
TransactionAttribute attr = annotationParser.parseTransactionAnnotation(ae);
if (attr != null) {
return attr;
}
}
return null;
}
public TransactionAttribute parseTransactionAnnotation(AnnotatedElement ae) {
AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
ae, Transactional.class, false, false);
if (attributes != null) {
// ---->
return parseTransactionAnnotation(attributes);
}
else {
return null;
}
}
解析事务属性
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
Propagation propagation = attributes.getEnum("propagation");
rbta.setPropagationBehavior(propagation.value());
Isolation isolation = attributes.getEnum("isolation");
rbta.setIsolationLevel(isolation.value());
rbta.setTimeout(attributes.getNumber("timeout").intValue());
rbta.setReadOnly(attributes.getBoolean("readOnly"));
rbta.setQualifier(attributes.getString("value"));
ArrayList<RollbackRuleAttribute> rollBackRules = new ArrayList<>();
Class<?>[] rbf = attributes.getClassArray("rollbackFor");
for (Class<?> rbRule : rbf) {
RollbackRuleAttribute rule = new RollbackRuleAttribute(rbRule);
rollBackRules.add(rule);
}
String[] rbfc = attributes.getStringArray("rollbackForClassName");
for (String rbRule : rbfc) {
RollbackRuleAttribute rule = new RollbackRuleAttribute(rbRule);
rollBackRules.add(rule);
}
Class<?>[] nrbf = attributes.getClassArray("noRollbackFor");
for (Class<?> rbRule : nrbf) {
NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(rbRule);
rollBackRules.add(rule);
}
String[] nrbfc = attributes.getStringArray("noRollbackForClassName");
for (String rbRule : nrbfc) {
NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(rbRule);
rollBackRules.add(rule);
}
rbta.getRollbackRules().addAll(rollBackRules);
return rbta;
}
返回合适的增强器后(BeanFactoryTransactionAttributeSourceAdvisor),就是创建代理
4 . 调用时的拦截器链处理
// DynamicAdvisedInterceptor.intercept -> 获取chain -> ReflectiveMethodInvocation.proceed -->
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
// ... 通过BeanFactoryTransactionAttributeSourceAdvisor取到MethodInterceptors:TransactionInterceptor
this.advisorChainFactory.getInterceptorsAndDynamicInterceptionAdvice(
this, method, targetClass);
// org.springframework.transaction.interceptor.TransactionInterceptor
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
// TransactionAspectSupport
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
// 获取对应事务属性
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 加载配置中的transactionManager
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 构造方法唯一标识(类全路径.方法名)
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 声明式事务处理
// CallbackPreferringPlatformTransactionManager实现PlatformTransactionManager接口,暴露出一个方法用于执行事务处理的回调
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 5. 创建TransactionInfo
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 执行被增强的方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 异常回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清除信息
cleanupTransactionInfo(txInfo);
}
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
// 编程式事务
else {
final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
// 执行被增强的方法
return invocation.proceedWithInvocation();
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
cleanupTransactionInfo(txInfo);
}
});
// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
}
}
5 . 创建TransactionInfo
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
// 如果没有名称指定则使用方法唯一标识,并使用DelegatingTransactionAttribute封装TransactionAttribute
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 6. 获取TransactionStatus
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 12. 根据指定的属性与status,准备一个TransactionInfo
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
6 . 获取TransactionStatus
// AbstractPlatformTransactionManager
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
// 7. 获取事务
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
// 判断当前线程是否存在事务,判断依据为当前线程记录的连接不为空且连接中(connectionHolder)中的transactionActive属性不为空
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
// ----> 11. 当前线程已经存在事务
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// 当前线程不存在事务
// Check definition settings for new transaction.
// 事务超时设置验证
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
// propagationBehavior被声明为PROPAGATION_MANDATORY则抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// propagationBehavior声明为PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED则需要新建事务
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 空挂起
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 构建DefaultTransactionStatus
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 8. 构造transaction,包括设置ConnectionHolder、隔离级别、timeout
// 如果是新连接,绑定到当前线程
doBegin(transaction, definition);
// 10. 同步事务的设置,针对于当前线程的设置
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
7 . 获取事务
// DataSourceTransactionManager
protected Object doGetTransaction() {
DataSourceTransactionManager.DataSourceTransactionObject txObject = new DataSourceTransactionManager.DataSourceTransactionObject();
txObject.setSavepointAllowed(this.isNestedTransactionAllowed());
// 如果当前线程已经记录数据库连接,则使用原有连接
ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.obtainDataSource());
// 设置ConnectionHolder,false表示非新创建连接
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
8 . 构造transaction
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
Connection con = null;
try {
if(!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 尝试获取连接
Connection newCon = this.obtainDataSource().getConnection();
if(this.logger.isDebugEnabled()) {
this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
// 设置ConnectionHolder
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 事务同步
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 9 . 设置隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
if(con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if(this.logger.isDebugEnabled()) {
this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
// 更改自动提交,由spring控制提交
con.setAutoCommit(false);
}
this.prepareTransactionalConnection(con, definition);
// 判断当前线程是否存在事务的标志
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = this.determineTimeout(definition);
if(timeout != -1) {
// 设置超时时间
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
if(txObject.isNewConnectionHolder()) {
// 新连接,则ConnectionHolder绑定到当前线程
TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable var7) {
if(txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.obtainDataSource());
txObject.setConnectionHolder((ConnectionHolder)null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
}
}
9 . 获取隔离级别
public static Integer prepareConnectionForTransaction(Connection con, @Nullable TransactionDefinition definition) throws SQLException {
Assert.notNull(con, "No Connection specified");
// 设置数据连接的只读属性
if(definition != null && definition.isReadOnly()) {
try {
if(logger.isDebugEnabled()) {
logger.debug("Setting JDBC Connection [" + con + "] read-only");
}
con.setReadOnly(true);
} catch (RuntimeException | SQLException var4) {
for(Object exToCheck = var4; exToCheck != null; exToCheck = ((Throwable)exToCheck).getCause()) {
if(exToCheck.getClass().getSimpleName().contains("Timeout")) {
throw var4;
}
}
logger.debug("Could not set JDBC Connection read-only", var4);
}
}
// 设置数据库连接的隔离级别
Integer previousIsolationLevel = null;
if(definition != null && definition.getIsolationLevel() != -1) {
if(logger.isDebugEnabled()) {
logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " + definition.getIsolationLevel());
}
int currentIsolation = con.getTransactionIsolation();
if(currentIsolation != definition.getIsolationLevel()) {
previousIsolationLevel = Integer.valueOf(currentIsolation);
con.setTransactionIsolation(definition.getIsolationLevel());
}
}
return previousIsolationLevel;
}
10 . 同步事务的设置,将事务信息记录到当前线程中
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
11 . 返回6.当前线程已经存在事务的处理
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// PROPAGATION_NOT_SUPPORTED
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
// 挂起,主要目的是记录原有事务状态,以便于后续操作对该事务恢复
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// PROPAGATION_REQUIRES_NEW
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
// 挂起,主要目的是记录原有事务状态,以便于后续操作对该事务恢复
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 与8.同样,新事务
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// PROPAGATION_NESTED嵌入式事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 建立一个保存点savepoint
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
// 有些情况是不能使用保存点操作的,比如JTA,所以直接创建新事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
// 验证
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
12 . 返回5.根据指定的属性与status,准备一个TransactionInfo,将所有事务信息统一都记录在TransactionInfo中
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method...
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// The transaction manager will flag an error if an incompatible tx already exists.
// 记录事务状态
txInfo.newTransactionStatus(status);
}
else {
// The TransactionInfo.hasTransaction() method will return false. We created it only
// to preserve the integrity of the ThreadLocal stack maintained in this class.
if (logger.isTraceEnabled())
logger.trace("Don't need to create transaction for [" + joinpointIdentification +
"]: This method isn't transactional.");
}
// We always bind the TransactionInfo to the thread, even if we didn't create
// a new transaction here. This guarantees that the TransactionInfo stack
// will be managed correctly even if no transaction was created by this aspect.
// 绑定到当前线程
txInfo.bindToThread();
return txInfo;
}
13 . 当抛出异常时,进行回滚操作
// TransactionAspectSupport
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
// 当抛出异常时,首先判断当前是否存在事务
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
// 默认判断是否回滚的依据:ex instanceof RuntimeException || ex instanceof Error
// DelegatingTransactionAttribute.rollbackOn,可以通过rollbackFor等属性判断,不存在再调用父类方法判断异常(默认)
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 14. 根据transactionStatus信息进行回滚处理
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
else {
// We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
// 如果不满足回滚条件,那么就算抛出异常,也不会去回滚
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
14 . 根据transactionStatus信息进行回滚处理
// AbstractPlatformTransactionManager
public final void rollback(TransactionStatus status) throws TransactionException {
// 如果事务已经完成,那么再次回滚会抛出异常
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// ---->
processRollback(defStatus, false);
}
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
// 激活所有TransactionSynchronization中对应的方法
triggerBeforeCompletion(status);
// 如果有保存点,也就是说当前事务为单独的线程,则会退到保存点
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
// 如果当前事务为独立的新事务,那么直接回退
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status);
}
else {
// Participating in larger transaction
// 如果当前事务不是独立事务,那只能标记状态,等事务链执行完毕后统一回滚
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
// 激活所有TransactionSynchronization中对应的方法
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
// 清空记录的资源,并将挂起的资源恢复:
// 标记事务状态为完成,避免重复调用
// 如果当前事务是新的同步状态,需要将绑定到当前线程的事务信息清除
// 如果是新事务,则需要将数据库从当前线程解除绑定,释放connection连接,恢复自动提交属性,重置数据库连接
// 如果事务执行前有其他事务被挂起,那需要在当前事务结束后将挂起的事务恢复
cleanupAfterCompletion(status);
}
}
15 . 正常的事务提交
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// ---->
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
// AbstractPlatformTransactionManager
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 就是上面回滚操作时else里做的只标记回滚,因为有可能一个事务既没有保存点也不是新事务,而是另一个事务的嵌套事务,
// 而这个事务又不在spring管理范围内,或者无法设置保存点,那么spring通过设置回滚标志来禁止提交,当外部事务提交时,
// 一旦判断当前事务流被设置了回滚标志,则由外部事务统一进行整体的回滚
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
// 如果发现事务被标记全局回滚并且在全局回滚标记情况下不应该提交事务的话,则进行回滚
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
processRollback(defStatus, true);
return;
}
// 事务提交
processCommit(defStatus);
}
事务提交
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
// 空方法
prepareForCommit(status);
// 添加的TransactionSynchronization中对应方法的调用
triggerBeforeCommit(status);
// 添加的TransactionSynchronization中对应方法的调用
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
// 如果存在保存点,清除保存点的信息(内嵌依赖外部提交)
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
// 如果是独立事务,则直接提交
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// ----> 调用了Connection.commit()提交
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
// 提交过程中出现异常,则回滚
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
advice标签
使用advice配置的时候,就会调用TxAdviceBeanDefinitionParser的parse方法
public final BeanDefinition parse(Element element, ParserContext parserContext) {
AbstractBeanDefinition definition = parseInternal(element, parserContext);
if (definition != null && !parserContext.isNested()) {
try {
// methodAdvice
String id = resolveId(element, definition, parserContext);
if (!StringUtils.hasText(id)) {
parserContext.getReaderContext().error(
"Id is required for element '" + parserContext.getDelegate().getLocalName(element)
+ "' when used as a top-level tag", element);
}
String[] aliases = null;
if (shouldParseNameAsAliases()) {
String name = element.getAttribute(NAME_ATTRIBUTE);
if (StringUtils.hasLength(name)) {
aliases = StringUtils.trimArrayElements(StringUtils.commaDelimitedListToStringArray(name));
}
}
BeanDefinitionHolder holder = new BeanDefinitionHolder(definition, id, aliases);
// 注册
registerBeanDefinition(holder, parserContext.getRegistry());
if (shouldFireEvents()) {
BeanComponentDefinition componentDefinition = new BeanComponentDefinition(holder);
postProcessComponentDefinition(componentDefinition);
parserContext.registerComponent(componentDefinition);
}
}
catch (BeanDefinitionStoreException ex) {
String msg = ex.getMessage();
parserContext.getReaderContext().error((msg != null ? msg : ex.toString()), element);
return null;
}
}
return definition;
}
protected final AbstractBeanDefinition parseInternal(Element element, ParserContext parserContext) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition();
String parentName = getParentName(element);
if (parentName != null) {
builder.getRawBeanDefinition().setParentName(parentName);
}
// 直接返回org.springframework.transaction.interceptor.TransactionInterceptor
Class<?> beanClass = getBeanClass(element);
if (beanClass != null) {
builder.getRawBeanDefinition().setBeanClass(beanClass);
}
else {
String beanClassName = getBeanClassName(element);
if (beanClassName != null) {
builder.getRawBeanDefinition().setBeanClassName(beanClassName);
}
}
builder.getRawBeanDefinition().setSource(parserContext.extractSource(element));
BeanDefinition containingBd = parserContext.getContainingBeanDefinition();
if (containingBd != null) {
// Inner bean definition must receive same scope as containing bean.
builder.setScope(containingBd.getScope());
}
if (parserContext.isDefaultLazyInit()) {
// Default-lazy-init applies to custom bean definitions as well.
builder.setLazyInit(true);
}
doParse(element, parserContext, builder);
return builder.getBeanDefinition();
}
解析标签
protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) {
builder.addPropertyReference("transactionManager", TxNamespaceHandler.getTransactionManagerName(element));
// 解析其tx:attributes子节点
List<Element> txAttributes = DomUtils.getChildElementsByTagName(element, ATTRIBUTES_ELEMENT);
if (txAttributes.size() > 1) {
parserContext.getReaderContext().error(
"Element <attributes> is allowed at most once inside element <advice>", element);
}
else if (txAttributes.size() == 1) {
// Using attributes source.
Element attributeSourceElement = txAttributes.get(0);
RootBeanDefinition attributeSourceDefinition = parseAttributeSource(attributeSourceElement, parserContext);
builder.addPropertyValue("transactionAttributeSource", attributeSourceDefinition);
}
else {
// Assume annotations source.
builder.addPropertyValue("transactionAttributeSource",
new RootBeanDefinition("org.springframework.transaction.annotation.AnnotationTransactionAttributeSource"));
}
}
private RootBeanDefinition parseAttributeSource(Element attrEle, ParserContext parserContext) {
List<Element> methods = DomUtils.getChildElementsByTagName(attrEle, METHOD_ELEMENT);
ManagedMap<TypedStringValue, RuleBasedTransactionAttribute> transactionAttributeMap =
new ManagedMap<>(methods.size());
transactionAttributeMap.setSource(parserContext.extractSource(attrEle));
// 解析每一个tx:method节点
for (Element methodEle : methods) {
String name = methodEle.getAttribute(METHOD_NAME_ATTRIBUTE);
TypedStringValue nameHolder = new TypedStringValue(name);
nameHolder.setSource(parserContext.extractSource(methodEle));
RuleBasedTransactionAttribute attribute = new RuleBasedTransactionAttribute();
// 设置属性
String propagation = methodEle.getAttribute(PROPAGATION_ATTRIBUTE);
String isolation = methodEle.getAttribute(ISOLATION_ATTRIBUTE);
String timeout = methodEle.getAttribute(TIMEOUT_ATTRIBUTE);
String readOnly = methodEle.getAttribute(READ_ONLY_ATTRIBUTE);
if (StringUtils.hasText(propagation)) {
attribute.setPropagationBehaviorName(RuleBasedTransactionAttribute.PREFIX_PROPAGATION + propagation);
}
if (StringUtils.hasText(isolation)) {
attribute.setIsolationLevelName(RuleBasedTransactionAttribute.PREFIX_ISOLATION + isolation);
}
if (StringUtils.hasText(timeout)) {
try {
attribute.setTimeout(Integer.parseInt(timeout));
}
catch (NumberFormatException ex) {
parserContext.getReaderContext().error("Timeout must be an integer value: [" + timeout + "]", methodEle);
}
}
if (StringUtils.hasText(readOnly)) {
attribute.setReadOnly(Boolean.valueOf(methodEle.getAttribute(READ_ONLY_ATTRIBUTE)));
}
List<RollbackRuleAttribute> rollbackRules = new LinkedList<>();
if (methodEle.hasAttribute(ROLLBACK_FOR_ATTRIBUTE)) {
String rollbackForValue = methodEle.getAttribute(ROLLBACK_FOR_ATTRIBUTE);
addRollbackRuleAttributesTo(rollbackRules,rollbackForValue);
}
if (methodEle.hasAttribute(NO_ROLLBACK_FOR_ATTRIBUTE)) {
String noRollbackForValue = methodEle.getAttribute(NO_ROLLBACK_FOR_ATTRIBUTE);
addNoRollbackRuleAttributesTo(rollbackRules,noRollbackForValue);
}
attribute.setRollbackRules(rollbackRules);
transactionAttributeMap.put(nameHolder, attribute);
}
RootBeanDefinition attributeSourceDefinition = new RootBeanDefinition(NameMatchTransactionAttributeSource.class);
attributeSourceDefinition.setSource(parserContext.extractSource(attrEle));
attributeSourceDefinition.getPropertyValues().add("nameMap", transactionAttributeMap);
return attributeSourceDefinition;
}
然后通过aop:config标签,通过aop:advisor子标签与aop:pointcut子标签去创建代理,在拦截器链中进行事务管理
平时遇到的
自我调用处理
从上面的源码分析可以了解到,aop代理是在方法入口层面的。所以内部调用自己的方法,如果入口没有@Transactional注解,那么这个调用就不会使用事务;如果在内部调用中还配有其他@Transactional的不同配置,那么也将无法奏效。
Spring通过AopProxy接口,抽象了这两种实现,实现了一致的AOP方式。这种抽象同样带了一个缺陷,那就是抹杀了Cglib能够直接创建普通类的增强子类的能力,Spring相当于把Cglib动态生成的子类,当普通的代理类了,这也是为什么会创建两个对象的原因
那么解决方案是什么呢?那就是通过AOP代理调用内部方法,就可走事务切面进行增强
<aop:aspectj-autoproxy expose-proxy="true" />
public void step1() {
jdbcTemplate.execute(SQL);
((TestDao)(AopContext.currentProxy())).step2();
((TestDao)(AopContext.currentProxy())).step3();
}
@Transactional
public void step2() {
jdbcTemplate.execute(SQL);
}
@Transactional
public void step3() {
jdbcTemplate.execute(SQL);
throw new RuntimeException();
}
另一种方法就是将代理对象注入,然后通过代理对象调用内部方法
@Autowired
private ApplicationContext context;
private ITestDao proxySelf;
@PostConstruct
private void setSelf() {
// 从上下文获取代理对象(如果通过proxtSelf=this是不对的,this是目标对象)
// 此种方法不适合于prototype Bean,因为每次getBean返回一个新的Bean
proxySelf = context.getBean(ITestDao.class);
}
判断在事务中
if (TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionStatus status = TransactionAspectSupport.currentTransactionStatus();
...
}
额外
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
System.out.println("send email after transaction commit...");
}
});
public class LogTransactionSynchronization extends TransactionSynchronizationAdapter {
@Override
public afterCompletion(int status) {
// log the status or store it for later usage
}
}
事务后操作
public class MyTransactionEvent extends ApplicationEvent {
public MyTransactionEvent(String msg, Object source) {
super(source);
this.msg = msg;
}
}
@Component
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
public class MyTransactionListener {
@TransactionalEventListener(fallbackExecution = true)
public void onApplicationEvent(MyTransactionEvent event) {
// do something...
}
}
参考:
Spring事务处理时自我调用的解决方案及一些实现方式的风险