瀏覽代碼

新增动态数据源,支持分布式事务。

zhouhao 8 年之前
父節點
當前提交
a18a72d110

+ 104 - 0
hsweb-web-datasource/pom.xml

@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright 2015-2016 http://hsweb.me
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>hsweb-framework</artifactId>
+        <groupId>org.hsweb</groupId>
+        <version>1.0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>hsweb-web-datasource</artifactId>
+
+    <dependencies>
+        <!--atomikos-->
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-jdbc</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hsweb</groupId>
+            <artifactId>hsweb-web-service-interface</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hsweb</groupId>
+            <artifactId>hsweb-web-concurrent-lock</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.hsweb</groupId>
+            <artifactId>hsweb-web-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.0.20</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.atomikos</groupId>
+            <artifactId>transactions-jdbc</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.atomikos</groupId>
+            <artifactId>transactions-jta</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.atomikos</groupId>
+            <artifactId>transactions</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.atomikos</groupId>
+            <artifactId>transactions-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.atomikos</groupId>
+            <artifactId>atomikos-util</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>javax.transaction</groupId>
+            <artifactId>jta</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mybatis.spring.boot</groupId>
+            <artifactId>mybatis-spring-boot-autoconfigure</artifactId>
+            <version>1.0.2</version>
+        </dependency>
+    </dependencies>
+</project>

+ 106 - 0
hsweb-web-datasource/src/main/java/org/hsweb/web/datasource/dynamic/DynamicDataSourceAutoConfiguration.java

@@ -0,0 +1,106 @@
+/*
+ * Copyright 2015-2016 http://hsweb.me
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.hsweb.web.datasource.dynamic;
+
+import com.atomikos.icatch.jta.UserTransactionImp;
+import com.atomikos.icatch.jta.UserTransactionManager;
+import com.atomikos.jdbc.AtomikosDataSourceBean;
+import org.hsweb.commons.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.transaction.jta.JtaTransactionManager;
+
+import javax.sql.DataSource;
+import javax.transaction.SystemException;
+
+/**
+ * Created by zhouhao on 16-4-20.
+ */
+@Configuration
+public class DynamicDataSourceAutoConfiguration {
+
+    @Autowired
+    private DataSourceProperties properties;
+
+    static {
+//        com.atomikos.icatch.config.Configuration.init();
+        //  com.atomikos.icatch.config.Configuration.installCompositeTransactionManager(new CompositeTransactionManagerImp());
+    }
+
+    /**
+     * 默认数据库链接
+     */
+    @Primary
+    @Bean(initMethod = "init", name = "dataSource", destroyMethod = "close")
+    public DataSource dataSource() {
+        AtomikosDataSourceBean dataSourceBean = new AtomikosDataSourceBean();
+        dataSourceBean.getXaProperties().putAll(properties.getXa().getProperties());
+        dataSourceBean.setXaDataSourceClassName(properties.getXa().getDataSourceClassName());
+        dataSourceBean.setUniqueResourceName("core");
+        dataSourceBean.setMaxPoolSize(StringUtils.toInt(properties.getXa().getProperties().get("maxPoolSize"), 200));
+        dataSourceBean.setTestQuery(properties.getXa().getProperties().get("validationQuery"));
+        dataSourceBean.setBorrowConnectionTimeout(60);
+        return dataSourceBean;
+    }
+
+    /**
+     * 动态数据源
+     * @param dataSource 默认数据源
+     * @return 动态数据源
+     */
+    @Bean(initMethod = "init", destroyMethod = "close")
+    public AtomikosDataSourceBean atomikosDataSourceBean(DataSource dataSource) {
+        AtomikosDataSourceBean dataSourceBean = new AtomikosDataSourceBean();
+        dataSourceBean.setXaDataSource(new DynamicXaDataSourceImpl(dataSource));
+        dataSourceBean.setUniqueResourceName("dynamic");
+        dataSourceBean.setMaxPoolSize(StringUtils.toInt(properties.getXa().getProperties().get("maxPoolSize"), 200));
+        dataSourceBean.setBorrowConnectionTimeout(30);
+        return dataSourceBean;
+    }
+
+    @Bean
+    public UserTransactionManager userTransactionManager() {
+        UserTransactionManager transactionManager = new UserTransactionManager();
+        transactionManager.setForceShutdown(true);
+        return transactionManager;
+    }
+
+    @Bean
+    public UserTransactionImp userTransaction() throws SystemException {
+        UserTransactionImp userTransactionImp = new UserTransactionImp();
+        userTransactionImp.setTransactionTimeout(300);
+        return userTransactionImp;
+    }
+
+    @Bean
+    public JtaTransactionManager transactionManager(UserTransactionManager userTransactionManager, UserTransactionImp userTransaction) throws SystemException {
+        JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();
+        jtaTransactionManager.setTransactionManager(userTransactionManager);
+        jtaTransactionManager.setUserTransaction(userTransaction);
+        jtaTransactionManager.setAllowCustomIsolationLevels(true);
+        return jtaTransactionManager;
+    }
+
+    @Bean(name = "sqlExecutor")
+    public DynamicDataSourceSqlExecutorService sqlExecutor() {
+        return new DynamicDataSourceSqlExecutorService();
+    }
+
+}

