flowable 多数据源

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

目录


前言


        在springboot中使用flowable此时flowable默认使用spring中的数据源。我这里flowable的表在一个数据库业务表在另个一个数据库。

一、多数据源

        在项目中创建多数据源给 DynamicDataSource 设置目标数据源和默认数据源这个有很多教程这里不详细说了。

public class DynamicDataSource extends AbstractRoutingDataSource {
    /**
     * 取得当前使用哪个数据源
     */
    @Override
    protected Object determineCurrentLookupKey() {
        return DbContextHolder.getDbType();
    }
}

给flowable数据源设置切面

execution(* org.flowable.task.service..*.*(..))

给业务数据源设置切面

execution(* cn.ac.iscas.pdm.biz.rest.dev.service..*.*(..)) 

二、测试

1.测试接口

public interface FolwableDSTestService {

     void test1();

    void test2();

    void test3();

    void test4();
}
@Service
public class FolwableDSTestServiceImpl implements FolwableDSTestService {

    @Autowired
    private TaskService taskService;

    @Autowired
    private PdmFileInfoService fileInfoService;

    @Autowired
    private FolwableDSTestService testService;
	......
}

1.不带事务

 	 @Override
    public void test1() {
        //查询数据源1中的任务
        List<Task> tasks = taskService.createTaskQuery()
                .active()
                .includeProcessVariables()
                .taskCandidateUser("张三")
                .includeIdentityLinks()
                .list();
        System.out.println("任务总数" + tasks.size());

        //查询数据源2的业务数据
        List<PdmFileInfo> fileInfos = fileInfoService.list();
        System.out.println("文件总数" + fileInfos.size());
    }

        通过数据源切面各自访问自己的数据源没有问题。

2.加上事务

 	
    @Transactional
    @Override
    public void test2() {
        //查询数据源1中的任务
        List<Task> tasks = taskService.createTaskQuery()
                .active()
                .includeProcessVariables()
                .taskCandidateUser("张三")
                .includeIdentityLinks()
                .list();
        System.out.println("任务总数" + tasks.size());
        //查询数据源2的业务数据
        List<PdmFileInfo> fileInfos = fileInfoService.list();
        System.out.println("文件总数" + fileInfos.size());
    }

        在有事务存在的情况下会优先从事务中获取数据库连接那这样就可能会存在访问的数据库不是目标库的问题。

看一下获取连接的源码

DataSourceUtils.java

public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
		try {
			return doGetConnection(dataSource);
		}
		catch (SQLException ex) {
			throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection", ex);
		}
		catch (IllegalStateException ex) {
			throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: " + ex.getMessage());
		}
	}

public static Connection doGetConnection(DataSource dataSource) throws SQLException {
		Assert.notNull(dataSource, "No DataSource specified");
		
		//从事务中获取连接
		ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
		if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
			conHolder.requested();
			if (!conHolder.hasConnection()) {
				logger.debug("Fetching resumed JDBC Connection from DataSource");
				conHolder.setConnection(fetchConnection(dataSource));
			}
			return conHolder.getConnection();
		}
		// Else we either got no holder or an empty thread-bound holder here.

		logger.debug("Fetching JDBC Connection from DataSource");
		//获取目标数据源
		Connection con = fetchConnection(dataSource);
		......
		}

private static Connection fetchConnection(DataSource dataSource) throws SQLException {
		//从多数据源中获取目标数据源dataSource为DynamicDataSource 
		Connection con = dataSource.getConnection();
		if (con == null) {
			throw new IllegalStateException("DataSource returned null from getConnection(): " + dataSource);
		}
		return con;
	}

	@Override
	public Connection getConnection() throws SQLException {
		//从 DbContextHolder 中数据源切面切到了哪个数据源
		return determineTargetDataSource().getConnection();
	}

三、解决方法

1.开启新事物

在 test4() 方法中新增方法加上事务设置事务传播为

Propagation.REQUIRES_NEW 或者 Propagation.NOT_SUPPORTED
 	@Transactional
    @Override
    public void test3() {
        //查询数据源1中的任务
        List<Task> tasks = taskService.createTaskQuery()
                .active()
                .includeProcessVariables()
                .taskCandidateUser("张三")
                .includeIdentityLinks()
                .list();
        System.out.println("任务总数" + tasks.size());

        //查询数据源2的业务数据
        testService.test4();

    }

    @Transactional(propagation = Propagation.NOT_SUPPORTED)
    @Override
    public void test4() {
        List<PdmFileInfo> fileInfos = fileInfoService.list();
        System.out.println("文件总数" + fileInfos.size());
    }

