Spring 多数据源分布式事务实现:基于 Atomikos+MyBatis+Druid 完整方案
- JAVA
- 2025-03-13
- 45热度
- 0评论
在分布式系统中,读写分离、分库分表是提升系统性能的常用方案,但随之而来的是跨数据源事务一致性问题 —— 一个业务操作需同时操作多个数据库,必须保证所有操作要么全部成功,要么全部回滚。本文将详细介绍如何基于 Spring+Atomikos(JTA)+Druid+MyBatis 实现多数据源分布式事务管理,包含完整的依赖配置、数据源配置、事务配置及测试验证,适用于 Spring MVC、Spring Boot 等主流 Spring 项目。
一、核心技术选型与原理说明
1. 关键技术组件
- Spring:核心框架,提供事务管理、依赖注入等基础能力;
- Atomikos:轻量级开源 JTA 事务管理器,替代 Spring 3.0 后移除的 JOTM,支持 XA / 非 XA 事务,适配多数据源场景;
- Druid:阿里开源数据库连接池,支持 XA 数据源实现,提供连接池监控、性能优化等功能;
- MyBatis:持久层框架,简化数据库操作,支持多数据源绑定;
- JTA:Java 事务 API,定义分布式事务规范,确保跨数据源事务的 ACID 特性。
2. 核心原理
分布式事务的核心是两阶段提交(2PC):
- 准备阶段:事务管理器向所有参与的数据源发送准备请求,各数据源执行本地事务但不提交;
- 提交阶段:若所有数据源准备成功,事务管理器发送提交请求;若任一数据源失败,发送回滚请求,确保所有数据源状态一致。
Atomikos 作为 JTA 实现,负责协调多个 XA 数据源的事务提交 / 回滚,Spring 通过
JtaTransactionManager整合 Atomikos,实现声明式事务管理。3. Atomikos 数据源类型说明
Atomikos 提供三种数据源配置,适配不同场景:
- AtomikosDataSourceBean:支持 XA 事务(全局事务),需配置 XA 驱动,推荐用于多数据源分布式事务,可配置连接池参数;
- AtomikosNonXADataSourceBean:非 XA 事务,适用于单个数据源场景,效率高于 XA,但不支持分布式事务;
- SimpleDataSourceBean:简化版 XA 数据源,需手动配置 XA 驱动,功能较少,不推荐生产使用。
二、环境准备与依赖配置
1. 版本兼容说明
- Spring:4.2.5.RELEASE(需与 Atomikos 版本适配,本文选用稳定兼容组合);
- Atomikos:4.0.2(轻量稳定,支持 Spring 整合);
- Druid:1.1.10(支持 XA 数据源实现);
- MyBatis:3.4.6;
- JDK:1.8+;
- 数据库:MySQL 5.7+(需开启 XA 事务支持,默认开启)。
2. Maven 依赖配置(pom.xml)
核心依赖包括 Spring 事务、Atomikos、Druid、MyBatis 等,需排除冲突依赖:
<!-- Spring核心依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>4.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>4.2.5.RELEASE</version>
</dependency>
<!-- JTA规范依赖 -->
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
</dependency>
<!-- Atomikos事务管理器 -->
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>atomikos-util</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-jta</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>com.atomikos</groupId>
<artifactId>transactions-jdbc</artifactId>
<version>4.0.2</version>
</dependency>
<!-- Druid连接池(支持XA) -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
<!-- MyBatis及Spring整合 -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>1.3.2</version>
</dependency>
<!-- MySQL驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<!-- 其他依赖(AOP、日志等) -->
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.8.13</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
三、核心配置文件编写
1. 数据库配置文件(database.properties)
配置主从数据源连接信息、XA 驱动、连接池参数,放在
classpath:properties/目录下:
# 数据库连接验证SQL(MySQL)
validationQuery=SELECT 1
# 连接池基础参数
jdbc.initialSize=5
jdbc.maxActive=20
jdbc.maxWait=60000
jdbc.username=root
jdbc.password=123456
jdbc.driverClassName=com.mysql.jdbc.Driver
# XA数据源类名(DruidXADataSource)
jdbc.xaDataSourceClassName=com.alibaba.druid.pool.xa.DruidXADataSource
# 主数据源(master库)
master.jdbc.url=jdbc:mysql://127.0.0.1:3306/master_db?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
# 从数据源(slave库)
slave.jdbc.url=jdbc:mysql://127.0.0.1:3306/slave_db?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
2. 多数据源配置(datasource-context.xml)
配置 Atomikos XA 数据源,通过抽象父类统一配置连接池参数,主从数据源继承复用:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd"
default-lazy-init="true">
<!-- 抽象XA数据源配置(主从数据源继承) -->
<bean id="abstractXADataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"
init-method="init" destroy-method="close" abstract="true">
<!-- XA数据源类名(DruidXADataSource) -->
<property name="xaDataSourceClassName" value="${jdbc.xaDataSourceClassName}"/>
<!-- 连接池参数 -->
<property name="poolSize" value="10"/>
<property name="minPoolSize" value="5"/>
<property name="maxPoolSize" value="30"/>
<property name="borrowConnectionTimeout" value="60"/>
<property name="reapTimeout" value="20"/>
<property name="maxIdleTime" value="60"/>
<property name="testQuery" value="${validationQuery}"/>
</bean>
<!-- 主数据源(master_db) -->
<bean id="masterDataSource" parent="abstractXADataSource">
<!-- 全局唯一资源名(必须唯一) -->
<property name="uniqueResourceName" value="masterDB"/>
<!-- XA数据源属性配置 -->
<property name="xaProperties">
<props>
<prop key="driverClassName">${jdbc.driverClassName}</prop>
<prop key="url">${master.jdbc.url}</prop>
<prop key="username">${jdbc.username}</prop>
<prop key="password">${jdbc.password}</prop>
<!-- Druid连接池补充参数 -->
<prop key="initialSize">${jdbc.initialSize}</prop>
<prop key="maxActive">${jdbc.maxActive}</prop>
<prop key="maxWait">${jdbc.maxWait}</prop>
<prop key="testOnBorrow">false</prop>
<prop key="testWhileIdle">true</prop>
<prop key="removeAbandoned">true</prop>
<prop key="removeAbandonedTimeout">1800</prop>
</props>
</property>
</bean>
<!-- 从数据源(slave_db) -->
<bean id="slaveDataSource" parent="abstractXADataSource">
<property name="uniqueResourceName" value="slaveDB"/>
<property name="xaProperties">
<props>
<prop key="driverClassName">${jdbc.driverClassName}</prop>
<prop key="url">${slave.jdbc.url}</prop>
<prop key="username">${jdbc.username}</prop>
<prop key="password">${jdbc.password}</prop>
<prop key="initialSize">${jdbc.initialSize}</prop>
<prop key="maxActive">${jdbc.maxActive}</prop>
<prop key="maxWait">${jdbc.maxWait}</prop>
<prop key="testOnBorrow">false</prop>
<prop key="testWhileIdle">true</prop>
<prop key="removeAbandoned">true</prop>
<prop key="removeAbandonedTimeout">1800</prop>
</props>
</property>
</bean>
</beans>
3. MyBatis 配置(mybatis-context.xml)
为每个数据源绑定独立的 SqlSessionFactory,分别加载对应的 MyBatis 配置和 Mapper:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd"
default-lazy-init="true">
<!-- 主数据源SqlSessionFactory -->
<bean id="masterSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="masterDataSource"/>
<property name="configLocation" value="classpath:mybatis/mybatis-config-master.xml"/>
<!-- 扫描Mapper XML文件 -->
<property name="mapperLocations" value="classpath:mybatis/mapper/master/*.xml"/>
</bean>
<!-- 从数据源SqlSessionFactory -->
<bean id="slaveSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="slaveDataSource"/>
<property name="configLocation" value="classpath:mybatis/mybatis-config-slave.xml"/>
<property name="mapperLocations" value="classpath:mybatis/mapper/slave/*.xml"/>
</bean>
<!-- Mapper扫描配置(批量注入) -->
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="basePackage" value="com.tx.dao.master"/>
<property name="sqlSessionFactoryBeanName" value="masterSqlSessionFactory"/>
</bean>
<bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="basePackage" value="com.tx.dao.slave"/>
<property name="sqlSessionFactoryBeanName" value="slaveSqlSessionFactory"/>
</bean>
</beans>
补充:MyBatis 子配置文件
mybatis-config-master.xml(主库配置):
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<!-- 类型别名 -->
<typeAliases>
<package name="com.tx.entity.master"/>
</typeAliases>
<!-- 全局配置(如驼峰命名转换) -->
<settings>
<setting name="mapUnderscoreToCamelCase" value="true"/>
</settings>
</configuration>
mybatis-config-slave.xml(从库配置)与主库类似,仅扫描从库实体类包。
4. 分布式事务配置(transaction-context.xml)
整合 Atomikos 事务管理器与 Spring 声明式事务,通过 AOP 切面拦截业务方法:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-4.2.xsd"
default-lazy-init="true">
<!-- 1. Atomikos事务管理器 -->
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close">
<!-- 强制关闭事务管理器(避免残留连接) -->
<property name="forceShutdown" value="true"/>
</bean>
<!-- 2. UserTransaction实例 -->
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<!-- 事务超时时间(300秒) -->
<property name="transactionTimeout" value="300"/>
</bean>
<!-- 3. Spring JTA事务管理器(整合Atomikos) -->
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="atomikosTransactionManager"/>
<property name="userTransaction" ref="atomikosUserTransaction"/>
<!-- 允许自定义隔离级别 -->
<property name="allowCustomIsolationLevels" value="true"/>
</bean>
<!-- 4. 事务通知(声明式事务规则) -->
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<!-- 读操作:只读事务 -->
<tx:method name="get*" propagation="REQUIRED" read-only="true"/>
<tx:method name="find*" propagation="REQUIRED" read-only="true"/>
<!-- 写操作:需要事务,异常回滚 -->
<tx:method name="add*" propagation="REQUIRED" rollback-for="java.lang.Exception"/>
<tx:method name="insert*" propagation="REQUIRED" rollback-for="java.lang.Exception"/>
<tx:method name="update*" propagation="REQUIRED" rollback-for="java.lang.Exception"/>
<tx:method name="delete*" propagation="REQUIRED" rollback-for="java.lang.Exception"/>
<tx:method name="register*" propagation="REQUIRED" rollback-for="java.lang.Exception"/>
</tx:attributes>
</tx:advice>
<!-- 5. AOP切面:拦截业务层方法 -->
<aop:config proxy-target-class="true">
<aop:pointcut id="txPointcut" expression="execution(* com.tx.service.*.*(..))"/>
<aop:advisor pointcut-ref="txPointcut" advice-ref="txAdvice"/>
</aop:config>
</beans>
5. Atomikos 启动参数配置(jta.properties)
在
classpath下创建jta.properties,配置 Atomikos 事务日志、日志级别等参数:
# 事务服务工厂类(独立部署模式)
com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
# 事务日志输出目录(需提前创建)
com.atomikos.icatch.log_base_dir=/home/logs/tx/
# 日志文件前缀
com.atomikos.icatch.log_base_name=txlog
# 事务管理器唯一标识(全局唯一)
com.atomikos.icatch.tm_unique_name=com.atomikos.spring.tm
# 控制台日志级别(DEBUG/INFO/WARN)
com.atomikos.icatch.console_log_level=DEBUG
# 事务超时时间(默认30秒,与UserTransaction保持一致)
com.atomikos.icatch.default_jta_timeout=300000
6. Spring 主配置文件(spring-context.xml)
整合所有子配置文件,开启组件扫描和属性文件解析:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.2.xsd">
<!-- 组件扫描(业务层、DAO层等) -->
<context:component-scan base-package="com.tx"/>
<!-- 加载属性文件 -->
<context:property-placeholder location="classpath:properties/database.properties"/>
<!-- 引入子配置文件 -->
<import resource="classpath:datasource-context.xml"/>
<import resource="classpath:mybatis-context.xml"/>
<import resource="classpath:transaction-context.xml"/>
</beans>
四、业务代码实现
1. 实体类定义
- 主库实体(
com.tx.entity.master.Member):
package com.tx.entity.master;
public class Member {
private Integer id;
private String username;
private String password;
private Byte status;
// getter/setter省略
}
- 从库实体(
com.tx.entity.slave.MemberInfo):
package com.tx.entity.slave;
public class MemberInfo {
private Integer id;
private String nickname;
private String realname;
private Byte age;
// getter/setter省略
}
2. DAO 层 Mapper 接口
- 主库 Mapper(
com.tx.dao.master.MemberMapper):
package com.tx.dao.master;
import com.tx.entity.master.Member;
import org.springframework.stereotype.Repository;
@Repository
public interface MemberMapper {
int insert(Member member);
}
- 从库 Mapper(
com.tx.dao.slave.MemberInfoMapper):
package com.tx.dao.slave;
import com.tx.entity.slave.MemberInfo;
import org.springframework.stereotype.Repository;
@Repository
public interface MemberInfoMapper {
int insert(MemberInfo memberInfo);
}
3. Mapper XML 文件
- 主库 Mapper XML(
classpath:mybatis/mapper/master/MemberMapper.xml):
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.tx.dao.master.MemberMapper">
<insert id="insert" parameterType="com.tx.entity.master.Member">
INSERT INTO member (id, username, password, status)
VALUES (#{id}, #{username}, #{password}, #{status})
</insert>
</mapper>
- 从库 Mapper XML(
classpath:mybatis/mapper/slave/MemberInfoMapper.xml):
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.tx.dao.slave.MemberInfoMapper">
<insert id="insert" parameterType="com.tx.entity.slave.MemberInfo">
INSERT INTO member_info (id, nickname, realname, age)
VALUES (#{id}, #{nickname}, #{realname}, #{age})
</insert>
</mapper>
4. 服务层实现(事务入口)
业务层方法
registerMember同时操作主库和从库,通过 Spring 声明式事务保证一致性:
package com.tx.service;
import com.tx.dao.master.MemberMapper;
import com.tx.dao.slave.MemberInfoMapper;
import com.tx.entity.master.Member;
import com.tx.entity.slave.MemberInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MemberServiceImpl implements MemberService {
private static final Logger LOG = LoggerFactory.getLogger(MemberServiceImpl.class);
@Autowired
private MemberMapper memberMapper;
@Autowired
private MemberInfoMapper memberInfoMapper;
@Override
public boolean registerMember(Member member, MemberInfo memberInfo) {
try {
// 操作主库:插入会员基础信息
int masterResult = memberMapper.insert(member);
if (masterResult != 1) {
throw new RuntimeException("主库会员插入失败");
}
// 操作从库:插入会员详情信息
int slaveResult = memberInfoMapper.insert(memberInfo);
if (slaveResult != 1) {
throw new RuntimeException("从库会员详情插入失败");
}
return true;
} catch (Exception e) {
LOG.error("会员注册失败:{}", e.getMessage(), e);
// 抛出异常触发事务回滚
throw new RuntimeException("会员注册异常", e);
}
}
}
五、测试验证
1. 测试用例编写(JUnit 4)
通过 SpringJUnit4ClassRunner 加载 Spring 上下文,测试事务提交与回滚场景:
package com.tx.test;
import com.tx.entity.master.Member;
import com.tx.entity.slave.MemberInfo;
import com.tx.service.MemberService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:spring-context.xml"})
public class DistributedTransactionTest {
private static final Logger LOG = LoggerFactory.getLogger(DistributedTransactionTest.class);
@Autowired
private MemberService memberService;
// 正常场景:事务提交
@Test
public void testRegisterSuccess() {
Member member = new Member();
member.setId(1001);
member.setUsername("zhangsan");
member.setPassword("123456");
member.setStatus((byte) 0);
MemberInfo memberInfo = new MemberInfo();
memberInfo.setId(1001);
memberInfo.setNickname("张三");
memberInfo.setRealname("张三");
memberInfo.setAge((byte) 25);
boolean result = memberService.registerMember(member, memberInfo);
LOG.info("会员注册结果:{}", result ? "成功" : "失败");
}
// 异常场景:事务回滚(从库插入失败)
@Test
public void testRegisterRollback() {
Member member = new Member();
member.setId(1002);
member.setUsername("lisi");
member.setPassword("654321");
member.setStatus((byte) 0);
MemberInfo memberInfo = new MemberInfo();
memberInfo.setId(null); // 故意设置主键为空,触发插入失败
memberInfo.setNickname("李四");
memberInfo.setRealname("李四");
memberInfo.setAge((byte) 30);
try {
memberService.registerMember(member, memberInfo);
} catch (Exception e) {
LOG.info("会员注册触发回滚:{}", e.getMessage());
}
}
}
2. 测试结果验证
(1)正常场景(事务提交)
- 主库
master_db.member表新增记录:id=1001, username=zhangsan; - 从库
slave_db.member_info表新增记录:id=1001, nickname=张三; - 日志输出:
会员注册结果:成功。
(2)异常场景(事务回滚)
- 从库插入因主键为空失败,触发事务回滚;
- 主库
member表无新增记录,从库member_info表无新增记录; - 日志输出:
会员注册触发回滚:会员注册异常。
