Spring 多数据源分布式事务实现:基于 Atomikos+MyBatis+Druid 完整方案

在分布式系统中,读写分离、分库分表是提升系统性能的常用方案,但随之而来的是跨数据源事务一致性问题 —— 一个业务操作需同时操作多个数据库,必须保证所有操作要么全部成功,要么全部回滚。本文将详细介绍如何基于 Spring+Atomikos(JTA)+Druid+MyBatis 实现多数据源分布式事务管理,包含完整的依赖配置、数据源配置、事务配置及测试验证,适用于 Spring MVC、Spring Boot 等主流 Spring 项目。

一、核心技术选型与原理说明

1. 关键技术组件

  • Spring:核心框架,提供事务管理、依赖注入等基础能力;
  • Atomikos:轻量级开源 JTA 事务管理器,替代 Spring 3.0 后移除的 JOTM,支持 XA / 非 XA 事务,适配多数据源场景;
  • Druid:阿里开源数据库连接池,支持 XA 数据源实现,提供连接池监控、性能优化等功能;
  • MyBatis:持久层框架,简化数据库操作,支持多数据源绑定;
  • JTA:Java 事务 API,定义分布式事务规范,确保跨数据源事务的 ACID 特性。

2. 核心原理

分布式事务的核心是两阶段提交(2PC)
  1. 准备阶段:事务管理器向所有参与的数据源发送准备请求,各数据源执行本地事务但不提交;
  2. 提交阶段:若所有数据源准备成功,事务管理器发送提交请求;若任一数据源失败,发送回滚请求,确保所有数据源状态一致。
Atomikos 作为 JTA 实现,负责协调多个 XA 数据源的事务提交 / 回滚,Spring 通过JtaTransactionManager整合 Atomikos,实现声明式事务管理。

3. Atomikos 数据源类型说明

Atomikos 提供三种数据源配置,适配不同场景:
  • AtomikosDataSourceBean:支持 XA 事务(全局事务),需配置 XA 驱动,推荐用于多数据源分布式事务,可配置连接池参数;
  • AtomikosNonXADataSourceBean:非 XA 事务,适用于单个数据源场景,效率高于 XA,但不支持分布式事务;
  • SimpleDataSourceBean:简化版 XA 数据源,需手动配置 XA 驱动,功能较少,不推荐生产使用。

二、环境准备与依赖配置

1. 版本兼容说明

  • Spring:4.2.5.RELEASE(需与 Atomikos 版本适配,本文选用稳定兼容组合);
  • Atomikos:4.0.2(轻量稳定,支持 Spring 整合);
  • Druid:1.1.10(支持 XA 数据源实现);
  • MyBatis:3.4.6;
  • JDK:1.8+;
  • 数据库:MySQL 5.7+(需开启 XA 事务支持,默认开启)。

2. Maven 依赖配置(pom.xml)

核心依赖包括 Spring 事务、Atomikos、Druid、MyBatis 等,需排除冲突依赖:
<!-- Spring核心依赖 -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>4.2.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-tx</artifactId>
    <version>4.2.5.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jdbc</artifactId>
    <version>4.2.5.RELEASE</version>
</dependency>

<!-- JTA规范依赖 -->
<dependency>
    <groupId>javax.transaction</groupId>
    <artifactId>jta</artifactId>
    <version>1.1</version>
</dependency>

<!-- Atomikos事务管理器 -->
<dependency>
    <groupId>com.atomikos</groupId>
    <artifactId>atomikos-util</artifactId>
    <version>4.0.2</version>
</dependency>
<dependency>
    <groupId>com.atomikos</groupId>
    <artifactId>transactions</artifactId>
    <version>4.0.2</version>
</dependency>
<dependency>
    <groupId>com.atomikos</groupId>
    <artifactId>transactions-jta</artifactId>
    <version>4.0.2</version>
</dependency>
<dependency>
    <groupId>com.atomikos</groupId>
    <artifactId>transactions-jdbc</artifactId>
    <version>4.0.2</version>
