14、基于Redis实现缓存与消息队列
Redis是基于内存运行并支持持久化、高性能的NoSQL(非关系型)数据库,适用于存储频繁访问,数据量较小的数据,应用在配合关系型数据库做高速缓存与多样的数据结构存储持久化数据;
一、搭建基于spring boot的Redis工程:
1、POM:
org.springframework.boot spring-boot-starter-data-redis
2、YML:
spring: datasource: type: com.alibaba.druid.pool.DruidDataSource #当前数据源操作类型 driver-class-name: org.gjt.mm.mysql.Driver #mysql驱动包 url: jdbc:mysql://localhost:3306/demo?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: 123456 druid: test-while-idle: false #关闭空闲检测 redis: host: 127.0.0.1 # Redis服务器地址 port: 6379 # Redis服务器连接端口 password: 123456 # Redis服务器连接密码 jedis: pool: max-active: 8 # 连接池最大连接数(使用负值表示没有限制) max-wait: -1ms # 连接池最大阻塞等待时间(使用负值表示没有限制) max-idle: 500 # 连接池中的最大空闲连接 min-idle: 0 # 连接池中的最小空闲连接 lettuce: shutdown-timeout: 0msmybatis: mapperLocations: classpath:mapper/*.xml #resources下建mapper包
3、配置类:
import com.fasterxml.jackson.annotation.JsonAutoDetect;import com.fasterxml.jackson.annotation.PropertyAccessor;import com.fasterxml.jackson.databind.ObjectMapper;import org.springframework.cache.CacheManager;import org.springframework.cache.annotation.CachingConfigurerSupport;import org.springframework.cache.annotation.EnableCaching;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.cache.RedisCacheConfiguration;import org.springframework.data.redis.cache.RedisCacheManager;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;import org.springframework.data.redis.serializer.RedisSerializationContext;import org.springframework.data.redis.serializer.RedisSerializer;import org.springframework.data.redis.serializer.StringRedisSerializer;import java.time.Duration;@EnableCaching@Configurationpublic class RedisConfig extends CachingConfigurerSupport { /** * RedisTemplate内部的序列化配置器默认采用JDK序列化器 * 使用默认序列化配置器查看数据时会导致数据乱码,需重新定义RedisTemplate序列化方案 * 修改存储对象的序列化问题 */ @Bean @SuppressWarnings("all") public RedisTemplateredisTemplate(RedisConnectionFactory factory) { RedisTemplate template = new RedisTemplate (); template.setConnectionFactory(factory); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); // key采用String的序列化方式 template.setKeySerializer(stringRedisSerializer); // hash的key也采用String的序列化方式 template.setHashKeySerializer(stringRedisSerializer); // value序列化方式采用jackson template.setValueSerializer(jackson2JsonRedisSerializer); // hash的value序列化方式采用jackson template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } /** * CacheManager: * 管理多种缓存,包含内存, appfabric, redis, couchbase, windows azure cache, memorycache等 * 提供了额外的功能,如缓存同步、并发更新、事件、性能计数器等 */ @Bean public CacheManager cacheManager(RedisConnectionFactory factory) { RedisSerializer redisSerializer = new StringRedisSerializer(); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); //解决查询缓存转换异常的问题 ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); // 配置序列化(解决乱码的问题),过期时间600秒 RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig() .entryTtl(Duration.ofSeconds(600)) .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer)) .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer)) .disableCachingNullValues(); RedisCacheManager cacheManager = RedisCacheManager.builder(factory) .cacheDefaults(config) .build(); return cacheManager; }}
4、工具类:
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Component;import java.util.Collection;import java.util.List;import java.util.Map;import java.util.Set;import java.util.concurrent.TimeUnit;@Componentpublic final class RedisUtil { @Autowired private RedisTemplateView CoderedisTemplate; // =============================common============================ /** * 指定缓存失效时间 * @param key 键 * @param time 时间(秒) */ public boolean expire(String key, long time) { try { if (time > 0) { redisTemplate.expire(key, time, TimeUnit.SECONDS); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 根据key 获取过期时间 * @param key 键 不能为null * @return 时间(秒) 返回0代表为永久有效 */ public long getExpire(String key) { return redisTemplate.getExpire(key, TimeUnit.SECONDS); } /** * 判断key是否存在 * @param key 键 * @return true 存在 false不存在 */ public boolean hasKey(String key) { try { return redisTemplate.hasKey(key); } catch (Exception e) { e.printStackTrace(); return false; } } /** * 删除缓存 * @param key 传入一个值 */ @SuppressWarnings("unchecked") public void del(String key) { redisTemplate.delete(key); } /** * 删除缓存 * @param keys 传入多个值 */ @SuppressWarnings("unchecked") public void del(Collection keys) { redisTemplate.delete(keys); } // ============================String============================= /** * 普通缓存获取 * @param key 键 * @return 值 */ public Object get(String key) { return key == null ? null : redisTemplate.opsForValue().get(key); } /** * 普通缓存放入 * @param key 键 * @param value 值 * @return true成功 false失败 */ public boolean set(String key, Object value) { try { redisTemplate.opsForValue().set(key, value); return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 普通缓存放入并设置时间 * @param key 键 * @param value 值 * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期 * @return true成功 false 失败 */ public boolean set(String key, Object value, long time) { try { if (time > 0) { redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS); } else { set(key, value); } return true; } catch (Exception e) { e.printStackTrace(); return false; } } /** * 递增 * @param key 键 * @param delta 要增加几(大于0) */ public long incr(String key, long delta) { if (delta < 0) { throw new RuntimeException("递增因子必须大于0"); } return redisTemplate.opsForValue().increment(key, delta); } /** * 递减 * @param key 键 * @param delta 要减少几(小于0) */ public long decr(String key, long delta) { if (delta < 0) { throw new RuntimeException("递减因子必须大于0"); } return redisTemplate.opsForValue().increment(key, -delta); } // ================================Map================================= /** * HashGet * @param key 键 不能为null * @param item 项 不能为null */ public Object hget(String key, String item) { return redisTemplate.opsForHash().get(key, item); } /** * 获取hashKey对应的所有键值 * @param key 键 * @return 对应的多个键值 */ public Map
二、基于Redis-String类型做简单缓存:
@Autowired private RedisTemplateredisTemplate; /** * redis简单缓存 * 基于String数据类型,做简单的key-value缓存 * */ @Override public Object redisComDemo() { if(redisTemplate.hasKey("key1")){ log.info("查询redis"); return redisTemplate.opsForValue().get("key1"); } log.info("查询本地"); redisTemplate.opsForValue().set("key1", "redis基于String数据类型,做简单的key-value缓存", 10, TimeUnit.SECONDS); return "redis基于String数据类型,做简单的key-value缓存"; }
三、基于Redis-Mybatis做二级缓存(Mapper级别缓存):
1、YML配置:
手动开启Mybatis二级缓存声明:
(资料图片)
Mybatis一级缓存是SqlSession级别的缓存,Mybatis默认开启一级缓存。
Mybatis二级缓存是mapper级别的缓存,Mybatis默认是关闭二级缓存的。
mybatis: #开启mybatis的二级缓存 configuration: cache-enabled: true
2、配置类:
(1)、手动获取Spring容器的bean工具类:
import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.context.annotation.Configuration;/** * 手动地去调用Spring容器的getBean方法来获取bean * */@Configurationpublic class ApplicationContextUtil implements ApplicationContextAware { //获取IoC容器 private static ApplicationContext applicationContext; //实现了ApplicationContextAware接口后,重写setApplicationContext方法进行设置 @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } //通过在工厂中获取对象的方法 public staticT getBean(String beanName,Class requiredType){ return applicationContext.getBean(beanName,requiredType); }}
(2)、声明二级缓存配置类:
import lombok.extern.slf4j.Slf4j;import org.apache.ibatis.cache.Cache;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.dao.DataAccessException;import org.springframework.data.redis.connection.RedisConnection;import org.springframework.data.redis.core.RedisCallback;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.util.DigestUtils;import java.nio.charset.StandardCharsets;import java.util.Set;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;@Slf4jpublic class MybatisRedisCache implements Cache { private String id; @Autowired private RedisTemplateredisTemplate; private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private static final long TIME_IN_DAY = 1; public MybatisRedisCache(String id) { if (id == null) { throw new IllegalArgumentException("Cache instances require an ID"); } log.debug(">>>>>> cache Id = {}",id); this.id = id; } @Override public String getId() { return this.id; } @Override public void putObject(Object key, Object value) { if (null == redisTemplate) { //获取Redis实例 redisTemplate = (RedisTemplate ) ApplicationContextUtil .getBean("redisTemplate",RedisTemplate.class); } String newKey = getId() + ":" + DigestUtils.md5DigestAsHex(key.toString().getBytes(StandardCharsets.UTF_8)); redisTemplate.opsForValue().set(newKey, value, TIME_IN_DAY, TimeUnit.DAYS); log.debug("添加Key={}的缓存.",newKey); } @Override public Object getObject(Object key) { if (null == redisTemplate) { //获取Redis实例 redisTemplate = (RedisTemplate ) ApplicationContextUtil .getBean("redisTemplate",RedisTemplate.class); } String newKey = getId() + ":" + DigestUtils.md5DigestAsHex(key.toString().getBytes(StandardCharsets.UTF_8)); log.debug("获取Key={}的缓存",newKey); return redisTemplate.opsForValue().get(newKey); } @Override public Object removeObject(Object key) { if (null == redisTemplate) { //获取Redis实例 redisTemplate = (RedisTemplate ) ApplicationContextUtil .getBean("redisTemplate",RedisTemplate.class); } String newKey = getId() + ":" + DigestUtils.md5DigestAsHex(key.toString().getBytes(StandardCharsets.UTF_8)); redisTemplate.delete(newKey); log.debug("删除key={}的缓存",newKey); return null; } @Override public void clear() { if (null == redisTemplate) { //获取Redis实例 redisTemplate = (RedisTemplate ) ApplicationContextUtil .getBean("redisTemplate",RedisTemplate.class); } log.info("清除缓存"); Set keys = redisTemplate.keys("*" + this.id + "*"); redisTemplate.delete(keys); } @Override public int getSize() { if (null == redisTemplate) { //获取Redis实例 redisTemplate = (RedisTemplate ) ApplicationContextUtil .getBean("redisTemplate",RedisTemplate.class); } log.debug("获取缓存中的所有key的size"); Integer execute = redisTemplate.execute(new RedisCallback () { @Override public Integer doInRedis(RedisConnection connection) throws DataAccessException { return connection.dbSize().intValue(); } }); return execute; } @Override public ReadWriteLock getReadWriteLock() { return readWriteLock; }}
(3)、清理二级缓存配置工具类:
import lombok.extern.slf4j.Slf4j;import org.apache.ibatis.cache.Cache;import org.apache.ibatis.session.SqlSessionFactory;import org.springframework.data.redis.connection.RedisConnection;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.util.ObjectUtils;import java.util.Iterator;import java.util.Objects;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;/** * @Description: 清空Mybatis二级缓存的工具类 * */@Slf4jpublic class MybatisCacheCleanUtil { //读写锁保证线程安全 private static final ReadWriteLock READ_WRITE_LOCK = new ReentrantReadWriteLock(); /** * 清理redis全量缓存 * */ public static Boolean cleanAllCache(RedisTemplateredisTemplate) { RedisConnection connection = null; try{ RedisConnectionFactory redisConnectionFactory = Objects.requireNonNull(redisTemplate.getConnectionFactory()); connection = redisConnectionFactory.getConnection(); connection.flushDb(); }catch (Exception e){ log.error("清空缓存异常:{}",e.getMessage()); return false; }finally { connection.close(); } log.info("清理缓存成功."); return true; } /** * 清理MyBatis二级缓存 * * @param sqlSessionFactory SqlSession对象 * @param mapperPackage 二级缓存的mapper路径(例:“com.example.dao.DataDemoMapper”或者“DataDemoMapper”) * * @Description:当mapperPackage指定的mapper路径为空时对全量MyBatis二级缓存进行清除 */ public static Boolean clearCache(SqlSessionFactory sqlSessionFactory, String mapperPackage) { Iterator cacheNames = sqlSessionFactory.getConfiguration().getCacheNames().iterator(); log.info("清理二级缓存量:{}", sqlSessionFactory.getConfiguration().getCacheNames().size()); while (cacheNames.hasNext()) { String newMapperName = !ObjectUtils.isEmpty(mapperPackage) ? mapperPackage : cacheNames.next(); Cache cache = sqlSessionFactory.getConfiguration().getCache(newMapperName); if (cache != null) { READ_WRITE_LOCK.writeLock().lock(); try { cache.clear(); } finally { READ_WRITE_LOCK.writeLock().unlock(); } log.info("清理{}缓存成功.", newMapperName); if(!ObjectUtils.isEmpty(mapperPackage)) { return true; } } } return true; }}
3、Mapper的XML文件声明:
4、使用:
(1)、开启二级缓存:
查询时开启
@Autowired private DataDemoMapper dataDemoMapper; /** * redis简单缓存 * 基于String数据类型,做mybatis二级缓存 * */ @Override public Object redisMybatisDemo() { return dataDemoMapper.selectAllDemo(); }
(2)、清理二级缓存:
@Autowired private RedisTemplateredisTemplate; @Autowired private SqlSessionFactory sqlSessionFactory; /** * 清理缓存 * */ @Override public Object redisMybatisCleanDemo() { //方式一、清理指定mapper二级缓存 MybatisCacheCleanUtil.clearCache(sqlSessionFactory, "com.xxx.xxx.dao.DataDemoMapper"); //方式二、清理所有mapper二级缓存 MybatisCacheCleanUtil.clearCache(sqlSessionFactory, null); //方式三、清理所有redis缓存 MybatisCacheCleanUtil.cleanAllCache(redisTemplate); return null; }
四、基于Redis-List类型实现实时监听生产者功能:
Redis基于List数据类型,按照先进先出的顺序对数据进行存取
1、声明Redis-List操作工具类:
import com.alibaba.druid.support.json.JSONUtils;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.util.ObjectUtils;/** * redis基于List数据类型,按照先进先出的顺序对数据进行存取 * */public class ChokeQueueUtil { //队列名称 public static String queueName = "REAL_TIME_QUEUE"; //将消息推送到消息队列 public static void pushMessage(RedisTemplateredisTemplate, String message){ redisTemplate.opsForList().leftPush(queueName, message); } public static String popMessage(RedisTemplate redisTemplate, String queueName) { Object msg = redisTemplate.opsForList().rightPop(queueName); if(ObjectUtils.isEmpty(msg)){ return null; }else { return JSONUtils.toJSONString(msg); } }}
2、声明生产者:
@Autowired private RedisTemplateredisTemplate; /** * redis单业务消息队列 * 基于List数据类型,实时监听生产者功能 * */ @Override public Object redisQueueProduceDemo(String msg) { try { ChokeQueueUtil.pushMessage(redisTemplate, msg); return "生产者发布消息成功"; }catch (Exception e){ log.error("send message error:{}", e.getMessage()); return "生产者发布消息失败"; } }
3、声明消费者:
(1)、消费者接口:
/** * 消息消费接口 * */public interface RedisQueueConsume { /** * 消费者 */ void consume (String msgBody);}
(2)、实现类:
import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;@Slf4j@Componentpublic class RedisQueueConsumeImpl implements RedisQueueConsume { @Override public void consume(String msgBody) { log.info("消费者消费成功,消费消息为:{}",msgBody); }}
4、声明实时消费监听器:
import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.InitializingBean;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Component;import java.util.concurrent.*;/** * 实现消息队列,可用于完成单业务的实时监听功能 */@Slf4j@Componentpublic class RedisMessageQueueListener implements InitializingBean { private ExecutorService product = null; private ExecutorService consumer = null; @Autowired private RedisQueueConsume redisQueueConsume; @Autowired private RedisTemplateredisTemplate; /** * 初始化配置 */ @Override public void afterPropertiesSet() { product = new ThreadPoolExecutor(10, 15, 60 * 3, TimeUnit.SECONDS, new SynchronousQueue<>()); consumer = new ThreadPoolExecutor(10, 15, 60 * 3, TimeUnit.SECONDS, new SynchronousQueue<>()); product.execute(() -> { startListener(redisTemplate, redisQueueConsume); }); } //启动消息监听器,采用分布式锁机制确保仅有一个监听器在运行 public void startListener(RedisTemplate redisTemplate, RedisQueueConsume handler) { String lockKey = ChokeQueueUtil.queueName + ":lock"; while (true) { //如果锁为空就set值,并返回1;如果锁存在不进行操作,并返回0,实现分布式锁操作 //采用了分布式锁方式来保证多个线程同时弹出同一个队列元素的情况不会发生 boolean locked = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey, "")); if (locked) { try { //如果锁的持有者在1秒内没有释放锁,则锁将自动过期。避免在某些情况下锁被长时间占用而导致的问题 redisTemplate.expire(lockKey, 1, TimeUnit.SECONDS); //获取消息 String message =ChokeQueueUtil.popMessage(redisTemplate, ChokeQueueUtil.queueName); if (message != null) { log.info(">>>>>>生产者存在消息,启动消费者监听>>>>>>"); //消费来自生产者的消息 consumer.execute(() -> { handler.consume(message); }); log.info(">>>>>>消费者消费当前消息完毕>>>>>>"); } } finally { redisTemplate.delete(lockKey); } }else { try { // 每100毫秒尝试一次获取锁 Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("消费消息error:{}", e.getMessage()); break; } } } }}
五、问题存在:
1、Redis做缓存存在的问题:
(1)、缓存与数据库双写数据不一致问题
(2)、缓存穿透、击穿与雪崩问题
2、Redis做消息队列存在的问题:
(1)、实时消费存在CPU性能损失问题
(2)、存在消息丢失问题
(3)、避免生产者过快,消费者过慢导致的消息堆积占用 Redis 的内存问题
六、参考:
Redis基础详解
redisTemplate.opsForList()使用
Redis报错Redis is configured to save RDB snapshots解决方案1
Redis报错Redis is configured to save RDB snapshots解决方案2
分布式锁与synchronize的区别
关键词: