Spring 解决单机环境下多数据源的事务问题

  1. SpringBoot 单机环境下动态多数据源的事务问题
    1. 一、事务实现方案及环境搭建
    2. 二、自定义开启事务注解
    3. 三、YML 配置多数据源
    4. 四、多数据源配置类
    5. 五、自定义数据源
    6. 六、重写 Connection 连接
    7. 七、数据源管理 Context
    8. 八、事务管理 Context
    9. 九、写切面
    10. 十、测试类
  2. SpringBoot 单机环境下静态多数据源的事务问题
    1. 一、事务实现方案
    2. 二、自定义开启事务注解
    3. 三、YML 配置多数据源
    4. 四、多数据源配置类
    5. 五、自定义数据源
    6. 六、重写 Connection 连接
    7. 七、数据源管理 Context
    8. 八、事务管理 Context
    9. 九、写切面
    10. 十、测试类
  3. SpringBoot JTA Atomikos 多数据源的分布式事务管理详解
    1. 1、分布式事务介绍
    2. 2、Atomikos 参考文档
    3. 3、Atomikos 组件介绍
    4. 4、JTA + AbstractRoutingDataSource 数据源切换失效
      1. 1、事务上下文的绑定
      2. 2、JTA 全局事务的特性
      3. 3、声明式事务与路由策略的冲突
      4. 4、解决方案
      5. 5、最终总结
  4. SpringBoot 使用 JdbcTemplate + Atomikos 实现分布式事务
    1. 1、添加依赖
    2. 2、在 application.yml 中配置多个数据源
    3. 3、第一个数据源配置
    4. 4、第二个数据源配置
    5. 5、Atomikos 配置,创建 JTA 事务管理器
    6. 6、编写 Controller 多数据源正常和异常情况
    7. 7、测试多数据源正常情况
    8. 8、测试多数据源异常情况
  5. SpringBoot 使用 Mybatis + Atomikos 实现分布式事务
    1. 1、添加依赖
    2. 2、在 application.yml 中配置多个数据源
    3. 3、第一个数据源配置
    4. 4、第二个数据源配置
    5. 5、Atomikos 配置,创建 JTA 事务管理器
    6. 6、编写数据库实体类以及 Mapper
    7. 7、编写 Controller 多数据源正常和异常情况
    8. 8、测试多数据源正常情况
    9. 9、测试多数据源异常情况
  6. SpringBoot 使用 JPA + Atomikos 实现分布式事务
    1. 1、添加依赖
    2. 2、在 application.yml 中配置多个数据源
    3. 3、第一个数据源配置
    4. 4、第二个数据源配置
    5. 5、Atomikos 配置,创建 JTA 事务管理器
    6. 6、编写 Entity 与 Repository
    7. 7、编写 Controller 多数据源正常和异常情况
    8. 8、测试多数据源正常情况
    9. 9、测试多数据源异常情况
  7. 参考文献 & 鸣谢

SpringBoot 单机环境下动态多数据源的事务问题

Java解决单机环境下多数据源的事务问题:https://www.cnblogs.com/fantongxue/p/16867228.html

SpringBoot 单机环境下的 @Transictional 可以保证事务,但多数据源的情况就无法使用了,这里简单实现一下多数据源的情况下如何保证事务。

一、事务实现方案及环境搭建

1、利用 ThreadLocal 将事务方法 内用到的 connection 缓存起来,当业务执行完毕,再统一 commit 或者 rollback;

img

2、pom.xml 文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>multi-database-transaction-static</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>MultiDatabaseTransaction</name>
    <description>MultiDatabaseTransaction</description>
    <properties>
        <java.version>11</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.h2database/h2 -->
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>nexus-aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>public</id>
            <name>aliyun nexus</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>
</project>

二、自定义开启事务注解

package com.example.annotation;

import java.lang.annotation.*;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface MultiDSTransaction {
}

三、YML 配置多数据源

spring:
  datasource:
    datasource1:
      jdbc-url: jdbc:h2:mem:datasource1
      username: sa
      password: sa
      driverClassName: org.h2.Driver
    datasource2:
      jdbc-url: jdbc:h2:mem:datasource2
      username: sa
      password: sa
      driverClassName: org.h2.Driver
  h2:
    console:
      enabled: true
      path: /h2
      settings:
        web-allow-others: true

四、多数据源配置类

使用了 @Primary 注解后,作用是将该bean设置为主要注入bean,当注入相同类型的datasource的bean时就不会注入DataSourceConfig配置类中注入的两个bean了,只会注入这个,mybatis 在使用 DataSourceUtil.getDataSource 的时候获取的是这个自定义数据源,执行的是此自定义数据源的 getConnection 方法。jdbcTemplate、JPA 也是同理。

package com.example.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

/**
 * 注入两个数据源
 */
@Configuration
public class DataSourceConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.datasource1")
    public DataSource dataSource1(){
        return DataSourceBuilder.create().build();
    }

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.datasource2")
    public DataSource dataSource2(){
        return DataSourceBuilder.create().build();
    }

    @Primary
    @Bean
    public DynamicDataSource dynamicDataSource() {
        DynamicDataSource dynamicDataSource = new DynamicDataSource();
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put("one", dataSource1());
        targetDataSources.put("two", dataSource2());
        // 设置默认数据源
        dynamicDataSource.setDefaultTargetDataSource(dataSource1());
        // 添加数据源集合
        dynamicDataSource.setTargetDataSources(targetDataSources);
        return dynamicDataSource;
    }
}

五、自定义数据源

在自定义数据源中注入上边那两个多数据源,维持多数据源执行事务期间用到的连接列表,在自定义数据源中添加事务相关业务,既在获取 连接的地方将 Connection 缓存到 ThreadLocal 中。

package com.example.config;

import com.example.util.DataSourceContext;
import com.example.util.TransactionContext;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class DynamicDataSource extends AbstractRoutingDataSource {
    /**
     * 添加一个DataSources Map,用来获取数据源集合
     */
    private Map<Object, Object> targetDataSources = new HashMap<>();
    public Map<Object, Object> getTargetDataSources() {
        return targetDataSources;
    }

    /**
     * 重写setTargetDataSources方法,为了可以让自己得到
     * @param targetDataSources
     */
    @Override
    public void setTargetDataSources(Map<Object, Object> targetDataSources) {
        this.targetDataSources.putAll(targetDataSources);
        super.setTargetDataSources(this.targetDataSources);
        super.afterPropertiesSet();// 必须添加该句,否则新添加数据源无法识别到
    }

    /**
     * 必须实现该接口。用来获取当前数据源
     */
    @Override
    protected Object determineCurrentLookupKey() {
        // 或者return null.细节看源码AbstractRoutingDataSource.determineTargetDataSource
        return DataSourceContext.getDataSource();
    }

    @Override
    public Connection getConnection() throws SQLException {
        // 获取当前数据源连接
        String currentDataSourceKey = DataSourceContext.getDataSource();
        DataSource currentDataSource = (DataSource) targetDataSources.get(currentDataSourceKey);
        Connection connection = currentDataSource.getConnection();

        // 获取自定义数据库连接
        CustomConnection customConnection = new CustomConnection(connection);
        if (TransactionContext.isOpenTran()) {
            customConnection.setAutoCommit(false);
            List<CustomConnection> customConnections = TransactionContext.MULTI_TRAN_CONNECTION.get();
            if (customConnections == null) {
                customConnections = new ArrayList<>();
            }
            customConnections.add(customConnection);
            TransactionContext.MULTI_TRAN_CONNECTION.set(customConnections);
        }
        return customConnection;
    }
}

六、重写 Connection 连接

package com.example.config;

import com.example.util.TransactionContext;
import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;

public class CustomConnection implements Connection {
    // 真实的连接
    private Connection connection;

    public CustomConnection(Connection connection) {
        this.connection = connection;
    }
    @Override
    public void commit() throws SQLException {
        // 如果没开启多数据源事务,则走 commit
        if (!TransactionContext.isOpenTran()) {
            connection.commit();
        }
    }

    @Override
    public void rollback() throws SQLException {
        connection.rollback();
    }

    public void commitMultiDbTran() throws SQLException {
        // 如果开启多数据源,则走 这里的 commit
        connection.commit();
    }
    @Override
    public void close() throws SQLException {
        // mybatis 执行完业务后,会触发 close() 操作,如果 connection 被提前 close 了,业务就会出错
        if (!TransactionContext.isOpenTran()) {
            connection.close();
        }
    }
    public void closeMultiDbTran() throws SQLException {
        // 如果开启多数据源事务,则走 这里的 close
        connection.close();
    }

    @Override
    public Statement createStatement() throws SQLException {
        return connection.createStatement();
    }

    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        return connection.prepareStatement(sql);
    }

    @Override
    public CallableStatement prepareCall(String sql) throws SQLException {
        return connection.prepareCall(sql);
    }

    @Override
    public String nativeSQL(String sql) throws SQLException {
        return connection.nativeSQL(sql);
    }

    @Override
    public void setAutoCommit(boolean autoCommit) throws SQLException {
        connection.setAutoCommit(autoCommit);
    }

    @Override
    public boolean getAutoCommit() throws SQLException {
        return connection.getAutoCommit();
    }

    @Override
    public boolean isClosed() throws SQLException {
        return connection.isClosed();
    }

    @Override
    public DatabaseMetaData getMetaData() throws SQLException {
        return connection.getMetaData();
    }

    @Override
    public void setReadOnly(boolean readOnly) throws SQLException {
        connection.setReadOnly(readOnly);
    }

    @Override
    public boolean isReadOnly() throws SQLException {
        return connection.isReadOnly();
    }

    @Override
    public void setCatalog(String catalog) throws SQLException {
        connection.setCatalog(catalog);
    }

    @Override
    public String getCatalog() throws SQLException {
        return connection.getCatalog();
    }

    @Override
    public void setTransactionIsolation(int level) throws SQLException {
        connection.setTransactionIsolation(level);
    }

    @Override
    public int getTransactionIsolation() throws SQLException {
        return connection.getTransactionIsolation();
    }

    @Override
    public SQLWarning getWarnings() throws SQLException {
        return connection.getWarnings();
    }

    @Override
    public void clearWarnings() throws SQLException {
        connection.clearWarnings();
    }

    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
        return connection.createStatement(resultSetType,resultSetConcurrency);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return connection.prepareStatement(sql,resultSetType,resultSetConcurrency);
    }

    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return connection.prepareCall(sql,resultSetType,resultSetConcurrency);
    }

    @Override
    public Map<String, Class<?>> getTypeMap() throws SQLException {
        return connection.getTypeMap();
    }

    @Override
    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
        connection.setTypeMap(map);
    }

    @Override
    public void setHoldability(int holdability) throws SQLException {
        connection.setHoldability(holdability);
    }

    @Override
    public int getHoldability() throws SQLException {
        return connection.getHoldability();
    }

    @Override
    public Savepoint setSavepoint() throws SQLException {
        return connection.setSavepoint();
    }

    @Override
    public Savepoint setSavepoint(String name) throws SQLException {
        return connection.setSavepoint(name);
    }

    @Override
    public void rollback(Savepoint savepoint) throws SQLException {
        connection.rollback(savepoint);
    }

    @Override
    public void releaseSavepoint(Savepoint savepoint) throws SQLException {
        connection.releaseSavepoint(savepoint);
    }

    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return connection.createStatement(resultSetType,resultSetConcurrency,resultSetHoldability);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return connection.prepareStatement(sql,resultSetType,resultSetConcurrency,resultSetHoldability);
    }

    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return connection.prepareCall(sql,resultSetType,resultSetConcurrency,resultSetHoldability);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
        return connection.prepareStatement(sql,autoGeneratedKeys);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
        return connection.prepareStatement(sql,columnIndexes);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
        return connection.prepareStatement(sql,columnNames);
    }

    @Override
    public Clob createClob() throws SQLException {
        return connection.createClob();
    }

    @Override
    public Blob createBlob() throws SQLException {
        return connection.createBlob();
    }

    @Override
    public NClob createNClob() throws SQLException {
        return connection.createNClob();
    }

    @Override
    public SQLXML createSQLXML() throws SQLException {
        return connection.createSQLXML();
    }

    @Override
    public boolean isValid(int timeout) throws SQLException {
        return connection.isValid(timeout);
    }

    @Override
    public void setClientInfo(String name, String value) throws SQLClientInfoException {
        connection.setClientInfo(name,value);
    }

    @Override
    public void setClientInfo(Properties properties) throws SQLClientInfoException {
        connection.setClientInfo(properties);
    }

    @Override
    public String getClientInfo(String name) throws SQLException {
        return connection.getClientInfo(name);
    }

    @Override
    public Properties getClientInfo() throws SQLException {
        return connection.getClientInfo();
    }

    @Override
    public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
        return connection.createArrayOf(typeName,elements);
    }

    @Override
    public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
        return connection.createStruct(typeName,attributes);
    }

    @Override
    public void setSchema(String schema) throws SQLException {
        connection.setSchema(schema);
    }

    @Override
    public String getSchema() throws SQLException {
        return connection.getSchema();
    }

    @Override
    public void abort(Executor executor) throws SQLException {
        connection.abort(executor);
    }

    @Override
    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
        connection.setNetworkTimeout(executor,milliseconds);
    }

    @Override
    public int getNetworkTimeout() throws SQLException {
        return connection.getNetworkTimeout();
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return connection.unwrap(iface);
    }

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return connection.isWrapperFor(iface);
    }
}