</dependency>

<!-- Druid连接池(支持XA) -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.1.10</version>
</dependency>

<!-- MyBatis及Spring整合 -->
<dependency>
    <groupId>org.mybatis</groupId>
    <artifactId>mybatis</artifactId>
    <version>3.4.6</version>
</dependency>
<dependency>
    <groupId>org.mybatis</groupId>
    <artifactId>mybatis-spring</artifactId>
    <version>1.3.2</version>
</dependency>

<!-- MySQL驱动 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
</dependency>

<!-- 其他依赖(AOP、日志等) -->
<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjweaver</artifactId>
    <version>1.8.13</version>
</dependency>
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>
  

三、核心配置文件编写

1. 数据库配置文件(database.properties)

配置主从数据源连接信息、XA 驱动、连接池参数,放在classpath:properties/目录下:
# 数据库连接验证SQL(MySQL)
validationQuery=SELECT 1

# 连接池基础参数
jdbc.initialSize=5
jdbc.maxActive=20
jdbc.maxWait=60000
jdbc.username=root
jdbc.password=123456
jdbc.driverClassName=com.mysql.jdbc.Driver

# XA数据源类名(DruidXADataSource)
jdbc.xaDataSourceClassName=com.alibaba.druid.pool.xa.DruidXADataSource

# 主数据源(master库)
master.jdbc.url=jdbc:mysql://127.0.0.1:3306/master_db?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull

# 从数据源(slave库)
slave.jdbc.url=jdbc:mysql://127.0.0.1:3306/slave_db?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
 

2. 多数据源配置(datasource-context.xml)

配置 Atomikos XA 数据源,通过抽象父类统一配置连接池参数,主从数据源继承复用:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.2.xsd"
       default-lazy-init="true">

    <!-- 抽象XA数据源配置(主从数据源继承) -->
    <bean id="abstractXADataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"
          init-method="init" destroy-method="close" abstract="true">
        <!-- XA数据源类名(DruidXADataSource) -->
        <property name="xaDataSourceClassName" value="${jdbc.xaDataSourceClassName}"/>
        <!-- 连接池参数 -->
        <property name="poolSize" value="10"/>
        <property name="minPoolSize" value="5"/>
        <property name="maxPoolSize" value="30"/>
        <property name="borrowConnectionTimeout" value="60"/>
        <property name="reapTimeout" value="20"/>
        <property name="maxIdleTime" value="60"/>
        <property name="testQuery" value="${validationQuery}"/>
    </bean>

    <!-- 主数据源(master_db) -->
    <bean id="masterDataSource" parent="abstractXADataSource">
        <!-- 全局唯一资源名(必须唯一) -->
        <property name="uniqueResourceName" value="masterDB"/>
        <!-- XA数据源属性配置 -->
        <property name="xaProperties">
            <props>
                <prop key="driverClassName">${jdbc.driverClassName}</prop>
                <prop key="url">${master.jdbc.url}</prop>
                <prop key="username">${jdbc.username}</prop>
                <prop key="password">${jdbc.password}</prop>
                <!-- Druid连接池补充参数 -->
                <prop key="initialSize">${jdbc.initialSize}</prop>
                <prop key="maxActive">${jdbc.maxActive}</prop>
                <prop key="maxWait">${jdbc.maxWait}</prop>
                <prop key="testOnBorrow">false</prop>
                <prop key="testWhileIdle">true</prop>
                <prop key="removeAbandoned">true</prop>
                <prop key="removeAbandonedTimeout">1800</prop>
            </props>
        </property>
    </bean>

    <!-- 从数据源(slave_db) -->
    <bean id="slaveDataSource" parent="abstractXADataSource">
        <property name="uniqueResourceName" value="slaveDB"/>
        <property name="xaProperties">
            <props>
                <prop key="driverClassName">${jdbc.driverClassName}</prop>
                <prop key="url">${slave.jdbc.url}</prop>
                <prop key="username">${jdbc.username}</prop>
                <prop key="password">${jdbc.password}</prop>
                <prop key="initialSize">${jdbc.initialSize}</prop>
                <prop key="maxActive">${jdbc.maxActive}</prop>
                <prop key="maxWait">${jdbc.maxWait}</prop>
                <prop key="testOnBorrow">false</prop>
                <prop key="testWhileIdle">true</prop>
                <prop key="removeAbandoned">true</prop>
                <prop key="removeAbandonedTimeout">1800</prop>
            </props>
        </property>
    </bean>
</beans>
  

3. MyBatis 配置(mybatis-context.xml)

为每个数据源绑定独立的 SqlSessionFactory,分别加载对应的 MyBatis 配置和 Mapper:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.2.xsd"
       default-lazy-init="true">

    <!-- 主数据源SqlSessionFactory -->
    <bean id="masterSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="dataSource" ref="masterDataSource"/>
        <property name="configLocation" value="classpath:mybatis/mybatis-config-master.xml"/>
        <!-- 扫描Mapper XML文件 -->
        <property name="mapperLocations" value="classpath:mybatis/mapper/master/*.xml"/>
    </bean>

    <!-- 从数据源SqlSessionFactory -->
    <bean id="slaveSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="dataSource" ref="slaveDataSource"/>
        <property name="configLocation" value="classpath:mybatis/mybatis-config-slave.xml"/>
        <property name="mapperLocations" value="classpath:mybatis/mapper/slave/*.xml"/>
    </bean>

    <!-- Mapper扫描配置(批量注入) -->
    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <property name="basePackage" value="com.tx.dao.master"/>
        <property name="sqlSessionFactoryBeanName" value="masterSqlSessionFactory"/>
    </bean>
    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <property name="basePackage" value="com.tx.dao.slave"/>
        <property name="sqlSessionFactoryBeanName" value="slaveSqlSessionFactory"/>
    </bean>
</beans>
  

补充:MyBatis 子配置文件

  • mybatis-config-master.xml(主库配置):
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
    <!-- 类型别名 -->
    <typeAliases>
        <package name="com.tx.entity.master"/>
    </typeAliases>
    <!-- 全局配置(如驼峰命名转换) -->
    <settings>
        <setting name="mapUnderscoreToCamelCase" value="true"/>
    </settings>
</configuration>
  
  • mybatis-config-slave.xml(从库配置)与主库类似,仅扫描从库实体类包。

4. 分布式事务配置(transaction-context.xml)

整合 Atomikos 事务管理器与 Spring 声明式事务,通过 AOP 切面拦截业务方法:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
       http://www.springframework.org/schema/aop
       http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
       http://www.springframework.org/schema/tx
       http://www.springframework.org/schema/tx/spring-tx-4.2.xsd"
       default-lazy-init="true">

    <!-- 1. Atomikos事务管理器 -->
    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
          init-method="init" destroy-method="close">
        <!-- 强制关闭事务管理器(避免残留连接) -->
        <property name="forceShutdown" value="true"/>
    </bean>

    <!-- 2. UserTransaction实例 -->
    <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
        <!-- 事务超时时间(300秒) -->
        <property name="transactionTimeout" value="300"/>
    </bean>

    <!-- 3. Spring JTA事务管理器(整合Atomikos) -->
    <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager" ref="atomikosTransactionManager"/>
        <property name="userTransaction" ref="atomikosUserTransaction"/>
        <!-- 允许自定义隔离级别 -->
        <property name="allowCustomIsolationLevels" value="true"/>
    </bean>

    <!-- 4. 事务通知(声明式事务规则) -->
    <tx:advice id="txAdvice" transaction-manager="transactionManager">
        <tx:attributes>
            <!-- 读操作:只读事务 -->
            <tx:method name="get*" propagation="REQUIRED" read-only="true"/>
            <tx:method name="find*" propagation="REQUIRED" read-only="true"/>
            <!-- 写操作:需要事务,异常回滚 -->
            <tx:method name="add*" propagation="REQUIRED" rollback-for="java.lang.Exception"/>
            <tx:method name="insert*" propagation="REQUIRED" rollback-for="java.lang.Exception"/>
            <tx:method name="update*" propagation="REQUIRED" rollback-for="java.lang.Exception"/>
            <tx:method name="delete*" propagation="REQUIRED" rollback-for="java.lang.Exception"/>
            <tx:method name="register*" propagation="REQUIRED" rollback-for="java.lang.Exception"/>
        </tx:attributes>
    </tx:advice>

    <!-- 5. AOP切面:拦截业务层方法 -->
    <aop:config proxy-target-class="true">
        <aop:pointcut id="txPointcut" expression="execution(* com.tx.service.*.*(..))"/>
        <aop:advisor pointcut-ref="txPointcut" advice-ref="txAdvice"/>
    </aop:config>
