
要使用Java SPI,需要遵循如下约定:
1、当服务提供者提供了接口的一种具体实现后,在jar包的META-INF/services目录下创建一个以“接口全路径名”为命名的文件,内容为实现类的全限定名;
2、接口实现类所在的jar包放在主程序的classpath中;
3、主程序通过java.util.ServiceLoder动态装载实现模块,它通过扫描META-INF/services目录下的配置文件找到实现类的全限定名,把类加载到JVM;
4、SPI的实现类必须携带一个无参构造方法
在项目中使用 SPI 机制来维护抽取元数据的功能是一个非常合适的方案。由于不同数据库的具体实现可能不同,但抽取元数据的主流程基本相同,SPI 机制能够提供灵活性和扩展性,使得在不修改主应用程序代码的情况下添加或更换具体数据库的实现变得简单且高效。
具体的解决方案和优点
- 统一接口:定义一个统一的接口,所有数据库的元数据抽取实现都需要实现这个接口。这样可以确保主流程的一致性。
- 独立实现:为每种数据库(如 MySQL、PostgreSQL、Oracle 等)提供具体的实现类。这些实现类独立于主应用程序,可以根据需要进行单独修改和优化。
- 动态加载:使用 SPI 机制,通过
ServiceLoader
动态加载不同的实现类。这使得在运行时可以灵活地选择和切换不同的数据库实现。 - 易于扩展:当需要支持新的数据库类型时,只需添加新的实现类并在配置文件中注册,不需要修改主应用程序代码。这提高了系统的可维护性和扩展性。
操作流程-多数据库元数据动态抽取
//获取当前的数据源连接参数
Datasource dataSource = baseMapper.selectById(datasourceId);
HashSet<String> tableSet = null;
//构建基础的连接参数,将连接信息,通过特定的JSON转换获取对应的数据信息
BaseConnectionParam connectionParam =
(BaseConnectionParam) DataSourceUtils.buildConnectionParams(
DbType.of(dataSource.getType()),
dataSource.getConnectionParams());
if (null == connectionParam) {
logger.error("data source connection failed");
}
构建参数具体流程
DataSourceUtils
1.getConnection方法,getDatasourceProcessor(dbType).getConnection(connectionParam),主要是为了获取对应数据库类型的处理实现类的构建参数方法。
public static ConnectionParam buildConnectionParams(DbType dbType, String connectionJson) {
return getDatasourceProcessor(dbType).createConnectionParams(connectionJson);
}
2.在getDatasourceProcessor(DbType dbType)方法中,首先要通过DataSourceProcessorProvider类,来将所有的处理实现类添加到dataSourceProcessorMap 中。
public static DataSourceProcessor getDatasourceProcessor(DbType dbType) {
Map<String, DataSourceProcessor> dataSourceProcessorMap = DataSourceProcessorProvider.getInstance().getDataSourceProcessorMap();
从第7步回来,可以知道我们的dataSourceProcessorMap是个不可被修改的一个map,里面存的是数据库类型和对应的实现处理类
if (!dataSourceProcessorMap.containsKey(dbType.name())) {
throw new IllegalArgumentException("illegal datasource type");
}
return dataSourceProcessorMap.get(dbType.name());
}
public static DataSourceProcessorProvider getInstance() {
return DataSourceClientProviderHolder.INSTANCE;
}
DataSourceProcessorProvider
DataSourceProcessorProvider类的构造方法中存在初始化方法
3.通过getInstance方法,进行了DataSourceClientProviderHolder.INSTANCE;
public static DataSourceProcessorProvider getInstance() {
return DataSourceClientProviderHolder.INSTANCE;
}
4.通过DataSourceClientProviderHolder ,生成了DataSourceProcessorProvider类
private static class DataSourceClientProviderHolder {
private static final DataSourceProcessorProvider INSTANCE = new DataSourceProcessorProvider();
}
5.DataSourceProcessorProvider的构造方法内有初始化方法
private DataSourceProcessorProvider() {
initDataSourceProcessorPlugin();
}
6.初始化方法中新建了一个处理器的管理类,并且执行了该类中的installProcessor方法
private void initDataSourceProcessorPlugin() {
dataSourcePluginManager = new DataSourceProcessorManager();
dataSourcePluginManager.installProcessor();
}
DataSourceProcessorManager
在DataSourceProcessorManager中定义了一个静态的map数组,是存放数据源类型和数据源类型对应的处理是实现类的
private static final Map<String, DataSourceProcessor> dataSourceProcessorMap = new ConcurrentHashMap<>();
//该方法,返回的数组是不可修改的
public Map<String, DataSourceProcessor> getDataSourceProcessorMap() {
return Collections.unmodifiableMap(dataSourceProcessorMap);
}
7.使用SPI机制加载DataSourceProcessor的所有实现类
public void installProcessor() {
//遍历所有的实现类
ServiceLoader.load(DataSourceProcessor.class).forEach(factory -> {
//获取对应实现类方法里的数据库类型名称
final String name = factory.getDbType().name();
logger.info("start register processor: {}", name);
//如果当前map已经存在该数据库类型,则抛出异常
if (dataSourceProcessorMap.containsKey(name)) {
throw new IllegalStateException(format("Duplicate datasource plugins named '%s'", name));
}
将当前循环中的实现类和当前类的数据库类型存入map中
loadDatasourceClient(factory);
logger.info("done register processor: {}", name);
});
}
private void loadDatasourceClient(DataSourceProcessor processor) {
DataSourceProcessor instance = processor.create();
dataSourceProcessorMap.put(processor.getDbType().name(), instance);
}
DMDataSourceProcessor
实现类中的构建连接参数方法
@Override
public ConnectionParam createConnectionParams(String connectionJson) {
return JSONUtils.parseObject(connectionJson, DMConnectionParam.class);
}
当前流程结束
继续流程
Connection connection =
DataSourceUtils.getConnection(DbType.of(dataSource.getType()), connectionParam);
构建连接具体流程
DataSourceUtils
getConnection方法,getDatasourceProcessor(dbType).getConnection(connectionParam),主要是为了获取对应数据库类型的处理实现类的连接方法。具体获取实现类的方法如上面构建参数一样
public static Connection getConnection(DbType dbType, ConnectionParam connectionParam) {
try {
return getDatasourceProcessor(dbType).getConnection(connectionParam);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
DMDataSourceProcessor
实现类中的创建连接方法
@Override
public String getJdbcUrl(ConnectionParam connectionParam) {
DMConnectionParam dmConnectionParam = (DMConnectionParam) connectionParam;
String jdbcUrl = dmConnectionParam.getJdbcUrl();
if (!StringUtils.isEmpty(dmConnectionParam.getOther())) {
return String.format("%s?%s", jdbcUrl, dmConnectionParam.getOther());
}
return String.format("%s", jdbcUrl);
}
@Override
public Connection getConnection(ConnectionParam connectionParam) throws ClassNotFoundException, SQLException {
//转换参数类型,将通用的类,转为子类达梦的连接参数
DMConnectionParam dmConnectionParam = (DMConnectionParam) connectionParam;
//利用反射,加载驱动
Class.forName(getDatasourceDriver());
//获取用户名
String user = dmConnectionParam.getUser();
//获取密码
String password = PasswordUtils.decodePassword(dmConnectionParam.getPassword());
//返回驱动连接
return DriverManager.getConnection(getJdbcUrl(connectionParam), user, password);
}
连接当前流程结束
继续流程
ResultSet tables = null;
try {
if (null == connection) {
throw new Exception("data source connection failed");
}
//获取连接的元数据
DatabaseMetaData metaData = connection.getMetaData();
String schema = null;
try {
//获取其中的模式名
schema = metaData.getConnection().getSchema();
} catch (SQLException e) {
logger.error("cant not get the schema : {}", e.getMessage(), e);
}
//获取数据库的所有表
tables = metaData.getTables(
connectionParam.getDatabase(),
getDbSchemaPattern(DbType.of(dataSource.getType()), schema, connectionParam),
"%", TABLE_TYPES);
if (null == tables) {
throw new Exception("get datasource tables error");
}
tableSet = new HashSet<>();
while (tables.next()) {
String name = tables.getString(TABLE_NAME);
tableSet.add(name);
}
} catch (Exception e) {
logger.error(e.toString(), e);
} finally {
closeResult(tables);
releaseConnection(connection);
}
List<String> tableList = new ArrayList<>(tableSet); // Convert Set to List
return getParamsOptions(tableList);
具体代码
具体操作流程代码
public List<ParamsOptions> getTables(Integer datasourceId) {
Datasource dataSource = baseMapper.selectById(datasourceId);
HashSet<String> tableSet = null;
BaseConnectionParam connectionParam =
(BaseConnectionParam) DataSourceUtils.buildConnectionParams(
DbType.of(dataSource.getType()),
dataSource.getConnectionParams());
if (null == connectionParam) {
logger.error("data source connection failed");
}
Connection connection =
DataSourceUtils.getConnection(DbType.of(dataSource.getType()), connectionParam);
ResultSet tables = null;
try {
if (null == connection) {
throw new Exception("data source connection failed");
}
DatabaseMetaData metaData = connection.getMetaData();
String schema = null;
try {
schema = metaData.getConnection().getSchema();
} catch (SQLException e) {
logger.error("cant not get the schema : {}", e.getMessage(), e);
}
tables = metaData.getTables(
connectionParam.getDatabase(),
getDbSchemaPattern(DbType.of(dataSource.getType()), schema, connectionParam),
"%", TABLE_TYPES);
if (null == tables) {
throw new Exception("get datasource tables error");
}
tableSet = new HashSet<>();
while (tables.next()) {
String name = tables.getString(TABLE_NAME);
tableSet.add(name);
}
} catch (Exception e) {
logger.error(e.toString(), e);
} finally {
closeResult(tables);
releaseConnection(connection);
}
List<String> tableList = new ArrayList<>(tableSet); // Convert Set to List
return getParamsOptions(tableList);
}
DataSourceUtils
新建一个工具类,来操作这些处理器
import com.fasterxml.jackson.databind.JsonNode;
import com.zlt.meta.common.DbType;
import com.zlt.meta.dto.BaseDataSourceParamDTO;
import com.zlt.meta.dto.ConnectionParam;
import com.zlt.meta.plugin.database.DataSourceProcessor;
import com.zlt.meta.plugin.database.DataSourceProcessorProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.util.Map;
public class DataSourceUtils {
public DataSourceUtils() {
}
private static final Logger logger = LoggerFactory.getLogger(DataSourceUtils.class);
//检查数据源参数
public static void checkDatasourceParam(BaseDataSourceParamDTO baseDataSourceParamDTO) {
getDatasourceProcessor(baseDataSourceParamDTO.getType()).checkDatasourceParam(baseDataSourceParamDTO);
}
//根据数据源参数 DTO 构建连接参数,并记录日志。
public static ConnectionParam buildConnectionParams(BaseDataSourceParamDTO baseDataSourceParamDTO) {
ConnectionParam connectionParams = getDatasourceProcessor(baseDataSourceParamDTO.getType())
.createConnectionParams(baseDataSourceParamDTO);
logger.info("parameters map:{}", connectionParams);
return connectionParams;
}
//根据数据库类型和连接 JSON 构建连接参数。
public static ConnectionParam buildConnectionParams(DbType dbType, String connectionJson) {
return getDatasourceProcessor(dbType).createConnectionParams(connectionJson);
}
//根据数据库类型和连接参数获取 JDBC URL。
public static String getJdbcUrl(DbType dbType, ConnectionParam baseConnectionParam) {
return getDatasourceProcessor(dbType).getJdbcUrl(baseConnectionParam);
}
//根据数据库类型和连接参数获取数据库连接,捕获并处理异常。
public static Connection getConnection(DbType dbType, ConnectionParam connectionParam) {
try {
return getDatasourceProcessor(dbType).getConnection(connectionParam);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
//根据数据库类型获取相应的数据源驱动程序。
public static String getDatasourceDriver(DbType dbType) {
return getDatasourceProcessor(dbType).getDatasourceDriver();
}
//根据数据库类型和连接参数字符串构建数据源参数 DTO。
public static BaseDataSourceParamDTO buildDatasourceParamDTO(DbType dbType, String connectionParams) {
return getDatasourceProcessor(dbType).createDatasourceParamDTO(connectionParams);
}
//根据数据库类型获取对应的数据源处理器,如果类型非法则抛出异常。
public static DataSourceProcessor getDatasourceProcessor(DbType dbType) {
Map<String, DataSourceProcessor> dataSourceProcessorMap = DataSourceProcessorProvider.getInstance().getDataSourceProcessorMap();
if (!dataSourceProcessorMap.containsKey(dbType.name())) {
throw new IllegalArgumentException("illegal datasource type");
}
return dataSourceProcessorMap.get(dbType.name());
}
//根据连接参数和数据库类型获取数据源的唯一标识符。
public static String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType) {
return getDatasourceProcessor(dbType).getDatasourceUniqueId(connectionParam, dbType);
}
//根据传入的参数字符串,解析为 JSON 节点,并通过相应的数据源处理器构建数据源参数 DTO。
public static BaseDataSourceParamDTO buildDatasourceParam(String param) {
JsonNode jsonNodes = JSONUtils.parseObject(param);
return getDatasourceProcessor(DbType.ofName(jsonNodes.get("type").asText().toUpperCase()))
.castDatasourceParamDTO(param);
}
}
DataSourceProcessorProvider
定义数据源处理器提供者
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class DataSourceProcessorProvider {
private static final Logger logger = LoggerFactory.getLogger(DataSourceProcessorProvider.class);
private DataSourceProcessorManager dataSourcePluginManager;
private DataSourceProcessorProvider() {
initDataSourceProcessorPlugin();
}
//构造方法
private static class DataSourceClientProviderHolder {
private static final DataSourceProcessorProvider INSTANCE = new DataSourceProcessorProvider();
}
public static DataSourceProcessorProvider getInstance() {
return DataSourceClientProviderHolder.INSTANCE;
}
//获取管理类所存储的不同数据源的处理器实现类的map
public Map<String, DataSourceProcessor> getDataSourceProcessorMap() {
return dataSourcePluginManager.getDataSourceProcessorMap();
}
//new 管理类,去通过SPI机制初始化处理器的map
private void initDataSourceProcessorPlugin() {
dataSourcePluginManager = new DataSourceProcessorManager();
dataSourcePluginManager.installProcessor();
}
}
DataSourceProcessorManager
定义一个执行器管理类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import static java.lang.String.format;
public class DataSourceProcessorManager {
private static final Logger logger = LoggerFactory.getLogger(DataSourceProcessorManager.class);
//在管理类里创建一个静态map数组,用于存放不同数据库的执行类
private static final Map<String, DataSourceProcessor> dataSourceProcessorMap = new ConcurrentHashMap<>();
public Map<String, DataSourceProcessor> getDataSourceProcessorMap() {
return Collections.unmodifiableMap(dataSourceProcessorMap);
}
//加载执行器
public void installProcessor() {
//使用SPI机制的加载方法,将DataSourceProcessor所有实现类进行遍历
ServiceLoader.load(DataSourceProcessor.class).forEach(factory -> {
//通过当前实现类的具体方法,获取该实现类所对应的数据库类型
final String name = factory.getDbType().name();
logger.info("start register processor: {}", name);
//如果当前map中已经存在该类型,则说明该类型的执行器已经被加载了,抛出异常
if (dataSourceProcessorMap.containsKey(name)) {
throw new IllegalStateException(format("Duplicate datasource plugins named '%s'", name));
}
//加载实现类
loadDatasourceClient(factory);
logger.info("done register processor: {}", name);
});
}
private void loadDatasourceClient(DataSourceProcessor processor) {
//返回对应的实现类
DataSourceProcessor instance = processor.create();
//放入数据源map,key为类型,value为实现类
dataSourceProcessorMap.put(processor.getDbType().name(), instance);
}
}
DataSourceProcessor
首先定义一个接口,示例数据源执行器接口
import com.zlt.meta.bean.BaseConnectionParam;
import com.zlt.meta.common.DbType;
import com.zlt.meta.dto.BaseDataSourceParamDTO;
import com.zlt.meta.dto.ConnectionParam;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
public interface DataSourceProcessor {
//将 JSON 转换为对应的数据源参数 DTO 对象。
BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson);
//检查数据源参数是否有效。
void checkDatasourceParam(BaseDataSourceParamDTO datasourceParam);
//获取数据源客户端的唯一标识符。
String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType);
//根据连接 JSON 创建数据源参数 DTO 对象。
BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson);
//创建数据源连接参数,这些参数将存储在数据源中。
ConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam);
//将 JSON 反序列化为数据源连接参数对象。
ConnectionParam createConnectionParams(String connectionJson);
//获取数据源的驱动程序。
String getDatasourceDriver();
//获取验证查询语句。
String getValidationQuery();
//根据连接参数获取 JDBC URL,该方法可以注入其他信息到 JDBC URL 中。
String getJdbcUrl(ConnectionParam connectionParam);
//根据连接参数获取数据库连接对象。
Connection getConnection(ConnectionParam connectionParam) throws ClassNotFoundException, SQLException, IOException;
//获取数据源的数据库类型。
DbType getDbType();
//创建数据源处理器对象
DataSourceProcessor create();
}
DMDataSourceProcessor
达梦数据库的实现类
import com.zlt.meta.common.Constants;
import com.google.auto.service.AutoService;
import com.zlt.meta.bean.BaseConnectionParam;
import com.zlt.meta.common.DataSourceConstants;
import com.zlt.meta.common.DbType;
import com.zlt.meta.dto.BaseDataSourceParamDTO;
import com.zlt.meta.dto.ConnectionParam;
import com.zlt.meta.plugin.database.AbstractDataSourceProcessor;
import com.zlt.meta.plugin.database.DataSourceProcessor;
import com.zlt.meta.utils.JSONUtils;
import com.zlt.meta.utils.PasswordUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@AutoService(DataSourceProcessor.class)
public class DMDataSourceProcessor extends AbstractDataSourceProcessor {
private final Logger logger = LoggerFactory.getLogger(DMDataSourceProcessor.class);
@Override
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
return JSONUtils.parseObject(paramJson,DMDataSourceParamDTO.class);
}
@Override
public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
DMConnectionParam connectionParams = (DMConnectionParam) createConnectionParams(connectionJson);
DMDataSourceParamDTO mysqlDatasourceParamDTO = new DMDataSourceParamDTO();
mysqlDatasourceParamDTO.setUserName(connectionParams.getUser());
mysqlDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
mysqlDatasourceParamDTO.setOther(parseOther(connectionParams.getOther()));
String address = connectionParams.getAddress();
String[] hostSeperator = address.split(Constants.DOUBLE_SLASH);
String[] hostPortArray = hostSeperator[hostSeperator.length - 1].split(Constants.COMMA);
mysqlDatasourceParamDTO.setPort(Integer.parseInt(hostPortArray[0].split(Constants.COLON)[1]));
mysqlDatasourceParamDTO.setHost(hostPortArray[0].split(Constants.COLON)[0]);
return mysqlDatasourceParamDTO;
}
@Override
public BaseConnectionParam createConnectionParams(BaseDataSourceParamDTO dataSourceParam) {
DMDataSourceParamDTO dmDatasourceParam = (DMDataSourceParamDTO) dataSourceParam;
String address = String.format("%s%s:%s", DataSourceConstants.JDBC_DM, dmDatasourceParam.getHost(),
dmDatasourceParam.getPort());
String jdbcUrl = String.format("%s/%s", address, dmDatasourceParam.getDatabase());
DMConnectionParam dmConnectionParam = new DMConnectionParam();
dmConnectionParam.setJdbcUrl(jdbcUrl);
dmConnectionParam.setDatabase(dmDatasourceParam.getDatabase());
dmConnectionParam.setAddress(address);
dmConnectionParam.setUser(dmDatasourceParam.getUserName());
dmConnectionParam.setPassword(PasswordUtils.encodePassword(dmDatasourceParam.getPassword()));
dmConnectionParam.setDriverClassName(getDatasourceDriver());
dmConnectionParam.setValidationQuery(getValidationQuery());
dmConnectionParam.setOther(transformOther(dmDatasourceParam.getOther()));
dmConnectionParam.setProps(dmDatasourceParam.getOther());
return dmConnectionParam;
}
@Override
public ConnectionParam createConnectionParams(String connectionJson) {
return JSONUtils.parseObject(connectionJson, DMConnectionParam.class);
}
@Override
public String getDatasourceDriver() {
return DataSourceConstants.COM_DM_JDBC_DRIVER;
}
@Override
public String getValidationQuery() {
return DataSourceConstants.MYSQL_VALIDATION_QUERY;
}
@Override
public String getJdbcUrl(ConnectionParam connectionParam) {
DMConnectionParam dmConnectionParam = (DMConnectionParam) connectionParam;
String jdbcUrl = dmConnectionParam.getJdbcUrl();
if (!StringUtils.isEmpty(dmConnectionParam.getOther())) {
return String.format("%s?%s", jdbcUrl, dmConnectionParam.getOther());
}
return String.format("%s", jdbcUrl);
}
@Override
public Connection getConnection(ConnectionParam connectionParam) throws ClassNotFoundException, SQLException {
DMConnectionParam dmConnectionParam = (DMConnectionParam) connectionParam;
Class.forName(getDatasourceDriver());
String user = dmConnectionParam.getUser();
String password = PasswordUtils.decodePassword(dmConnectionParam.getPassword());
return DriverManager.getConnection(getJdbcUrl(connectionParam), user, password);
}
@Override
public DbType getDbType() {
return DbType.DM;
}
@Override
public DataSourceProcessor create() {
return new DMDataSourceProcessor();
}
private String transformOther(Map<String, String> paramMap) {
if (MapUtils.isEmpty(paramMap)) {
return null;
}
Map<String, String> otherMap = new HashMap<>();
paramMap.forEach((k, v) -> {
if (!checkKeyIsLegitimate(k)) {
return;
}
otherMap.put(k, v);
});
if (MapUtils.isEmpty(otherMap)) {
return null;
}
StringBuilder stringBuilder = new StringBuilder();
otherMap.forEach((key, value) -> stringBuilder.append(String.format("%s=%s&", key, value)));
return stringBuilder.toString();
}
private static boolean checkKeyIsLegitimate(String key) {
// return !key.contains(ALLOW_LOAD_LOCAL_IN_FILE_NAME)
// && !key.contains(AUTO_DESERIALIZE)
// && !key.contains(ALLOW_LOCAL_IN_FILE_NAME)
// && !key.contains(ALLOW_URL_IN_LOCAL_IN_FILE_NAME);
return true;
}
private Map<String, String> parseOther(String other) {
if (StringUtils.isEmpty(other)) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
for (String config : other.split("&")) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
}
MySQLDataSourceProcessor
mysq数据库的实现类
import com.google.auto.service.AutoService;
import com.zlt.meta.bean.BaseConnectionParam;
import com.zlt.meta.common.Constants;
import com.zlt.meta.common.DataSourceConstants;
import com.zlt.meta.dto.BaseDataSourceParamDTO;
import com.zlt.meta.dto.ConnectionParam;
import com.zlt.meta.plugin.database.AbstractDataSourceProcessor;
import com.zlt.meta.plugin.database.DataSourceProcessor;
import com.zlt.meta.utils.JSONUtils;
import com.zlt.meta.utils.PasswordUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import com.zlt.meta.common.DbType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@AutoService(DataSourceProcessor.class)
public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
private final Logger logger = LoggerFactory.getLogger(MySQLDataSourceProcessor.class);
private static final String ALLOW_LOAD_LOCAL_IN_FILE_NAME = "allowLoadLocalInfile";
private static final String AUTO_DESERIALIZE = "autoDeserialize";
private static final String ALLOW_LOCAL_IN_FILE_NAME = "allowLocalInfile";
private static final String ALLOW_URL_IN_LOCAL_IN_FILE_NAME = "allowUrlInLocalInfile";
private static final String APPEND_PARAMS = "allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false";
@Override
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
return JSONUtils.parseObject(paramJson, MySQLDataSourceParamDTO.class);
}
@Override
public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
MySQLConnectionParam
connectionParams = (MySQLConnectionParam) createConnectionParams(connectionJson);
MySQLDataSourceParamDTO
mysqlDatasourceParamDTO = new MySQLDataSourceParamDTO();
mysqlDatasourceParamDTO.setUserName(connectionParams.getUser());
mysqlDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
mysqlDatasourceParamDTO.setOther(parseOther(connectionParams.getOther()));
String address = connectionParams.getAddress();
String[] hostSeperator = address.split(Constants.DOUBLE_SLASH);
String[] hostPortArray = hostSeperator[hostSeperator.length - 1].split(Constants.COMMA);
mysqlDatasourceParamDTO.setPort(Integer.parseInt(hostPortArray[0].split(Constants.COLON)[1]));
mysqlDatasourceParamDTO.setHost(hostPortArray[0].split(Constants.COLON)[0]);
return mysqlDatasourceParamDTO;
}
@Override
public BaseConnectionParam createConnectionParams(BaseDataSourceParamDTO dataSourceParam) {
MySQLDataSourceParamDTO mysqlDatasourceParam = (MySQLDataSourceParamDTO) dataSourceParam;
String address = String.format("%s%s:%s", DataSourceConstants.JDBC_MYSQL, mysqlDatasourceParam.getHost(),
mysqlDatasourceParam.getPort());
String jdbcUrl = String.format("%s/%s", address, mysqlDatasourceParam.getDatabase());
MySQLConnectionParam
mysqlConnectionParam = new MySQLConnectionParam();
mysqlConnectionParam.setJdbcUrl(jdbcUrl);
mysqlConnectionParam.setDatabase(mysqlDatasourceParam.getDatabase());
mysqlConnectionParam.setAddress(address);
mysqlConnectionParam.setUser(mysqlDatasourceParam.getUserName());
mysqlConnectionParam.setPassword(PasswordUtils.encodePassword(mysqlDatasourceParam.getPassword()));
mysqlConnectionParam.setDriverClassName(getDatasourceDriver());
mysqlConnectionParam.setValidationQuery(getValidationQuery());
mysqlConnectionParam.setOther(transformOther(mysqlDatasourceParam.getOther()));
mysqlConnectionParam.setProps(mysqlDatasourceParam.getOther());
return mysqlConnectionParam;
}
@Override
public ConnectionParam createConnectionParams(String connectionJson) {
return JSONUtils.parseObject(connectionJson, MySQLConnectionParam.class);
}
@Override
public String getDatasourceDriver() {
return DataSourceConstants.COM_MYSQL_CJ_JDBC_DRIVER;
}
@Override
public String getValidationQuery() {
return DataSourceConstants.MYSQL_VALIDATION_QUERY;
}
@Override
public String getJdbcUrl(ConnectionParam connectionParam) {
MySQLConnectionParam
mysqlConnectionParam = (MySQLConnectionParam) connectionParam;
String jdbcUrl = mysqlConnectionParam.getJdbcUrl();
if (!StringUtils.isEmpty(mysqlConnectionParam.getOther())) {
return String.format("%s?%s&%s", jdbcUrl, mysqlConnectionParam.getOther(), APPEND_PARAMS);
}
return String.format("%s?%s", jdbcUrl, APPEND_PARAMS);
}
@Override
public Connection getConnection(ConnectionParam connectionParam) throws ClassNotFoundException, SQLException {
MySQLConnectionParam mysqlConnectionParam = (MySQLConnectionParam) connectionParam;
Class.forName(getDatasourceDriver());
String user = mysqlConnectionParam.getUser();
if (user.contains(AUTO_DESERIALIZE)) {
logger.warn("sensitive param : {} in username field is filtered", AUTO_DESERIALIZE);
user = user.replace(AUTO_DESERIALIZE, "");
}
String password = PasswordUtils.decodePassword(mysqlConnectionParam.getPassword());
if (password.contains(AUTO_DESERIALIZE)) {
logger.warn("sensitive param : {} in password field is filtered", AUTO_DESERIALIZE);
password = password.replace(AUTO_DESERIALIZE, "");
}
return DriverManager.getConnection(getJdbcUrl(connectionParam), user, password);
}
@Override
public DbType getDbType() {
return DbType.MYSQL;
}
@Override
public DataSourceProcessor create() {
return new MySQLDataSourceProcessor();
}
private String transformOther(Map<String, String> paramMap) {
if (MapUtils.isEmpty(paramMap)) {
return null;
}
Map<String, String> otherMap = new HashMap<>();
paramMap.forEach((k, v) -> {
if (!checkKeyIsLegitimate(k)) {
return;
}
otherMap.put(k, v);
});
if (MapUtils.isEmpty(otherMap)) {
return null;
}
StringBuilder stringBuilder = new StringBuilder();
otherMap.forEach((key, value) -> stringBuilder.append(String.format("%s=%s&", key, value)));
return stringBuilder.toString();
}
private static boolean checkKeyIsLegitimate(String key) {
return !key.contains(ALLOW_LOAD_LOCAL_IN_FILE_NAME)
&& !key.contains(AUTO_DESERIALIZE)
&& !key.contains(ALLOW_LOCAL_IN_FILE_NAME)
&& !key.contains(ALLOW_URL_IN_LOCAL_IN_FILE_NAME);
}
private Map<String, String> parseOther(String other) {
if (StringUtils.isEmpty(other)) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
for (String config : other.split("&")) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
}
Comments | NOTHING