七、数据源管理 Context

package com.example.util;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DataSourceContext {
    /**
     * 动态数据源名称上下文
     */
    private static final ThreadLocal<String> contextLocal = ThreadLocal.withInitial(() -> "one");

    /**
     * 设置数据源
     */
    public static void setDataSource(String key) {
        contextLocal.set(key);
    }

    /**
     * 获取数据源
     *
     * @return
     */
    public static String getDataSource() {
        return contextLocal.get();
    }

    /**
     * 重置/释放数据源
     */
    public static void clearDataSource() {
        contextLocal.remove();
    }
}

八、事务管理 Context

package com.example.util;

import com.example.config.CustomConnection;
import java.util.List;

public class TransactionContext {
    // 默认事务处于关闭状态
    private static final ThreadLocal<Boolean> TRAN_SWITCH_CONTEXT = ThreadLocal.withInitial(() -> false);
    /**
     * 多数据源 执行 事务期间用到的连接
     */
    public static final ThreadLocal<List<CustomConnection>> MULTI_TRAN_CONNECTION = new ThreadLocal<>();

    // 开启事务
    public static void openTran() {
        TRAN_SWITCH_CONTEXT.set(true);
    }
    // 关闭事务
    public static void closeTran() {
        TRAN_SWITCH_CONTEXT.set(false);
    }
    // 判断是否开启事务
    public static Boolean isOpenTran() {
        return TRAN_SWITCH_CONTEXT.get();
    }
}

九、写切面

利用 AOP 进行方法拦截,对使用了 多数据源 事务注解的方法,执行事务业务

package com.example.aspect;

import com.example.config.CustomConnection;
import com.example.util.TransactionContext;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.context.annotation.Configuration;

@Aspect
@Configuration
public class MultiDSTransactionConfig {
    @Pointcut("@annotation(com.example.annotation.MultiDSTransaction)")
    public void transactPoint() {
    }

    @Around("transactPoint()")
    public Object multiTranAop(ProceedingJoinPoint joinPoint) throws Throwable {
        // 开启事务
        TransactionContext.openTran();
        try {
            // 执行业务
            Object proceed = joinPoint.proceed();
            // 提交事务 & 关闭连接
            for (CustomConnection connection : TransactionContext.MULTI_TRAN_CONNECTION.get()) {
                connection.commitMultiDbTran();
                connection.closeMultiDbTran();
            }
            return proceed;
        } catch (Throwable t) {
            for (CustomConnection connection : TransactionContext.MULTI_TRAN_CONNECTION.get()) {
                // 事务回滚
                connection.rollback();
                connection.closeMultiDbTran();
            }
            throw t;
        } finally {
            // 清空 事务 连接,关闭当前事务
            TransactionContext.MULTI_TRAN_CONNECTION.get().clear();
            TransactionContext.closeTran();
        }
    }
}

十、测试类

测试代码如下,当报错的之后,事务同时回滚,数据没插入成功,当未出现报错,数据则都插入成功

package com.example.controller;

import com.example.annotation.MultiDSTransaction;
import com.example.util.DataSourceContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import java.util.stream.Stream;

@RestController
public class TestController {
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @RequestMapping("test")
    @MultiDSTransaction
    public void test() {
        // 选择datasource1数据源
        DataSourceContext.setDataSource("one");
        int i = jdbcTemplate.update("INSERT INTO user (id, name, type) VALUES (1, 'meir', 'db_test_one')");

        // 模拟报错
        int k = 1 / 0;

        // 选择datasource2数据源
        DataSourceContext.setDataSource("two");
        int j = jdbcTemplate.update("INSERT INTO user (id, name, type) VALUES (1, 'sam', 'db_test_two')");

        System.out.println("over");
    }

    @PostConstruct
    public void init1() {
        Stream.of("one", "two").forEach(i -> initData(i));
    }

    private void initData(String str) {
        // 切换数据源
        DataSourceContext.setDataSource(str);
        // 初始化表和数据
        jdbcTemplate.execute("DROP TABLE IF EXISTS user");
        jdbcTemplate.execute("CREATE TABLE user ("
                + "id BIGINT NOT NULL,"
                + "name VARCHAR(50) DEFAULT NULL,"
                + "type VARCHAR(50) DEFAULT NULL,"
                + "PRIMARY KEY (id))");
        // 还原数据源
        DataSourceContext.clearDataSource();
    }
}

SpringBoot 单机环境下静态多数据源的事务问题

一、事务实现方案

Java解决单机环境下多数据源的事务问题:https://www.cnblogs.com/fantongxue/p/16867228.html

利用 ThreadLocal 将事务方法 内用到的 connection 缓存起来,当业务执行完毕,再统一 commit 或者 rollback;

2、pom.xml 文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>multi-database-transaction-static</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>MultiDatabaseTransaction</name>
    <description>MultiDatabaseTransaction</description>
    <properties>
        <java.version>11</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.h2database/h2 -->
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>nexus-aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>public</id>
            <name>aliyun nexus</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>
</project>

二、自定义开启事务注解

package com.example.annotation;

import java.lang.annotation.*;

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface MultiDSTransaction {
}

三、YML 配置多数据源

spring:
  datasource:
    datasource1:
      jdbc-url: jdbc:h2:mem:datasource1
      username: sa
      password: sa
      driverClassName: org.h2.Driver
    datasource2:
      jdbc-url: jdbc:h2:mem:datasource2
      username: sa
      password: sa
      driverClassName: org.h2.Driver
  h2:
    console:
      enabled: true
      path: /h2
      settings:
        web-allow-others: true

四、多数据源配置类

使用了 @Primary 注解后,作用是将该bean设置为主要注入bean,当注入相同类型的datasource的bean时就不会注入DataSourceConfig配置类中注入的两个bean了,只会注入这个,mybatis 在使用 DataSourceUtil.getDataSource 的时候获取的是这个自定义数据源,执行的是此自定义数据源的 getConnection 方法。jdbcTemplate、JPA 也是同理。

package com.example.config;

import com.example.util.DataSourceContext;
import com.example.util.TransactionContext;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

/**
 * 管理多个数据源
 */
public class StaticDataSource implements DataSource {

    private Map<String, DataSource> dataSourceMap;

    public StaticDataSource(Map<String, DataSource> dataSourceMap) {
        this.dataSourceMap = dataSourceMap;
    }

    @Override
    public Connection getConnection() throws SQLException {
        Connection connection = dataSourceMap.get(DataSourceContext.getDataSource()).getConnection();
        CustomConnection customConnection = new CustomConnection(connection);
        if (TransactionContext.isOpenTran()) {
            customConnection.setAutoCommit(false);
            List<CustomConnection> customConnections = TransactionContext.MULTI_TRAN_CONNECTION.get();
            if (customConnections == null) {
                customConnections = new ArrayList<>();
            }
            customConnections.add(customConnection);
            TransactionContext.MULTI_TRAN_CONNECTION.set(customConnections);
        }
        return customConnection;
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        return null;
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return null;
    }

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return false;
    }

    @Override
    public PrintWriter getLogWriter() throws SQLException {
        return null;
    }

    @Override
    public void setLogWriter(PrintWriter out) throws SQLException {

    }

    @Override
    public void setLoginTimeout(int seconds) throws SQLException {

    }

    @Override
    public int getLoginTimeout() throws SQLException {
        return 0;
    }

    @Override
    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
        return null;
    }
}

五、自定义数据源

在自定义数据源中注入上边那两个多数据源,维持多数据源执行事务期间用到的连接列表,在自定义数据源中添加事务相关业务,既在获取 连接的地方将 Connection 缓存到 ThreadLocal 中

package com.example.config;

import com.example.util.DataSourceContext;
import com.example.util.TransactionContext;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import javax.sql.DataSource;
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

/**
 * 管理多个数据源
 */
public class StaticDataSource implements DataSource {

    private Map<String, DataSource> dataSourceMap;

    public StaticDataSource(Map<String, DataSource> dataSourceMap) {
        this.dataSourceMap = dataSourceMap;
    }

    @Override
    public Connection getConnection() throws SQLException {
        Connection connection = dataSourceMap.get(DataSourceContext.getDataSource()).getConnection();
        CustomConnection customConnection = new CustomConnection(connection);
        if (TransactionContext.isOpenTran()) {
            customConnection.setAutoCommit(false);
            List<CustomConnection> customConnections = TransactionContext.MULTI_TRAN_CONNECTION.get();
            if (customConnections == null) {
                customConnections = new ArrayList<>();
            }
            customConnections.add(customConnection);
            TransactionContext.MULTI_TRAN_CONNECTION.set(customConnections);
        }
        return customConnection;
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        return null;
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return null;
    }

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return false;
    }

    @Override
    public PrintWriter getLogWriter() throws SQLException {
        return null;
    }

    @Override
    public void setLogWriter(PrintWriter out) throws SQLException {

    }

    @Override
    public void setLoginTimeout(int seconds) throws SQLException {

    }

    @Override
    public int getLoginTimeout() throws SQLException {
        return 0;
    }

    @Override
    public Logger getParentLogger() throws SQLFeatureNotSupportedException {
        return null;
    }
}

