Spring Boot(六)之基于Redis实现MyBatis查询缓存解决方案

1. 前言

MyBatis是Java中常用的数据层ORM框架,笔者目前在实际的开发中,也在使用MyBatis。本文主要介绍了MyBatis的缓存策略、以及基于SpringBoot和Redis实现MyBatis的二级缓存的过程。实现本文的demo,主要依赖以下软件版本信息,但是由于数据层面的实现,并不依赖具体的版本,你可以以自己主机当前的环境创建。

软件环境 版本
SpringBoot 1.5.18
Redis 通用
MyBatis 3.4.+

2. MyBatis缓存策略

2.1 一级缓存

MyBatis默认实现了一级缓存,实现过程可参考下图:
image

默认基础接口有两个:

  • org.apache.ibatis.session.SqlSession: 提供了用户和数据库交互需要的所有方法,默认实现类是DefaultSqlSession。
  • org.apache.ibatis.executor.Executor: 和数据库的实际操作接口,基础抽象类BaseExecutor。

我们从底层往上查看源代码,首先打开BaseExecutor的源代码,可以看到Executor实现一级缓存的成员变量是PerpetualCache对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @author Clinton Begin
*/
public abstract class BaseExecutor implements Executor {
private static final Log log = LogFactory.getLog(BaseExecutor.class);
protected Transaction transaction;
protected Executor wrapper;
protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads;
// 实现一级缓存的成员变量
protected PerpetualCache localCache;
protected PerpetualCache localOutputParameterCache;
protected Configuration configuration;
...
}

我们再打开PerpetualCache类的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* @author Clinton Begin
*/
public class PerpetualCache implements Cache {
private final String id;
private Map<Object, Object> cache = new HashMap<Object, Object>();
public PerpetualCache(String id) {
this.id = id;
}
...
}

可以看到PerpetualCache是对Cache的基本实现,而且通过内部持有一个简单的HashMap实现缓存。

了解了一级缓存的实现后,我们再回到入口处,为了你的sql语句和数据库交互,MyBatis首先需要实现SqlSession,通过DefaultSqlSessionFactory实现SqlSession的初始化的过程可查看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
Transaction tx = null;
try {
final Environment environment = configuration.getEnvironment();
final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
// Executor初始化
final Executor executor = configuration.newExecutor(tx, execType);
return new DefaultSqlSession(configuration, executor, autoCommit);
} catch (Exception e) {
closeTransaction(tx); // may have fetched a connection so lets call close()
throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}

从代码中可以看到,通过configuration创建一个Executor,实际创建Executor的过程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
executorType = executorType == null ? defaultExecutorType : executorType;
executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
Executor executor;
if (ExecutorType.BATCH == executorType) {
executor = new BatchExecutor(this, transaction);
} else if (ExecutorType.REUSE == executorType) {
executor = new ReuseExecutor(this, transaction);
} else {
executor = new SimpleExecutor(this, transaction);
}
// 是否开启二级缓存
// 如果开启,使用CahingExecutor装饰BaseExecutor的子类
if (cacheEnabled) {
executor = new CachingExecutor(executor);
}
executor = (Executor) interceptorChain.pluginAll(executor);
return executor;
}

注意,cacheEnabled字段是二级缓存是否开启的标志位,如果开启,会使用使用CahingExecutor装饰BaseExecutor的子类。

创建完SqlSession,根据Statment的不同,会使用不同的SqlSession查询方法:

1
2
3
4
5
6
7
8
9
10
11
@Override
public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
try {
MappedStatement ms = configuration.getMappedStatement(statement);
return executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error querying database. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}

SqlSession把具体的查询职责委托给了Executor,如果只开启了一级缓存的话,首先会进入BaseExecutor的query方法。代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@SuppressWarnings("unchecked")
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
List<E> list;
try {
queryStack++;
// 使用缓存
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) {
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
if (queryStack == 0) {
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
// issue #601
deferredLoads.clear();
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
// issue #482
// 清空缓存
clearLocalCache();
}
}
return list;
}

query方法实现了缓存的查询过程,在query方法执行的最后,会判断一级缓存级别是否是STATEMENT级别,如果是的话,就清空缓存,这也就是STATEMENT级别的一级缓存无法共享localCache的原因。

SqlSession的insert方法和delete方法,都会统一走update的流程,在BaseExecutor实现的update方法中:

1
2
3
4
5
6
7
8
9
10
@Override
public int update(MappedStatement ms, Object parameter) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
// 清空缓存
clearLocalCache();
return doUpdate(ms, parameter);
}

可以看到,每次执行update方法都会执行clearLocalCache清空缓存。至此,我们分析完了MyBatis的一级缓存从入口到实现的过程。

关于MyBatis一级缓存的总结:

  • 一级缓存的生命周期和SqlSession保持一致;
  • 一级缓存的缓存通过HashMap实现;
  • 一级缓存的作用域是对应的SqlSession,假如存在多个SqlSession,写操作可能会引起脏数据。