</beans>
  

5. Atomikos 启动参数配置(jta.properties)

classpath下创建jta.properties,配置 Atomikos 事务日志、日志级别等参数:
# 事务服务工厂类(独立部署模式)
com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
# 事务日志输出目录(需提前创建)
com.atomikos.icatch.log_base_dir=/home/logs/tx/
# 日志文件前缀
com.atomikos.icatch.log_base_name=txlog
# 事务管理器唯一标识(全局唯一)
com.atomikos.icatch.tm_unique_name=com.atomikos.spring.tm
# 控制台日志级别(DEBUG/INFO/WARN)
com.atomikos.icatch.console_log_level=DEBUG
# 事务超时时间(默认30秒,与UserTransaction保持一致)
com.atomikos.icatch.default_jta_timeout=300000
  

6. Spring 主配置文件(spring-context.xml)

整合所有子配置文件,开启组件扫描和属性文件解析:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-4.2.xsd">

    <!-- 组件扫描(业务层、DAO层等) -->
    <context:component-scan base-package="com.tx"/>

    <!-- 加载属性文件 -->
    <context:property-placeholder location="classpath:properties/database.properties"/>

    <!-- 引入子配置文件 -->
    <import resource="classpath:datasource-context.xml"/>
    <import resource="classpath:mybatis-context.xml"/>
    <import resource="classpath:transaction-context.xml"/>
</beans>

四、业务代码实现

1. 实体类定义

  • 主库实体(com.tx.entity.master.Member):
package com.tx.entity.master;

public class Member {
    private Integer id;
    private String username;
    private String password;
    private Byte status;

    // getter/setter省略
}
  
  • 从库实体(com.tx.entity.slave.MemberInfo):
package com.tx.entity.slave;

public class MemberInfo {
    private Integer id;
    private String nickname;
    private String realname;
    private Byte age;

    // getter/setter省略
}
  

2. DAO 层 Mapper 接口

  • 主库 Mapper(com.tx.dao.master.MemberMapper):
package com.tx.dao.master;

import com.tx.entity.master.Member;
import org.springframework.stereotype.Repository;

@Repository
public interface MemberMapper {
    int insert(Member member);
}
  
  • 从库 Mapper(com.tx.dao.slave.MemberInfoMapper):
package com.tx.dao.slave;

import com.tx.entity.slave.MemberInfo;
import org.springframework.stereotype.Repository;

@Repository
public interface MemberInfoMapper {
    int insert(MemberInfo memberInfo);
}
  