六、重写 Connection 连接

package com.example.config;

import com.example.util.TransactionContext;
import java.sql.*;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;

public class CustomConnection implements Connection {
    // 真实的连接
    private Connection connection;

    public CustomConnection(Connection connection) {
        this.connection = connection;
    }
    @Override
    public void commit() throws SQLException {
        // 如果没开启多数据源事务,则走 commit
        if (!TransactionContext.isOpenTran()) {
            connection.commit();
        }
    }

    @Override
    public void rollback() throws SQLException {
        connection.rollback();
    }

    public void commitMultiDbTran() throws SQLException {
        // 如果开启多数据源,则走 这里的 commit
        connection.commit();
    }
    @Override
    public void close() throws SQLException {
        // mybatis 执行完业务后,会触发 close() 操作,如果 connection 被提前 close 了,业务就会出错
        if (!TransactionContext.isOpenTran()) {
            connection.close();
        }
    }
    public void closeMultiDbTran() throws SQLException {
        // 如果开启多数据源事务,则走 这里的 close
        connection.close();
    }

    @Override
    public Statement createStatement() throws SQLException {
        return connection.createStatement();
    }

    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        return connection.prepareStatement(sql);
    }

    @Override
    public CallableStatement prepareCall(String sql) throws SQLException {
        return connection.prepareCall(sql);
    }

    @Override
    public String nativeSQL(String sql) throws SQLException {
        return connection.nativeSQL(sql);
    }

    @Override
    public void setAutoCommit(boolean autoCommit) throws SQLException {
        connection.setAutoCommit(autoCommit);
    }

    @Override
    public boolean getAutoCommit() throws SQLException {
        return connection.getAutoCommit();
    }

    @Override
    public boolean isClosed() throws SQLException {
        return connection.isClosed();
    }

    @Override
    public DatabaseMetaData getMetaData() throws SQLException {
        return connection.getMetaData();
    }

    @Override
    public void setReadOnly(boolean readOnly) throws SQLException {
        connection.setReadOnly(readOnly);
    }

    @Override
    public boolean isReadOnly() throws SQLException {
        return connection.isReadOnly();
    }

    @Override
    public void setCatalog(String catalog) throws SQLException {
        connection.setCatalog(catalog);
    }

    @Override
    public String getCatalog() throws SQLException {
        return connection.getCatalog();
    }

    @Override
    public void setTransactionIsolation(int level) throws SQLException {
        connection.setTransactionIsolation(level);
    }

    @Override
    public int getTransactionIsolation() throws SQLException {
        return connection.getTransactionIsolation();
    }

    @Override
    public SQLWarning getWarnings() throws SQLException {
        return connection.getWarnings();
    }

    @Override
    public void clearWarnings() throws SQLException {
        connection.clearWarnings();
    }

    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
        return connection.createStatement(resultSetType,resultSetConcurrency);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return connection.prepareStatement(sql,resultSetType,resultSetConcurrency);
    }

    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
        return connection.prepareCall(sql,resultSetType,resultSetConcurrency);
    }

    @Override
    public Map<String, Class<?>> getTypeMap() throws SQLException {
        return connection.getTypeMap();
    }

    @Override
    public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
        connection.setTypeMap(map);
    }

    @Override
    public void setHoldability(int holdability) throws SQLException {
        connection.setHoldability(holdability);
    }

    @Override
    public int getHoldability() throws SQLException {
        return connection.getHoldability();
    }

    @Override
    public Savepoint setSavepoint() throws SQLException {
        return connection.setSavepoint();
    }

    @Override
    public Savepoint setSavepoint(String name) throws SQLException {
        return connection.setSavepoint(name);
    }

    @Override
    public void rollback(Savepoint savepoint) throws SQLException {
        connection.rollback(savepoint);
    }

    @Override
    public void releaseSavepoint(Savepoint savepoint) throws SQLException {
        connection.releaseSavepoint(savepoint);
    }

    @Override
    public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return connection.createStatement(resultSetType,resultSetConcurrency,resultSetHoldability);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return connection.prepareStatement(sql,resultSetType,resultSetConcurrency,resultSetHoldability);
    }

    @Override
    public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
        return connection.prepareCall(sql,resultSetType,resultSetConcurrency,resultSetHoldability);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
        return connection.prepareStatement(sql,autoGeneratedKeys);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
        return connection.prepareStatement(sql,columnIndexes);
    }

    @Override
    public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
        return connection.prepareStatement(sql,columnNames);
    }

    @Override
    public Clob createClob() throws SQLException {
        return connection.createClob();
    }

    @Override
    public Blob createBlob() throws SQLException {
        return connection.createBlob();
    }

    @Override
    public NClob createNClob() throws SQLException {
        return connection.createNClob();
    }

    @Override
    public SQLXML createSQLXML() throws SQLException {
        return connection.createSQLXML();
    }

    @Override
    public boolean isValid(int timeout) throws SQLException {
        return connection.isValid(timeout);
    }

    @Override
    public void setClientInfo(String name, String value) throws SQLClientInfoException {
        connection.setClientInfo(name,value);
    }

    @Override
    public void setClientInfo(Properties properties) throws SQLClientInfoException {
        connection.setClientInfo(properties);
    }

    @Override
    public String getClientInfo(String name) throws SQLException {
        return connection.getClientInfo(name);
    }

    @Override
    public Properties getClientInfo() throws SQLException {
        return connection.getClientInfo();
    }

    @Override
    public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
        return connection.createArrayOf(typeName,elements);
    }

    @Override
    public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
        return connection.createStruct(typeName,attributes);
    }

    @Override
    public void setSchema(String schema) throws SQLException {
        connection.setSchema(schema);
    }

    @Override
    public String getSchema() throws SQLException {
        return connection.getSchema();
    }

    @Override
    public void abort(Executor executor) throws SQLException {
        connection.abort(executor);
    }

    @Override
    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
        connection.setNetworkTimeout(executor,milliseconds);
    }

    @Override
    public int getNetworkTimeout() throws SQLException {
        return connection.getNetworkTimeout();
    }

    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        return connection.unwrap(iface);
    }

    @Override
    public boolean isWrapperFor(Class<?> iface) throws SQLException {
        return connection.isWrapperFor(iface);
    }
}

七、数据源管理 Context

package com.example.util;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DataSourceContext {
    /**
     * 动态数据源名称上下文
     */
    private static final ThreadLocal<String> contextLocal = ThreadLocal.withInitial(() -> "one");

    /**
     * 设置数据源
     */
    public static void setDataSource(String key) {
        contextLocal.set(key);
    }

    /**
     * 获取数据源
     *
     * @return
     */
    public static String getDataSource() {
        return contextLocal.get();
    }

    /**
     * 重置/释放数据源
     */
    public static void clearDataSource() {
        contextLocal.remove();
    }
}

八、事务管理 Context

package com.example.util;

import com.example.config.CustomConnection;
import java.util.List;

public class TransactionContext {
    // 默认事务处于关闭状态
    private static final ThreadLocal<Boolean> TRAN_SWITCH_CONTEXT = ThreadLocal.withInitial(() -> false);
    /**
     * 多数据源 执行 事务期间用到的连接
     */
    public static final ThreadLocal<List<CustomConnection>> MULTI_TRAN_CONNECTION = new ThreadLocal<>();

    // 开启事务
    public static void openTran() {
        TRAN_SWITCH_CONTEXT.set(true);
    }
    // 关闭事务
    public static void closeTran() {
        TRAN_SWITCH_CONTEXT.set(false);
    }
    // 判断是否开启事务
    public static Boolean isOpenTran() {
        return TRAN_SWITCH_CONTEXT.get();
    }
}

九、写切面

利用 AOP 进行方法拦截,对使用了 多数据源 事务注解的方法,执行事务业务

package com.example.aspect;

import com.example.config.CustomConnection;
import com.example.util.TransactionContext;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.context.annotation.Configuration;

@Aspect
@Configuration
public class MultiDSTransactionConfig {
    @Pointcut("@annotation(com.example.annotation.MultiDSTransaction)")
    public void transactPoint() {
    }

    @Around("transactPoint()")
    public Object multiTranAop(ProceedingJoinPoint joinPoint) throws Throwable {
        // 开启事务
        TransactionContext.openTran();
        try {
            // 执行业务
            Object proceed = joinPoint.proceed();
            // 提交事务 & 关闭连接
            for (CustomConnection connection : TransactionContext.MULTI_TRAN_CONNECTION.get()) {
                connection.commitMultiDbTran();
                connection.closeMultiDbTran();
            }
            return proceed;
        } catch (Throwable t) {
            for (CustomConnection connection : TransactionContext.MULTI_TRAN_CONNECTION.get()) {
                // 事务回滚
                connection.rollback();
                connection.closeMultiDbTran();
            }
            throw t;
        } finally {
            // 清空 事务 连接,关闭当前事务
            TransactionContext.MULTI_TRAN_CONNECTION.get().clear();
            TransactionContext.closeTran();
        }
    }
}

十、测试类

测试代码如下,当报错的之后,事务同时回滚,数据没插入成功,当未出现报错,数据则都插入成功

package com.example.controller;

import com.example.annotation.MultiDSTransaction;
import com.example.util.DataSourceContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import java.util.stream.Stream;

@RestController
public class TestController {
    @Autowired
    private JdbcTemplate jdbcTemplate;

    @RequestMapping("test")
    @MultiDSTransaction
    public void test() {
        // 选择datasource1数据源
        DataSourceContext.setDataSource("one");
        int i = jdbcTemplate.update("INSERT INTO user (id, name, type) VALUES (1, 'meir', 'db_test_one')");

        // 模拟报错
        int k = 1 / 0;

        // 选择datasource2数据源
        DataSourceContext.setDataSource("two");
        int j = jdbcTemplate.update("INSERT INTO user (id, name, type) VALUES (1, 'sam', 'db_test_two')");

        System.out.println("over");
    }

    @PostConstruct
    public void init1() {
        Stream.of("one", "two").forEach(i -> initData(i));
    }

    private void initData(String str) {
        // 切换数据源
        DataSourceContext.setDataSource(str);
        // 初始化表和数据
        jdbcTemplate.execute("DROP TABLE IF EXISTS user");
        jdbcTemplate.execute("CREATE TABLE user ("
                + "id BIGINT NOT NULL,"
                + "name VARCHAR(50) DEFAULT NULL,"
                + "type VARCHAR(50) DEFAULT NULL,"
                + "PRIMARY KEY (id))");
        // 还原数据源
        DataSourceContext.clearDataSource();
    }
}

SpringBoot JTA Atomikos 多数据源的分布式事务管理详解

1、分布式事务介绍

