Seata在Spring boot中的读写分离实现

读写分离的实现方式

一般的读写分离的3三种方式:

  1. 作用范围是针对整个应用,所有的读都是默认走读库,一旦遇到写操作则接下来的读都走写库。
  2. 作用范围为按照一定的命名规则写的方法或者类,比如由readqueryfind开头的方法名都走读库,其他的方法走写库
  3. 使用具体地注解来标注相应的方法,显式的告知该方法走的是读库还是读库。一般来说,会将默认的库设置为写库,使用注解的方法则走读库。

这3种方法都是有优缺点的,具体使用哪种策略,需要看实际情况来确定。

  • 如果你对主从延迟导致的数据不一致非常敏感,那么不适用第一种方法。但是好处就是无入侵代码,整合起来非常快。
  • 如果你现在改造的是一个老项目,之前的方法命名并没有统一的规范,那么则不适用于第二种方法。对新项目来说,这个方法可以较大地节省人力。
  • 第三种方法对代码有较大的入侵,但是总体来说对读写分离的情况可控程度较高。排查问题的时候也比较简单。

本文使用的策略是第三种实现方式,能够较好的兼容绝大部分情况。

如何集成读写分离

特点:同时支持seata框架和不带seata框架两种情况。
使用方式:

  1. 将下面的源码复制到项目中可以被springboot扫码的地方。
  2. 在想要只读的方法上加入@SlaveDb注解。
  3. 更改数据库的配置,如下所示。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    spring:
    datasource:
    master:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://192.168.10.203:3306/jte253?useSSL=false&serverTimezone=UTC
    username: root
    password: xyz11111111
    max-active: 80
    min-idle: 3
    max-wait: 5000
    slave:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://192.168.10.203:3306/jte253?useSSL=false&serverTimezone=UTC
    username: jte_readonly
    password: 123456
    max-active: 80
    min-idle: 3
    max-wait: 5000

注意事项:

  • 1、@SlaveDb方法镶嵌在增删改的事务中被调用时,失效。原因是事务开始之前会获取一个connection(默认的那个),之后在整个事务中,将会复用这个connection,所以在切面中再怎么改ThreadLocal中的标识,也没有用。会一直使用写库。
  • 2、在同一个service中,如果使用this调用同一个类的方法,则根本不会触发aop。详细原因可以看这篇文章

源码解析

代码结构

