架构设计
组件选择/多级
缓存的设计要分多个层次,在不同的层次上选择不同的缓存,包括JVM缓存、文件缓存和Redis缓存。
JVM缓存
JVM缓存就是本地缓存,设计在应用服务器中(tomcat)。
通常可以采用Ehcache和Guava Cache。在互联网应用中,由于要处理高并发,通常选择Guava Cache。
适用本地(JVM)缓存的场景:
- 对性能有非常高的要求
- 不经常变化
- 占用内存不大
- 有访问整个集合的需求
- 数据允许不时时一致
文件缓存
这里的文件缓存是基于http协议的文件缓存,一般放在nginx中。
因为静态文件(比如css,js, 图片)中,很多都是不经常更新的。nginx使用proxy_cache将用户的请求缓存到本地一个目录。下一个相同请求可以直接调取缓存文件,就不用去请求服务器了。
server {
listen 80 default_server;
server_name localhost;
root /mnt/blog/;
location / {
}
#要缓存文件的后缀,可以在以下设置。
location ~ .*\.(gif|jpg|png|css|js)(.*) {
proxy_pass http://ip地址:90;
proxy_redirect off;
proxy_set_header Host $host;
proxy_cache cache_one;
proxy_cache_valid 200 302 24h;
proxy_cache_valid 301 30d;
proxy_cache_valid any 5m;
expires 90d;
add_header wall "hello lagou.";
}
}
Redis缓存
分布式缓存,采用主从+哨兵或RedisCluster的方式缓存数据库的数据。在实际开发中,如果作为数据库使用,数据要保证完整;如果要作为缓存使用,要保证性能(比如作为MyBatis的二级缓存等)。
缓存大小
GuavaCache的缓存设置方式:
// 超过num会按照LRU算法来移除缓存
CacheBuilder.newBuilder().maximumSize(num)
Nginx的缓存设置方式:
http {
...
proxy_cache_path /path/to/cache levels=1:2 keys_zone=my_cache:10m max_size=10g
inactive=60m use_temp_path=off;
server {
proxy_cache my_cache;
location / {
proxy_pass http://localhost:8000;
}
}
}
Redis缓存设置:
# 最大缓存量 一般为内存的3/4
maxmemory=num
# 淘汰算法
maxmemory-policy allkeys lru
缓存淘汰策略的选择:
- allkeys-lru : 在不确定时一般采用策略。
- volatile-lru : 比allkeys-lru性能差
- allkeys-random : 希望请求符合平均分布(每个元素以相同的概率被访问)
- volatile-ttl:自己控制,但有可能发生缓存穿透
- 禁止驱逐(默认): 用作DB,不设置maxmemory
key数量
官方说Redis单例能处理key:2.5亿个,一个key或是value大小最大是512M。
读写峰值
Redis采用的是基于内存的采用的是单进程单线程模型的KV数据库,由C语言编写,官方提供的数据是可以达到110000+的QPS(每秒内查询次数),80000的写。
命中率
- 命中:可以直接通过缓存获取到需要的数据
- 不命中:无法直接通过缓存获取到想要的数据,需要再次查询数据库或者执行其它的操作。原因可能是由于缓存中根本不存在,或者缓存已经过期
通常来讲,缓存的命中率越高则表示使用缓存的收益越高,应用的性能越好(响应时间越短、吞吐量越高),抗并发的能力越强。
由此可见,在高并发的互联网系统中,缓存的命中率是至关重要的指标。
通过info
命令可以监控服务器状态:
127.0.0.1:6379> info
# Server
redis_version:5.0.5
redis_git_sha1:00000000
redis_git_dirty:0
redis_build_id:e188a39ce7a16352
redis_mode:standalone
os:Linux 3.10.0-229.el7.x86_64 x86_64
arch_bits:64
#缓存命中
keyspace_hits:1000
#缓存未命中
keyspace_misses:20
used_memory:433264648
expired_keys:1333536
evicted_keys:1547380
命中率=1000/(1000+20)=98%。一个缓存失效机制和过期时间设计良好的系统,命中率可以做到95%以上。
影响缓存命中率的因素:
- 缓存的数量越少命中率越高,比如缓存单个对象的命中率要高于缓存集合
- 过期时间越长命中率越高
- 缓存越大缓存的对象越多,则命中的越多
过期策略
Redis的过期策略是定时删除+惰性删除。
性能监控指标
利用info
命令就可以了解Redis的状态了,主要监控指标有:
connected_clients:68 #连接的客户端数量
used_memory_rss_human:847.62M #系统给redis分配的内存
used_memory_peak_human:794.42M #内存使用的峰值大小
total_connections_received:619104 #服务器已接受的连接请求数量
instantaneous_ops_per_sec:1159 #服务器每秒钟执行的命令数量 qps
instantaneous_input_kbps:55.85 #redis网络入口kps
instantaneous_output_kbps:3553.89 #redis网络出口kps
rejected_connections:0 #因为最大客户端数量限制而被拒绝的连接请求数量
expired_keys:0 #因为过期而被自动删除的数据库键数量
evicted_keys:0 #因为最大内存容量限制而被驱逐(evict)的键数量
keyspace_hits:0 #查找数据库键成功的次数
keyspace_misses:0 #查找数据库键失败的次数
Redis监控平台:grafana、prometheus以及redis_exporter。
缓存预热
缓存预热就是系统启动前,提前将相关的缓存数据直接加载到缓存系统。避免在用户请求的时候,先查询数据库,然后再将数据缓存的问题!用户直接查询实现被预热的缓存数据。
加载缓存思路:
- 数据量不大,可以在项目启动的时候自动进行加载
- 利用定时任务刷新缓存,将数据库的数据刷新到缓存中
缓存问题
缓存穿透
一般的缓存系统,都是按照key去缓存查询,如果不存在对应的value,就应该去后端系统查找(比如DB)。
缓存穿透是指在高并发下查询key不存在的数据(不存在的key),会穿过缓存查询数据库。导致数据库压力过大而宕机。
解决方案:
- 对查询结果为空的情况也进行缓存,缓存时间(ttl)设置短一点,或者该key对应的数据insert了之后清理缓存。问题:缓存太多空值占用了更多的空间
- 使用布隆过滤器。在缓存之前在加一层布隆过滤器,在查询的时候先去布隆过滤器查询key是否存在,如果不存在就直接返回,存在再查缓存和DB
布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机hash映射函数。
布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法。
布隆过滤器的原理是,当一个元素被加入集合时,通过K个Hash函数将这个元素映射成一个数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。
缓存雪崩
当缓存服务器重启或者大量缓存集中在某一个时间段失效,这样在失效的时候,也会给后端系统(比如DB)带来很大压力。
解决方案:
- key的失效期分散开,不同的key设置不同的有效期
- 设置二级缓存(数据不一定一致)
- 高可用(脏读)
缓存击穿
对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题,这个和缓存雪崩的区别在于这里针对某一key缓存,前者则是很多key。
存在某个时间点过期的时候,恰好在这个时间点对这个Key有大量的并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。
解决方案:
- 用分布式锁控制访问的线程
- 不设超时时间,淘汰策略设置成volatile-lru。这种方案会造成写一致问题(即DB更新了但是缓存未及时更新)
数据不一致
缓存和DB的数据不一致的根源 : 数据源不一样。
解决方案:
- 先更新数据库同时删除缓存项(key),等读的时候再填充缓存
- 2秒后再删除一次缓存项(key)
- 设置缓存过期时间 Expired Time 比如 10秒 或1小时
- 缓存删除失败记录到日志中,利用脚本提取失败记录再次删除(缓存失效期过长 7*24)
升级方案:通过数据库的binlog来异步淘汰key,利用工具(canal)将binlog日志采集发送到MQ中,然后通过ACK机制确认处理删除缓存。
数据并发竞争
这里的并发指的是多个redis的client同时set 同一个key引起的并发问题。多客户端(Jedis)同时并发写一个key,一个key的值是1,本来按顺序修改为2,3,4,最后是4,但是顺序变成了4,3,2,最后变成了2。
第一种方案:分布式锁+时间戳
1.整体技术方案
这种情况,主要是准备一个分布式锁,大家去抢锁,抢到锁就做set操作。加锁的目的实际上就是把并行读写改成串行读写的方式,从而来避免资源竞争。
2.Redis分布式锁的实现
主要用到的redis函数是setnx()
。
对于上面举的例子,要求key的操作需要顺序执行,所以需要保存一个时间戳判断set顺序。
系统A key 1 { ValueA 7:00 }
系统B key 1 { ValueB 7:05 }
假设系统B先抢到锁,将key1设置为{ ValueB 7:05 }。接下来系统A抢到锁,发现自己的key1的时间戳早于缓存中的时间戳(7:00<7:05),那就不做set操作了。
第二种方案:利用消息队列
在并发量过大的情况下,可以通过消息中间件进行处理,把并行读写进行串行化。把Redis的set操作放在队列中使其串行化来一个一个执行。
Hot Key
当有大量的请求(几十万)访问某个Redis某个key时,由于流量集中达到网络上限,从而导致这个redis的服务器宕机。造成缓存击穿,接下来对这个key的访问将直接访问数据库造成数据库崩溃,或者访问数据库回填Redis再访问Redis,继续崩溃。
如何发现热key:
- 预估热key,比如秒杀的商品、火爆的新闻等
- 在客户端进行统计,实现简单,加一行代码即可
- 如果是Proxy,比如Codis,可以在Proxy端收集
- 利用Redis自带的命令,monitor、hotkeys。但是执行缓慢(不要用)
- 利用基于大数据领域的流式计算技术来进行实时数据访问次数的统计,比如 Storm、Spark、Streaming、Flink,这些技术都是可以的。发现热点数据后可以写到ZooKeeper中
如何处理热Key:
- 变分布式缓存为本地缓存:发现热key后,把缓存数据取出后,直接加载到本地缓存中。可以采用Ehcache、Guava Cache都可以,这样系统在访问热key数据时就可以直接访问自己的缓存了(数据不要求时时一致)
- 备份热key:在每个Redis主节点上备份热key数据,这样在读取时可以采用随机读取的方式,将访问压力负载到每个Redis上
- 利用对热点数据访问的限流熔断保护措施:每个系统实例每秒最多请求缓存集群读操作不超过 400 次,一超过就可以熔断掉,不让请求缓存集群,直接返回一个空白信息,然后用户稍后会自行再次重新刷新页面之类的(首页不行,系统友好性差)
Big Key
大key指的是存储的值(Value)非常大,常见场景:
- 热门话题下的讨论
- 大V的粉丝列表
- 序列化后的图片
- 没有及时处理的垃圾数据
大key的影响:
- 大key会大量占用内存,在集群中无法均衡
- Redis的性能下降,主从复制异常
- 在主动删除或过期删除时会操作时间过长而引起服务阻塞
如何发现大key:
./redis-cli --bigkeys
命令。可以找到某个实例5种数据类型(String、hash、list、set、zset)的最大key。但如果Redis 的key比较多,执行该命令会比较慢(有密码为:./redis-cli -a 密码 --bigkeys
)- 获取生产Redis的rdb文件,通过rdbtools分析rdb生成csv文件,再导入MySQL或其他数据库中进行分析统计,根据
size_in_bytes
统计Big Key
大key的处理:
- string类型的big key,尽量不要存入Redis中,可以使用文档型数据库MongoDB或缓存到CDN上。如果必须用Redis存储,最好单独存储,不要和其他的key一起存储。采用一主一从或多从
- 单个简单的key存储的value很大,可以尝试将对象分拆成几个key-value, 使用mget获取值,这样分拆的意义在于分拆单次操作的压力,将操作压力平摊到多次操作中,降低对redis的IO影响
- hash, set,zset,list 中存储过多的元素,可以将这些元素分拆
以hash类型举例来说,对于field过多的场景,可以根据field进行hash取模,生成一个新的key,例如
原来的
hash_key:{filed1:value, filed2:value, filed3:value ...}
可以hash取模后形成如下
key:value形式
hash_key:1:{filed1:value}
hash_key:2:{filed2:value}
hash_key:3:{filed3:value}
...
取模后,将原先单个key分成多个key,每个key filed个数为原先的1/N
- 删除大key时不要使用del,因为del是阻塞命令,删除时会影响性能
- 使用lazy delete (
unlink
命令)
删除指定的key(s),若key不存在则该key被跳过。但是,相比DEL
会产生阻塞,该命令会在另一个线程中回收内存,因此它是非阻塞的。 这也是该命令名字的由来:仅将keys从key空间中删除,真正的数据删除会在后续异步操作。
redis> SET key1 "Hello"
"OK"
redis> SET key2 "World"
"OK"
redis> UNLINK key1 key2 key3
(integer) 2
缓存与数据库一致性
缓存更新策略
- 利用Redis的缓存淘汰策略被动更新 LRU 、LFU
- 利用TTL被动更新
- 在更新数据库时主动更新 (先更数据库再删缓存----延时双删)
- 异步更新定时任务,数据不保证时时一致 ,保证不穿DB
不同策略之间的优缺点
策略 | 一致性 | 维护成本 |
利用Redis的缓存淘汰策略被动更新 | 最差 | 最低 |
利用TTL被动更新 | 较差 | 较低 |
在更新数据库时主动更新 | 较强 | 最高 |
与MyBatis整合
可以使用Redis做MyBatis的二级缓存,在分布式环境下可以使用。框架采用SpringBoot+MyBatis+Redis。框架的搭建就不赘述了。
在pom.xml中添加Redis依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
在application.yml中添加Redis配置
#开发配置
spring:
#数据源配置
datasource:
url: jdbc:mysql://192.168.127.128:3306/test?
serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8
username: root
password: root
driver-class-name: com.mysql.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
redis:
host: 192.168.127.128
port: 6379
jedis:
pool:
min-idle: 0
max-idle: 8
max-active: 8
max-wait: -1ms
# 公共配置与profiles选择无关
mybatis:
typeAliasesPackage: com.rubin.rcache.entity
mapperLocations: classpath:mappers/*.xml
添加Redis操作实例到容器中
@Configuration
public class RedisConfig {
@Autowired
private RedisConnectionFactory factory;
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setConnectionFactory(factory);
return redisTemplate;
}
}
缓存实现:ApplicationContextHolder
用于注入RedisTemplate
。
@Component
public class ApplicationContextHolder implements ApplicationContextAware {
private static ApplicationContext ctx;
@Override
//向工具类注入applicationContext
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
ctx = applicationContext; //ctx就是注入的applicationContext
}
//外部调用ctx
public static ApplicationContext getCtx() {
return ctx;
}
public static <T> T getBean(Class<T> tClass) {
return ctx.getBean(tClass);
}
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) {
return (T) ctx.getBean(name);
}
}
RedisCache使用Redis实现MyBatis二级缓存
/**
* 使用redis实现mybatis二级缓存
*/
public class RedisCache implements Cache {
// 缓存对象唯一标识
// orm的框架都是按对象的方式缓存,而每个对象都需要一个唯一标
识.
private final String id;
// 用于事务性缓存操作的读写锁
private static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 处理事务性缓存中做的
// 操作数据缓存的--跟着线程走的
// Redis的模板负责将缓存对象写到redis服务器
里面去
private RedisTemplate redisTemplate;
// 缓存对象的是失效时间,30分钟
private static final long EXPRIRE_TIME_IN_MINUT = 30;
// 构造方法---把对象唯一标识传进来
public RedisCache(String id) {
if (id == null) {
throw new IllegalArgumentException("缓存对象id是不能为空的");
}
this.id = id;
}
@Override
public String getId() {
return this.id;
}
// 给模板对象RedisTemplate赋值,并传出去
private RedisTemplate getRedisTemplate() {
if (redisTemplate == null) { //每个连接池的连接都要获得RedisTemplate
redisTemplate = ApplicationContextHolder.getBean("redisTemplate");
}
return redisTemplate;
}
/*
* 保存缓存对象的方法
*/
@Override
public void putObject(Object key, Object value) {
try {
RedisTemplate redisTemplate = getRedisTemplate();
// 使用redisTemplate得到值操作对象
ValueOperations operation = redisTemplate.opsForValue();
// 使用值操作对象operation设置缓存对象
operation.set(key, value, EXPRIRE_TIME_IN_MINUT, TimeUnit.MINUTES);
// TimeUnit.MINUTES系统当前时间的分钟数
System.out.println("缓存对象保存成功");
} catch (Throwable t) {
System.out.println("缓存对象保存失败" + t);
}
}
/*
* 获取缓存对象的方法
*/
@Override
public Object getObject(Object key) {
try {
RedisTemplate redisTemplate = getRedisTemplate();
ValueOperations operations = redisTemplate.opsForValue();
Object result = operations.get(key);
System.out.println("获取缓存对象");
return result;
} catch (Throwable t) {
System.out.println("缓存对象获取失败" + t);
return null;
}
}
/*
* 删除缓存对象
*/
@Override
public Object del(Object key) {
try {
RedisTemplate redisTemplate = getRedisTemplate();
redisTemplate.delete(key);
System.out.println("删除缓存对象成功!");
} catch (Throwable t) {
System.out.println("删除缓存对象失败!" + t);
}
return null;
}
/*
* 清空缓存对象
* 当缓存的对象更新了的化,就执行此方法
*/
@Override
public void clear() {
RedisTemplate redisTemplate = getRedisTemplate();
//回调函数
redisTemplate.execute((RedisCallback) collection -> {
collection.flushDb();
return null;
});
System.out.println("清空缓存对象成功!");
}
// 可选实现的方法
@Override
public int getSize() {
return 0;
}
@Override
public ReadWriteLock getReadWriteLock() {
return readWriteLock;
}
}
在mapper中增加二级缓存开启(默认不开启)
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.lagou.rcache.dao.UserDao" >
<cache type="com.rubin.rcache.utils.RedisCache" />
<resultMap id="BaseResultMap" type="com.rubin.rcache.entity.TUser" >
<id column="id" property="id" jdbcType="INTEGER" />
<result column="name" property="name" jdbcType="VARCHAR" />
<result column="address" property="address" jdbcType="VARCHAR" />
</resultMap>
<sql id="Base_Column_List" >
id, name, address
</sql>
<select id="selectUser" resultMap="BaseResultMap">
select
<include refid="Base_Column_List" />
from tuser
</select>
</mapper>
在启动时允许缓存
@SpringBootApplication
@MapperScan("com.rubin.rcache.dao")
@EnableCaching
public class RcacheApplication {
public static void main(String[] args) {
SpringApplication.run(RcacheApplication.class, args);
}
}
分布式锁
watch
利用Watch实现Redis乐观锁
乐观锁基于CAS(Compare And Swap)思想(比较并替换),是不具有互斥性,不会产生锁等待而消耗资源,但是需要反复的重试,但也是因为重试的机制,能比较快的响应。因此我们可以利用redis来实现乐观锁。具体思路如下:
- 利用Redis的watch功能,监控这个redisKey的状态值
- 获取redisKey的值
- 创建redis事务
- 给这个key的值+1
- 然后去执行这个事务,如果key的值被修改过则回滚,key不加1
Redis乐观锁实现秒杀
public class Second {
public static void main(String[] arg) {
String redisKey = "lock";
ExecutorService executorService = Executors.newFixedThreadPool(20);
try {
Jedis jedis = new Jedis("127.0.0.1", 6378);
// 初始值
jedis.set(redisKey, "0");
jedis.close();
} catch (Exception e) {
e.printStackTrace();
}
for (int i = 0; i < 1000; i++) {
executorService.execute(() -> {
Jedis jedis1 = new Jedis("127.0.0.1", 6378);
try {
jedis1.watch(redisKey);
String redisValue = jedis1.get(redisKey);
int valInteger = Integer.valueOf(redisValue);
String userInfo = UUID.randomUUID().toString();
// 没有秒完
if (valInteger < 20) {
Transaction tx = jedis1.multi();
tx.incr(redisKey);
List list = tx.exec();
// 秒成功 失败返回空list而不是空
if (list != null && list.size() > 0) {
System.out.println("用户:" + userInfo + ",秒杀成功!
当前成功人数:" + (valInteger + 1));
}
// 版本变化,被别人抢了。
else {
System.out.println("用户:" + userInfo + ",秒杀失
败");
}
}
// 秒完了
else {
System.out.println("已经有20人秒杀成功,秒杀结束");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
jedis1.close();
}
});
}
executorService.shutdown();
}
}
setnx
实现方式
获取锁
- 方式1(使用
set
命令实现)--推荐
/**
* 使用redis的set命令实现获取分布式锁
* @param lockKey 可以就是锁
* @param requestId 请求ID,保证同一性 uuid+threadID
* @param expireTime 过期时间,避免死锁
* @return
*/
public boolean getLock(String lockKey, String requestId, int expireTime) {
// NX:保证互斥性
// hset 原子性操作 只要lockKey有效 则说明有进程在使用分布式锁
String result = jedis.set(lockKey, requestId, "NX", "EX", expireTime);
if("OK".equals(result)) {
return true;
}
return false;
}
- 方式2(使用
setnx
命令实现) -- 并发会产生问题
public boolean getLock(String lockKey, String requestId, int expireTime) {
Long result = jedis.setnx(lockKey, requestId);
if(result == 1) {
//成功设置 如果进程down->永久有效->别的进程就无法获得锁
jedis.expire(lockKey, expireTime);
return true;
}
return false;
}
释放锁
- 方式1(
del
命令实现) -- 并发
/**
* 释放分布式锁
* @param lockKey
* @param requestId
*/
public static void releaseLock(String lockKey,String requestId) {
if (requestId.equals(jedis.get(lockKey))) {
jedis.del(lockKey);
}
}
以上方式的问题在于如果调用jedis.del()
方法的时候,这把锁已经不属于当前客户端的时候会解除他人加的锁。那么是否真的有这种场景?答案是肯定的,比如客户端A加锁,一段时间之后客户端A解锁,在执行jedis.del()
之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()
方法,则将客户端B的锁给解除了。
- 方式2(redis+lua脚本实现)--推荐
public static boolean releaseLock(String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return
redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey),
Collections.singletonList(requestId));
if (result.equals(1L)) {
return true;
}
return false;
}
存在问题
- 单机的话无法保证高可用
- 主从的话无法保证数据的强一致性,在主机宕机时会造成锁的重复获得
- 无法续租,超过expireTime后,不能继续使用
本质分析
Redis集群不能保证数据的随时一致性,只能保证数据的最终一致性。为什么还可以用Redis实现分布式锁?这个跟业务有关:
- 当业务不需要数据强一致性时,比如:社交场景,就可以使用Redis实现分布式锁
- 当业务必须要数据的强一致性,即不允许重复获得锁,比如金融场景(重复下单,重复转账)就不要使用。可以使用CP模型实现,比如:ZooKeeper和Etcd
Redisson分布式锁的使用
Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。Redisson在基于NIO的Netty框架上,生产环境使用分布式锁。
加入jar包的依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>2.7.0</version>
</dependency>
配置Redisson
public class RedissonManager {
private static Config config = new Config();
//声明redisso对象
private static Redisson redisson = null;
//实例化redisson
static{
config.useClusterServers()
// 集群状态扫描间隔时间,单位是毫秒
.setScanInterval(2000)
//cluster方式至少6个节点(3主3从,3主做sharding,3从用来保证主宕机后可以高可用)
.addNodeAddress("redis://127.0.0.1:6379" )
.addNodeAddress("redis://127.0.0.1:6380")
.addNodeAddress("redis://127.0.0.1:6381")
.addNodeAddress("redis://127.0.0.1:6382")
.addNodeAddress("redis://127.0.0.1:6383")
.addNodeAddress("redis://127.0.0.1:6384");
//得到redisson对象
redisson = (Redisson) Redisson.create(config);
}
//获取redisson对象的方法
public static Redisson getRedisson(){
return redisson;
}
}
锁的获取和释放
public class DistributedRedisLock {
//从配置类中获取redisson对象
private static Redisson redisson = RedissonManager.getRedisson();
private static final String LOCK_TITLE = "redisLock_";
//加锁
public static boolean acquire(String lockName){
//声明key对象
String key = LOCK_TITLE + lockName;
//获取锁对象
RLock mylock = redisson.getLock(key);
//加锁,并且设置锁过期时间3秒,防止死锁的产生 uuid+threadId
mylock.lock(2, 3, TimeUtil.SECOND);
//加锁成功
return true;
}
//锁的释放
public static void release(String lockName){
//必须是和加锁时的同一个key
String key = LOCK_TITLE + lockName;
//获取所对象
RLock mylock = redisson.getLock(key);
//释放锁(解锁)
mylock.unlock();
}
}
业务逻辑中使用分布式锁
public String discount() throws IOException{
String key = "lock001";
//加锁
DistributedRedisLock.acquire(key);
//执行具体业务逻辑
dosoming
//释放锁
DistributedRedisLock.release(key);
//返回结果
return soming;
}
Redisson分布式锁的实现原理
加锁机制
如果该客户端面对的是一个Redis Cluster集群,他首先会根据hash节点选择一台机器。
发送Lua脚本到Redis服务器上,脚本如下:
"if (redis.call('exists',KEYS[1])==0) then "+ --看有没有锁
"redis.call('hset',KEYS[1],ARGV[2],1) ; "+ --无锁 加锁
"redis.call('pexpire',KEYS[1],ARGV[1]) ; "+
"return nil; end ;" +
"if (redis.call('hexists',KEYS[1],ARGV[2]) ==1 ) then "+ --我加的锁
"redis.call('hincrby',KEYS[1],ARGV[2],1) ; "+ --重入锁
"redis.call('pexpire',KEYS[1],ARGV[1]) ; "+
"return nil; end ;" +
"return redis.call('pttl',KEYS[1]) ;" --不能加锁,返回锁的时间
Lua的作用:保证这段复杂业务逻辑执行的原子性。
Lua脚本的解释:
- KEYS[1] : 加锁的key
- ARGV[1] : key的生存时间,默认为30秒
- ARGV[2] : 加锁的客户端ID (
UUID.randomUUID() + “:” + threadId
)
锁互斥机制
那么在这个时候,如果客户端2来尝试加锁,执行了同样的一段Lua脚本,会咋样呢?
很简单,第一个if判断会执行“exists myLock”,发现myLock这个锁key已经存在了。接着第二个if判断,判断一下,myLock锁key的hash数据结构中,是否包含客户端2的ID,但是明显不是的,因为那里包含的是客户端1的ID。所以,客户端2会获取到pttl myLock返回的一个数字,这个数字代表了myLock这个锁key的剩余生存时间--比如还剩15000毫秒的生存时间。
此时客户端2会进入一个while
循环,不停的尝试加锁。
自动延时机制
只要客户端1一旦加锁成功,就会启动一个Watch Dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。这里是一个争议的点,大家可能会想如果不断续期不就是死锁了吗?
博主查阅资料发现,看门狗机制只有在加锁时未指定超时时间的情况下才会自动开启,只要指定了超时时间便不会开启。如果未指定超时时间,需要自行处理宕机导致死锁问题,且看门狗机制很影响性能。
可重入锁机制
第一个if判断肯定不成立,“exists myLock”会显示锁key已经存在了。第二个if判断会成立,因为myLock的hash数据结构中包含的那个ID,就是客户端1的那个ID。此时就会执行可重入加锁的逻辑,他会用:incrby myLock 8743c9c0-0795-4907-87fd-6c71a6b4586:1 1
命令。通过这个命令,对客户端1的加锁次数,累加1。
释放锁机制
执行Lua脚本如下:
# 如果key已经不存在,说明已经被解锁,直接发布(publish)redis消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
# key和field不匹配,说明当前客户端线程没有持有锁,不能主动解锁。 不是我加的锁 不能解锁
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
# 将value减1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3],
-1); " +
# 如果counter>0说明锁在重入,不能删除key
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
# 删除key并且publish 解锁消息
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
- KEYS[1] :需要加锁的key,这里需要是字符串类型
- KEYS[2] :Redis消息的ChannelName,一个分布式锁对应唯一的一个channelName
- ARGV[1] :Reids消息体,这里只需要一个字节的标记就可以,主要标记Redis的key已经解锁,再结合Redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁
- ARGV[2] :锁的超时时间,防止死锁
- ARGV[3] :锁的唯一标识,也就是刚才介绍的 id(
UUID.randomUUID() + “:” + threadId
)
如果执行lock.unlock()
,就可以释放分布式锁,此时的业务逻辑也是非常简单的。其实说白了,就是每次都对myLock数据结构中的那个加锁次数减1。如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用“del myLock
”命令,从Redis里删除这个key。然后呢,另外的客户端2就可以尝试完成加锁了。
分布式锁特性
- 互斥性:任意时刻,只能有一个客户端获取锁,不能同时有两个客户端获取到锁
- 同一性:锁只能被持有该锁的客户端删除,不能由其它客户端删除
- 可重入性:持有某个锁的客户端可继续对该锁加锁,实现锁的续租
- 容错性:锁失效后(超过生命周期)自动释放锁(key失效),其他客户端可以继续获得该锁,防止死锁
分布式锁的实际应用
- 防止重复下单
- 防止库存超卖
- 。。。
ZooKeeper分布式锁的对比
Redis | ZooKeeper | Etcd | |
一致性算法 | 无 | paxos(ZAB) | raft |
CAP | AP | CP | CP |
高可用 | 主从集群 | n+1 (n至少为2) | n+1 |
接口类型 | 客户端 | 客户端 | http/grpc |
实现 | setNX | createEphemeral | restful API |
阿里Redis使用手册
本文主要介绍在使用阿里云Redis的开发规范,从下面几个方面进行说明:
- 键值设计
- 命令使用
- 客户端使用
- 相关工具
通过本文的介绍可以减少使用Redis过程带来的问题。
键值设计
- key名设计
可读性和可管理性
以业务名(或数据库名)为前缀(防止key冲突),用冒号分隔,比如业务名:表名:id。
简洁性
保证语义的前提下,控制key的长度,当key较多时,内存占用也不容忽视。
不要包含特殊字符
反例:包含空格、换行、单双引号以及其他转义字符。
- value设计
拒绝bigkey
防止网卡流量、慢查询,string类型控制在10KB以内,hash、list、set、zset元素个数不要超过5000。
非字符串的bigkey,不要使用del删除,使用hscan、sscan、zscan方式渐进式删除,同时要注意防止bigkey过期时间自动删除问题(例如一个200万的zset设置1小时过期,会触发del操作,造成阻塞,而且该操作不会不出现在慢查询中。
选择适合的数据类型
例如:实体类型(要合理控制和使用数据结构内存编码优化配置,例如ziplist,但也要注意节省内存和性能之间的平衡)。
控制key的生命周期
Redis不是垃圾桶,建议使用expire设置过期时间(条件允许可以打散过期时间,防止集中过期),不过期的数据重点关注idletime。
命令使用
- O(N)命令关注N的数量:例如
hgetall
、lrange
、smembers
、zrange
、sinter
等并非不能使用,但是需要明确N的值。有遍历的需求可以使用hscan
、sscan
、zscan
代替 - 禁用命令:禁止线上使用
keys
、flushall
、flushdb
等,通过redis的rename机制禁掉命令,或者使用scan
的方式渐进式处理 - 合理使用select:Redis的多数据库较弱,使用数字进行区分,很多客户端支持较差,同时多业务用多数据库实际还是单线程处理,会有干扰
- 使用批量操作提高效率:mget、mset等,也可以使用pipeline。不建议过多使用Redis事务功能
客户端使用
- 避免多个应用使用一个Redis实例:不相干的业务拆分,公共数据做服务化
- 使用连接池:可以有效控制连接,同时提高效率
- 熔断功能:高并发下建议客户端添加熔断功能(例如netflix hystrix)
- 合理的加密:设置合理的密码,如有必要可以使用SSL加密访问
- 淘汰策略:根据自身业务类型,选好
maxmemory-policy
(最大内存淘汰策略),设置好过期时间。默认策略是volatile-lru
,即超过最大内存后,在过期键中使用lru算法进行key的剔除,保证不过期数据不被删除,但是可能会出现OOM问题
其他策略如下:
- allkeys-lru:根据LRU算法删除键,不管数据有没有设置超时属性,直到腾出足够空间为止
- allkeys-random:随机删除所有键,直到腾出足够空间为止
- volatile-random:随机删除过期键,直到腾出足够空间为止
- volatile-ttl:根据键值对象的ttl属性,删除最近将要过期数据。如果没有,回退到noeviction策略
- noeviction:不会剔除任何数据,拒绝所有写入操作并返回客户端错误信息"(error) OOM command not allowed when used memory",此时Redis只响应读操作
相关工具
- 数据同步:Redis间数据同步可以使用:redis-port
- big key搜索:Redis大key搜索工具
- 热点key寻找:内部实现使用monitor,所以建议短时间使用facebook的redis-faina
删除bigkey
- Hash删除: hscan + hdel
- List删除: ltrim
- Set删除: sscan + srem
- SortedSet删除: zscan + zrem
以上就是本文的全部内容。欢迎小伙伴们积极留言交流~~~
文章评论