Atomikos 是一个易用、可靠、开放源码的事务管理器,它可以用于管理分布式事务,尤其在微服务架构中非常实用。它支持 JTA(Java Transaction API)规范,能够与各种 JTA 兼容的资源管理器(如数据库和消息队列)配合使用。

分布式事务:当一个业务操作需要修改多个资源(如多个数据库或消息队列)时,我们需要保证这些修改操作的原子性,即它们要么全部成功,要么全部失败。这就是分布式事务。

JTA:Java Transaction API(JTA)是Java平台的一个事务规范,定义了用户和事务管理器以及事务管理器和资源管理器之间的接口。Atomikos 作为一个事务管理器,就是遵循 JTA 规范的。

XA 协议:XA 协议是分布式事务的一个重要协议,它定义了全局事务 ID、分支事务 ID 等概念,以及如何协调分支事务的接口。Atomikos 支持 XA 协议。

Atomikos 还提供了自动恢复、故障转移等高级特性,以进一步提高分布式事务的可靠性。主要用于处理跨数据库事务;在 SpringBoot 的文档也推荐更多人使用 Atomikos。

2、Atomikos 参考文档

SpringBoot 提供了一个用于整合 Atomikos 的 starter,名为 spring-boot-starter-jta-atomikos。它是 SpringBoot 提供的一系列 “starter” 依赖之一。

在 SpringBoot 2.x 中,整合了这两个 JTA 的实现:

  • Atomikos:可以通过引入spring-boot-starter-jta-atomikos依赖来使用
  • Bitronix:可以通过引入spring-boot-starter-jta-bitronix依赖来使用

由于 Bitronix 自 SpringBoot 2.3.0 开始不推荐使用,所以在下面的动手环节中,我们将使用 Atomikos 作为例子来介绍JTA的使用。

SpringBoot 官方文档的 “Spring Boot Features” 部分有一个 “Working with JTA” 的小节,其中提到了如何使用 spring-boot-starter-jta-atomikos。

3、Atomikos 组件介绍

Atomikos 是一个提供分布式事务管理的开源事务管理器。将它们结合使用,可以在 Spring Boot 应用程序中实现分布式事务的管理。

在 SpringBoot 中使用 Atomikos,通常需要进行以下步骤:

  1. 引入 Atomikos 依赖:首先,在 Maven 或 Gradle 构建文件中添加 Atomikos 的依赖项。可以添加 atomikos-transactions-spring-boot-starter 依赖,它是 Atomikos 与 SpringBoot 集成的起点。
  2. 配置数据源:在 SpringBoot 应用程序中,你需要配置多个数据源。可以使用 Spring Boot 的自动配置功能,根据配置文件或属性来配置数据源。你可以使用任何支持 Atomikos 的数据源,如 Atomikos 提供的 AtomikosDataSourceBean 或其他第三方数据源。
  3. 配置 Atomikos 事务管理器:在 SpringBoot 应用程序中,你需要配置 Atomikos 事务管理器。可以通过在配置类中创建 JtaTransactionManager 实例,并将其与 Atomikos 的 UserTransactionManager 和 TransactionManager 关联起来。这样,Spring 将使用 Atomikos 事务管理器来管理分布式事务。
  4. 配置 JTA 事务管理器:为了使 SpringBoot 应用程序能够使用 Atomikos 进行分布式事务管理,你需要配置 JTA 事务管理器。可以使用 Spring Boot 的自动配置功能,根据配置文件或属性来配置 JTA 事务管理器。
  5. 在方法上添加 @Transactional 注解:在需要进行事务管理的方法上,添加 Spring 的 @Transactional 注解。这将告诉 Spring 在方法执行期间启动和提交事务,并回滚事务(如果发生异常)。

使用 SpringBoot + Atomikos 的原理如下:

  • SpringBoot 提供了自动配置功能,可以根据配置文件或属性来自动配置数据源和事务管理器。

  • Atomikos 是一个独立的事务管理器,它提供了 JTA(Java Transaction API)的实现,可以处理分布式事务。

  • 在 Spring Boot 中,你配置了多个数据源,并使用 Atomikos 的数据源实现(如 AtomikosDataSourceBean)来实现分布式事务。

  • 当你在方法上添加了 @Transactional 注解时,Spring Boot 会使用 Atomikos 的事务管理器来管理事务。

  • 当方法执行时,事务管理器会协调各个数据源的事务,并在方法执行完成后根据事务的状态来提交或回滚事务。

总结来说,SpringBoot + Atomikos 的原理是利用 SpringBoot 的自动配置功能来配置多个数据源和事务管理器,并使用 Atomikos 的事务管理器实现分布式事务的管理。这样,你可以在 SpringBoot 应用程序中使用 @Transactional 注解来管理分布式事务。

最后需要确保你的数据库支持 XA 事务,否则无法使用 Atomikos。

4、JTA + AbstractRoutingDataSource 数据源切换失效

在使用 JTA、AbstractRoutingDataSource 和声明式事务时,数据源切换失效的问题通常与以下几个方面有关:

1、事务上下文的绑定

  • 当声明式事务(通过 @Transactional 注解)启动时,事务管理器会创建一个新的事务上下文,并根据路由策略选择并绑定一个具体的数据源。这个绑定一旦完成,整个事务周期内都将使用这个数据源,即使 AbstractRoutingDataSource 后续的路由策略改变,也不会影响已绑定的数据源。
  • 因为 JTA 事务管理器管理的是全局事务,所有参与的资源(即数据源)在事务启动时都必须确定。一旦事务上下文绑定到某个数据源,事务的整个生命周期内都将固定使用这个数据源,导致数据源切换失效。

2、JTA 全局事务的特性

  • JTA(Java Transaction API)负责管理分布式事务。它需要在事务启动时,将所有参与的资源(如不同的数据源)提前注册到事务中。由于 JTA 的全局事务特性,切换数据源必须在事务启动前完成。事务启动后,无法再改变绑定到事务上的资源。
  • 如果 AbstractRoutingDataSource 尝试在事务进行中切换数据源,新的数据源可能不会被 JTA 事务管理器识别或注册,从而导致事务管理失效,数据源切换无效。

3、声明式事务与路由策略的冲突

  • 声明式事务依赖于 Spring 的 PlatformTransactionManager 或 JTA 事务管理器来管理事务。事务管理器会在事务开始时根据路由策略选择一个数据源,并在事务执行期间固定使用该数据源。
  • AbstractRoutingDataSource 的路由策略通常基于当前线程的上下文信息(如请求的某些参数)动态选择数据源。如果在事务开始后路由策略改变,由于事务已经绑定到某个数据源,路由切换将无效。

4、解决方案

  1. 在事务外部切换数据源

    • 如果可能,确保数据源的选择在事务开始前完成。你可以在业务逻辑中提前设置好路由策略,确保在事务启动时,AbstractRoutingDataSource 能正确选择并绑定数据源。
    • 将数据源的切换逻辑放在不使用事务管理的代码部分,这样可以避免因事务绑定导致的数据源切换问题。
  2. 手动管理事务

    • 对于复杂的事务场景,可以考虑手动管理事务而不是依赖声明式事务。这允许你更精确地控制事务的启动和结束时机,确保在事务启动前完成数据源切换。
    • 可以使用 TransactionTemplate 或者直接使用 JtaTransactionManager 进行手动事务管理。
  3. 使用独立的事务管理器

    • 考虑为每个数据源配置独立的事务管理器,而不是使用全局的 JTA 事务管理器。每个数据源使用自己的事务管理器,这样可以避免 JTA 全局事务的限制。
    • 这种方式更适合没有严格分布式事务需求的场景。
  4. 使用 Seata 等分布式事务框架

    • 如果你确实需要在分布式系统中进行数据源切换和事务管理,可以考虑使用像 Seata 这样的分布式事务框架,它支持在微服务环境下的分布式事务,并且能够处理跨数据源的事务。

5、最终总结

在使用 JTA、AbstractRoutingDataSource 和声明式事务时,数据源切换失效通常是由于事务上下文绑定和 JTA 全局事务的限制所导致的。通过在事务外部进行数据源切换、手动管理事务或者使用独立的事务管理器,可以有效地避免这个问题。确保事务的管理和数据源的选择策略保持一致,是解决该问题的关键。

所以下面咱们的案例都是静态多数据源整合 jta-atomikos 实现多数据源分布式事务。

SpringBoot 使用 JdbcTemplate + Atomikos 实现分布式事务

1、添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.18</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>h2-jdbctemplate-multi-transaction</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>h2-jdbctemplate-multi-transaction</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.h2database/h2 -->
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2、在 application.yml 中配置多个数据源

server:
  port: 8090

#################### H2 数据库配置 ####################
# 文件数据库, jdbc:h2:file:./dbh2/dbc2m;AUTO_SERVER=TRUE
# 内存数据库,jdbc:h2:mem:master
spring:
  datasource1:
    unique-resource-name: datasource1
    xa-data-source-class-name: org.h2.jdbcx.JdbcDataSource
    xa-properties:
      url: jdbc:h2:mem:master
      user: sa
      password: sa
    exclusive-connection-mode: true
    min-pool-size: 3
    max-pool-size: 10
    max-lifetime: 20000
    borrow-connection-timeout: 10000
    test-query: SELECT 1 from dual # 由于采用HikariCP,用于检测数据库连接是否存活。
  datasource2:
    unique-resource-name: datasource2
    xa-data-source-class-name: org.h2.jdbcx.JdbcDataSource
    xa-properties:
      url: jdbc:h2:mem:slave
      user: sa
      password: sa
    exclusive-connection-mode: true
    min-pool-size: 3
    max-pool-size: 10
    max-lifetime: 20000
    borrow-connection-timeout: 10000
    test-query: SELECT 1 from dual # 由于采用HikariCP,用于检测数据库连接是否存活。

  #################### H2 Web Console设置 ####################
  ### enabled:程序开启时就会启动 H2 Web Console, 默认就是启动的
  ### path:配置访问地址为:/h2,访问 H2 Web Console,默认:/h2-console
  ### settings.web-allow-others:开启H2 Web Console远程访问,默认为false不开启只能在本机访问
  h2:
    console:
      enabled: true
      path: /h2
      settings:
        web-allow-others: true

## SQL打印以及事务的打印
logging:
  level:
    org.springframework.transaction: DEBUG
    org.springframework.jdbc.datasource.DataSourceTransactionManager: DEBUG
    org.springframework.jdbc.core.JdbcTemplate: DEBUG
    org.springframework.jdbc.datasource.DataSourceUtils: DEBUG
    org.springframework.jdbc.core.StatementCreatorUtils: TRACE

3、第一个数据源配置

package com.example.config;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

/**
 * '@EnableAutoConfiguration' 就是去掉springboot默认自动配置数据源
 */
@DependsOn("transactionManager")
@Configuration
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class})
public class JdbcTemplateDataSource1Config {

    @Primary
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource1")
    public DataSource datasource1() {
        // 这里是关键,返回的是AtomikosDataSourceBean,所有的配置属性也都是注入到这个类里面
        return new AtomikosDataSourceBean();
    }

    /**
     * 初始化JdbcTemplate,这里必须显示配置。
     */
    @Bean
    public JdbcTemplate jdbcTemplate1(@Qualifier("datasource1") DataSource datasource1) {
        return new JdbcTemplate(datasource1);
    }

