组件【shardingsphere】
小伙伴们,你们好,欢迎来到老寇云平台,这个组件【shardingsphere】是分库分表&读写分离,支持从nacos拉取配置
# 使用
nacos-shardingsphere例子,请点击我 (opens new window)
注意:需要自己提前创建数据库和表

1.引入依赖
<dependencies>
<dependency>
<groupId>org.laokou</groupId>
<artifactId>laokou-common-shardingsphere</artifactId>
</dependency>
</dependencies>
2.配置分库分表&读写分离
注意:对shardingsphere数据库用户名/密码进行加密与解密
class ShardingSphereTest {
@Test
void testCrypto() throws Exception {
// 私钥加密,公钥解密
String str = "123456";
String publicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJ4o6sn4WoPmbs7DR9mGQzuuUQM9erQTVPpwxIzB0ETYkyKffO097qXVRLA6KPmaV+/siWewR7vpfYYjWajw5KkCAwEAAQ==";
String encryptStr = CryptoUtils.encrypt(str);
String decryptStr = CryptoUtils.decrypt(publicKey, encryptStr);
Assertions.assertEquals(str, decryptStr);
}
}
application-shardingsphere.yaml【nacos配置】
# 加密公钥
public-key: MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJ4o6sn4WoPmbs7DR9mGQzuuUQM9erQTVPpwxIzB0ETYkyKffO097qXVRLA6KPmaV+/siWewR7vpfYYjWajw5KkCAwEAAQ==
dataSources:
# 主表【数据库配置,请按照作者来,请不要瞎几把改!!!】
master:
dataSourceClassName: com.zaxxer.hikari.HikariDataSource
driverClassName: org.postgresql.Driver
jdbcUrl: jdbc:postgresql://postgresql:5432/kcloud_platform_test?tcpKeepAlive=true&reWriteBatchedInserts=true&stringtype=unspecified&ApplicationName=laokou-sample-shardingsphere&useSSL=false
username: ENC(VZamSTMi224AH6RUtJGXNldiDp/XEL2ozRhBUu/o9ChodT4JEb9kE/j0EFhXKbjsfvLVacUW0AUzetA6OrNJug==)
password: ENC(laIHkPM/z03tYjA95hES4+BhyjyhvrPjJynrC65oDyXnUTP0Tge1UxwERWFBbHoOOQZ2GzzUrRhEYJ3jFb89eQ==)
# 从表【数据库配置,请按照作者来,请不要瞎几把改!!!】
slave:
dataSourceClassName: com.zaxxer.hikari.HikariDataSource
driverClassName: org.postgresql.Driver
jdbcUrl: jdbc:postgresql://postgresql:5432/kcloud_platform_test?tcpKeepAlive=true&reWriteBatchedInserts=true&stringtype=unspecified&ApplicationName=laokou-sample-shardingsphere&useSSL=false
username: ENC(VZamSTMi224AH6RUtJGXNldiDp/XEL2ozRhBUu/o9ChodT4JEb9kE/j0EFhXKbjsfvLVacUW0AUzetA6OrNJug==)
password: ENC(laIHkPM/z03tYjA95hES4+BhyjyhvrPjJynrC65oDyXnUTP0Tge1UxwERWFBbHoOOQZ2GzzUrRhEYJ3jFb89eQ==)
rules:
- !SHARDING
tables:
# 分表的表名
boot_sys_user:
# 分表规则,按月份分表
actualDataNodes: master.boot_sys_user_20240$->{1..9},master.boot_sys_user_2024$->{10..12}
# 分表策略
tableStrategy:
# 规格
standard:
# 按创建时间字段分表
shardingColumn: create_date
# 自定义分表算法名称
shardingAlgorithmName: laokou_table_inline
# 主键生成策略
keyGenerateStrategy:
# 主键字段名称
column: id
# 主键生成采用的算法名称
keyGeneratorName: snowflake
# 自定义分表算法
shardingAlgorithms:
# 自定义分表算法名称
laokou_table_inline:
# 时间范围分片算法
type: INTERVAL
props:
# 分片键的时间戳格式
datetime-pattern: "yyyy-MM-dd HH:mm:ss"
# 真实表的后缀格式
sharding-suffix-pattern: "yyyyMM"
# 时间分片下界值
datetime-lower: "2024-01-01 00:00:00"
# 时间分片上界值
datetime-upper: "2024-12-31 23:59:59"
# 分片间隔
datetime-interval-amount: 1
# 按月分表
datetime-interval-unit: "months"
# 主键生成器
keyGenerators:
# 雪花算法配置
snowflake:
# 雪花算法
type: SNOWFLAKE
# 属性
props:
# 机器工作ID
work-id: 123
- !READWRITE_SPLITTING
# 数据源分组
dataSourceGroups:
# 读写分离配置
laokou_readwrite_data_sources:
# 主节点写入
writeDataSourceName: master
# 从节点读取
readDataSourceNames:
- slave
# 自定义负载均衡算法名称
loadBalancerName: laokou_load_balance_algorithm
# 自定义负载均衡算法
loadBalancers:
# 自定义负载均衡算法名称
laokou_load_balance_algorithm:
# 轮询调度算法
type: ROUND_ROBIN
# 属性配置
props:
# 显示SQL语句
sql-show: true
3.使用
spring:
datasource:
dynamic:
datasource:
master:
# 拉取nacos的配置
url: jdbc:shardingsphere:nacos:application-shardingsphere.yaml
# 这个配置请写死,请不要瞎几把改【基于spi发现】
# 这个配置请写死,请不要瞎几把改【基于spi发现】
# 这个配置请写死,请不要瞎几把改【基于spi发现】
driver-class-name: org.laokou.common.shardingsphere.config.nacos.NacosShardingSphereDriver
cloud:
# nacos的使用,请参考本教程组件【nacos】一节有详细介绍
nacos:
discovery:
server-addr: nacos:8848
username: nacos
password: nacos
namespace: public
cluster-name: laokou-cluster
group: DEFAULT_GROUP
config:
server-addr: nacos:8848
username: nacos
password: nacos
namespace: public
cluster-name: laokou-cluster
# https://github.com/alibaba/spring-cloud-alibaba/blob/2021.x/spring-cloud-alibaba-docs/src/main/asciidoc-zh/nacos-config.adoc
# 指定读取的文件格式
file-extension: yaml
group: DEFAULT_GROUP
refresh-enabled: true
4.启动nacos和laokou-sample-shardingsphere【略】
5.启动并访问 http://localhost:9033
# 扩展
众所周知,市面上的配置中心五花八门,如何适配各种配置中心?,跟随我来扩展吧!
在开始之前,我们先来看一下ShardingSphereDriver (opens new window) 了解整个加载流程【基于SPI发现】





