MyBatis缓存机制

MyBatis一级二级缓存学习

Posted by yyconstantine on October 31, 2019

MyBatis缓存机制


源码的简单解读

  • SqlSession:对外提供了用户和数据库之间交互需要的所有方法,隐藏了底层的细节。默认实现类是DefaultSqlSession
  • Executor:SqlSession向用户提供操作数据库的方法,但和数据库操作有关的职责都会委托给Executor
    • BaseExecutor:BaseExecutor是一个实现了Executor接口的抽象类,定义若干抽象方法,在执行的时候,把具体的操作委托给子类执行。
      • LocalCache:BaseExecutor的一个成员变量

一级缓存

  • 简介
    • 在应用运行过程中,我们有可能在一次数据库会话中,执行多次查询条件完全相同的SQL,MyBatis提供了一级缓存的方案优化这部分场景,如果是相同的SQL语句,会优先命中一级缓存,避免直接对数据库进行查询,提高性能。
    • 每个SqlSession中持有了Executor,每个Executor中有一个LocalCache。当用户发起查询时,MyBatis根据当前执行的语句生成MappedStatement,在Local Cache进行查询,如果缓存命中的话,直接返回结果给用户,如果缓存没有命中的话,查询数据库,结果写入Local Cache,最后返回结果给用户。
  • 配置

    • 一级缓存有两个选项,SESSIONSTATEMENT,默认是SESSION级别,即在一个MyBatis会话中执行的所有语句,都会共享这一个缓存;一种是STATEMENT级别,可以理解为缓存只对当前执行的这一个Statement有效。

    • <setting name="localCacheScope" value="SESSION">
      
  • 实验

    • 在同一个SqlSession内只进行查询,发现只有第一次真正查询了数据库,后续的查询都命中了缓存。
    • 在同一个SqlSession内的查询中增加数据库修改操作,发现一次缓存失效,插入前后都执行了查询操作。
    • 在SqlSession1中查询数据,在SqlSession2中更新数据,但SqlSession1中的查询结果仍为之前的结果,产生了脏数据。
  • 实现

    • 为执行和数据库的交互,首先需要初始化SqlSession,通过DefaultSqlSessionFactory开启SqlSession

    • private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
          Transaction tx = null;
          try {
              final Environment environment = configuration.getEnvironment();
              final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
              tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
              // 重点在这里
              final Executor executor = configuration.newExecutor(tx, execType);
              return new DefaultSqlSession(configuration, executor, autoCommit);
          } catch (Exception e) {
              closeTransaction(tx); // may have fetched a connection so lets call close()
              throw ExceptionFactory.wrapException("Error opening session.  Cause: " + e, e);
          } finally {
              ErrorContext.instance().reset();
          }
      }
      
    • 初始化SqlSession时,会用Configuration类创建 一个全新的Executor,作为DefaultSqlSession构造函数的参数,创建Executor的代码如下:

    • public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
          executorType = executorType == null ? defaultExecutorType : executorType;
          executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
          Executor executor;
          if (ExecutorType.BATCH == executorType) {
              executor = new BatchExecutor(this, transaction);
          } else if (ExecutorType.REUSE == executorType) {
              executor = new ReuseExecutor(this, transaction);
          } else {
              executor = new SimpleExecutor(this, transaction);
          }
          if (cacheEnabled) {
              executor = new CachingExecutor(executor);
          }
          executor = (Executor) interceptorChain.pluginAll(executor);
          return executor;
      }
      
    • SqlSession创建完毕后,根据Statement的不同类型,会进入SqlSession的不同方法中,如果是Select语句,最后会执行到SqlSessionselectList,代码如下:

    • @Override
      public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
          try {
              MappedStatement ms = configuration.getMappedStatement(statement);
              return executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);
          } catch (Exception e) {
              throw ExceptionFactory.wrapException("Error querying database.  Cause: " + e, e);
          } finally {
              ErrorContext.instance().reset();
          }
      }
      
    • SqlSession把具体的查询职责委托给了Executor,如果只开启了一级缓存,首先会进入BaseExecutorquery方法,代码如下:

    • @Override
      public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
          BoundSql boundSql = ms.getBoundSql(parameter);
          CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);
          return query(ms, parameter, rowBounds, resultHandler, key, boundSql);
      }
      
    • createCacheKey中,根据传入的参数生成CacheKey并返回:

    • CacheKey cacheKey = new CacheKey();
      cacheKey.update(ms.getId());
      cacheKey.update(rowBounds.getOffset());
      cacheKey.update(rowBounds.getLimit());
      cacheKey.update(boundSql.getSql());
      //后面是update了sql中带的参数
      cacheKey.update(value);
      
    • 在上述代码中,将MappedStatement的id、SQL的limit、SQL本身以及SQL中的参数传入了CacheKey这个类,最终构成CacheKeyCacheKey这个类的构成为:

    • private static final int DEFAULT_MULTIPLYER = 37;
      private static final int DEFAULT_HASHCODE = 17;
          
      private int multiplier;
      private int hashcode;
      private long checksum;
      private int count;
      private List<Object> updateList;
          
      public CacheKey() {
          this.hashcode = DEFAULT_HASHCODE;
          this.multiplier = DEFAULT_MULTIPLYER;
          this.count = 0;
          this.updateList = new ArrayList<Object>();
      }
      
    • 首先是成员变量和构造函数,有一个初始的hashcode和乘数,同时维护了一个内部的updateList。在CacheKeyupdate方法中,会进行一个hashcodechecksum的计算,同时把传入的参数添加进updateList中:

    • public void update(Object object) {
          int baseHashCode = object == null ? 1 : ArrayUtil.hashCode(object);
          
          count++;
          checksum += baseHashCode;
          baseHashCode *= count;
          
          hashcode = multiplier * hashcode + baseHashCode;
          
          updateList.add(object);
      }
      
    • 同时CacheKey重写了equals方法,只比较updateList中的元素:

    • @Override
        public boolean equals(Object object) {
          if (this == object) {
              return true;
          }
          if (!(object instanceof CacheKey)) {
              return false;
          }
          
          final CacheKey cacheKey = (CacheKey) object;
          
          if (hashcode != cacheKey.hashcode) {
              return false;
          }
          if (checksum != cacheKey.checksum) {
              return false;
          }
          if (count != cacheKey.count) {
              return false;
          }
          
          for (int i = 0; i < updateList.size(); i++) {
              Object thisObject = updateList.get(i);
              Object thatObject = cacheKey.updateList.get(i);
              if (!ArrayUtil.equals(thisObject, thatObject)) {
                return false;
              }
          }
          return true;
      }
      
    • 即只要两条SQL中的Statement Id/Offset/Limit/Sql/Params相同,我们即认为它们是相同的SQL,即命中一级缓存

    • 然后BaseExecutorquery方法继续往下走:

    • list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
      if (list != null) {
          // 这里主要用于处理存储过程,不做细究
          handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
      } else {
          list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
      }
      
    • 我们可以看出,如果resultHandler为空,则从缓存中获取数据,获取为空则从数据库查询,并将数据写入localCache。在query方法执行的最后,会判断一级缓存级别是否是STATEMENT,如果是则清空缓存,所以STATEMENT级别的一级缓存无法共享localCache

    • if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
          clearLocalCache();
      }
      
    • 如果是insert/delete/update方法:

      • 首先,insertdelete都会统一走update的流程,update方法则把真正的执行操作委托给了BaseExecutor

      • @Override
        public int update(String statement, Object parameter) {
            try {
                dirty = true;
                MappedStatement ms = configuration.getMappedStatement(statement);
                return executor.update(ms, wrapCollection(parameter));
            } catch (Exception e) {
                throw ExceptionFactory.wrapException("Error updating database.  Cause: " + e, e);
            } finally {
                ErrorContext.instance().reset();
            }
        }
        
      • 可以看到,在BaseExecutor中真正执行update操作前,会先清空localCache,所以在执行insert/update/delete操作后,一级缓存失效

      • @Override
        public int update(MappedStatement ms, Object parameter) throws SQLException {
            ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
            if (closed) {
                throw new ExecutorException("Executor was closed.");
            }
            clearLocalCache();
            return doUpdate(ms, parameter);
        }
        
  • 总结:

    • MyBatis一级缓存的生命周期和SqlSession一致。
    • MyBatis一级缓存内部设计简单,只是一个没有容量限定的HashMap,在缓存的功能性上有所欠缺。
    • MyBatis的一级缓存最大范围是SqlSession内部,有多个SqlSession或者分布式的环境下,数据库写操作会引起脏数据,建议设定缓存级别为Statement。