    /**
     * 事务管理
     */
    @Bean(name = "transactionManager1")
    public PlatformTransactionManager transactionManager1(@Qualifier("datasource1") DataSource datasource1) {
        return new DataSourceTransactionManager(datasource1);
    }
}
  • 这里需要注意:数据源需要在 JTA transactionManager 创建后创建。

4、第二个数据源配置

package com.example.config;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;

@DependsOn("transactionManager")
@Configuration
public class JdbcTemplateDataSource2Config {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource2")
    public DataSource datasource2() {
        return new AtomikosDataSourceBean();
    }

    /**
     * 初始化JdbcTemplate,这里必须显示配置。
     */
    @Bean
    public JdbcTemplate jdbcTemplate2(@Qualifier("datasource2") DataSource datasource2) {
        return new JdbcTemplate(datasource2);
    }

    /**
     * 事务管理
     */
    @Bean(name = "transactionManager2")
    public PlatformTransactionManager transactionManager2(@Qualifier("datasource2") DataSource datasource2) {
        return new DataSourceTransactionManager(datasource2);
    }

}
  • 这里需要注意:数据源需要在 JTA transactionManager 创建后创建。

5、Atomikos 配置,创建 JTA 事务管理器

package com.example.config;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;

@Configuration
public class JtaTransactionManagerConfig {

    @Bean
    public UserTransaction userTransaction() throws SystemException {
        UserTransactionImp userTransactionImp = new UserTransactionImp();
        userTransactionImp.setTransactionTimeout(10000);
        return userTransactionImp;
    }

    @Bean(name = "atomikosTransactionManager")
    public TransactionManager atomikosTransactionManager() {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(false);
        return userTransactionManager;
    }

    @Bean(name = "transactionManager")
    @DependsOn({"userTransaction", "atomikosTransactionManager"})
    public PlatformTransactionManager transactionManager() throws SystemException {
        return new JtaTransactionManager(userTransaction(), atomikosTransactionManager());
    }
}

6、编写 Controller 多数据源正常和异常情况

package com.example.controller;

import com.atomikos.jdbc.AtomikosDataSourceBean;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

@Slf4j
@RestController
public class TestController {

    @Resource(name = "jdbcTemplate1")
    JdbcTemplate jdbcTemplate1;
    @Resource(name = "jdbcTemplate2")
    JdbcTemplate jdbcTemplate2;

    /**
     * 给新增的数据源创建表和初始化数据
     */
    @PostConstruct
    public void init() {
        Stream.of(jdbcTemplate1, jdbcTemplate2).forEach(jdbcTemplate -> {
            jdbcTemplate.execute("DROP TABLE IF EXISTS USER_INFO");
            jdbcTemplate.execute("CREATE TABLE USER_INFO ("
                    + "id BIGINT NOT NULL,"
                    + "name VARCHAR(50) DEFAULT NULL,"
                    + "type VARCHAR(50) DEFAULT NULL,"
                    + "PRIMARY KEY (id))");
            String sql = "INSERT INTO USER_INFO (id, name, type) VALUES (?, ?, ?)";
            String resourceName = ((AtomikosDataSourceBean) jdbcTemplate.getDataSource()).getUniqueResourceName();
            jdbcTemplate.update(sql, 1, "Jone", resourceName);
            jdbcTemplate.update(sql, 2, "Tom", resourceName);
            jdbcTemplate.update(sql, 3, "Sandy", resourceName);
        });
    }

    @Transactional(transactionManager = "transactionManager")
    @RequestMapping("/test")
    public String testOk1() {
        jdbcTemplate1.execute("DELETE FROM USER_INFO WHERE id = 1");
        jdbcTemplate2.execute("DELETE FROM USER_INFO WHERE id = 1");
        return "delete success";
    }

    @Transactional(transactionManager = "transactionManager")
    @RequestMapping("/testError")
    public String testError() {
        jdbcTemplate1.execute("DELETE FROM USER_INFO WHERE id = 2");
        jdbcTemplate2.execute("DELETE FROM USER_INFO WHERE id = 2");
        int i = 1 / 0;
        return "delete success";
    }

    @RequestMapping("/db1")
    public List<Map<String, Object>> db1() {
        return jdbcTemplate1.queryForList("SELECT * FROM USER_INFO");
    }

    @RequestMapping("/db2")
    public List<Map<String, Object>> db2() {
        return jdbcTemplate2.queryForList("SELECT * FROM USER_INFO");
    }
}

7、测试多数据源正常情况

### 更新删除前先查询一次数据
curl -X GET "localhost:8090/db1"
[{"ID":1,"NAME":"Jone","TYPE":"datasource1"},{"ID":2,"NAME":"Tom","TYPE":"datasource1"},{"ID":3,"NAME":"Sandy","TYPE":"datasource1"}]
curl -X GET "localhost:8090/db2"
[{"ID":1,"NAME":"Jone","TYPE":"datasource2"},{"ID":2,"NAME":"Tom","TYPE":"datasource2"},{"ID":3,"NAME":"Sandy","TYPE":"datasource2"}]

### 执行删除操作
curl -X GET "localhost:8090/test"
delete success
### 控制台输出
2024-09-10 11:09:14.906 DEBUG 98251 --- [nio-8090-exec-4] o.s.t.jta.JtaTransactionManager          : Creating new transaction with name [com.example.controller.TestController.testOk1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; 'transactionManager'
2024-09-10 11:09:14.935 DEBUG 98251 --- [nio-8090-exec-4] o.s.jdbc.core.JdbcTemplate               : Executing SQL statement [DELETE FROM USER_INFO WHERE id = 1]
2024-09-10 11:09:14.936 DEBUG 98251 --- [nio-8090-exec-4] o.s.jdbc.datasource.DataSourceUtils      : Fetching JDBC Connection from DataSource
2024-09-10 11:09:14.951 DEBUG 98251 --- [nio-8090-exec-4] o.s.jdbc.core.JdbcTemplate               : Executing SQL statement [DELETE FROM USER_INFO WHERE id = 1]
2024-09-10 11:09:14.951 DEBUG 98251 --- [nio-8090-exec-4] o.s.jdbc.datasource.DataSourceUtils      : Fetching JDBC Connection from DataSource
2024-09-10 11:09:14.953 DEBUG 98251 --- [nio-8090-exec-4] o.s.t.jta.JtaTransactionManager          : Initiating transaction commit

### 更新删除后再查询一次数据
curl -X GET "localhost:8090/db1"
[{"ID":2,"NAME":"Tom","TYPE":"datasource1"},{"ID":3,"NAME":"Sandy","TYPE":"datasource1"}]
curl -X GET "localhost:8090/db2"
[{"ID":2,"NAME":"Tom","TYPE":"datasource2"},{"ID":3,"NAME":"Sandy","TYPE":"datasource2"}]

8、测试多数据源异常情况

### 执行删除操作
curl -X GET "localhost:8090/testError"
{"timestamp":"2024-09-10T03:12:24.359+00:00","status":500,"error":"Internal Server Error","path":"/testError"}
### 控制台输出
2024-09-10 11:12:24.294 DEBUG 98251 --- [nio-8090-exec-7] o.s.t.jta.JtaTransactionManager          : Creating new transaction with name [com.example.controller.TestController.testError]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; 'transactionManager'
2024-09-10 11:12:24.303 DEBUG 98251 --- [nio-8090-exec-7] o.s.jdbc.core.JdbcTemplate               : Executing SQL statement [DELETE FROM USER_INFO WHERE id = 2]
2024-09-10 11:12:24.303 DEBUG 98251 --- [nio-8090-exec-7] o.s.jdbc.datasource.DataSourceUtils      : Fetching JDBC Connection from DataSource
2024-09-10 11:12:24.309 DEBUG 98251 --- [nio-8090-exec-7] o.s.jdbc.core.JdbcTemplate               : Executing SQL statement [DELETE FROM USER_INFO WHERE id = 2]
2024-09-10 11:12:24.310 DEBUG 98251 --- [nio-8090-exec-7] o.s.jdbc.datasource.DataSourceUtils      : Fetching JDBC Connection from DataSource
2024-09-10 11:12:24.317 DEBUG 98251 --- [nio-8090-exec-7] o.s.t.jta.JtaTransactionManager          : Initiating transaction rollback
2024-09-10 11:12:24.349 ERROR 98251 --- [nio-8090-exec-7] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.ArithmeticException: / by zero] with root cause

### 更新删除后再查询一次数据
curl -X GET "localhost:8090/db1"
[{"ID":2,"NAME":"Tom","TYPE":"datasource1"},{"ID":3,"NAME":"Sandy","TYPE":"datasource1"}]
curl -X GET "localhost:8090/db2"
[{"ID":2,"NAME":"Tom","TYPE":"datasource2"},{"ID":3,"NAME":"Sandy","TYPE":"datasource2"}]

SpringBoot 使用 Mybatis + Atomikos 实现分布式事务

1、添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.18</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>h2-mybaits-multi-transaction</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>h2-mybaits-multi-transaction</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2、在 application.yml 中配置多个数据源

server:
  port: 8090

#################### H2 数据库配置 ####################
# 文件数据库, jdbc:h2:file:./dbh2/dbc2m;AUTO_SERVER=TRUE
# 内存数据库,jdbc:h2:mem:master
spring:
  datasource1:
    unique-resource-name: datasource1
    xa-data-source-class-name: org.h2.jdbcx.JdbcDataSource
    xa-properties:
      url: jdbc:h2:mem:master
      user: sa
      password: sa
    exclusive-connection-mode: true
    min-pool-size: 3
    max-pool-size: 10
    max-lifetime: 20000
    borrow-connection-timeout: 10000
    test-query: SELECT 1 from dual # 由于采用HikariCP,用于检测数据库连接是否存活。
  datasource2:
    unique-resource-name: datasource2
    xa-data-source-class-name: org.h2.jdbcx.JdbcDataSource
    xa-properties:
      url: jdbc:h2:mem:slave
      user: sa
      password: sa
    exclusive-connection-mode: true
    min-pool-size: 3
    max-pool-size: 10
    max-lifetime: 20000
    borrow-connection-timeout: 10000
    test-query: SELECT 1 from dual # 由于采用HikariCP,用于检测数据库连接是否存活。

  #################### H2 Web Console设置 ####################
  ### enabled:程序开启时就会启动 H2 Web Console, 默认就是启动的
  ### path:配置访问地址为:/h2,访问 H2 Web Console,默认:/h2-console
  ### settings.web-allow-others:开启H2 Web Console远程访问,默认为false不开启只能在本机访问
  h2:
    console:
      enabled: true
      path: /h2
      settings:
        web-allow-others: true

### 开启控制台打印sql
mybatis:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

3、第一个数据源配置

package com.example.config;

import org.apache.ibatis.logging.stdout.StdOutImpl;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;

/**
 * @EnableAutoConfiguration 就是去掉springboot默认自动配置数据源
 */