+ 77 - 29
hsweb-web-service-impl-common/src/main/java/org/hsweb/web/service/impl/datasource/DynamicDataSourceServiceImpl.java

@@ -14,23 +14,29 @@
  * limitations under the License.
  */
 
-package org.hsweb.web.service.impl.datasource;
+package org.hsweb.web.datasource.dynamic;
 
+import com.atomikos.jdbc.AtomikosDataSourceBean;
+import com.atomikos.jdbc.AtomikosSQLException;
 import org.hsweb.concurrent.lock.LockFactory;
-import org.hsweb.ezorm.executor.SqlExecutor;
 import org.hsweb.web.bean.po.datasource.DataSource;
 import org.hsweb.web.core.datasource.DynamicDataSource;
 import org.hsweb.web.core.exception.NotFoundException;
 import org.hsweb.web.service.datasource.DataSourceService;
 import org.hsweb.web.service.datasource.DynamicDataSourceService;
-import org.hsweb.web.service.impl.basic.SqlExecutorService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.PlatformTransactionManager;
 
 import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -38,14 +44,21 @@ import java.util.concurrent.locks.ReadWriteLock;
 @Service("dynamicDataSourceService")
 public class DynamicDataSourceServiceImpl implements DynamicDataSourceService {
 
+    private Logger logger = LoggerFactory.getLogger(this.getClass());
     @Resource
     private DataSourceService dataSourceService;
 
     @Autowired(required = false)
-    private DynamicDataSource dynamicDataSource;
+    @Qualifier("atomikosDataSourceBean")
+    private AtomikosDataSourceBean atomikosDataSourceBean;
 
     @Autowired
     private LockFactory lockFactory;
+//    @Resource
+//    private DynamicDeployBeans dynamicDeployBeans;
+
+//    @Autowired
+//    ApplicationContext applicationContext;
 
     private ConcurrentMap<String, CacheInfo> cache = new ConcurrentHashMap<>();
 
@@ -54,13 +67,23 @@ public class DynamicDataSourceServiceImpl implements DynamicDataSourceService {
         return getCache(id).getDataSource();
     }
 
-    public PlatformTransactionManager getTransactionManager(String id) {
-        return getCache(id).getTransactionManager();
+    @Override
+    @PreDestroy
+    public void destroyAll() throws Exception {
+        cache.values().stream().map(CacheInfo::getDataSource).forEach(this::closeDataSource);
     }
 
-    @Override
-    public SqlExecutor getSqlExecutor(String id) {
-        return getCache(id).getSqlExecutor();
+    protected void closeDataSource(javax.sql.DataSource ds) {
+        if (ds instanceof AtomikosDataSourceBean) {
+            ((AtomikosDataSourceBean) ds).close();
+        }
+        if (ds instanceof Closeable) {
+            try {
+                ((Closeable) ds).close();
+            } catch (IOException e) {
+                logger.error("close datasource error", e);
+            }
+        }
     }
 
     protected CacheInfo getCache(String id) {
@@ -82,9 +105,13 @@ public class DynamicDataSourceServiceImpl implements DynamicDataSourceService {
             //加载datasource到缓存
             readWriteLock.writeLock().tryLock();
             try {
-                javax.sql.DataSource dataSource = dataSourceService.createDataSource(id);
+                javax.sql.DataSource dataSource = createDataSource(old);
+
                 CacheInfo cacheInfo = new CacheInfo(old.getHash(), dataSource);
-                cache.put(id, cacheInfo);
+                CacheInfo oldCache = cache.put(id, cacheInfo);
+                if (oldCache != null) {
+                    closeDataSource(oldCache.getDataSource());
+                }
                 return cacheInfo;
             } finally {
                 readWriteLock.writeLock().unlock();
@@ -94,10 +121,45 @@ public class DynamicDataSourceServiceImpl implements DynamicDataSourceService {
         }
     }
 
+    @Autowired
+    private DataSourceProperties properties;
+
+    protected javax.sql.DataSource createDataSource(DataSource dataSource) {
+        AtomikosDataSourceBean dataSourceBean = new AtomikosDataSourceBean();
+        Properties xaProperties = new Properties();
+        if (dataSource.getProperties() != null)
+            xaProperties.putAll(dataSource.getProperties());
+        if (dataSource.getDriver().contains("mysql")) {
+            dataSourceBean.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
+            xaProperties.put("pinGlobalTxToPhysicalConnection", true);
+            xaProperties.put("user", dataSource.getUsername());
+            xaProperties.put("password", dataSource.getPassword());
+            xaProperties.put("url", dataSource.getUrl());
+        } else {
+            dataSourceBean.setXaDataSourceClassName(properties.getXa().getDataSourceClassName());
+            xaProperties.put("username", dataSource.getUsername());
+            xaProperties.put("password", dataSource.getPassword());
+            xaProperties.put("url", dataSource.getUrl());
+            xaProperties.put("driverClassName", dataSource.getDriver());
+        }
+        dataSourceBean.setXaProperties(xaProperties);
+        dataSourceBean.setUniqueResourceName("ds_" + dataSource.getId());
+        dataSourceBean.setMaxPoolSize(200);
+        dataSourceBean.setMinPoolSize(5);
+        dataSourceBean.setTestQuery(dataSource.getTestSql());
+        dataSourceBean.setBorrowConnectionTimeout(60);
+        try {
+            dataSourceBean.init();
+        } catch (AtomikosSQLException e) {
+            throw new RuntimeException(e);
+        }
+        return dataSourceBean;
+    }
+
     @PostConstruct
     public void init() {
-        if (null != dynamicDataSource)
-            ((DynamicDataSourceImpl) dynamicDataSource).setDynamicDataSourceService(this);
+        if (null != atomikosDataSourceBean)
+            ((DynamicXaDataSourceImpl) atomikosDataSourceBean.getXaDataSource()).setDynamicDataSourceService(this);
     }
 
     class CacheInfo {
@@ -105,15 +167,9 @@ public class DynamicDataSourceServiceImpl implements DynamicDataSourceService {
 
         javax.sql.DataSource dataSource;
 
-        PlatformTransactionManager transactionManager;
-
-        SqlExecutor sqlExecutor;
-
         public CacheInfo(int hash, javax.sql.DataSource dataSource) {
             this.hash = hash;
             this.dataSource = dataSource;
-            sqlExecutor = new SqlExecutorService().setDataSource(dataSource);
-            transactionManager = new DataSourceTransactionManager(dataSource);
         }
 
         public int getHash() {
@@ -123,14 +179,6 @@ public class DynamicDataSourceServiceImpl implements DynamicDataSourceService {
         public javax.sql.DataSource getDataSource() {
             return dataSource;
         }
-
-        public SqlExecutor getSqlExecutor() {
-            return sqlExecutor;
-        }
-
-        public PlatformTransactionManager getTransactionManager() {
-            return transactionManager;
-        }
     }
 
 }

+ 110 - 0
hsweb-web-datasource/src/main/java/org/hsweb/web/datasource/dynamic/DynamicDataSourceSqlExecutorService.java

@@ -0,0 +1,110 @@
+/*
+ * Copyright 2015-2016 http://hsweb.me
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.hsweb.web.datasource.dynamic;
+
+import com.atomikos.jdbc.AtomikosDataSourceBean;
+import org.hsweb.ezorm.executor.AbstractJdbcSqlExecutor;
+import org.hsweb.ezorm.executor.SQL;
+import org.hsweb.ezorm.meta.expand.ObjectWrapper;
+import org.hsweb.ezorm.meta.expand.SimpleMapWrapper;
+import org.hsweb.ezorm.render.support.simple.SimpleSQL;
+import org.hsweb.web.core.datasource.DynamicDataSource;
+import org.springframework.jdbc.datasource.DataSourceUtils;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.annotation.Resource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 动态数据源sql执行器
+ */
+public class DynamicDataSourceSqlExecutorService extends AbstractJdbcSqlExecutor {
+
+    @Resource
+    protected AtomikosDataSourceBean atomikosDataSourceBean;
+
+    @Override
+    public Connection getConnection() {
+        return DataSourceUtils.getConnection(((DynamicDataSource) atomikosDataSourceBean.getXaDataSource()).getActiveDataSource());
+    }
+
+    @Override
+    public void releaseConnection(Connection connection) throws SQLException {
+        DataSourceUtils.releaseConnection(connection, ((DynamicDataSource) atomikosDataSourceBean.getXaDataSource()).getActiveDataSource());
+    }
+
+    @Override
+    @Transactional(readOnly = true)
+    public <T> List<T> list(SQL sql, ObjectWrapper<T> wrapper) throws SQLException {
+        return super.list(sql, wrapper);
+    }
+
+    @Override
+    @Transactional(readOnly = true)
+    public <T> T single(SQL sql, ObjectWrapper<T> wrapper) throws SQLException {
+        return super.single(sql, wrapper);
+    }
+
+    @Transactional(readOnly = true)
+    public List<Map<String, Object>> list(SQL sql) throws SQLException {
+        List<Map<String, Object>> data = list(sql, new SimpleMapWrapper());
+        return data;
+    }
+
+    @Transactional(readOnly = true)
+    public Map<String, Object> single(SQL sql) throws Exception {
+        Map<String, Object> data = single(sql, new SimpleMapWrapper());
+        return data;
+    }
+
+    @Transactional(readOnly = true)
+    public List<Map<String, Object>> list(String sql) throws Exception {
+        List<Map<String, Object>> data = list(create(sql), new SimpleMapWrapper());
+        return data;
+    }
+
+    @Transactional(readOnly = true)
+    public List<Map<String, Object>> list(String sql, Map<String, Object> param) throws Exception {
+        List<Map<String, Object>> data = list(create(sql, param), new SimpleMapWrapper());
+        return data;
+    }
+
+    @Transactional(readOnly = true)
+    public Map<String, Object> single(String sql) throws Exception {
+        Map<String, Object> data = single(create(sql));
+        return data;
+    }
+
+    @Transactional(readOnly = true)
+    public Map<String, Object> single(String sql, Map<String, Object> param) throws Exception {
+        Map<String, Object> data = single(create(sql, param));
+        return data;
+    }
+
+    public SQL create(String sql) {
+        return new SimpleSQL(sql);
+    }
+
+    public SQL create(String sql, Map<String, Object> param) {
+        SimpleSQL sql1 = new SimpleSQL(sql, param);
+        return sql1;
+    }
+
+}

+ 41 - 3
hsweb-web-service-impl-common/src/main/java/org/hsweb/web/service/impl/datasource/DynamicDataSourceImpl.java

@@ -14,22 +14,29 @@
  * limitations under the License.
  */
 
-package org.hsweb.web.service.impl.datasource;
+package org.hsweb.web.datasource.dynamic;
 
+import com.atomikos.jdbc.AtomikosDataSourceBean;
 import org.hsweb.web.core.datasource.DynamicDataSource;
 import org.hsweb.web.service.datasource.DynamicDataSourceService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.jdbc.datasource.AbstractDataSource;
 
 import javax.sql.DataSource;
+import javax.sql.XAConnection;
+import javax.sql.XADataSource;
+import java.io.Closeable;
 import java.sql.Connection;
 import java.sql.SQLException;
 
-public class DynamicDataSourceImpl extends AbstractDataSource implements DynamicDataSource {
+public class DynamicXaDataSourceImpl extends AbstractDataSource implements DynamicDataSource, XADataSource, Closeable {
+    private Logger logger = LoggerFactory.getLogger(DynamicDataSource.class);
     private javax.sql.DataSource defaultDataSource;
 
     protected DynamicDataSourceService dynamicDataSourceService;
 
-    public DynamicDataSourceImpl(javax.sql.DataSource defaultDataSource) {
+    public DynamicXaDataSourceImpl(javax.sql.DataSource defaultDataSource) {
         this.defaultDataSource = defaultDataSource;
     }
 
@@ -45,12 +52,26 @@ public class DynamicDataSourceImpl extends AbstractDataSource implements Dynamic
 
     public DataSource getActiveDataSource() {
         String sourceId = DynamicDataSource.getActiveDataSourceId();
+        logger.info("use datasource:{}", sourceId == null ? "default" : sourceId);
         if (sourceId == null || dynamicDataSourceService == null) return defaultDataSource;
         DataSource dataSource = dynamicDataSourceService.getDataSource(sourceId);
         if (dataSource == null) return defaultDataSource;
         return dataSource;
     }
 
+    public XADataSource getActiveXADataSource() {
+        DataSource activeDs = getActiveDataSource();
+        XADataSource xaDataSource;
+        if (activeDs instanceof XADataSource)
+            xaDataSource = ((XADataSource) activeDs);
+        else if (activeDs instanceof AtomikosDataSourceBean) {
+            xaDataSource = ((AtomikosDataSourceBean) activeDs).getXaDataSource();
+        } else {
+            throw new UnsupportedOperationException(activeDs.getClass() + " is not XADataSource");
+        }
+        return xaDataSource;
+    }
+
     public void setDefaultDataSource(DataSource defaultDataSource) {
         this.defaultDataSource = defaultDataSource;
     }
@@ -59,4 +80,21 @@ public class DynamicDataSourceImpl extends AbstractDataSource implements Dynamic
         this.dynamicDataSourceService = dynamicDataSourceService;
     }
 
+    @Override
+    public XAConnection getXAConnection() throws SQLException {
+        return getActiveXADataSource().getXAConnection();
+    }
+
+    @Override
+    public XAConnection getXAConnection(String user, String password) throws SQLException {
+        return getActiveXADataSource().getXAConnection(user, password);
+    }
+
+    public void close() {
+        try {
+            dynamicDataSourceService.destroyAll();
+        } catch (Exception e) {
+            logger.error("close datasource error", e);
+        }
+    }
 }

+ 3 - 0
hsweb-web-datasource/src/main/resources/META-INF/spring.factories

@@ -0,0 +1,3 @@
+# Auto Configure
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+org.hsweb.web.datasource.dynamic.DynamicDataSourceAutoConfiguration