实现读写分离一共需要5个类,他们分别的源码和作用是:

  • SlaveDb: 注解类,用于在方法上标识走读库。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    /**
    * @author https://www.jdkdownload.com
    * @since 2020-11-17 15:36
    */
    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.METHOD)
    public @interface SlaveDb {
    String value() default "";
    }
  • DataSourceAopInService: Aop类,作用是帮助扫描到SlaveDb注解,并且在方法真正执行前做一些切面操作。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56

    /**
    * 在service层决定数据源
    * <p>
    * 必须在事务AOP之前执行,所以实现Ordered,order的值越小,越先执行
    * 如果一旦开始切换到写库,则之后的读都会走写库
    * @author https://www.jdkdownload.com
    */
    @Aspect
    @Component
    @Slf4j
    public class DataSourceAopInService implements PriorityOrdered {

    @Pointcut("@annotation(com.jte.cloud.platform.readwrite.SlaveDb)")
    public void pointcut() { }

    /**
    * 之前的实现方式在Service调用其它service的方法时会再次触发切面,
    * 注意可能存在写方法里面调用读库的问题
    * @param joinPoint
    * @return
    */
    @Around("pointcut()")
    public Object setDataSourceType(ProceedingJoinPoint joinPoint) throws Throwable {

    boolean isRoute = false;

    if (DynamicDataSourceHolder.get() == null) {
    isRoute = true;
    log.debug("---动态切换数据库===切换到读数据库===");
    DynamicDataSourceHolder.routeSlave();
    }
    Object proceed = null;
    try {
    Object[] args = joinPoint.getArgs();
    proceed = joinPoint.proceed(args);
    } finally {
    if (isRoute) {
    DynamicDataSourceHolder.clear();
    log.debug("清除已经设置的数据库类型,方便下次使用");
    }
    }
    return proceed;
    }

    @Override
    public int getOrder() {
    /**
    * 值越小,越优先执行
    * 要优于事务的执行
    * 在启动类中加上了@EnableTransactionManagement(order = 10)
    */
    return 1;
    }

    }
  • DataSourceConfiguration: 用于初始化数据源,兼容seata的代码就在这个类里面。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74

    /**
    * 多数据库源配置
    */
    @Configuration
    @AutoConfigureBefore(value = {DruidDataSourceAutoConfigure.class, DataSourceAutoConfiguration.class})
    @EnableTransactionManagement(order = 10)
    @EnableConfigurationProperties
    @ComponentScan("com.jte.cloud.platform.readwrite")
    @Order(1)
    @Slf4j
    public class DataSourceConfiguration {
    private static final String MASTER_DATA_SOURCE_PREFIX = "spring.datasource.master";
    private static final String SLAVE_DATA_SOURCE_PREFIX = "spring.datasource.slave";
    @Value("${spring.datasource.master.username:}")
    private String masterDbUsername;

    public DataSourceConfiguration() {
    System.out.println("------ inti DataSourceConfiguration ------");
    }

    @Bean("masterDataSource")
    @ConfigurationProperties(MASTER_DATA_SOURCE_PREFIX)
    public DataSource masterDataSource() {
    if(Objects.isNull(masterDbUsername)||masterDbUsername.equals("")){
    throw new IllegalArgumentException("请正确配置数据主从信息!");
    }
    log.info("------ 初始化 Druid 主数据源 ------");
    DruidDataSource ds= DruidDataSourceBuilder.create().build();
    return ds;
    }

    @Bean("seataDataSourceProxy")
    @ConditionalOnClass(name = "io.seata.rm.datasource.DataSourceProxy")
    public DataSource seataDataSourceProxy(@Qualifier(value = "masterDataSource")DataSource masterDataSource) {
    log.info("------ 初始化 seata proxy数据源 ------");
    return new DataSourceProxy(masterDataSource);
    }

    @Bean("slaveDataSource")
    @ConfigurationProperties(SLAVE_DATA_SOURCE_PREFIX)
    public DataSource slaveDataSource() {
    log.info("------ 初始化 Druid 从数据源 ------");
    DruidDataSource ds= DruidDataSourceBuilder.create().build();
    return ds;
    }

    @Bean
    @Primary
    @ConditionalOnClass(name = "io.seata.rm.datasource.DataSourceProxy")
    public DynamicDataSource dataSourceWithSeata(DataSource seataDataSourceProxy
    , DataSource slaveDataSource
    ) {
    log.info("------ 初始化 Dynamic(Seata) 数据源 ------");
    val targetDataSources = new HashMap<String, DataSource>();
    targetDataSources.put(DynamicDataSourceHolder.MASTER_DATA_SOURCE, seataDataSourceProxy);
    targetDataSources.put(DynamicDataSourceHolder.SLAVE_DATA_SOURCE, slaveDataSource);
    return new DynamicDataSource(seataDataSourceProxy, targetDataSources);
    }

    @Bean
    @Primary
    @ConditionalOnMissingClass({"io.seata.rm.datasource.DataSourceProxy"})
    public DynamicDataSource dataSourceWithoutSeata(DataSource masterDataSource
    , DataSource slaveDataSource
    ) {
    log.info("------ 初始化 Dynamic 数据源 ------");
    val targetDataSources = new HashMap<String, DataSource>();
    targetDataSources.put(DynamicDataSourceHolder.MASTER_DATA_SOURCE, masterDataSource);
    targetDataSources.put(DynamicDataSourceHolder.SLAVE_DATA_SOURCE, slaveDataSource);
    return new DynamicDataSource(masterDataSource, targetDataSources);
    }

    }
  • DynamicDataSource: 该类用于确定到底使用哪个数据源。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37

    /**
    * 动态数据源
    */
    @Slf4j
    public class DynamicDataSource extends AbstractRoutingDataSource {

    private DataSource defaultTargetDataSource;
    private Map<String, DataSource> targetDataSources;

    public DynamicDataSource(DataSource defaultTargetDataSource, Map<String, DataSource> targetDataSources) {
    this.defaultTargetDataSource = defaultTargetDataSource;
    this.targetDataSources = targetDataSources;
    }


    /**
    * 超过两个读库要实现一个负载均衡算法
    * @return
    */

    @Override
    protected Object determineCurrentLookupKey() {
    // 使用 DynamicDataSourceHolder 保证线程安全
    String dataSource = DynamicDataSourceHolder.get();
    log.debug("当前数据库: {}", dataSource);
    return dataSource;
    }

    @Override
    public void afterPropertiesSet() {
    super.setDefaultTargetDataSource(defaultTargetDataSource);
    super.setTargetDataSources(new HashMap<>(targetDataSources));
    super.afterPropertiesSet();
    }

    }
  • DynamicDataSourceHolder: 用于保存当前线程到底是走写库还是读库这个状态。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    /**
    * 动态数据源容器
    */
    public final class DynamicDataSourceHolder {
    public static final String MASTER_DATA_SOURCE = "Master";
    public static final String SLAVE_DATA_SOURCE = "Slave";

    private static final ThreadLocal<String> CONTAINER = new ThreadLocal<>();

    private static void set(String dataSource) {
    CONTAINER.set(dataSource);
    }

    public static void routeMaster() {
    set(MASTER_DATA_SOURCE);
    }

    public static void routeSlave() {
    set(SLAVE_DATA_SOURCE);
    }

    public static String get() {
    return CONTAINER.get();
    }

    public static void clear() {
    CONTAINER.remove();
    }
    }

代码实现原理

  1. 项目启动之后,会自动通过DataSourceConfiguration类初始化数据源。该类通过@ConditionalOnClass注解和@ConditionalOnMissingClass注解来判断是否当前存在seata环境。
  2. 如果有seata环境,则会包装一层seataDataSourceProxy,没有的话直接返回普通的Datasource即可。最终注入给spring boot使用的是DynamicDataSource。因为在生成bean的时候加了@Primary注解,这个类的优先级最高!
  3. 扫描所有加了@SlaveDb注解的方法,如果调用了该方法,将会进入DataSourceAopInService的切面。
  4. 进入切面后,会通过DynamicDataSourceHolder.routeSlaveDynamicDataSourceHolder中的标识位改成读库。(到目前为止还没有真正执行service的方法)。
  5. 调用service方法,准备执行select语句前,spring框架会使用DynamicDataSourcedetermineCurrentLookupKey方法来确定获取写库的connection还是读库的。
  6. service方法执行完毕之后,清除DynamicDataSourceHolder中的标识位。

原文地址:https://www.jdkdownload.com/seata_read_write.html