@DependsOn("transactionManager")
@Configuration
@MapperScan(
        basePackages = "com.example.mapper.db1",
        sqlSessionFactoryRef = "sqlSessionFactory1",
        sqlSessionTemplateRef = "sqlSessionTemplate1")
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class})
public class MybatisDataSource1Config {

    @Bean(name = "datasource1")
    @ConfigurationProperties(prefix = "spring.datasource1")
    public DataSource datasource1() {
        // 这里是关键,返回的是AtomikosDataSourceBean,所有的配置属性也都是注入到这个类里面
        return new AtomikosDataSourceBean();
    }

    /**
     * Spring-Mybatis 配置第一步:配置SqlSessionFactory
     */
    @Bean(name = "sqlSessionFactory1")
    public SqlSessionFactory sqlSessionFactory1(@Qualifier("datasource1") DataSource datasource1) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(datasource1); // 传入数据源
        // 指定扫描的xml文件所在位置,在配置文件里面配置,会报Invalid bound statement
        // bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*.xml"));
        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
        configuration.setMapUnderscoreToCamelCase(true); // 开启驼峰转换
        configuration.setLogImpl(StdOutImpl.class); // 配置SQL日志
        bean.setConfiguration(configuration);
        return bean.getObject();
    }

    /**
     * Spring-Mybatis 配置第二步:配置SqlSessionTemplate
     */
    @Bean(name = "sqlSessionTemplate1")
    public SqlSessionTemplate sqlSessionTemplate1(@Qualifier("sqlSessionFactory1") SqlSessionFactory sqlSessionFactory1) {
        return new SqlSessionTemplate(sqlSessionFactory1);
    }

    /**
     * 事务管理
     * Spring-Mybatis 配置第三步:配置DataSourceTransactionManager
     */
    @Bean(name = "transactionManager1")
    public PlatformTransactionManager transactionManager1(@Qualifier("datasource1") DataSource datasource1) {
        return new DataSourceTransactionManager(datasource1);
    }
}
  • 这里需要注意:数据源和 sqlSessionFactory 需要在 JTA transactionManager 创建后创建。

4、第二个数据源配置

package com.example.config;

import org.apache.ibatis.logging.stdout.StdOutImpl;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@DependsOn("transactionManager")
@Configuration
@MapperScan(
        basePackages = "com.example.mapper.db2",
        sqlSessionFactoryRef = "sqlSessionFactory2",
        sqlSessionTemplateRef = "sqlSessionTemplate2")
public class MybatisDataSource2Config {

    @Bean(name = "datasource2")
    @ConfigurationProperties(prefix = "spring.datasource2")
    public DataSource datasource2() {
        // 这里是关键,返回的是AtomikosDataSourceBean,所有的配置属性也都是注入到这个类里面
        return new AtomikosDataSourceBean();
    }

    /**
     * Spring-Mybatis 配置第一步:配置SqlSessionFactory
     */
    @Bean(name = "sqlSessionFactory2")
    public SqlSessionFactory sqlSessionFactory2(@Qualifier("datasource2") DataSource datasource2) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(datasource2); // 传入数据源
        // 指定扫描的xml文件所在位置,在配置文件里面配置,会报Invalid bound statement
        // bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*.xml"));
        org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
        configuration.setMapUnderscoreToCamelCase(true); // 开启驼峰转换
        configuration.setLogImpl(StdOutImpl.class); // 配置SQL日志
        bean.setConfiguration(configuration);
        return bean.getObject();
    }

    /**
     * Spring-Mybatis 配置第二步:配置SqlSessionTemplate
     */
    @Bean(name = "sqlSessionTemplate2")
    public SqlSessionTemplate sqlSessionTemplate2(@Qualifier("sqlSessionFactory2") SqlSessionFactory sqlSessionFactory2) {
        return new SqlSessionTemplate(sqlSessionFactory2);
    }

    /**
     * 事务管理
     * Spring-Mybatis 配置第三步:配置DataSourceTransactionManager
     */
    @Bean(name = "transactionManager2")
    public PlatformTransactionManager transactionManager2(@Qualifier("datasource2") DataSource datasource2) {
        return new DataSourceTransactionManager(datasource2);
    }
}
  • 这里需要注意:数据源和 sqlSessionFactory 需要在 JTA transactionManager 创建后创建。

5、Atomikos 配置,创建 JTA 事务管理器

package com.example.config;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;

@Configuration
public class JtaTransactionManagerConfig {

    @Bean
    public UserTransaction userTransaction() throws SystemException {
        UserTransactionImp userTransactionImp = new UserTransactionImp();
        userTransactionImp.setTransactionTimeout(10000);
        return userTransactionImp;
    }

    @Bean(name = "atomikosTransactionManager")
    public TransactionManager atomikosTransactionManager() {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(false);
        return userTransactionManager;
    }

    @Bean(name = "transactionManager")
    @DependsOn({"userTransaction", "atomikosTransactionManager"})
    public PlatformTransactionManager transactionManager() throws SystemException {
        return new JtaTransactionManager(userTransaction(), atomikosTransactionManager());
    }
}

6、编写数据库实体类以及 Mapper

1、数据库对应实体类 UserInfo

package com.example.entity;
import lombok.Data;
import java.io.Serializable;

@Data
public class UserInfo implements Serializable {
    private Integer id;
    private String name;
    private String type;
}

2、数据源一对应的 Mapper 接口:UserMapper1

package com.example.mapper.db1;

import com.example.entity.UserInfo;
import org.apache.ibatis.annotations.*;
import java.util.List;

public interface UserMapper1 {
    @Select("select * from USER_INFO")
    List<UserInfo> findAll();

    @Update("drop table if exists USER_INFO;")
    void dropUserExistTable();

    @Update("create TABLE USER_INFO (id BIGINT NOT NULL AUTO_INCREMENT,name VARCHAR(50) DEFAULT NULL,type VARCHAR(50) DEFAULT NULL,PRIMARY KEY (id))")
    void createUserTable();

    @Insert("insert into USER_INFO (id, name, type) values (1, 'Jone', #{type}),(2, 'Tom', #{type}),(3, 'Sandy', #{type});")
    int initTableData(@Param("type") String type);

    @Delete("delete from USER_INFO where id = #{id}")
    int deleteById(@Param("id") Integer id);
}

3、数据源二对应的 Mapper 接口:UserMapper2

package com.example.mapper.db2;

import com.example.entity.UserInfo;
import org.apache.ibatis.annotations.*;
import java.util.List;

public interface UserMapper2 {
    @Select("select * from USER_INFO")
    List<UserInfo> findAll();

    @Update("drop table if exists USER_INFO;")
    void dropUserExistTable();

    @Update("create TABLE USER_INFO (id BIGINT NOT NULL AUTO_INCREMENT,name VARCHAR(50) DEFAULT NULL,type VARCHAR(50) DEFAULT NULL,PRIMARY KEY (id))")
    void createUserTable();

    @Insert("insert into USER_INFO (id, name, type) values (1, 'Jone', #{type}),(2, 'Tom', #{type}),(3, 'Sandy', #{type});")
    int initTableData(@Param("type") String type);

    @Delete("delete from USER_INFO where id = #{id}")
    int deleteById(@Param("id") Integer id);
}

7、编写 Controller 多数据源正常和异常情况

package com.example.controller;

import com.example.entity.UserInfo;
import com.example.mapper.db1.UserMapper1;
import com.example.mapper.db2.UserMapper2;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;

@Slf4j
@RestController
public class TestController {

    @Resource
    UserMapper1 userMapper1;
    @Resource
    UserMapper2 userMapper2;

    @PostConstruct
    public String initData() {
        userMapper1.dropUserExistTable();
        userMapper1.createUserTable();
        userMapper1.initTableData("datasource1");
        userMapper2.dropUserExistTable();
        userMapper2.createUserTable();
        userMapper2.initTableData("datasource2");
        return "init data success";
    }

    @Transactional(transactionManager = "transactionManager")
    @RequestMapping("/test")
    public String test() {
        userMapper1.deleteById(1);
        userMapper2.deleteById(1);
        return "delete success";
    }

    @Transactional(transactionManager = "transactionManager")
    @RequestMapping("/testError")
    public String testError() {
        userMapper1.deleteById(1);
        userMapper2.deleteById(1);
        int i = 1 / 0;
        return "delete success";
    }


    @RequestMapping("/db1")
    public List<UserInfo> db1() {
        return userMapper1.findAll();
    }

    @RequestMapping("/db2")
    public List<UserInfo> db2() {
        return userMapper2.findAll();
    }

}

8、测试多数据源正常情况

### 更新删除前先查询一次数据
curl -X GET "localhost:8090/db1"
[{"id":1,"name":"Jone","type":"datasource1"},{"id":2,"name":"Tom","type":"datasource1"},{"id":3,"name":"Sandy","type":"datasource1"}]
curl -X GET "localhost:8090/db2"
[{"id":1,"name":"Jone","type":"datasource2"},{"id":2,"name":"Tom","type":"datasource2"},{"id":3,"name":"Sandy","type":"datasource2"}]

### 执行删除操作
curl -X GET "localhost:8090/test"
delete success
### 控制台输出
Creating a new SqlSession
Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6833f290]
JDBC Connection [conn2: url=jdbc:h2:mem:master user=SA] will be managed by Spring
==>  Preparing: delete from USER_INFO where id = ?
==> Parameters: 1(Integer)
<==    Updates: 1
Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6833f290]
Creating a new SqlSession
Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@64914ab4]
JDBC Connection [conn9: url=jdbc:h2:mem:slave user=SA] will be managed by Spring
==>  Preparing: delete from USER_INFO where id = ?
==> Parameters: 1(Integer)
<==    Updates: 1
Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@64914ab4]
Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6833f290]
Transaction synchronization committing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@64914ab4]
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6833f290]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6833f290]
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@64914ab4]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@64914ab4]

### 更新删除后再查询一次数据
curl -X GET "localhost:8090/db1"
[{"id":2,"name":"Tom","type":"datasource1"},{"id":3,"name":"Sandy","type":"datasource1"}]
curl -X GET "localhost:8090/db2"
[{"id":2,"name":"Tom","type":"datasource2"},{"id":3,"name":"Sandy","type":"datasource2"}]

9、测试多数据源异常情况

### 执行删除操作
curl -X GET "localhost:8090/testError"
{"timestamp":"2024-09-10T03:29:24.635+00:00","status":500,"error":"Internal Server Error","path":"/testError"}
### 控制台输出
Creating a new SqlSession
Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@45135b31]
JDBC Connection [conn2: url=jdbc:h2:mem:master user=SA] will be managed by Spring
==>  Preparing: delete from USER_INFO where id = ?
==> Parameters: 1(Integer)
<==    Updates: 0
Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@45135b31]
Creating a new SqlSession
Registering transaction synchronization for SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@253ca47e]
JDBC Connection [conn9: url=jdbc:h2:mem:slave user=SA] will be managed by Spring
==>  Preparing: delete from USER_INFO where id = ?
==> Parameters: 1(Integer)
<==    Updates: 0
Releasing transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@253ca47e]
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@45135b31]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@45135b31]
Transaction synchronization deregistering SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@253ca47e]
Transaction synchronization closing SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@253ca47e]
2024-09-10 11:29:24.622 ERROR 99240 --- [nio-8090-exec-6] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.ArithmeticException: / by zero] with root cause