3. Mapper XML 文件

  • 主库 Mapper XML(classpath:mybatis/mapper/master/MemberMapper.xml):
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.tx.dao.master.MemberMapper">
    <insert id="insert" parameterType="com.tx.entity.master.Member">
        INSERT INTO member (id, username, password, status)
        VALUES (#{id}, #{username}, #{password}, #{status})
    </insert>
</mapper>
  
  • 从库 Mapper XML(classpath:mybatis/mapper/slave/MemberInfoMapper.xml):
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.tx.dao.slave.MemberInfoMapper">
    <insert id="insert" parameterType="com.tx.entity.slave.MemberInfo">
        INSERT INTO member_info (id, nickname, realname, age)
        VALUES (#{id}, #{nickname}, #{realname}, #{age})
    </insert>
</mapper>
  

4. 服务层实现(事务入口)

业务层方法registerMember同时操作主库和从库,通过 Spring 声明式事务保证一致性:
package com.tx.service;

import com.tx.dao.master.MemberMapper;
import com.tx.dao.slave.MemberInfoMapper;
import com.tx.entity.master.Member;
import com.tx.entity.slave.MemberInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MemberServiceImpl implements MemberService {

    private static final Logger LOG = LoggerFactory.getLogger(MemberServiceImpl.class);

    @Autowired
    private MemberMapper memberMapper;

    @Autowired
    private MemberInfoMapper memberInfoMapper;

    @Override
    public boolean registerMember(Member member, MemberInfo memberInfo) {
        try {
            // 操作主库:插入会员基础信息
            int masterResult = memberMapper.insert(member);
            if (masterResult != 1) {
                throw new RuntimeException("主库会员插入失败");
            }

            // 操作从库:插入会员详情信息
            int slaveResult = memberInfoMapper.insert(memberInfo);
            if (slaveResult != 1) {
                throw new RuntimeException("从库会员详情插入失败");
            }

            return true;
        } catch (Exception e) {
            LOG.error("会员注册失败:{}", e.getMessage(), e);
            // 抛出异常触发事务回滚
            throw new RuntimeException("会员注册异常", e);
        }
    }
}
  

五、测试验证

1. 测试用例编写(JUnit 4)

通过 SpringJUnit4ClassRunner 加载 Spring 上下文,测试事务提交与回滚场景:
package com.tx.test;

import com.tx.entity.master.Member;
import com.tx.entity.slave.MemberInfo;
import com.tx.service.MemberService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:spring-context.xml"})
public class DistributedTransactionTest {

    private static final Logger LOG = LoggerFactory.getLogger(DistributedTransactionTest.class);

    @Autowired
    private MemberService memberService;

    // 正常场景:事务提交
    @Test
    public void testRegisterSuccess() {
        Member member = new Member();
        member.setId(1001);
        member.setUsername("zhangsan");
        member.setPassword("123456");
        member.setStatus((byte) 0);

        MemberInfo memberInfo = new MemberInfo();
        memberInfo.setId(1001);
        memberInfo.setNickname("张三");
        memberInfo.setRealname("张三");
        memberInfo.setAge((byte) 25);

        boolean result = memberService.registerMember(member, memberInfo);
        LOG.info("会员注册结果:{}", result ? "成功" : "失败");
    }

    // 异常场景:事务回滚(从库插入失败)
    @Test
    public void testRegisterRollback() {
        Member member = new Member();
        member.setId(1002);
        member.setUsername("lisi");
        member.setPassword("654321");
        member.setStatus((byte) 0);

        MemberInfo memberInfo = new MemberInfo();
        memberInfo.setId(null); // 故意设置主键为空,触发插入失败
        memberInfo.setNickname("李四");
        memberInfo.setRealname("李四");
        memberInfo.setAge((byte) 30);

        try {
            memberService.registerMember(member, memberInfo);
        } catch (Exception e) {
            LOG.info("会员注册触发回滚:{}", e.getMessage());
        }
    }
}
  

2. 测试结果验证

(1)正常场景(事务提交)

  • 主库master_db.member表新增记录:id=1001, username=zhangsan
  • 从库slave_db.member_info表新增记录:id=1001, nickname=张三
  • 日志输出:会员注册结果:成功

(2)异常场景(事务回滚)

  • 从库插入因主键为空失败,触发事务回滚;
  • 主库member表无新增记录,从库member_info表无新增记录;
  • 日志输出:会员注册触发回滚:会员注册异常