2.重写事务

1多数据源事务

public class MultiDataSourceTransaction implements Transaction {

    private final DataSource dataSource;

    private Connection mainConnection;

    private String mainDatabaseIdentification;

    private ConcurrentMap<String, Connection> otherConnectionMap;


    private boolean isConnectionTransactional;

    private boolean autoCommit;

    private String defaultDbName;

    public MultiDataSourceTransaction(DataSource dataSource, String defaultDbName) {
        if (dataSource == null) {
            throw new RuntimeException("No DataSource specified");
        }
        this.dataSource = dataSource;
        this.defaultDbName = defaultDbName;
        otherConnectionMap = new ConcurrentHashMap<>();
        mainDatabaseIdentification = getDbType();
    }

    private String getDbType() {
        String dbType = DbContextHolder.getDbType();
        if (StringUtils.isBlank(dbType)) {
            return defaultDbName;
        }
        return dbType;
    }


    @Override
    public Connection getConnection() throws SQLException {
        String databaseIdentification = getDbType();

        //现在获取到的mainConnection是从事务管理器获取或从DynamicDataSource获取目标数据源
        openMainConnection();
        mainDatabaseIdentification = ((ConnectionProxyImpl) ((DruidPooledConnection) mainConnection).getConnection()).getDirectDataSource().getName();
        //校验是否是mainDatabaseIdentification
        if (mainDatabaseIdentification.equalsIgnoreCase(databaseIdentification)) {
            return mainConnection;
        } else {
            if (!otherConnectionMap.containsKey(databaseIdentification)) {
                try {
                    //获取其他数据源
                    Connection conn = dataSource.getConnection();
                    otherConnectionMap.put(databaseIdentification, conn);
                } catch (SQLException ex) {
                    throw new RuntimeException("Could not get JDBC Connection", ex);
                }
            }
            return otherConnectionMap.get(databaseIdentification);
        }
    }


    private void openMainConnection() throws SQLException {
        if (mainConnection == null) {
            this.mainConnection = DataSourceUtils.getConnection(this.dataSource);
            this.autoCommit = this.mainConnection.getAutoCommit();
            this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.mainConnection, this.dataSource);

            if (log.isDebugEnabled()) {
                log.debug(
                        "JDBC Connection ["
                                + this.mainConnection
                                + "] will"
                                + (this.isConnectionTransactional ? " " : " not ")
                                + "be managed by Spring");
            }
        }
    }

    @Override
    public void commit() throws SQLException {
        if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) {
            if (log.isDebugEnabled()) {
                log.debug("Committing JDBC Connection [" + this.mainConnection + "]");
            }
            this.mainConnection.commit();
            for (Connection connection : otherConnectionMap.values()) {
                connection.commit();
            }
        }
    }

    @Override
    public void rollback() throws SQLException {
        if (this.mainConnection != null && !this.isConnectionTransactional && !this.autoCommit) {
            if (log.isDebugEnabled()) {
                log.debug("Rolling back JDBC Connection [" + this.mainConnection + "]");
            }
            this.mainConnection.rollback();
            for (Connection connection : otherConnectionMap.values()) {
                connection.rollback();
            }
        }
    }

    @Override
    public void close() throws SQLException {
        DataSourceUtils.releaseConnection(this.mainConnection, this.dataSource);
        for (Connection connection : otherConnectionMap.values()) {
            DataSourceUtils.releaseConnection(connection, this.dataSource);
        }
    }

    @Override
    public Integer getTimeout() {
        return null;
    }

}

2多数据源事务工厂

public class MultiDataSourceTransactionFactory extends SpringManagedTransactionFactory {

    private String defaultDbName;

    public MultiDataSourceTransactionFactory(String defaultDbName) {
        this.defaultDbName = defaultDbName;
    }

    @Override
    public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) {
        return new MultiDataSourceTransaction(dataSource, defaultDbName);
    }

}

3给数据源设置事务

		MybatisSqlSessionFactoryBean factory = new MybatisSqlSessionFactoryBean();
        factory.setDataSource(dataSource);
        factory.setTransactionFactory(new MultiDataSourceTransactionFactory(defaultDbName));

总结

        这样只是暂时解决了多数据源的切换能保证正常访问到目标数据源但是对于事务回滚时仍存在着一些问题要想把问题从解决需要在项目中引入分布式事务。

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6