### 更新删除后再查询一次数据
curl -X GET "localhost:8090/db1"
[{"id":2,"name":"Tom","type":"datasource1"},{"id":3,"name":"Sandy","type":"datasource1"}]
curl -X GET "localhost:8090/db2"
[{"id":2,"name":"Tom","type":"datasource2"},{"id":3,"name":"Sandy","type":"datasource2"}]

SpringBoot 使用 JPA + Atomikos 实现分布式事务

1、添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.18</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>h2-jpa-multi-transaction</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>h2-jpa-multi-transaction</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2、在 application.yml 中配置多个数据源

server:
  port: 8090

#################### H2 数据库配置 ####################
# 文件数据库, jdbc:h2:file:./dbh2/dbc2m;AUTO_SERVER=TRUE
# 内存数据库,jdbc:h2:mem:master
spring:
  jta:
    enabled: true
    transaction-manager-id: atomikosTransactionManager
  datasource1:
    unique-resource-name: primaryDataSource
    xa-data-source-class-name: org.h2.jdbcx.JdbcDataSource
    xa-properties:
      url: jdbc:h2:mem:master
      user: sa
      password:
    exclusive-connection-mode: true
    min-pool-size: 3
    max-pool-size: 10
    max-lifetime: 20000
    borrow-connection-timeout: 10000
    test-query: SELECT 1 from dual # 由于采用HikariCP,用于检测数据库连接是否存活。
  datasource2:
    unique-resource-name: secondaryDataSource
    xa-data-source-class-name: org.h2.jdbcx.JdbcDataSource
    xa-properties:
      url: jdbc:h2:mem:slave
      user: sa
      password:
    exclusive-connection-mode: true
    min-pool-size: 3
    max-pool-size: 10
    max-lifetime: 20000
    borrow-connection-timeout: 10000
    test-query: SELECT 1 from dual # 由于采用HikariCP,用于检测数据库连接是否存活。

  #################### JPA 的配置 ####################
  # database-platform: 数据库方言的配置
  # hibernate.ddl-auto = none, 不启动 jpa 的自动通过实体类自动建表功能
  # show-sql = true 开启 SQL 打印功能
  # properties.hibernate.format_sql = true 开启SQL格式化功能
  jpa:
    database-platform: org.hibernate.dialect.H2Dialect
    hibernate:
      ddl-auto: create
    properties:
      # spring.jpa.properties.javax.persistence.transactionType
      # spring.jpa.properties.hibernate.transaction.jta.platform
      # 使用JPA 整合 JTA-Atomikos 分布式事务时必须配置这两个配置
      javax:
        persistence:
          transactionType: JTA
      hibernate:
        transaction:
          jta:
            platform: org.hibernate.engine.transaction.jta.platform.internal.AtomikosJtaPlatform # 处理事务的类
        format_sql: false
    show-sql: true
    open-in-view: false

  #################### H2 Web Console设置 ####################
  ### enabled:程序开启时就会启动 H2 Web Console, 默认就是启动的
  ### path:配置访问地址为:/h2,访问 H2 Web Console,默认:/h2-console
  ### settings.web-allow-others:开启H2 Web Console远程访问,默认为false不开启只能在本机访问
  h2:
    console:
      enabled: true
      path: /h2
      settings:
        web-allow-others: true

## 此配置是为了调试, 每个HTTP请求进来会打印 Open Hibernate 和 Open Session, 还有从当前线程中取出EntityManager ##
#################### 修改日志级别 ####################
logging:
  level:
    org.hibernate.engine.transaction.internal: trace
    org.springframework.orm.jpa.JpaTransactionManager: trace
    org.hibernate.internal: trace

注意:在使用 JPA 整合 JTA-Atomikos 时,必须要配置如下2个配置:

  • spring.jpa.properties.javax.persistence.transactionType
  • spring.jpa.properties.hibernate.transaction.jta.platform

3、第一个数据源配置

package com.example.config;

import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Primary;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.Database;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;

import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Map;

@DependsOn("transactionManager")
@Configuration
@EnableJpaRepositories(
        basePackages = "com.example.repository.db1", // 扫描repository并注入到Spring容器
        entityManagerFactoryRef = "entityManagerFactoryBean1",
        transactionManagerRef = "transactionManager")
public class JpaAndDatasource1Config {

    @Resource
    HibernateProperties hibernateProperties;
    @Resource
    JpaProperties jpaProperties;

    @Primary
    @Bean(name = "datasource1", initMethod = "init", destroyMethod = "close")
    @ConfigurationProperties(prefix = "spring.datasource1")
    public DataSource datasource1() {
        return new AtomikosDataSourceBean();
    }

    @Primary
    @Bean(name = "entityManagerFactoryBean1")
    public LocalContainerEntityManagerFactoryBean entityManagerFactoryBean1() {
        // 设置JPA特性
        HibernateJpaVendorAdapter jpaVendorAdapter = new HibernateJpaVendorAdapter();
        jpaVendorAdapter.setShowSql(true); // 显示SQL
        jpaVendorAdapter.setGenerateDdl(true); // 自动生成/更新表
        jpaVendorAdapter.setDatabase(Database.H2); // 设置数据库类型

        // 设置JPAProperties + HibernateProperties 配置
        Map<String, Object> properties = hibernateProperties
                .determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings());
        LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean();
        entityManager.setJtaDataSource(datasource1());
        entityManager.setJpaVendorAdapter(jpaVendorAdapter);
        // 这里要修改成主数据源的扫描包
        entityManager.setPackagesToScan("com.example.entity.db1");
        entityManager.setPersistenceUnitName("persistenceUnit1");
        entityManager.setJpaPropertyMap(properties);
        return entityManager;
    }
}
  • 这里需要注意:数据源和 entityManagerFactoryBean 需要在 JTA transactionManager 创建后创建。

4、第二个数据源配置

package com.example.config;

import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.Database;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;

import javax.annotation.Resource;
import javax.sql.DataSource;
import java.util.Map;

@DependsOn("transactionManager")
@Configuration
@EnableJpaRepositories(
        basePackages = "com.example.repository.db2", // 扫描repository并注入到Spring容器
        entityManagerFactoryRef = "entityManagerFactoryBean2",
        transactionManagerRef = "transactionManager")
public class JpaAndDatasource2Config {

    @Resource
    HibernateProperties hibernateProperties;
    @Resource
    JpaProperties jpaProperties;

    @Bean(name = "datasource2", initMethod = "init", destroyMethod = "close")
    @ConfigurationProperties(prefix = "spring.datasource2")
    public DataSource datasource2() {
        return new AtomikosDataSourceBean();
    }

    @Bean(name = "entityManagerFactoryBean2")
    public LocalContainerEntityManagerFactoryBean entityManagerFactoryBean2(EntityManagerFactoryBuilder builder) {
        // 设置JPA特性
        HibernateJpaVendorAdapter jpaVendorAdapter = new HibernateJpaVendorAdapter();
        jpaVendorAdapter.setShowSql(true); // 显示SQL
        jpaVendorAdapter.setGenerateDdl(true); // 自动生成/更新表
        jpaVendorAdapter.setDatabase(Database.H2); // 设置数据库类型

        // 设置JPAProperties + HibernateProperties 配置
        Map<String, Object> properties = hibernateProperties
                .determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings());
        // 如下也可以放到 properties/yml 中配置
        properties.put("hibernate.transaction.jta.platform",
                "org.hibernate.engine.transaction.jta.platform.internal.AtomikosJtaPlatform");
        properties.put("javax.persistence.transactionType", "JTA");

        LocalContainerEntityManagerFactoryBean entityManagerFactoryBean = builder
                .dataSource(datasource2())
                .jta(true)
                .packages("com.example.entity.db2") // 扫描Entity的包
                .persistenceUnit("persistenceUnit2")
                .properties(properties)
                .build();
        entityManagerFactoryBean.setJpaVendorAdapter(jpaVendorAdapter);
        return entityManagerFactoryBean;
    }
}
  • 这里需要注意:数据源和 entityManagerFactoryBean 需要在 JTA transactionManager 创建后创建。

5、Atomikos 配置,创建 JTA 事务管理器

package com.example.config;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;

/**
 * JTA-Atomikos 配置
 */
@Configuration
public class JtaAtomikosTransactionConfig {

    @Bean(name = "userTransaction")
    public UserTransaction userTransaction() throws SystemException {
        UserTransactionImp userTransactionImp = new UserTransactionImp();
        userTransactionImp.setTransactionTimeout(10000);
        return userTransactionImp;
    }

    @Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
    public TransactionManager atomikosTransactionManager() {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(false);
        return userTransactionManager;
    }

    @Bean(name = "transactionManager")
    @DependsOn({"userTransaction", "atomikosTransactionManager"})
    public PlatformTransactionManager transactionManager() throws SystemException {
        UserTransaction userTransaction = userTransaction();
        TransactionManager atomikosTransactionManager = atomikosTransactionManager();
        return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
    }
}

6、编写 Entity 与 Repository

1、数据源一的 Entity

package com.example.entity.db1;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "USER_INFO")
public class UserInfo1 {
    @Id
    private Integer id;
    private String name;
    private String type;
}

2、数据源二的 Entity

package com.example.entity.db2;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Data
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "USER_INFO")
public class UserInfo2 {
    @Id
    private Integer id;
    private String name;
    private String type;
}

3、数据源一的 Repository

package com.example.repository.db1;

import com.example.entity.db1.UserInfo1;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;

public interface UserInfoRepository1 extends JpaRepository<UserInfo1, Integer> {
    @Modifying
    @Transactional
    @Query(value = "drop table if exists USER_INFO;",nativeQuery=true)
    void dropUserExistTable();

    @Modifying
    @Transactional
    @Query( value= "create TABLE USER_INFO (id BIGINT NOT NULL AUTO_INCREMENT,name VARCHAR(50) DEFAULT NULL,type VARCHAR(50) DEFAULT NULL,PRIMARY KEY (id))",nativeQuery=true)
    void createUserTable();

    @Modifying
    @Transactional
    @Query(value="insert into USER_INFO (id, name, type) values (1, 'Jone', :type),(2, 'Tom', :type),(3, 'Sandy', :type);",nativeQuery=true)
    int initTableData(@Param("type")String type);
}

4、数据源二的 Repository

package com.example.repository.db2;

import com.example.entity.db2.UserInfo2;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;

public interface UserInfoRepository2 extends JpaRepository<UserInfo2, Integer> {
    @Modifying
    @Transactional
    @Query(value = "drop table if exists USER_INFO;",nativeQuery=true)
    void dropUserExistTable();

    @Modifying
    @Transactional
    @Query( value= "create TABLE USER_INFO (id BIGINT NOT NULL AUTO_INCREMENT,name VARCHAR(50) DEFAULT NULL,type VARCHAR(50) DEFAULT NULL,PRIMARY KEY (id))",nativeQuery=true)
    void createUserTable();