2.2 二级缓存

在上一小节中,我们知道一级缓存的的作用域就是对应的SqlSession。若开启了二级缓存,会使用CachingExecutor装饰Executor,进入一级缓存的查询流程前,先在CachingExecutor进行二级缓存的查询,二级缓存的查询流程如图所示:
image
二级缓存开启后,同一个namespace下的所有数据库操作语句,都使用同一个Cache,即二级缓存结果会被被多个SqlSession共享,是一个全局的变量。当开启二级缓存后,数据查询的执行流程就是二级缓存 -> 一级缓存 -> 数据库。

二级缓的实现源码,可以查看CachingExecutor类的query方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
throws SQLException {
// 从MappedStatement中获得在配置初始化时赋予的Cache
Cache cache = ms.getCache();
if (cache != null) {
// 判断是否需要刷新缓存
flushCacheIfRequired(ms);
if (ms.isUseCache() && resultHandler == null) {
// 主要是用来处理存储过程的
ensureNoOutParams(ms, boundSql);
@SuppressWarnings("unchecked")
// 尝试从tcm中获取缓存的列表,会把获取值的职责一路传递
List<E> list = (List<E>) tcm.getObject(cache, key);
if (list == null) {
list = delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
tcm.putObject(cache, key, list); // issue #578 and #116
}
return list;
}
}
return delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}

在二级缓存查询结束后,就会进入一级缓存的执行流程,可参考上一小节内容。

关于二级缓存的总结:

  • 二级缓存是SqlSession之间共享,能够做到mapper级别,并通过Cache实现缓存。
  • 由于MyBatis的缓存都是内存级的,在分布式环境下,有可能会产生脏数据,因此可以考虑使用第三方存储组件,如Redis实现二级缓存的存储,这样的安全性和性能也会更高。

3. SpringBoot和Redis实现MyBatis二级缓存

MyBatis的默认实现一级缓存的,二级缓存也是默认保存在内存中,因此当分布式部署你的应用时,有可能会产生脏数据。通用的解决方案是找第三方存储缓存结果,比如Ehcache、Redis、Memcached等。接下来,我们介绍下,使用Redis作为缓存组件,实现MyBatis二级缓存。

在实现二级缓存之前,我们假设你已经实现了SpringBoot+MyBatis的构建过程,如果还没有,建议你先创建一个demo实现简单的CRUD过程,然后再查看本文解决二级缓存的问题。

3.1 增加Redis配置

首先在你的工程加入Redis依赖:

1
compile('org.springframework.boot:spring-boot-starter-data-redis')

我使用的gradle,使用maven的同学可对应查询即可!

其次在配置文件中加入Redis的链接配置:

1
spring.redis.cluster.nodes=XXX:port,YYY:port

这里我们使用的是Redis集群配置。

打开mybatis.xml配置文件,开启二级缓存:

1
<setting name="cacheEnabled" value="true"/>

增加Redis的配置类,开启json的序列化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* Created by zhaoyh on 2019-01-23
*
* @author zhaoyh
*/
@Configuration
public class RedisConfig {
/**
* 重写Redis序列化方式,使用Json方式:
* 当我们的数据存储到Redis的时候,我们的键(key)和值(value)都是通过Spring提供的Serializer序列化到数据库的。RedisTemplate默认使用的是JdkSerializationRedisSerializer,StringRedisTemplate默认使用的是StringRedisSerializer。
* Spring Data JPA为我们提供了下面的Serializer:
* GenericToStringSerializer、Jackson2JsonRedisSerializer、JacksonJsonRedisSerializer、JdkSerializationRedisSerializer、OxmSerializer、StringRedisSerializer。
* 在此我们将自己配置RedisTemplate并定义Serializer。
* @param redisConnectionFactory
* @return
*/
@Bean(name = "redisTemplate")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 设置值(value)的序列化采用Jackson2JsonRedisSerializer。
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
// 设置键(key)的序列化采用StringRedisSerializer。
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}

3.2 实现MyBatis的Cache接口

org.apache.ibatis.cache.Cache接口是MyBatis通用的缓存实现接口,包括一级缓存和二级缓存都是基于Cache接口实现缓存机制。