二级缓存

  • 简介

    • 在上文中提到的一级缓存中,其最大的共享范围就是一个SqlSession内部,如果多个SqlSession之间需要共享缓存,则需要使用到二级缓存。开启二级缓存后,会使用CachingExecutor装饰Executor,进入一级缓存的查询流程前,先在CachingExecutor进行二级缓存的查询 。
    • 二级缓存开启后,同一个namespace下的所有操作语句,都影响着同一个Cache,即二级缓存被多个SqlSession共享,是一个全局的变量。
    • 当开启缓存后,数据的查询执行的流程就是 二级缓存 -> 一级缓存 -> 数据库。
  • 配置

    • 在MyBatis的配置文件中开启二级缓存:

    • <setting name="cacheEnabled" value="true"/>
      
    • 在MyBatis的映射XML中配置cache或者cache-ref

      • <cache/>标签用于声明这个namespace使用二级缓存,并且可以自定义配置

      • <cache/>
        
        • type:cache使用的类型,默认是PerpetualCache,这在一级缓存中提到过
        • eviction:定义回收的策略,常见的有FIFO,LRU
        • flushInterval:配置一定时间自动刷新缓存,单位是毫秒
        • size:最多缓存对象的个数
        • readOnly:是否只读,若配置可读写,则需要对应的实体类能够序列化
        • blocking:若缓存中找不到对应的key,是否会一直blocking,直到有对应的数据进入缓存
      • <cache-ref/>代表引用别的命名空间的Cache配置,两个命名空间的操作使用的是同一个Cache

      • <cache-ref namespace="mapper.StudentMapper"/>
        
  • 实验

    • SqlSession没有提交事务(即调用commit()方法,下同),二级缓存并没有起到作用
    • SqlSession提交了事务,二级缓存被使用
    • SqlSession更新数据库并提交事务后,同namespace下的另一个SqlSession的查询走了数据库,没有走Cache
    • SqlSession1查询数据后,二级缓存生效,保存在对应namespace下。当SqlSession2修改了SqlSession1关联查询的一张表并提交事务后,SqlSession3(与SqlSession1同namespace)再次发起查询,从缓存中读到了修改前的数据,即产生了脏数据
    • 使用<cache ref/>标签,让关联查询的命名空间发生引用关系,这样两个映射文件对应的sql操作都使用的是同一块缓存,就不会产生脏数据了;但这样会将缓存的粒度变粗,多个namespace的操作都会对这块缓存产生影响
  • 实现

    • MyBatis二级缓存的工作流程和一级缓存类似,只是在一级缓存处理前,用CachingExecutor装饰了BaseExecutor的子类,在委托具体职责给delegate之前,实现了二级缓存的查询和写入功能

    • CachingExecutorquery方法,首先会从MappedStatement中获得在配置初始化时赋予的Cache

    • Cache cache = ms.getCache();
      
    • 本质上是装饰器模式的使用,具体的装饰链是:SynchronizedCache -> LoggingCache -> SerializedCache -> LruCache -> PerpetualCache

      • SynchronizedCache:同步Cache,实现比较简单,直接使用synchronized修饰方法
      • LoggingCache:日志功能,装饰类,用于记录缓存的命中率,如果开启了DEBUG模式,则会输出命中率日志
      • SerializedCache:序列化功能,将值序列化后存到缓存中。该功能用于缓存返回一份实例的Copy,用于保存线程安全
      • LruCache:采用了Lru算法的Cache实现,移除最近最少使用的Key/Value
      • PerpetualCache: 作为为最基础的缓存类,底层实现比较简单,直接使用了HashMap
    • 然后判断是否需要刷新缓存

    • flushCacheIfRequired(ms);
      
    • 在默认的设置中SELECT不会刷新缓存,INSERT/UPDATE/DELETE会刷新缓存:

    • private void flushCacheIfRequired(MappedStatement ms) {
          Cache cache = ms.getCache();
          if (cache != null && ms.isFlushCacheRequired()) {
              tcm.clear(cache);
          }
      }
      
    • 其中,tcm.clear(cache)真正执行了清空缓存的操作,tcm是CachingExecutor中的TransactionalCacheManager,其持有了一个Map集合,保存了CacheTransactionalCache的映射关系

    • private final Map<Cache, TransactionalCache> transactionalCaches = new HashMap<>();
      
    • TransactionalCache实现了Cache接口,CachingExecutor会默认使用他包装初始生成的Cache,作用是如果事务提交,对缓存的操作才会生效,如果事务回滚或不提交事务,则不对缓存产生影响

    • TransactionalCacheclear(),有两步操作。清空了需要在提交时加入缓存的列表,同时设定提交时清空缓存:

    • public void clear() {
          clearOnCommit = true;
          entriesToAddOnCommit.clear();
      }
      
    • CachingExecutor继续向下,ensureNoOutParams主要用于处理存储过程

    • if (ms.isUseCache() && resultHandler == null)
          ensureNoOutParams(ms, boundSql);
      
    • 之后会尝试从TransactionalCacheManager中获取缓存的列表:

    • List<E> list = (List<E>) tcm.getObject(cache, key);
      
    • getObject方法中,会把获取值的职责一路传递,最终到PerpetualCache。如果没有查到,会将key加入MISS集合,用于统计缓存命中率

    • Object object = delegate.getObject(key);
      if (object == null) {
          entriesMissedInCache.add(key);
      }
      
    • CachingExecutor继续向下,将查询到的结果put进缓存中:

    • list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
      tcm.putObject(cache, key, list);
      
    • 查看putObject方法,我们可知,这次put只是将查询得到的结果集放在提交时加入缓存的列表,没有进行真正的提交:

    • public void putObject(Object key, Object object) {
          entriesToAddOnCommit.put(key, object);
      }
      
    • 我们返回看commit方法的提交:

    • public void commit(boolean force) {
          try {
              executor.commit(isCommitOrRollbackRequired(force));
              dirty = false;
          } catch (Exception e) {
              throw ExceptionFactory.wrapException("Error committing transaction.  Cause: " + e, e);
          } finally {
              ErrorContext.instance().reset();
          }
      }
      
    • 因为二级缓存走的是CachingExecutor,代码如下:

    • public void commit(boolean required) throws SQLException {
          delegate.commit(required);
          tcm.commit();
      }
      
    • 可以看到,最终还是通过TransactionalCacheManager找到了TransactionalCachecommit方法:

    • public void commit() {
          if (clearOnCommit) {
              delegate.clear();
          }
          flushPendingEntries();
          reset();
      }
      
    • 这也跟之前提到的clear()方法类似,如果提交时清空缓存标志位clearOnCommit为true,则清空要提交的缓存,然后在flushPendingEntries()中进行结果集的包装:

    • private void flushPendingEntries() {
          for (Map.Entry<Object, Object> entry : entriesToAddOnCommit.entrySet()) {
              delegate.putObject(entry.getKey(), entry.getValue());
          }
          for (Object entry : entriesMissedInCache) {
              if (!entriesToAddOnCommit.containsKey(entry)) {
                  delegate.putObject(entry, null);
              }
          }
      }
      
    • 如果是INSERT/UPDATE/DELETE操作,会统一进入CachingExecutorupdate方法,会调用如下方法清空缓存,随后执行一级缓存的流程:

    • =private void flushCacheIfRequired(MappedStatement ms) {
          Cache cache = ms.getCache();
          if (cache != null &amp;&amp; ms.isFlushCacheRequired()) {
              tcm.clear(cache);
          }
      }
      
  • 总结

    • MyBatis的二级缓存相对于一级缓存来说,实现了SqlSession之间缓存数据的共享,同时粒度更加的细,能够到namespace级别,通过Cache接口实现类不同的组合,对Cache的可控性也更强
    • MyBatis在多表查询时,极大可能会出现脏数据,有设计上的缺陷,安全使用二级缓存的条件比较苛刻
    • 在分布式环境下,由于默认的MyBatis Cache实现都是基于本地的,分布式环境下必然会出现读取到脏数据,需要使用集中式缓存将MyBatis的Cache接口实现,有一定的开发成本,直接使用Redis、Memcached等分布式缓存可能成本更低,安全性也更高

文章参考 https://tech.meituan.com/2018/01/19/mybatis-cache.html