14、基于Redis实现缓存与消息队列

来源:博客园 2023-04-03 10:22:28

Redis是基于内存运行并支持持久化、高性能的NoSQL(非关系型)数据库,适用于存储频繁访问,数据量较小的数据,应用在配合关系型数据库做高速缓存与多样的数据结构存储持久化数据;

一、搭建基于spring boot的Redis工程:

1、POM:

    org.springframework.boot    spring-boot-starter-data-redis

2、YML:

spring:  datasource:    type: com.alibaba.druid.pool.DruidDataSource    #当前数据源操作类型    driver-class-name: org.gjt.mm.mysql.Driver    #mysql驱动包    url: jdbc:mysql://localhost:3306/demo?useUnicode=true&characterEncoding=utf-8&useSSL=false    username: root    password: 123456    druid:      test-while-idle: false  #关闭空闲检测  redis:    host: 127.0.0.1   # Redis服务器地址    port: 6379    # Redis服务器连接端口    password: 123456   # Redis服务器连接密码    jedis:      pool:        max-active: 8   # 连接池最大连接数(使用负值表示没有限制)        max-wait: -1ms    # 连接池最大阻塞等待时间(使用负值表示没有限制)        max-idle: 500   # 连接池中的最大空闲连接        min-idle: 0   # 连接池中的最小空闲连接    lettuce:      shutdown-timeout: 0msmybatis:  mapperLocations: classpath:mapper/*.xml    #resources下建mapper包

3、配置类:

import com.fasterxml.jackson.annotation.JsonAutoDetect;import com.fasterxml.jackson.annotation.PropertyAccessor;import com.fasterxml.jackson.databind.ObjectMapper;import org.springframework.cache.CacheManager;import org.springframework.cache.annotation.CachingConfigurerSupport;import org.springframework.cache.annotation.EnableCaching;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.cache.RedisCacheConfiguration;import org.springframework.data.redis.cache.RedisCacheManager;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;import org.springframework.data.redis.serializer.RedisSerializationContext;import org.springframework.data.redis.serializer.RedisSerializer;import org.springframework.data.redis.serializer.StringRedisSerializer;import java.time.Duration;@EnableCaching@Configurationpublic class RedisConfig extends CachingConfigurerSupport {    /**     * RedisTemplate内部的序列化配置器默认采用JDK序列化器     * 使用默认序列化配置器查看数据时会导致数据乱码,需重新定义RedisTemplate序列化方案     * 修改存储对象的序列化问题     */    @Bean    @SuppressWarnings("all")    public RedisTemplateredisTemplate(RedisConnectionFactory factory) {        RedisTemplate template = new RedisTemplate();        template.setConnectionFactory(factory);        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);        ObjectMapper om = new ObjectMapper();        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);        jackson2JsonRedisSerializer.setObjectMapper(om);        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();        // key采用String的序列化方式        template.setKeySerializer(stringRedisSerializer);        // hash的key也采用String的序列化方式        template.setHashKeySerializer(stringRedisSerializer);        // value序列化方式采用jackson        template.setValueSerializer(jackson2JsonRedisSerializer);        // hash的value序列化方式采用jackson        template.setHashValueSerializer(jackson2JsonRedisSerializer);        template.afterPropertiesSet();        return template;    }    /**     * CacheManager:     * 管理多种缓存,包含内存, appfabric, redis, couchbase, windows azure cache, memorycache等     * 提供了额外的功能,如缓存同步、并发更新、事件、性能计数器等     */    @Bean    public CacheManager cacheManager(RedisConnectionFactory factory) {        RedisSerializer redisSerializer = new StringRedisSerializer();        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);        //解决查询缓存转换异常的问题        ObjectMapper om = new ObjectMapper();        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);        jackson2JsonRedisSerializer.setObjectMapper(om);        // 配置序列化(解决乱码的问题),过期时间600秒        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()                .entryTtl(Duration.ofSeconds(600))                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))                .disableCachingNullValues();        RedisCacheManager cacheManager = RedisCacheManager.builder(factory)                .cacheDefaults(config)                .build();        return cacheManager;    }}

