环境:Spring5.3.23
首先,我们先来大概的了解下Spring事务的工作原理,核心技术是通过AOP实现,将获取的Connection对象绑定到当前线程上下文中(ThreadLocal)。
事务核心拦截器TransactionInterceptor对象,如下(以下只会列出核心代码):
public class TransactionInterceptor { public Object invoke(MethodInvocation invocation) { // 该方法调用为核心方法,该方法在父类中 return invokeWithinTransaction(...) ; }}
public abstract class TransactionAspectSupport { protected Object invokeWithinTransaction(...) { // 1.1.创建事务对象 TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); try { // 调用下一个拦截器或者是目标方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // 1.2.回滚事务 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // 重置ThreadLocal中的TransactionInfo对象 cleanupTransactionInfo(txInfo); } // 1.3.提交或者回滚事务 commitTransactionAfterReturning(txInfo); return retVal; } }
上面代码列出了主要的事务执行流程及动作,我们主要是关心数据库连接对象Connection在当前线程中是如何使用的。
protected TransactionInfo createTransactionIfNecessary( @Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { TransactionStatus status = null; if (txAttr != null) { if (tm != null) { // 创建事务状态对象 status = tm.getTransaction(txAttr); } } // 将事务状态对象包装到TransactionInfo中,然后将这个对象绑定到当前线程中 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);}
public abstract class AbstractPlatformTransactionManager { public final TransactionStatus getTransaction(...) { if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(def, transaction, debugEnabled); } // 如果超时时间 < -1则抛出异常 if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); } // 当前不存在事务,则抛出异常 if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } // 其它的传播特性,开启事务功能 else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { try { // 开始事务 return startTransaction(def, transaction, debugEnabled, suspendedResources); } } }}
private TransactionStatus startTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // doBegin(transaction, definition); prepareSynchronization(status, definition); return status;}
创建Connection对象,并绑定到当前线程
public class DataSourceTransactionManager { protected void doBegin( Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { // 获取数据库连接对象 Connection newCon = obtainDataSource().getConnection(); txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } // 将连接对象绑定到当前的线程 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } }}
到此,已经清楚了当开始一个新的事务时,Spring会将获取的Connection绑定到当前的Thread中。
当我们使用通过JdbcTemplate操作数据库时,如下:
public class JdbcTemplate { // 核心执行方法 private <T> T execute(...) { // 获取数据库连接对象 Connection con = DataSourceUtils.getConnection(obtainDataSource()); }}
DataSourceUtils
public abstract class DataSourceUtils { public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException { try { return doGetConnection(dataSource) ; } } public static Connection doGetConnection(DataSource dataSource) throws SQLException { // 通过TransactionSynchronizationManager从当前线程上下文中获取连接对象 // 在上面我们也是通过这个对象将连接对象绑定到当前的Thread中 ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource); if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) { conHolder.requested() ; if (!conHolder.hasConnection()) { conHolder.setConnection(fetchConnection(dataSource)) ; } return conHolder.getConnection() ; } }}
原理相信你应该非常清楚了,每个线程都会绑定自己的Connection。那在多线程下每个线程都使用的是自己的Connection对象,所以要想保证事务的一致性,单靠传统的方式一个@Transaction是肯定无法解决的,接下来我们就来实现一个多线程下的事务一致性的处理。
多线程下要实现事务的一致性,我们需要借助JUC下的相关类来实现。
这里直接给出代码示例:
static class PersonService { @Resource private JdbcTemplate jdbcTemplate; @Resource private DataSource dataSource ; @Transactional public void save() throws Exception { CountDownLatch cdl = new CountDownLatch(2) ; AtomicBoolean txRollback = new AtomicBoolean(false) ; CompletableFuture.runAsync(() -> { Person person = new Person(); person.setAge(1); person.setName("张三"); transactionTemplate.execute(status -> { int result = 0 ; try { result = jdbcTemplate.update("insert into t_person (age, name) values (?, ?)", person.getAge(), person.getName()) ; // TODO // System.out.println(1 / 0) ; } catch (Exception e) { // 当发生异常后将状态该为true txRollback.set(true) ; } try { // 计数减一 cdl.countDown() ; // 继续等待其它线程结束 cdl.await() ; } catch (InterruptedException e) { e.printStackTrace(); } // 如果回滚状态为true说明有线程发生了异常,需要事务回滚 if (txRollback.get()) { // 标记当前事务回滚 status.setRollbackOnly() ; } System.out.printf("%s Insert Operator Result: %d 次%n", Thread.currentThread().getName(), result); return result ; }) ; }) ; transactionTemplate.execute(status -> { Person person = new Person(); person.setAge(2); person.setName("李四"); int result = 0 ; try { result = jdbcTemplate.update("insert into t_person (age, name) values (?, ?)", person.getAge(), person.getName()) ; // TODO TimeUnit.SECONDS.sleep(3) ; } catch (Exception e) { txRollback.set(true) ; } try { cdl.countDown() ; cdl.await() ; } catch (InterruptedException e) { e.printStackTrace(); } if (txRollback.get()) { // 回滚 status.setRollbackOnly() ; } System.out.printf("%s Insert Operator Result: %d 次%n", Thread.currentThread().getName(), result); return result ; }) ; cdl.await() ; System.err.println("Operator Complete...") ; }}
以上就是借助JUC来实现多线程下的事务一致性问题。
其实如果你真的理解了事务的原理,其实这里还有更加简单的实现方式,大家可以先思考,咱们下期再说这种简单的实现方法。
完毕!!!
本文链接:http://www.28at.com/showinfo-26-12416-0.html详解Spring多线程下如何保证事务的一致性
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com
上一篇: 你真的理解Python Qt6基础知识中的信号和槽机制吗?
下一篇: Spring事务管理—快速入门