    @Modifying
    @Transactional
    @Query(value="insert into USER_INFO (id, name, type) values (1, 'Jone', :type),(2, 'Tom', :type),(3, 'Sandy', :type);",nativeQuery=true)
    int initTableData(@Param("type")String type);
}

7、编写 Controller 多数据源正常和异常情况

package com.example.controller;

import com.example.entity.db1.UserInfo1;
import com.example.entity.db2.UserInfo2;
import com.example.repository.db1.UserInfoRepository1;
import com.example.repository.db2.UserInfoRepository2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;

@RestController
public class TestController {

    @Autowired
    private UserInfoRepository1 userInfoRepository1;
    @Autowired
    private UserInfoRepository2 userInfoRepository2;

    @Transactional
    @RequestMapping("/test")
    public String test() {
        userInfoRepository1.save(new UserInfo1(1, "Sam", "dataSource1"));
        userInfoRepository2.save(new UserInfo2(1, "Meir", "dataSource2"));
        return "ok";
    }

    @Transactional
    @RequestMapping("/testError")
    public String testError() {
        userInfoRepository1.save(new UserInfo1(2, "Sam", "dataSource1"));
        userInfoRepository2.save(new UserInfo2(2, "Meir", "dataSource2"));
        int i = 1/0;
        return "ok";
    }

    @RequestMapping("/db1")
    public List<UserInfo1> master() {
        return userInfoRepository1.findAll();
    }

    @RequestMapping("/db2")
    public List<UserInfo2> slave() {
        return userInfoRepository2.findAll();
    }
}

8、测试多数据源正常情况

### 执行新增数据操作
curl -X GET "localhost:8090/test"
ok
### 控制台输出
2024-09-10 11:50:59.899  INFO 906 --- [nio-8090-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2024-09-10 11:50:59.901  INFO 906 --- [nio-8090-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2024-09-10 11:50:59.905  INFO 906 --- [nio-8090-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 4 ms
2024-09-10 11:50:59.996 TRACE 906 --- [nio-8090-exec-1] .i.SessionFactoryImpl$SessionBuilderImpl : Opening Hibernate Session.  tenant=null
2024-09-10 11:50:59.999 TRACE 906 --- [nio-8090-exec-1] org.hibernate.internal.SessionImpl       : Opened Session [1d911323-5d78-4bd0-9935-ee3a72ad8ecb] at timestamp: 1725940259999
Hibernate: select userinfo1x0_.id as id1_0_0_, userinfo1x0_.name as name2_0_0_, userinfo1x0_.type as type3_0_0_ from user_info userinfo1x0_ where userinfo1x0_.id=?
2024-09-10 11:51:00.035 TRACE 906 --- [nio-8090-exec-1] .i.SessionFactoryImpl$SessionBuilderImpl : Opening Hibernate Session.  tenant=null
2024-09-10 11:51:00.036 TRACE 906 --- [nio-8090-exec-1] org.hibernate.internal.SessionImpl       : Opened Session [5d4df513-55db-4b2a-8a44-f707240e5ccd] at timestamp: 1725940260035
Hibernate: select userinfo2x0_.id as id1_0_0_, userinfo2x0_.name as name2_0_0_, userinfo2x0_.type as type3_0_0_ from user_info userinfo2x0_ where userinfo2x0_.id=?
2024-09-10 11:51:00.038 TRACE 906 --- [nio-8090-exec-1] org.hibernate.internal.SessionImpl       : Closing session [1d911323-5d78-4bd0-9935-ee3a72ad8ecb]
2024-09-10 11:51:00.039 TRACE 906 --- [nio-8090-exec-1] org.hibernate.internal.SessionImpl       : Closing session [5d4df513-55db-4b2a-8a44-f707240e5ccd]
2024-09-10 11:51:00.039 TRACE 906 --- [nio-8090-exec-1] org.hibernate.internal.SessionImpl       : SessionImpl#beforeTransactionCompletion()
2024-09-10 11:51:00.039 TRACE 906 --- [nio-8090-exec-1] org.hibernate.internal.SessionImpl       : Automatically flushing session
Hibernate: insert into user_info (name, type, id) values (?, ?, ?)
2024-09-10 11:51:00.047 DEBUG 906 --- [nio-8090-exec-1] o.h.e.t.internal.TransactionImpl         : TransactionImpl created on closed Session/EntityManager
2024-09-10 11:51:00.047 DEBUG 906 --- [nio-8090-exec-1] o.h.e.t.internal.TransactionImpl         : On TransactionImpl creation, JpaCompliance#isJpaTransactionComplianceEnabled == false
2024-09-10 11:51:00.047 TRACE 906 --- [nio-8090-exec-1] org.hibernate.internal.SessionImpl       : SessionImpl#beforeTransactionCompletion()
2024-09-10 11:51:00.047 TRACE 906 --- [nio-8090-exec-1] org.hibernate.internal.SessionImpl       : Automatically flushing session
Hibernate: insert into user_info (name, type, id) values (?, ?, ?)
2024-09-10 11:51:00.048 DEBUG 906 --- [nio-8090-exec-1] o.h.e.t.internal.TransactionImpl         : TransactionImpl created on closed Session/EntityManager
2024-09-10 11:51:00.048 DEBUG 906 --- [nio-8090-exec-1] o.h.e.t.internal.TransactionImpl         : On TransactionImpl creation, JpaCompliance#isJpaTransactionComplianceEnabled == false
2024-09-10 11:51:00.059 TRACE 906 --- [nio-8090-exec-1] org.hibernate.internal.SessionImpl       : SessionImpl#afterTransactionCompletion(successful=true, delayed=false)
2024-09-10 11:51:00.059 TRACE 906 --- [nio-8090-exec-1] org.hibernate.internal.SessionImpl       : Automatically closing session
2024-09-10 11:51:00.059 TRACE 906 --- [nio-8090-exec-1] org.hibernate.internal.SessionImpl       : Closing session [1d911323-5d78-4bd0-9935-ee3a72ad8ecb]
2024-09-10 11:51:00.060 TRACE 906 --- [nio-8090-exec-1] org.hibernate.internal.SessionImpl       : SessionImpl#afterTransactionCompletion(successful=true, delayed=false)
2024-09-10 11:51:00.060 TRACE 906 --- [nio-8090-exec-1] org.hibernate.internal.SessionImpl       : Automatically closing session
2024-09-10 11:51:00.060 TRACE 906 --- [nio-8090-exec-1] org.hibernate.internal.SessionImpl       : Closing session [5d4df513-55db-4b2a-8a44-f707240e5ccd]

### 查询新增的数据
curl -X GET "localhost:8090/db1"
[{"id":1,"name":"Sam","type":"dataSource1"}]
curl -X GET "localhost:8090/db2"
[{"id":1,"name":"Meir","type":"dataSource2"}]

9、测试多数据源异常情况

### 执行新增数据异常操作
curl -X GET "localhost:8090/testError"
{"timestamp":"2024-09-10T03:54:56.103+00:00","status":500,"error":"Internal Server Error","path":"/testError"}
### 控制台输出
2024-09-10 11:54:56.042 TRACE 906 --- [nio-8090-exec-8] .i.SessionFactoryImpl$SessionBuilderImpl : Opening Hibernate Session.  tenant=null
2024-09-10 11:54:56.043 TRACE 906 --- [nio-8090-exec-8] org.hibernate.internal.SessionImpl       : Opened Session [b708f76a-dcd9-4913-9707-8100e4da037c] at timestamp: 1725940496043
Hibernate: select userinfo1x0_.id as id1_0_0_, userinfo1x0_.name as name2_0_0_, userinfo1x0_.type as type3_0_0_ from user_info userinfo1x0_ where userinfo1x0_.id=?
2024-09-10 11:54:56.048 TRACE 906 --- [nio-8090-exec-8] .i.SessionFactoryImpl$SessionBuilderImpl : Opening Hibernate Session.  tenant=null
2024-09-10 11:54:56.048 TRACE 906 --- [nio-8090-exec-8] org.hibernate.internal.SessionImpl       : Opened Session [d6322714-5766-4b15-9e67-023e39baf276] at timestamp: 1725940496048
Hibernate: select userinfo2x0_.id as id1_0_0_, userinfo2x0_.name as name2_0_0_, userinfo2x0_.type as type3_0_0_ from user_info userinfo2x0_ where userinfo2x0_.id=?
2024-09-10 11:54:56.054 TRACE 906 --- [nio-8090-exec-8] org.hibernate.internal.SessionImpl       : Closing session [b708f76a-dcd9-4913-9707-8100e4da037c]
2024-09-10 11:54:56.055 TRACE 906 --- [nio-8090-exec-8] org.hibernate.internal.SessionImpl       : Closing session [d6322714-5766-4b15-9e67-023e39baf276]
2024-09-10 11:54:56.058 TRACE 906 --- [nio-8090-exec-8] org.hibernate.internal.SessionImpl       : SessionImpl#afterTransactionCompletion(successful=false, delayed=false)
2024-09-10 11:54:56.074 DEBUG 906 --- [nio-8090-exec-8] o.h.e.t.internal.TransactionImpl         : TransactionImpl created on closed Session/EntityManager
2024-09-10 11:54:56.074 DEBUG 906 --- [nio-8090-exec-8] o.h.e.t.internal.TransactionImpl         : On TransactionImpl creation, JpaCompliance#isJpaTransactionComplianceEnabled == false
2024-09-10 11:54:56.075 TRACE 906 --- [nio-8090-exec-8] org.hibernate.internal.SessionImpl       : Automatically closing session
2024-09-10 11:54:56.075 TRACE 906 --- [nio-8090-exec-8] org.hibernate.internal.SessionImpl       : Closing session [b708f76a-dcd9-4913-9707-8100e4da037c]
2024-09-10 11:54:56.075 TRACE 906 --- [nio-8090-exec-8] org.hibernate.internal.SessionImpl       : SessionImpl#afterTransactionCompletion(successful=false, delayed=false)
2024-09-10 11:54:56.075 DEBUG 906 --- [nio-8090-exec-8] o.h.e.t.internal.TransactionImpl         : TransactionImpl created on closed Session/EntityManager
2024-09-10 11:54:56.075 DEBUG 906 --- [nio-8090-exec-8] o.h.e.t.internal.TransactionImpl         : On TransactionImpl creation, JpaCompliance#isJpaTransactionComplianceEnabled == false
2024-09-10 11:54:56.075 TRACE 906 --- [nio-8090-exec-8] org.hibernate.internal.SessionImpl       : Automatically closing session
2024-09-10 11:54:56.075 TRACE 906 --- [nio-8090-exec-8] org.hibernate.internal.SessionImpl       : Closing session [d6322714-5766-4b15-9e67-023e39baf276]
2024-09-10 11:54:56.095 ERROR 906 --- [nio-8090-exec-8] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.ArithmeticException: / by zero] with root cause

### 查询数据, 看看是否有回滚
curl -X GET "localhost:8090/db1"
[{"id":1,"name":"Sam","type":"dataSource1"}]
curl -X GET "localhost:8090/db2"
[{"id":1,"name":"Meir","type":"dataSource2"}]

参考文献 & 鸣谢


转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 8629303@qq.com

×

喜欢就点赞,疼爱就打赏

GitHub