4、工具类:

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Component;import java.util.Collection;import java.util.List;import java.util.Map;import java.util.Set;import java.util.concurrent.TimeUnit;@Componentpublic final class RedisUtil {    @Autowired    private RedisTemplate redisTemplate;    // =============================common============================    /**     * 指定缓存失效时间     * @param key  键     * @param time 时间(秒)     */    public boolean expire(String key, long time) {        try {            if (time > 0) {                redisTemplate.expire(key, time, TimeUnit.SECONDS);            }            return true;        } catch (Exception e) {            e.printStackTrace();            return false;        }    }    /**     * 根据key 获取过期时间     * @param key 键 不能为null     * @return 时间(秒) 返回0代表为永久有效     */    public long getExpire(String key) {        return redisTemplate.getExpire(key, TimeUnit.SECONDS);    }    /**     * 判断key是否存在     * @param key 键     * @return true 存在 false不存在     */    public boolean hasKey(String key) {        try {            return redisTemplate.hasKey(key);        } catch (Exception e) {            e.printStackTrace();            return false;        }    }    /**     * 删除缓存     * @param key 传入一个值     */    @SuppressWarnings("unchecked")    public void del(String key) {        redisTemplate.delete(key);    }    /**     * 删除缓存     * @param keys 传入多个值     */    @SuppressWarnings("unchecked")    public void del(Collection keys) {        redisTemplate.delete(keys);    }    // ============================String=============================    /**     * 普通缓存获取     * @param key 键     * @return 值     */    public Object get(String key) {        return key == null ? null : redisTemplate.opsForValue().get(key);    }    /**     * 普通缓存放入     * @param key   键     * @param value 值     * @return true成功 false失败     */    public boolean set(String key, Object value) {        try {            redisTemplate.opsForValue().set(key, value);            return true;        } catch (Exception e) {            e.printStackTrace();            return false;        }    }    /**     * 普通缓存放入并设置时间     * @param key   键     * @param value 值     * @param time  时间(秒) time要大于0 如果time小于等于0 将设置无限期     * @return true成功 false 失败     */    public boolean set(String key, Object value, long time) {        try {            if (time > 0) {                redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);            } else {                set(key, value);            }            return true;        } catch (Exception e) {            e.printStackTrace();            return false;        }    }    /**     * 递增     * @param key   键     * @param delta 要增加几(大于0)     */    public long incr(String key, long delta) {        if (delta < 0) {            throw new RuntimeException("递增因子必须大于0");        }        return redisTemplate.opsForValue().increment(key, delta);    }    /**     * 递减     * @param key   键     * @param delta 要减少几(小于0)     */    public long decr(String key, long delta) {        if (delta < 0) {            throw new RuntimeException("递减因子必须大于0");        }        return redisTemplate.opsForValue().increment(key, -delta);    }    // ================================Map=================================    /**     * HashGet     * @param key  键 不能为null     * @param item 项 不能为null     */    public Object hget(String key, String item) {        return redisTemplate.opsForHash().get(key, item);    }    /**     * 获取hashKey对应的所有键值     * @param key 键     * @return 对应的多个键值     */    public Map hmget(String key) {        return redisTemplate.opsForHash().entries(key);    }    /**     * HashSet     * @param key 键     * @param map 对应多个键值     */    public boolean hmset(String key, Map map) {        try {            redisTemplate.opsForHash().putAll(key, map);            return true;        } catch (Exception e) {            e.printStackTrace();            return false;        }    }    /**     * HashSet 并设置时间     * @param key  键     * @param map  对应多个键值     * @param time 时间(秒)     * @return true成功 false失败     */    public boolean hmset(String key, Map map, long time) {        try {            redisTemplate.opsForHash().putAll(key, map);            if (time > 0) {                expire(key, time);            }            return true;        } catch (Exception e) {            e.printStackTrace();            return false;        }    }    /**     * 向一张hash表中放入数据,如果不存在将创建     *     * @param key   键     * @param item  项     * @param value 值     * @return true 成功 false失败     */    public boolean hset(String key, String item, Object value) {        try {            redisTemplate.opsForHash().put(key, item, value);            return true;        } catch (Exception e) {            e.printStackTrace();            return false;        }    }    /**     * 向一张hash表中放入数据,如果不存在将创建     *     * @param key   键     * @param item  项     * @param value 值     * @param time  时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间     * @return true 成功 false失败     */    public boolean hset(String key, String item, Object value, long time) {        try {            redisTemplate.opsForHash().put(key, item, value);            if (time > 0) {                expire(key, time);            }            return true;        } catch (Exception e) {            e.printStackTrace();            return false;        }    }    /**     * 删除hash表中的值     *     * @param key  键 不能为null     * @param item 项 可以使多个 不能为null     */    public void hdel(String key, Object... item) {        redisTemplate.opsForHash().delete(key, item);    }    /**     * 判断hash表中是否有该项的值     *     * @param key  键 不能为null     * @param item 项 不能为null     * @return true 存在 false不存在     */    public boolean hHasKey(String key, String item) {        return redisTemplate.opsForHash().hasKey(key, item);    }    /**     * hash递增 如果不存在,就会创建一个 并把新增后的值返回     *     * @param key  键     * @param item 项     * @param by   要增加几(大于0)     */    public double hincr(String key, String item, double by) {        return redisTemplate.opsForHash().increment(key, item, by);    }    /**     * hash递减     *     * @param key  键     * @param item 项     * @param by   要减少记(小于0)     */    public double hdecr(String key, String item, double by) {        return redisTemplate.opsForHash().increment(key, item, -by);    }    // ============================set=============================    /**     * 根据key获取Set中的所有值     * @param key 键     */    public Set sGet(String key) {        try {            return redisTemplate.opsForSet().members(key);        } catch (Exception e) {            e.printStackTrace();            return null;        }    }    /**     * 根据value从一个set中查询,是否存在     *     * @param key   键     * @param value 值     * @return true 存在 false不存在     */    public boolean sHasKey(String key, Object value) {        try {            return redisTemplate.opsForSet().isMember(key, value);        } catch (Exception e) {            e.printStackTrace();            return false;        }    }    /**     * 将数据放入set缓存     *     * @param key    键     * @param values 值 可以是多个     * @return 成功个数     */    public long sSet(String key, Object... values) {        try {            return redisTemplate.opsForSet().add(key, values);        } catch (Exception e) {            e.printStackTrace();            return 0;        }    }    /**     * 将set数据放入缓存     *     * @param key    键     * @param time   时间(秒)     * @param values 值 可以是多个     * @return 成功个数     */    public long sSetAndTime(String key, long time, Object... values) {        try {            Long count = redisTemplate.opsForSet().add(key, values);            if (time > 0)                expire(key, time);            return count;        } catch (Exception e) {            e.printStackTrace();            return 0;        }    }    /**     * 获取set缓存的长度     *     * @param key 键     */    public long sGetSetSize(String key) {        try {            return redisTemplate.opsForSet().size(key);        } catch (Exception e) {            e.printStackTrace();            return 0;        }    }    /**     * 移除值为value的     *     * @param key    键     * @param values 值 可以是多个     * @return 移除的个数     */    public long setRemove(String key, Object... values) {        try {            Long count = redisTemplate.opsForSet().remove(key, values);            return count;        } catch (Exception e) {            e.printStackTrace();            return 0;        }    }    // ===============================list=================================    /**     * 获取list缓存的内容     *     * @param key   键     * @param start 开始     * @param end   结束 0 到 -1代表所有值     */    public List lGet(String key, long start, long end) {        try {            return redisTemplate.opsForList().range(key, start, end);        } catch (Exception e) {            e.printStackTrace();            return null;        }    }    /**     * 获取list缓存的长度     *     * @param key 键     */    public long lGetListSize(String key) {        try {            return redisTemplate.opsForList().size(key);        } catch (Exception e) {            e.printStackTrace();            return 0;        }    }    /**     * 通过索引 获取list中的值     *     * @param key   键     * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推     */    public Object lGetIndex(String key, long index) {        try {            return redisTemplate.opsForList().index(key, index);        } catch (Exception e) {            e.printStackTrace();            return null;        }    }    /**     * 将list放入缓存     *     * @param key   键     * @param value 值     */    public boolean lSet(String key, Object value) {        try {            redisTemplate.opsForList().rightPush(key, value);            return true;        } catch (Exception e) {            e.printStackTrace();            return false;        }    }    /**     * 将list放入缓存     * @param key   键     * @param value 值     * @param time  时间(秒)     */    public boolean lSet(String key, Object value, long time) {        try {            redisTemplate.opsForList().rightPush(key, value);            if (time > 0)                expire(key, time);            return true;        } catch (Exception e) {            e.printStackTrace();            return false;        }    }    /**     * 将list放入缓存     *     * @param key   键     * @param value 值     * @return     */    public boolean lSet(String key, List value) {        try {            redisTemplate.opsForList().rightPushAll(key, value);            return true;        } catch (Exception e) {            e.printStackTrace();            return false;        }    }    /**     * 将list放入缓存     *     * @param key   键     * @param value 值     * @param time  时间(秒)     * @return     */    public boolean lSet(String key, List value, long time) {        try {            redisTemplate.opsForList().rightPushAll(key, value);            if (time > 0)                expire(key, time);            return true;        } catch (Exception e) {            e.printStackTrace();            return false;        }    }    /**     * 根据索引修改list中的某条数据     *     * @param key   键     * @param index 索引     * @param value 值     * @return     */    public boolean lUpdateIndex(String key, long index, Object value) {        try {            redisTemplate.opsForList().set(key, index, value);            return true;        } catch (Exception e) {            e.printStackTrace();            return false;        }    }    /**     * 移除N个值为value     *     * @param key   键     * @param count 移除多少个     * @param value 值     * @return 移除的个数     */    public long lRemove(String key, long count, Object value) {        try {            Long remove = redisTemplate.opsForList().remove(key, count, value);            return remove;        } catch (Exception e) {            e.printStackTrace();            return 0;        }    }}View Code

二、基于Redis-String类型做简单缓存:

@Autowired    private RedisTemplateredisTemplate;    /**     * redis简单缓存     * 基于String数据类型,做简单的key-value缓存     * */    @Override    public Object redisComDemo() {        if(redisTemplate.hasKey("key1")){            log.info("查询redis");            return redisTemplate.opsForValue().get("key1");        }        log.info("查询本地");        redisTemplate.opsForValue().set("key1", "redis基于String数据类型,做简单的key-value缓存",                10, TimeUnit.SECONDS);        return "redis基于String数据类型,做简单的key-value缓存";    }

三、基于Redis-Mybatis做二级缓存(Mapper级别缓存):

1、YML配置:

手动开启Mybatis二级缓存声明:


(资料图片)

Mybatis一级缓存是SqlSession级别的缓存,Mybatis默认开启一级缓存。

Mybatis二级缓存是mapper级别的缓存,Mybatis默认是关闭二级缓存的。

mybatis:  #开启mybatis的二级缓存  configuration:    cache-enabled: true

2、配置类:

(1)、手动获取Spring容器的bean工具类:

import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.context.annotation.Configuration;/** * 手动地去调用Spring容器的getBean方法来获取bean * */@Configurationpublic class ApplicationContextUtil implements ApplicationContextAware {    //获取IoC容器    private static ApplicationContext applicationContext;     //实现了ApplicationContextAware接口后,重写setApplicationContext方法进行设置     @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        this.applicationContext = applicationContext;    }     //通过在工厂中获取对象的方法    public static  T getBean(String beanName,Class requiredType){        return applicationContext.getBean(beanName,requiredType);    }}

(2)、声明二级缓存配置类:

import lombok.extern.slf4j.Slf4j;import org.apache.ibatis.cache.Cache;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.dao.DataAccessException;import org.springframework.data.redis.connection.RedisConnection;import org.springframework.data.redis.core.RedisCallback;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.util.DigestUtils;import java.nio.charset.StandardCharsets;import java.util.Set;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;@Slf4jpublic class MybatisRedisCache implements Cache {    private String id;    @Autowired    private RedisTemplate redisTemplate;    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();    private static final long TIME_IN_DAY = 1;    public MybatisRedisCache(String id) {        if (id == null) {            throw new IllegalArgumentException("Cache instances require an ID");        }        log.debug(">>>>>> cache Id = {}",id);        this.id = id;    }    @Override    public String getId() {        return this.id;    }    @Override    public void putObject(Object key, Object value) {        if (null == redisTemplate) {            //获取Redis实例            redisTemplate = (RedisTemplate) ApplicationContextUtil                    .getBean("redisTemplate",RedisTemplate.class);        }        String newKey = getId() +  ":" +  DigestUtils.md5DigestAsHex(key.toString().getBytes(StandardCharsets.UTF_8));        redisTemplate.opsForValue().set(newKey, value, TIME_IN_DAY, TimeUnit.DAYS);        log.debug("添加Key={}的缓存.",newKey);    }    @Override    public Object getObject(Object key) {        if (null == redisTemplate) {            //获取Redis实例            redisTemplate = (RedisTemplate) ApplicationContextUtil                    .getBean("redisTemplate",RedisTemplate.class);        }        String newKey = getId() +  ":" +  DigestUtils.md5DigestAsHex(key.toString().getBytes(StandardCharsets.UTF_8));        log.debug("获取Key={}的缓存",newKey);        return redisTemplate.opsForValue().get(newKey);    }    @Override    public Object removeObject(Object key) {        if (null == redisTemplate) {            //获取Redis实例            redisTemplate = (RedisTemplate) ApplicationContextUtil                    .getBean("redisTemplate",RedisTemplate.class);        }        String newKey = getId() +  ":" +  DigestUtils.md5DigestAsHex(key.toString().getBytes(StandardCharsets.UTF_8));        redisTemplate.delete(newKey);        log.debug("删除key={}的缓存",newKey);        return null;    }    @Override    public void clear() {        if (null == redisTemplate) {            //获取Redis实例            redisTemplate = (RedisTemplate) ApplicationContextUtil                    .getBean("redisTemplate",RedisTemplate.class);        }        log.info("清除缓存");        Set keys = redisTemplate.keys("*" + this.id + "*");        redisTemplate.delete(keys);    }    @Override    public int getSize() {        if (null == redisTemplate) {            //获取Redis实例            redisTemplate = (RedisTemplate) ApplicationContextUtil                    .getBean("redisTemplate",RedisTemplate.class);        }        log.debug("获取缓存中的所有key的size");        Integer execute = redisTemplate.execute(new RedisCallback() {            @Override            public Integer doInRedis(RedisConnection connection) throws DataAccessException {                return connection.dbSize().intValue();            }        });        return execute;    }    @Override    public ReadWriteLock getReadWriteLock() {        return readWriteLock;    }}

(3)、清理二级缓存配置工具类:

import lombok.extern.slf4j.Slf4j;import org.apache.ibatis.cache.Cache;import org.apache.ibatis.session.SqlSessionFactory;import org.springframework.data.redis.connection.RedisConnection;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.util.ObjectUtils;import java.util.Iterator;import java.util.Objects;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;/** * @Description: 清空Mybatis二级缓存的工具类 * */@Slf4jpublic class MybatisCacheCleanUtil {    //读写锁保证线程安全    private static final ReadWriteLock READ_WRITE_LOCK = new ReentrantReadWriteLock();    /**     * 清理redis全量缓存     *     */    public static Boolean cleanAllCache(RedisTemplate redisTemplate) {        RedisConnection connection = null;        try{            RedisConnectionFactory redisConnectionFactory = Objects.requireNonNull(redisTemplate.getConnectionFactory());            connection = redisConnectionFactory.getConnection();            connection.flushDb();        }catch (Exception e){            log.error("清空缓存异常:{}",e.getMessage());            return false;        }finally {            connection.close();        }        log.info("清理缓存成功.");        return true;    }    /**     * 清理MyBatis二级缓存     *     * @param sqlSessionFactory SqlSession对象     * @param mapperPackage 二级缓存的mapper路径(例:“com.example.dao.DataDemoMapper”或者“DataDemoMapper”)     *     * @Description:当mapperPackage指定的mapper路径为空时对全量MyBatis二级缓存进行清除     */    public static Boolean clearCache(SqlSessionFactory sqlSessionFactory, String mapperPackage) {        Iterator cacheNames = sqlSessionFactory.getConfiguration().getCacheNames().iterator();        log.info("清理二级缓存量:{}", sqlSessionFactory.getConfiguration().getCacheNames().size());        while (cacheNames.hasNext()) {            String newMapperName = !ObjectUtils.isEmpty(mapperPackage) ? mapperPackage : cacheNames.next();            Cache cache = sqlSessionFactory.getConfiguration().getCache(newMapperName);            if (cache != null) {                READ_WRITE_LOCK.writeLock().lock();                try {                    cache.clear();                } finally {                    READ_WRITE_LOCK.writeLock().unlock();                }                log.info("清理{}缓存成功.", newMapperName);                if(!ObjectUtils.isEmpty(mapperPackage)) {                    return true;                }            }        }        return true;    }}

3、Mapper的XML文件声明:

4、使用:

(1)、开启二级缓存:

查询时开启

@Autowired    private DataDemoMapper dataDemoMapper;        /**     * redis简单缓存     * 基于String数据类型,做mybatis二级缓存     * */    @Override    public Object redisMybatisDemo() {        return dataDemoMapper.selectAllDemo();    }

(2)、清理二级缓存:

@Autowired    private RedisTemplate redisTemplate;    @Autowired    private SqlSessionFactory sqlSessionFactory;    /**     * 清理缓存     * */    @Override    public Object redisMybatisCleanDemo() {        //方式一、清理指定mapper二级缓存        MybatisCacheCleanUtil.clearCache(sqlSessionFactory, "com.xxx.xxx.dao.DataDemoMapper");        //方式二、清理所有mapper二级缓存        MybatisCacheCleanUtil.clearCache(sqlSessionFactory, null);        //方式三、清理所有redis缓存        MybatisCacheCleanUtil.cleanAllCache(redisTemplate);        return null;    }

四、基于Redis-List类型实现实时监听生产者功能:

Redis基于List数据类型,按照先进先出的顺序对数据进行存取

1、声明Redis-List操作工具类:

import com.alibaba.druid.support.json.JSONUtils;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.util.ObjectUtils;/** * redis基于List数据类型,按照先进先出的顺序对数据进行存取 * */public class ChokeQueueUtil {    //队列名称    public static String queueName = "REAL_TIME_QUEUE";    //将消息推送到消息队列    public static void pushMessage(RedisTemplate redisTemplate, String message){        redisTemplate.opsForList().leftPush(queueName, message);    }    public static String popMessage(RedisTemplate redisTemplate, String queueName) {        Object msg = redisTemplate.opsForList().rightPop(queueName);        if(ObjectUtils.isEmpty(msg)){            return null;        }else {            return JSONUtils.toJSONString(msg);        }    }}

2、声明生产者:

@Autowired    private RedisTemplate redisTemplate;    /**     * redis单业务消息队列     * 基于List数据类型,实时监听生产者功能     * */    @Override    public Object redisQueueProduceDemo(String msg) {        try {            ChokeQueueUtil.pushMessage(redisTemplate, msg);            return "生产者发布消息成功";        }catch (Exception e){            log.error("send message error:{}", e.getMessage());            return "生产者发布消息失败";        }    }

3、声明消费者:

(1)、消费者接口:

/** * 消息消费接口 * */public interface RedisQueueConsume {    /**     * 消费者     */    void consume (String msgBody);}

(2)、实现类:

import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Component;@Slf4j@Componentpublic class RedisQueueConsumeImpl implements RedisQueueConsume {    @Override    public void consume(String msgBody) {        log.info("消费者消费成功,消费消息为:{}",msgBody);    }}

4、声明实时消费监听器:

import lombok.extern.slf4j.Slf4j;import org.springframework.beans.factory.InitializingBean;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Component;import java.util.concurrent.*;/** * 实现消息队列,可用于完成单业务的实时监听功能 */@Slf4j@Componentpublic class RedisMessageQueueListener implements InitializingBean {    private ExecutorService product = null;    private ExecutorService consumer = null;    @Autowired    private RedisQueueConsume redisQueueConsume;    @Autowired    private RedisTemplate redisTemplate;    /**     * 初始化配置     */    @Override    public void afterPropertiesSet() {        product = new ThreadPoolExecutor(10, 15, 60 * 3,                TimeUnit.SECONDS, new SynchronousQueue<>());        consumer = new ThreadPoolExecutor(10, 15, 60 * 3,                TimeUnit.SECONDS, new SynchronousQueue<>());        product.execute(() -> {            startListener(redisTemplate, redisQueueConsume);        });    }    //启动消息监听器,采用分布式锁机制确保仅有一个监听器在运行    public void startListener(RedisTemplate redisTemplate, RedisQueueConsume handler) {        String lockKey = ChokeQueueUtil.queueName + ":lock";        while (true) {            //如果锁为空就set值,并返回1;如果锁存在不进行操作,并返回0,实现分布式锁操作            //采用了分布式锁方式来保证多个线程同时弹出同一个队列元素的情况不会发生            boolean locked = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey, ""));            if (locked) {                try {                    //如果锁的持有者在1秒内没有释放锁,则锁将自动过期。避免在某些情况下锁被长时间占用而导致的问题                    redisTemplate.expire(lockKey, 1, TimeUnit.SECONDS);                    //获取消息                    String message =ChokeQueueUtil.popMessage(redisTemplate, ChokeQueueUtil.queueName);                    if (message != null) {                        log.info(">>>>>>生产者存在消息,启动消费者监听>>>>>>");                        //消费来自生产者的消息                        consumer.execute(() -> {                            handler.consume(message);                        });                        log.info(">>>>>>消费者消费当前消息完毕>>>>>>");                    }                } finally {                    redisTemplate.delete(lockKey);                }            }else {                try {                    // 每100毫秒尝试一次获取锁                    Thread.sleep(100);                } catch (InterruptedException e) {                    Thread.currentThread().interrupt();                    log.error("消费消息error:{}", e.getMessage());                    break;                }            }        }    }}

五、问题存在:

1、Redis做缓存存在的问题:

(1)、缓存与数据库双写数据不一致问题

(2)、缓存穿透、击穿与雪崩问题

2、Redis做消息队列存在的问题:

(1)、实时消费存在CPU性能损失问题

(2)、存在消息丢失问题

(3)、避免生产者过快,消费者过慢导致的消息堆积占用 Redis 的内存问题

六、参考:

Redis基础详解

redisTemplate.opsForList()使用

Redis报错Redis is configured to save RDB snapshots解决方案1

Redis报错Redis is configured to save RDB snapshots解决方案2

分布式锁与synchronize的区别

关键词:

返回顶部