创建MybatisRedisCache类,实现Cache接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import org.apache.ibatis.cache.Cache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.CollectionUtils;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Created by zhaoyh on 2019-01-22
* MyBatis二级缓存配置
* @author zhaoyh
*/
public class MybatisRedisCache implements Cache {
private static final Logger LOG = LoggerFactory.getLogger(MybatisRedisCache.class);
/**
* 默认redis有效期
* 单位分钟
*/
private static final int DEFAULT_REDIS_EXPIRE = 10;
/**
* 注入redis
*/
private static RedisTemplate<String, Object> redisTemplate = null;
/**
* 读写锁
*/
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
/**
* cache id
*/
private String id = null;
/**
* 构造函数
* @param id
*/
public MybatisRedisCache(final String id) {
if (null == id) {
throw new IllegalArgumentException("MybatisRedisCache Instance Require An Id...");
}
LOG.info("MybatisRedisCache: " + id);
this.id = id;
}
/**
* @return The identifier of this cache
*/
@Override
public String getId() {
return this.id;
}
/**
* @param key Can be any object but usually it is a {@link}
* @param value The result of a select.
*/
@Override
public void putObject(Object key, Object value) {
if (null != value) {
LOG.info("putObject key: " + key.toString());
// 向Redis中添加数据,默认有效时间是2小时
redisTemplate.opsForValue().set(key.toString(), value, DEFAULT_REDIS_EXPIRE, TimeUnit.MINUTES);
}
}
/**
* @param key The key
* @return The object stored in the cache.
*/
@Override
public Object getObject(Object key) {
try {
if (null != key) {
LOG.info("getObject key: " + key.toString());
return redisTemplate.opsForValue().get(key.toString());
}
} catch (Exception e) {
LOG.error("getFromRedis: " + key.toString() + " failed!");
}
LOG.info("getObject null...");
return null;
}
/**
* As of 3.3.0 this method is only called during a rollback
* for any previous value that was missing in the cache.
* This lets any blocking cache to release the lock that
* may have previously put on the key.
* A blocking cache puts a lock when a value is null
* and releases it when the value is back again.
* This way other threads will wait for the value to be
* available instead of hitting the database.
*
* 删除缓存中的对象
*
* @param keyObject The key
* @return Not used
*/
@Override
public Object removeObject(Object keyObject) {
if (null != keyObject) {
redisTemplate.delete(keyObject.toString());
}
return null;
}
/**
* Clears this cache instance
* 有delete、update、insert操作时执行此函数
*/
@Override
public void clear() {
LOG.info("clear...");
try {
Set<String> keys = redisTemplate.keys("*:" + this.id + "*");
LOG.info("keys size: " + keys.size());
for (String key : keys) {
LOG.info("key : " + key);
}
if (!CollectionUtils.isEmpty(keys)) {
redisTemplate.delete(keys);
}
} catch (Exception e) {
LOG.error("clear failed!", e);
}
}
/**
* Optional. This method is not called by the core.
*
* @return The number of elements stored in the cache (not its capacity).
*/
@Override
public int getSize() {
Long size = (Long) redisTemplate.execute(new RedisCallback<Long>() {
@Override
public Long doInRedis(RedisConnection connection) throws DataAccessException {
return connection.dbSize();
}
});
LOG.info("getSize: " + size.intValue());
return size.intValue();
}
/**
* Optional. As of 3.2.6 this method is no longer called by the core.
* <p>
* Any locking needed by the cache must be provided internally by the cache provider.
*
* @return A ReadWriteLock
*/
@Override
public ReadWriteLock getReadWriteLock() {
return this.readWriteLock;
}
public static void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
MybatisRedisCache.redisTemplate = redisTemplate;
}

由于redisTemplate是类变量,需要手动注入,再创建一个配置类注入redisTemplate即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Created by zhaoyh on 2019-01-22
* @author zhaoyh
*/
@Component
public class MyBatisHelper {
/**
* 注入redis
* @param redisTemplate
*/
@Autowired
@Qualifier("redisTemplate")
public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
MybatisRedisCache.setRedisTemplate(redisTemplate);
}
}

3.3 mapper文件中加入二级缓存的声明

在任意需要开启二级缓存的mapper配置文件中,加入:

1
2
3
4
5
6
7
8
9
10
11
<!-- mapper开启二级缓存 -->
<cache type="XX.XX.MybatisRedisCache">
<!-- 定义回收的策略 -->
<property name="eviction" value="LRU"/>
<!-- 配置一定时间自动刷新缓存,单位是毫秒 -->
<property name="flushInterval" value="600000"/>
<!-- 最多缓存对象的个数 -->
<property name="size" value="1024"/>
<!-- 是否只读,若配置可读写,则需要对应的实体类能够序列化 -->
<property name="readOnly" value="false"/>
</cache>

至此,就完成了基于Redis的MyBatis二级缓存的配置。

4. FAQ

  • 二级缓存相比较于一级缓存来说,粒度更细,但是也会更不可控,安全使用二级缓存的条件很难。
  • 二级缓存非常适合查询热度高且更新频率低的数据,请谨慎使用。
  • 建议在生产环境下关闭二级缓存,使得MyBatis单纯作为ORM框架即可,缓存使用其他更安全的策略。

以上内容就是关于SpringBoot基于Redis实现MyBatis查询缓存解决方案的全部内容了,谢谢你阅读到了这里!

Author:zhaoyh