这个流程也很简单,其实就是将yaml文件的数据转换成以shardingsphere形式的数据源,因此,我们可以将公共部分进行抽象,只需要补充各个配置中心拉取配置的代码。

nacos为例【nacos与consul、apollo类似,我就不一个一个展示啦~】
AbstractShardingSphereDriver
/**
* Abstract ShardingSphere driver.
*
* @author laokou
*/
@SuppressWarnings("UseAbstractOfJDBCDriverClass")
public abstract class AbstractShardingSphereDriver implements Driver {
private static final int MAJOR_DRIVER_VERSION = 5;
private static final int MINOR_DRIVER_VERSION = 5;
@HighFrequencyInvocation(canBeCached = true)
@Override
public Connection connect(final String url, final Properties info) throws SQLException {
return acceptsURL(url) ? getDriverDataSourceCache().get(url, getDriverUrlPrefix()).getConnection() : null;
}
@Override
public boolean acceptsURL(final String url) {
return null != url && url.startsWith(getDriverUrlPrefix());
}
@Override
public DriverPropertyInfo[] getPropertyInfo(final String url, final Properties info) {
return new DriverPropertyInfo[0];
}
@Override
public int getMajorVersion() {
return MAJOR_DRIVER_VERSION;
}
@Override
public int getMinorVersion() {
return MINOR_DRIVER_VERSION;
}
@Override
public boolean jdbcCompliant() {
return false;
}
@Override
public Logger getParentLogger() {
return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
}
abstract protected AbstractDriverDataSourceCache getDriverDataSourceCache();
abstract protected String getDriverUrlPrefix();
}
AbstractDriverDataSourceCache
/**
* Abstract Driver data source cache.
*
* @author laokou
*/
public abstract class AbstractDriverDataSourceCache {
private final Map<String, DataSource> DATASOURCE_MAP = new ConcurrentHashMap<>(MapUtil.initialCapacity(16));
/**
* Get data source.
* @param url URL
* @param urlPrefix URL prefix
* @return got data source
*/
public DataSource get(final String url, final String urlPrefix) {
if (DATASOURCE_MAP.containsKey(url)) {
return DATASOURCE_MAP.get(url);
}
return DATASOURCE_MAP.computeIfAbsent(url,
driverUrl -> createDataSource(ShardingSphereURL.parse(driverUrl.substring(urlPrefix.length()))));
}
abstract protected AbstractShardingSphereURLLoadEngine getShardingSphereURLLoadEngine(ShardingSphereURL url);
private <T extends Throwable> DataSource createDataSource(final ShardingSphereURL url) throws T {
try {
AbstractShardingSphereURLLoadEngine urlLoadEngine = getShardingSphereURLLoadEngine(url);
return YamlShardingSphereDataSourceFactory.createDataSource(urlLoadEngine.loadContent());
}
catch (final IOException ex) {
throw (T) new SQLException(ex);
}
catch (final SQLException ex) {
throw (T) ex;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
AbstractShardingSphereURLLoadEngine
/**
* Abstract ShardingSphere URL load engine.
*
* @author laokou
*/
@Slf4j
public abstract class AbstractShardingSphereURLLoadEngine {
private static final Pattern ENC_PATTERN = Pattern.compile("^ENC\\((.*)\\)$");
private static final String PUBLIC_KEY = "public-key";
private static final String CRYPTO_PREFIX = "ENC(";
private static final String CRYPTO_SUFFIX = ")";
protected final ShardingSphereURL url;
protected AbstractShardingSphereURLLoadEngine(final ShardingSphereURL url) {
this.url = url;
}
/**
* Load configuration content.
* @return loaded content
*/
public byte[] loadContent() throws Exception {
return resolvePropertyValue(getContent()).getBytes(StandardCharsets.UTF_8);
}
abstract protected String getContent() throws IOException, NacosException;
private String resolvePropertyValue(String value) {
List<String> list = getList(value);
if (list.isEmpty()) {
throw new SystemException("S_ShardingSphere_PullConfigError", "无法拉取ShardingSphere配置");
}
List<String> strList = list.stream().filter(i -> i.startsWith(PUBLIC_KEY)).toList();
String publicKey = StringConstant.EMPTY;
if (CollectionUtil.isNotEmpty(strList)) {
publicKey = strList.getFirst().substring(11).trim();
}
StringBuilder stringBuilder = new StringBuilder();
String finalPublicKey = publicKey;
list.forEach(item -> {
if (!item.startsWith(PUBLIC_KEY)) {
if (item.contains(CRYPTO_PREFIX) && item.contains(CRYPTO_SUFFIX)) {
int index = item.indexOf(StringConstant.RISK);
String key = item.substring(0, index + 2);
String val = item.substring(index + 2).trim();
stringBuilder.append(key).append(decrypt(finalPublicKey, val)).append("\n");
}
else {
stringBuilder.append(item).append("\n");
}
}
});
return stringBuilder.toString();
}
private List<String> getList(String value) {
List<String> list = new ArrayList<>(256);
try (LineNumberReader reader = new LineNumberReader(new InputStreamReader(
new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8)), StandardCharsets.UTF_8))) {
String str;
while (ObjectUtil.isNotNull((str = reader.readLine()))) {
// #开头直接丢弃
if (!str.trim().startsWith(WELL_NO)) {
list.add(str);
}
}
}
catch (IOException e) {
log.error("错误信息:{}", e.getMessage());
}
return list;
}
/**
* 字符串解密.
* @param cipherText 加密字符串
* @param publicKey 公钥
*/
private String decrypt(String publicKey, String cipherText) {
if (StringUtils.hasText(cipherText)) {
Matcher matcher = ENC_PATTERN.matcher(cipherText);
if (matcher.find()) {
try {
return CryptoUtils.decrypt(publicKey, matcher.group(1));
}
catch (Exception e) {
log.error("ShardingSphere decrypt error", e);
}
}
}
return cipherText;
}
}
NacosShardingSphereDriver
/**
* Nacos ShardingSphere driver.
*
* @author laokou
*/
@SuppressWarnings("UseNacosOfJDBCDriverClass")
public final class NacosShardingSphereDriver extends AbstractShardingSphereDriver {
private static final String DRIVER_URL_PREFIX = "jdbc:shardingsphere:nacos:";
static {
try {
DriverManager.registerDriver(new NacosShardingSphereDriver());
}
catch (final SQLException ex) {
throw new DriverRegisterException(ex);
}
}
private final NacosDriverDataSourceCache dataSourceCache = new NacosDriverDataSourceCache();
@Override
protected AbstractDriverDataSourceCache getDriverDataSourceCache() {
return dataSourceCache;
}
@Override
protected String getDriverUrlPrefix() {
return DRIVER_URL_PREFIX;
}
}
NacosDriverDataSourceCache
/**
* Nacos Driver data source cache.
*
* @author laokou
*/
@Getter
public final class NacosDriverDataSourceCache extends AbstractDriverDataSourceCache {
@Override
protected AbstractShardingSphereURLLoadEngine getShardingSphereURLLoadEngine(ShardingSphereURL url) {
return new NacosShardingSphereURLLoadEngine(url);
}
}
NacosShardingSphereURLLoadEngine
/**
* Nacos ShardingSphere URL load engine.
*
* @author laokou
*/
@Slf4j
public final class NacosShardingSphereURLLoadEngine extends AbstractShardingSphereURLLoadEngine {
private static final String BIND_YAML_NAME = "bootstrap.yml";
private static final String YAML_FORMAT = "yaml";
private static final String NACOS_CONFIG_PREFIX = "spring.cloud.nacos.config";
public NacosShardingSphereURLLoadEngine(ShardingSphereURL url) {
super(url);
}
@Override
protected String getContent() throws IOException, NacosException {
// Nacos拉取配置
NacosConfigProperties properties = PropertyUtil.bindOrCreate(NACOS_CONFIG_PREFIX, NacosConfigProperties.class,
BIND_YAML_NAME, YAML_FORMAT);
String group = properties.getGroup();
NacosConfigManager nacosConfigManager = new NacosConfigManager(properties);
ConfigService configService = nacosConfigManager.getConfigService();
String dataId = url.getConfigurationSubject();
Preconditions.checkArgument(StringUtil.isNotEmpty(dataId),
"Nacos dataId is required in ShardingSphere driver URL.");
return configService.getConfig(dataId, group, 5000);
}
}
配置SPI发现
注意:在resources目录下创建META-INF/services文件夹,创建 java.sql.Driver
文件
org.laokou.common.shardingsphere.config.nacos.NacosShardingSphereDriver
org.laokou.common.shardingsphere.config.apollo.ApolloShardingSphereDriver
org.laokou.common.shardingsphere.config.consul.ConsulShardingSphereDriver

总结:集成其他的配置中心,只需继承三个抽象类。
- AbstractShardingSphereDriver
- AbstractDriverDataSourceCache
- AbstractShardingSphereURLLoadEngine
注意:consul和apollo需要你们自行补充
ApolloShardingSphereURLLoadEngine
/**
* Apollo ShardingSphere URL load engine.
*
* @author laokou
*/
@Slf4j
public final class ApolloShardingSphereURLLoadEngine extends AbstractShardingSphereURLLoadEngine {
public ApolloShardingSphereURLLoadEngine(ShardingSphereURL url) {
super(url);
}
@Override
protected String getContent() {
// 补充拉取配置的代码
return null;
}
}
ConsulShardingSphereURLLoadEngine
/**
* Consul ShardingSphere URL load engine.
*
* @author laokou
*/
@Slf4j
public final class ConsulShardingSphereURLLoadEngine extends AbstractShardingSphereURLLoadEngine {
public ConsulShardingSphereURLLoadEngine(ShardingSphereURL url) {
super(url);
}
@Override
protected String getContent() {
// 补充拉取配置的代码
return null;
}
}
我是老寇,我们下次再见