Redis持久化
为什么要持久化
- Redis是内存数据库,宕机后数据会消失
- Redis重启后快速恢复数据,要提供持久化机制
- Redis持久化是为了快速的恢复数据而不是为了存储数据
- Redis有两种持久化方式:RDB和AOF
注意:Redis持久化不保证数据的完整性。所以当Redis用作DB时,DB数据要完整,所以一定要有一个完整的数据源(文件、MySQL),在系统启动时,从这个完整的数据源中将数据load到Redis中。
通过info
命令可以查看关于持久化的信息
# Persistence
loading:0
rdb_changes_since_last_save:1
rdb_bgsave_in_progress:0
rdb_last_save_time:1589363051
rdb_last_bgsave_status:ok
rdb_last_bgsave_time_sec:-1
rdb_current_bgsave_time_sec:-1
rdb_last_cow_size:0
aof_enabled:1
aof_rewrite_in_progress:0
aof_rewrite_scheduled:0
aof_last_rewrite_time_sec:-1
aof_current_rewrite_time_sec:-1
aof_last_bgrewrite_status:ok
aof_last_write_status:ok
aof_last_cow_size:0
aof_current_size:58
aof_base_size:0
aof_pending_rewrite:0
aof_buffer_length:0
aof_rewrite_buffer_length:0
aof_pending_bio_fsync:0
aof_delayed_fsync:0
RDB
RDB(Redis DataBase),是Redis默认的存储方式,RDB方式是通过快照(snapshotting
)完成的。他只关注这一刻的数据,而不关注过程。
触发快照的方式
- 符合自定义配置的快照规则
- 执行
save
或者bgsave
命令 - 执行
flushall
命令 - 执行主从复制操作 (第一次)
配置参数定期执行
在redis.conf中配置:save 多少秒内 数据变了多少
save "" # 不使用RDB存储 不能主从
save 900 1 # 表示15分钟(900秒钟)内至少1个键被更改则进行快照。
save 300 10 # 表示5分钟(300秒)内至少10个键被更改则进行快照。
save 60 10000 # 表示1分钟内至少10000个键被更改则进行快照。
这种设置方式我们叫做漏斗设计的设置,目的是为了提供更好的性能。
命令显式触发
在客户端输入bgsave
命令。
127.0.0.1:6379> bgsave
Background saving started
RDB执行流程
- Redis父进程首先判断:当前是否在执行
save
,或bgsave
/bgrewriteaof
(aof文件重写命令)的子进程,如果在执行则bgsave
命令直接返回 - 父进程执行fork(调用OS函数复制主进程)操作创建子进程,这个复制过程中父进程是阻塞的,Redis不能执行来自客户端的任何命令
- 父进程fork后,
bgsave
命令返回”Background saving started”信息并不再阻塞父进程,并可以响应其他命令 - 子进程创建RDB文件,根据父进程内存快照生成临时快照文件,完成后对原有文件进行原子替换
- 子进程发送信号给父进程表示完成,父进程更新统计信息
- 父进程fork子进程后,继续工作
RDB文件结构
- 头部5字节固定为“REDIS”字符串
- 4字节“RDB”版本号(不是Redis版本号),当前为9,填充后为0009
- 辅助字段,以key-value的形式
- 存储数据库号码
- 字典大小
- 过期key
- 主要数据,以key-value的形式存储
- 结束标志
- 校验和,就是看文件是否损坏,或者是否被修改
其中,辅助字段详情如下:
字段名 | 字段值 |
redis-ver | 5.0.5 |
redis-bits | 64/32 |
ctime | 当前时间戳 |
used-mem | 使用内存 |
aof-preamble | 是否开启aof |
repl-stream-db | 主从复制 |
repl-id | 主从复制 |
repl-offset | 主从复制 |
RDB的优缺点
优点
- RDB是二进制压缩文件,占用空间小,便于传输(传给slaver)
- 主进程fork子进程,可以最大化Redis性能,主进程不能太大,Redis的数据量不能太大,复制过程中主进程阻塞
缺点
- 不保证数据完整性,会丢失最后一次快照以后更改的所有数据
AOF
AOF(append only file)是Redis的另一种持久化方式。Redis默认情况下是不开启的。开启AOF持久化后,Redis 将所有对数据库进行过写入的命令(及其参数)(RESP)记录到 AOF 文件, 以此达到记录数据库状态的目的,这样当Redis重启后只要按顺序回放这些命令就会恢复到原始状态了。
AOF会记录过程,而RDB只管结果。
AOF持久化实现
配置 redis.conf
# 可以通过修改redis.conf配置文件中的appendonly参数开启
appendonly yes
# AOF文件的保存位置和RDB文件的位置相同,都是通过dir参数设置的。
dir ./
# 默认的文件名是appendonly.aof,可以通过appendfilename参数修改
appendfilename appendonly.aof
AOF原理
AOF文件中存储的是Redis的命令,同步命令到 AOF 文件的整个过程可以分为三个阶段:
- 命令传播:Redis 将执行完的命令、命令的参数、命令的参数个数等信息发送到 AOF 程序中
- 缓存追加:AOF 程序根据接收到的命令数据,将命令转换为网络通讯协议的格式,然后将协议内容追加到服务器的 AOF 缓存中
- 文件写入和保存:AOF 缓存中的内容被写入到 AOF 文件末尾,如果设定的 AOF 保存条件被满足的话,fsync 函数或者 fdatasync 函数会被调用,将写入的内容真正地保存到磁盘中
命令传播
当一个 Redis 客户端需要执行命令时, 它通过网络连接, 将协议文本发送给 Redis 服务器。服务器在接到客户端的请求之后, 它会根据协议文本的内容, 选择适当的命令函数, 并将各个参数从字符串文本转换为 Redis 字符串对象( StringObject )。每当命令函数成功执行之后, 命令参数都会被传播到AOF 程序。
缓存追加
当命令被传播到 AOF 程序之后, 程序会根据命令以及命令的参数, 将命令从字符串对象转换回原来的协议文本。协议文本生成之后, 它会被追加到 redis.h/redisServer
结构的 aof_buf
末尾。
redisServer
结构维持着 Redis 服务器的状态, aof_buf
域则保存着所有等待写入到 AOF 文件的协议文本(RESP)。
文件写入和保存
每当服务器常规任务函数被执行、 或者事件处理器被执行时, aof.c/flushAppendOnlyFile
函数都会被调用, 这个函数执行以下两个工作:
- WRITE:根据条件,将
aof_buf
中的缓存写入到 AOF 文件 - SAVE:根据条件,调用
fsync
或fdatasync
函数,将 AOF 文件保存到磁盘中
AOF 保存模式
Redis 目前支持三种 AOF 保存模式,它们分别是:
- AOF_FSYNC_NO :不保存
- AOF_FSYNC_EVERYSEC :每一秒钟保存一次。(默认)
- AOF_FSYNC_ALWAYS :每执行一个命令保存一次。(不推荐)
不保存
在这种模式下,每次调用flushAppendOnlyFile
函数, WRITE
都会被执行, 但SAVE
会被略过。在这种模式下, SAVE
只会在以下任意一种情况中被执行:
- Redis 被关闭
- AOF 功能被关闭
- 系统的写缓存被刷新(可能是缓存已经被写满,或者定期保存操作被执行)
这三种情况下的SAVE
操作都会引起Redis主进程阻塞。
每一秒钟保存一次(推荐)
在这种模式中, SAVE
原则上每隔一秒钟就会执行一次,因为SAVE
操作是由后台子线程(fork)调用的, 所以它不会引起服务器主进程阻塞。
每执行一个命令保存一次
在这种模式下,每次执行完一个命令之后,WRITE
和SAVE
都会被执行。另外,因为SAVE
是由Redis主进程执行的,所以在SAVE
执行期间,主进程会被阻塞,不能接受命令请求。
对于三种 AOF 保存模式, 它们对服务器主进程的阻塞情况如下:
AOF重写、触发方式、混合持久化
AOF记录数据的变化过程,越来越大,需要重写“瘦身”。
Redis可以在 AOF体积变得过大时,自动地在后台(Fork子进程)对 AOF进行重写。重写后的新 AOF文件包含了恢复当前数据集所需的最小命令集合。 所谓的“重写”其实是一个有歧义的词语, 实际上,AOF 重写并不需要对原有的 AOF 文件进行任何写入和读取, 它针对的是数据库中键的当前值。
举例如下:
et s1 11
set s1 22 ------- > set s1 33
set s1 33
没有优化的:
set s1 11
set s1 22
set s1 33
优化后:
set s1 33
lpush list1 1 2 3
lpush list1 4 5 6 -------- > list1 1 2 3 4 5 6
优化后
lpush list1 1 2 3 4 5 6
Redis 不希望 AOF 重写造成服务器无法处理请求, 所以 Redis 决定将 AOF 重写程序放到(后台)子进程里执行, 这样处理的最大好处是:
- 子进程进行AOF重写期间,主进程可以继续处理命令请求
- 子进程带有主进程的数据副本,使用子进程而不是线程,可以在避免锁的情况下,保证数据的安全性
不过, 使用子进程也有一个问题需要解决: 因为子进程在进行AOF重写期间, 主进程还需要继续处理命令, 而新的命令可能对现有的数据进行修改, 这会让当前数据库的数据和重写后的AOF文件中的数据不一致。
为了解决这个问题, Redis增加了一个AOF重写缓存, 这个缓存在fork出子进程之后开始启用,Redis 主进程在接到新的写命令之后, 除了会将这个写命令的协议内容追加到现有的AOF文件之外,还会追加到这个缓存中。
重写过程分析(整个重写操作是绝对安全的):
Redis在创建新AOF文件的过程中,会继续将命令追加到现有的AOF文件里面,即使重写过程中发生停机,现有的 AOF文件也不会丢失。 而一旦新AOF文件创建完毕,Redis就会从旧AOF文件切换到新AOF文件,并开始对新AOF文件进行追加操作。
当子进程在执行AOF重写时, 主进程需要执行以下三个工作:
- 处理命令请求
- 将写命令追加到现有的 AOF 文件中
- 将写命令追加到 AOF 重写缓存中
这样一来可以保证:
- 现有的AOF功能会继续执行,即使在AOF重写期间发生停机,也不会有任何数据丢失
- 所有对数据库进行修改的命令都会被记录到 AOF 重写缓存中
当子进程完成 AOF 重写之后, 它会向父进程发送一个完成信号, 父进程在接到完成信号之后, 会调用一个信号处理函数, 并完成以下工作:
- 将AOF 重写缓存中的内容全部写入到新AOF文件中
- 对新的AOF文件进行改名,覆盖原有的AOF文件
这个信号处理函数执行完毕之后, 主进程就可以继续像往常一样接受命令请求了。 在整个AOF后台重写过程中, 只有最后的写入缓存和改名操作会造成主进程阻塞, 在其他时候,AOF后台重写都不会对主进程造成阻塞, 这将AOF 重写对性能造成的影响降到了最低。
以上就是AOF后台重写, 也即是BGREWRITEAOF
命令(AOF重写)的工作原理。
触发方式
- 配置触发
在redis.conf中配置
# 表示当前aof文件大小超过上一次aof文件大小的百分之多少的时候会进行重写。如果之前没有重写过,
以启动时aof文件大小为准
auto-aof-rewrite-percentage 100
# 限制允许重写最小aof文件大小,也就是文件大小小于64mb的时候,不需要进行优化
auto-aof-rewrite-min-size 64mb
- 执行
bgrewriteaof
命令
127.0.0.1:6379> bgrewriteaof
Background append only file rewriting started
混合持久化
RDB和AOF各有优缺点,Redis 4.0 开始支持RDB和AOF的混合持久化。如果把混合持久化打开,AOF REWRITE的时候就直接把RDB的内容写到AOF文件开头。
开启混合持久化:
aof-use-rdb-preamble yes
AOF文件是RDB文件的头和AOF格式的内容,在加载时,首先会识别AOF文件是否以REDIS字符串开头,如果是就按RDB格式加载,加载完RDB后继续按AOF格式加载剩余部分。
AOF文件的载入与数据还原
因为AOF文件里面包含了重建数据库状态所需的所有写命令,所以服务器只要读入并重新执行一遍AOF文件里面保存的写命令,就可以还原服务器关闭之前的数据库状态。
Redis读取AOF文件并还原数据库状态的详细步骤如下:
- 创建一个不带网络连接的伪客户端(fake client):因为Redis的命令只能在客户端上下文中执行,而载入AOF文件时所使用的命令直接来源于AOF文件而不是网络连接,所以服务器使用了一个没有网络连接的伪客户端来执行AOF文件保存的写命令,伪客户端执行命令的效果和带网络连接的客户端执行命令的效果完全一样
- 从AOF文件中分析并读取出一条写命令
- 使用伪客户端执行被读出的写命令
- 一直执行步骤2和步骤3,直到AOF文件中的所有写命令都被处理完毕为止
当完成以上步骤之后,AOF文件所保存的数据库状态就会被完整地还原出来,整个过程如下图所示:
RDB与AOF对比
- RDB存某个时刻的数据快照,采用二进制压缩存储,AOF存操作命令,采用文本存储(混合)
- RDB性能高、AOF性能较低
- RDB在配置触发状态会丢失最后一次快照以后更改的所有数据,AOF设置为每秒保存一次,则最多丢2秒的数据
- Redis以主服务器模式运行,RDB不会保存过期键值对数据,Redis以从服务器模式运行,RDB会保存过期键值对,当主服务器向从服务器同步时,再清空过期键值对
AOF写入文件时,对过期的key会追加一条DEL
命令,当执行AOF重写时,会忽略过期key和DEL
命令。
底层数据结构
Redis作为Key-Value存储系统,数据结构如下:
Redis没有表的概念,Redis实例所对应的db以编号区分,db本身就是key的命名空间。比如:user:1000作为key值,表示在user这个命名空间下id为1000的元素,类似于user表的id=1000的行。
RedisDB结构
Redis中存在“数据库”的概念,该结构由redis.h中的redisDb定义。当Redis服务器初始化时,会预先分配16个数据库。所有数据库保存到结构redisServer
的一个成员redisServer.db数组中。redisClient中存在一个名叫db的指针指向当前使用的数据库。
RedisDB结构体源码:
typedef struct redisDb {
int id; //id是数据库序号,为0-15(默认Redis有16个数据库)
long avg_ttl; //存储的数据库对象的平均ttl(time to live),用于统计
dict *dict; //存储数据库所有的key-value
dict *expires; //存储key的过期时间
dict *blocking_keys;//blpop 存储阻塞key和客户端对象
dict *ready_keys;//阻塞后push 响应阻塞客户端 存储阻塞后push的key和客户端对象
dict *watched_keys;//存储watch监控的的key和客户端对象
} redisDb;
- id:id是数据库序号,为0-15(默认Redis有16个数据库)
- dict:存储数据库所有的key-value
- expires:存储key的过期时间
RedisObject结构
Value是一个对象,包含字符串对象,列表对象,哈希对象,集合对象和有序集合对象。
结构信息概览
RedisObject结构体源码:
typedef struct redisObject {
unsigned type:4;//类型 对象类型
unsigned encoding:4;//编码
void *ptr;//指向底层实现数据结构的指针
//...
int refcount;//引用计数
//...
unsigned lru:LRU_BITS; //LRU_BITS为24bit 记录最后一次被命令程序访问的时间
//...
}robj;
- 4位type
type字段表示对象的类型,占4位:REDIS_STRING(字符串)、REDIS_LIST (列表)、REDIS_HASH(哈希)、REDIS_SET(集合)、REDIS_ZSET(有序集合)。
当我们执行type
命令时,便是通过读取RedisObject
的type
字段获得对象的类型:
127.0.0.1:6379> type a1
string
- 4位encoding
encoding表示对象的内部编码,占4位。每个对象有不同的实现编码,Redis可以根据不同的使用场景来为对象设置不同的编码,这就大大提高了Redis的灵活性和效率。
通过object encoding
命令,可以查看对象采用的编码方式:
127.0.0.1:6379> object encoding a1
"int"
- 24位LRU
lru记录的是对象最后一次被命令程序访问的时间,( 4.0 版本占 24 位,2.6 版本占 22 位)。高16位存储一个分钟数级别的时间戳,低8位存储访问计数(lfu : 最近访问次数)。
- refcount
refcount记录的是该对象被引用的次数,类型为整型。refcount的作用,主要在于对象的引用计数和内存回收。当对象的refcount>1时,称为共享对象。Redis 为了节省内存,当有一些对象重复出现时,新的程序不会创建新的对象,而是仍然使用原来的对象。
- ptr
ptr指针指向具体的数据,比如:set hello world
,ptr指向包含字符串world的SDS。
7种type
字符串对象
Redis 使用了 SDS(Simple Dynamic String)来存储字符串和整型数据。如图:
struct sdshdr{
//记录buf数组中已使用字节的数量
int len;
//记录 buf 数组中未使用字节的数量
int free;
//字符数组,用于保存字符串
char buf[];
}
其中:buf[] 的长度=len+free+1。
SDS的优势:
- SDS 在C字符串的基础上加入了free和len字段,获取字符串长度:SDS 是 O(1),C字符串是O(n)
- SDS 由于记录了长度,在可能造成缓冲区溢出时会自动重新分配内存,杜绝了缓冲区溢出
- 可以存取二进制数据,以字符串长度len来作为结束标识
- \0 空字符串 二进制数据不包括空字符串,所以没有办法存取二进制数据
SDS的主要应用在:存储字符串和整型数据、存储key、AOF缓冲区和用户输入缓冲。
跳跃表(重点)
跳跃表是有序集合(sorted-set)的底层实现,效率高,实现简单。
跳跃表的基本思想:将有序链表中的部分节点分层,每一层都是一个有序链表。
在查找时优先从最高层开始向后查找,当到达某个节点时,如果next节点值大于要查找的值或next指针指向null,则从当前节点下降一层继续向后查找。
举例:
查找元素9,按道理我们需要从头结点开始遍历,一共遍历8个结点才能找到元素9。
第一次分层:
遍历5次找到元素9(红色的线为查找路径)
依此类推到第三次分层:
遍历4次找到元素9
这种数据结构,就是跳跃表,它具有二分查找的功能。
上面例子中,9个结点,一共4层,是理想的跳跃表。
插入新数据的话我们可以通过抛硬币(概率1/2)的方式来决定新插入结点跨越的层数:如果是正面则插入上层,反面则不插入。
删除数据我们在各层找到指定元素并删除即可。
跳跃表特点:
- 每层都是一个有序链表
- 查找次数近似于层数(1/2)
- 底层包含所有元素
- 空间复杂度 O(n) 扩充了一倍
Redis跳跃表的实现源码如下:
//跳跃表节点
typedef struct zskiplistNode {
sds ele; /* 存储字符串类型数据 redis3.0版本中使用robj类型表示,
但是在redis4.0.1中直接使用sds类型表示 */
double score;//存储排序的分值
struct zskiplistNode *backward;//后退指针,指向当前节点最底层的前一个节点
/*
* 层,柔性数组,随机生成1-64的值
*/
struct zskiplistLevel {
struct zskiplistNode *forward; //指向本层下一个节点
unsigned int span;//本层下个节点到本节点的元素个数
} level[];
} zskiplistNode;
//链表
typedef struct zskiplist{
//表头节点和表尾节点
structz skiplistNode *header, *tail;
//表中节点的数量
unsigned long length;
//表中层数最大的节点的层数
int level;
}zskiplist;
完整的跳跃表结构体:
跳跃表的优势:
- 可以快速查找到需要的节点 O(logn)
- 可以在O(1)的时间复杂度下,快速获得跳跃表的头节点、尾结点、长度和高度
应用场景:有序集合的实现
字典(重点)
字典dict又称散列表(hash),是用来存储键值对的一种数据结构。Redis整个数据库是用字典来存储的(K-V结构)。对Redis进行CURD操作其实就是对字典中的数据进行CURD操作。
Redis中的字典使用的数据结构和我们Java中HashMap使用的数据结构类似,都是采用数组+链表的方式来存储数据。
Redis字典实现包括:字典(dict)、Hash表(dictht)、Hash表节点(dictEntry)。如图:
Hash表源码:
typedef struct dictht {
dictEntry **table; // 哈希表数组
unsigned long size; // 哈希表数组的大小
unsigned long sizemask; // 用于映射位置的掩码,值永远等于(size-1)
unsigned long used; // 哈希表已有节点的数量,包含next单链表数据
} dictht;
- hash表的数组初始容量为4,随着k-v存储量的增加需要对hash表数组进行扩容,新扩容量为当前量的一倍,即4,8,16,32
- 索引值=Hash值&掩码值(Hash值与Hash表容量取余)
Hash表节点源码如下:
typedef struct dictEntry {
void *key; // 键
union { // 值v的类型可以是以下4种类型
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next; // 指向下一个哈希表节点,形成单向链表 解决hash冲突
} dictEntry;
key字段存储的是键值对中的键。v字段是个联合体,存储的是键值对中的值。next指向下一个哈希表节点,用于解决hash冲突。整个字典结构如下:
dict字典源码如下:
typedef struct dict {
dictType *type; // 该字典对应的特定操作函数
void *privdata; // 上述类型函数对应的可选参数
dictht ht[2]; /* 两张哈希表,存储键值对数据,ht[0]为原生
哈希表,
ht[1]为 rehash 哈希表 */
long rehashidx; /*rehash标识 当等于-1时表示没有在
rehash,
否则表示正在进行rehash操作,存储的值表示
hash表 ht[0]的rehash进行到哪个索引值
(数组下标)*/
int iterators; // 当前运行的迭代器数量
} dict;
type字段,指向dictType结构体,里边包括了对该字典操作的函数指针
typedef struct dictType {
// 计算哈希值的函数
unsigned int (*hashFunction)(const void *key);
// 复制键的函数
void *(*keyDup)(void *privdata, const void *key);
// 复制值的函数
void *(*valDup)(void *privdata, const void *obj);
// 比较键的函数
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
// 销毁键的函数
void (*keyDestructor)(void *privdata, void *key);
// 销毁值的函数
void (*valDestructor)(void *privdata, void *obj);
} dictType;
Redis字典除了主数据库的K-V数据存储以外,还可以用于:散列表对象、哨兵模式中的主从节点管理等在不同的应用中,字典的形态都可能不同,dictType是为了实现各种形态的字典而抽象出来的操作函数(多态)。
完整的Redis字典数据结构:
字典达到存储上限(阈值 0.75),需要rehash(扩容)。扩容流程如下图:
说明:
- 初次申请默认容量为4个dictEntry,非初次申请为当前hash表容量的一倍
- rehashidx=0表示要进行rehash操作
- 新增加的数据在新的hash表h[1]
- 修改、删除、查询在老hash表h[0]、新hash表h[1]中(rehash中)
- 将老的hash表h[0]的数据重新计算索引值后全部迁移到新的hash表h[1]中,这个过程称为rehash
当数据量巨大时rehash的过程是非常缓慢的,所以需要进行优化:
- 服务器忙,则只对一个节点进行rehash
- 服务器闲,可批量rehash(100节点)
应用场景:
- 主数据库的K-V数据存储
- 散列表对象(hash)
- 哨兵模式中的主从节点管理
压缩列表
压缩列表(ziplist)是由一系列特殊编码的连续内存块组成的顺序型数据结构。其特点就是节省内存空间。压缩列表的数据结构如下:
- zlbytes:压缩列表的字节长度
- zltail:压缩列表尾元素相对于压缩列表起始地址的偏移量
- zllen:压缩列表的元素个数
- entry1..entryX : 压缩列表的各个节点
- zlend:压缩列表的结尾,占一个字节,恒为0xFF(255)
ziplist结构体如下:
struct ziplist<T>{
unsigned int zlbytes; // ziplist的长度字节数,包含头部、所有entry和zipend。
unsigned int zloffset; // 从ziplist的头指针到指向最后一个entry的偏移量,用于快速反
向查询
unsigned short int zllength; // entry元素个数
T[] entry; // 元素值
unsigned char zlend; // ziplist结束符,值固定为0xFF
}
typedef struct zlentry {
unsigned int prevrawlensize; //previous_entry_length字段的长度
unsigned int prevrawlen; //previous_entry_length字段存储的内容
unsigned int lensize; //encoding字段的长度
unsigned int len; //数据内容长度
unsigned int headersize; //当前元素的首部长度,即previous_entry_length字段
长度与 encoding字段长度之和。
unsigned char encoding; //数据类型
unsigned char *p; //当前元素首地址
} zlentry;
应用场景:
- sorted-set和hash元素个数少且是小整数或短字符串(直接使用)
- list用快速链表(quicklist)数据结构存储,而快速链表是双向列表与压缩列表的组合(间接使用)
整数集合
整数集合(intset)是一个有序的(整数升序)、存储整数的连续存储结构。当Redis集合类型的元素都是整数并且都处在64位有符号整数范围内(2^64),使用该结构体存储。
127.0.0.1:6379> sadd set:001 1 3 5 6 2
(integer) 5
127.0.0.1:6379> object encoding set:001
"intset"
127.0.0.1:6379> sadd set:004 1 100000000000000000000000000 9999999999
(integer) 3
127.0.0.1:6379> object encoding set:004
"hashtable"
intset的结构如下:
typedef struct intset{
//编码方式
uint32_t encoding;
//集合包含的元素数量
uint32_t length;
//保存元素的数组
int8_t contents[];
}intset;
应用场景:
- 可以保存类型为int16_t、int32_t 或者int64_t 的整数值,并且保证集合中不会出现重复元素
快速列表(重要)
快速列表(quicklist)是Redis底层重要的数据结构,是列表的底层实现。(在Redis3.2之前,Redis采用双向链表(adlist)和压缩列表(ziplist)实现。)在Redis3.2以后结合adlist和ziplist的优势Redis设计出了quicklist。
127.0.0.1:6379> lpush list:001 1 2 5 4 3
(integer) 5
127.0.0.1:6379> object encoding list:001
"quicklist"
quicklist是一个双向链表,链表中的每个节点时一个ziplist结构。quicklist中的每个节点ziplist都能够存储多个数据元素。如图所示:
quicklist的结构定义如下:
typedef struct quicklist {
quicklistNode *head; // 指向quicklist的头部
quicklistNode *tail; // 指向quicklist的尾部
unsigned long count; // 列表中所有数据项的个数总和
unsigned int len; // quicklist节点的个数,即ziplist的个数
int fill : 16; // ziplist大小限定,由list-max-ziplist-size给定
(Redis设定)
unsigned int compress : 16; // 节点压缩深度设置,由list-compress-depth给定
(Redis设定)
} quicklist;
quicklistNode的结构定义如下:
typedef struct quicklistNode {
struct quicklistNode *prev; // 指向上一个ziplist节点
struct quicklistNode *next; // 指向下一个ziplist节点
unsigned char *zl; // 数据指针,如果没有被压缩,就指向ziplist结构,反之
指向quicklistLZF结构
unsigned int sz; // 表示指向ziplist结构的总长度(内存占用长度)
unsigned int count : 16; // 表示ziplist中的数据项个数
unsigned int encoding : 2; // 编码方式,1--ziplist,2--quicklistLZF
unsigned int container : 2; // 预留字段,存放数据的方式,1--NONE,2--ziplist
unsigned int recompress : 1; // 解压标记,当查看一个被压缩的数据时,需要暂时解压,标
记此参数为1,之后再重新进行压缩
unsigned int attempted_compress : 1; // 测试相关
unsigned int extra : 10; // 扩展字段,暂时没用
} quicklistNode;
quicklist每个节点的实际数据存储结构为ziplist,这种结构的优势在于节省存储空间。为了进一步降低ziplist的存储空间,还可以对ziplist进行压缩。Redis采用的压缩算法是LZF。其基本思想是:数据与前面重复的记录重复位置及长度,不重复的记录原始数据。
压缩过后的数据可以分成多个片段,每个片段有两个部分:解释字段和数据字段。quicklistLZF的结构体如下:
typedef struct quicklistLZF {
unsigned int sz; // LZF压缩后占用的字节数
char compressed[]; // 柔性数组,指向数据部分
} quicklistLZF;
应用场景:
- 列表(List)的底层实现、发布与订阅、慢查询、监视器等功能
流对象
stream主要由:消息、生产者、消费者和消费组构成。
Redis Stream的底层主要使用了listpack(紧凑列表)和Rax树(基数树)。
listpack表示一个字符串列表的序列化,listpack可用于存储字符串或整数。用于存储stream的消息内容。
结构如下图:
Rax 是一个有序字典树 (基数树 Radix Tree),按照 key 的字典序排列,支持快速地定位、插入和删除操作。
Rax被用在Redis Stream结构里面用于存储消息队列,在Stream里面消息ID的前缀是时间戳 + 序号,这样的消息可以理解为时间序列消息。使用Rax结构进行存储就可以快速地根据消息ID定位到具体的消息,然后继续遍历指定消息之后的所有消息。
10种encoding
encoding表示对象的内部编码,占4位。Redis通过encoding属性为对象设置不同的编码。对于少的和小的数据,Redis采用小的和压缩的存储方式,体现Redis的灵活性也大大提高了 Redis 的存储量和执行效率。
比如Set对象:
- intset : 元素是64位以内的整数
- hashtable:元素是64位以外的整数
如下所示:
127.0.0.1:6379> sadd set:001 1 3 5 6 2
(integer) 5
127.0.0.1:6379> object encoding set:001
"intset"
127.0.0.1:6379> sadd set:004 1 100000000000000000000000000 9999999999
(integer) 3
127.0.0.1:6379> object encoding set:004
"hashtable"
String
int、raw、embstr
int
REDIS_ENCODING_INT(int类型的整数)
127.0.0.1:6379> set n1 123
OK
127.0.0.1:6379> object encoding n1
"int"
embstr
REDIS_ENCODING_EMBSTR(编码的简单动态字符串)
小字符串 长度小于44个字节
127.0.0.1:6379> set name:001 zhangfei
OK
127.0.0.1:6379> object encoding name:001
"embstr"
raw
REDIS_ENCODING_RAW (简单动态字符串)
大字符串 长度大于44个字节
27.0.0.1:6379> set address:001
asdasdasdasdasdasdsadasdasdasdasdasdasdasdasdasdasdasdasdasdasdasdasdasdasdasdas
dasdasdas
OK
127.0.0.1:6379> object encoding address:001
"raw"
list
列表的编码是quicklist。
REDIS_ENCODING_QUICKLIST(快速列表)
127.0.0.1:6379> lpush list:001 1 2 5 4 3
(integer) 5
127.0.0.1:6379> object encoding list:001
"quicklist"
hash
散列的编码是字典和压缩列表
dict
REDIS_ENCODING_HT(字典)
当散列表元素的个数比较多或元素不是小整数或短字符串时。
127.0.0.1:6379> hmset user:003
username 111111111111111111111111111111111111111111111111111111111111111111111111
11111111111111111111111111111111 zhangfei password 111 num
2300000000000000000000000000000000000000000000000000
OK
127.0.0.1:6379> object encoding user:003
"hashtable"
ziplist
REDIS_ENCODING_ZIPLIST(压缩列表)
当散列表元素的个数比较少,且元素都是小整数或短字符串时。
127.0.0.1:6379> hmset user:001 username zhangfei password 111 age 23 sex M
OK
127.0.0.1:6379> object encoding user:001
"ziplist"
set
集合的编码是整形集合和字典
intset
REDIS_ENCODING_INTSET(整数集合)
当Redis集合类型的元素都是整数并且都处在64位有符号整数范围内(<18446744073709551616)
127.0.0.1:6379> sadd set:001 1 3 5 6 2
(integer) 5
127.0.0.1:6379> object encoding set:001
"intset"
dict
REDIS_ENCODING_HT(字典)
当Redis集合类型的元素是非整数或都处在64位有符号整数范围外(>18446744073709551616)
127.0.0.1:6379> sadd set:004 1 100000000000000000000000000 9999999999
(integer) 3
127.0.0.1:6379> object encoding set:004
"hashtable"
zset
有序集合的编码是压缩列表和跳跃表+字典
ziplist
REDIS_ENCODING_ZIPLIST(压缩列表)
当元素的个数比较少,且元素都是小整数或短字符串时。
127.0.0.1:6379> zadd hit:1 100 item1 20 item2 45 item3
(integer) 3
127.0.0.1:6379> object encoding hit:1
"ziplist"
skiplist + dict
REDIS_ENCODING_SKIPLIST(跳跃表+字典)
当元素的个数比较多或元素不是小整数或短字符串时。
127.0.0.1:6379> zadd hit:2 100
item1111111111111111111111111111111111111111111111111111111111111111111111111111
1111111111111111111111111111111111 20 item2 45 item3
(integer) 3
127.0.0.1:6379> object encoding hit:2
"skiplist"
缓存过期和淘汰策略
如果我们的Redis长期使用的话,软件中的key数量会不断膨胀。一旦物理内存被占满,Redis的性能会急剧下降甚至不可用。所以,为了避免这种情况,我们就需要根据场景设置我们的Redis对于内存的最大值,并了解Redis内部对于key的淘汰策略。
maxmemory
该参数是设置最大内存使用。默认是0(不限制),如果可以的话,单实例Redis建议设置为物理内存的3/4,主从实例的话,从实例建议留出一些内存备用。
在redis.conf中
maxmemory 1024mb
命令: 获得maxmemory数
CONFIG GET maxmemory
设置maxmemory后,当趋近maxmemory时,通过缓存淘汰策略,从内存中删除对象来保证内存的正常使用。
注意:设置maxmemory之后,我们的maxmemory-policy最好同步配置一下。关于 maxmemory-policy 下文中有详细介绍。
expire数据结构
在Redis中可以使用expire命令设置一个键的存活时间(ttl: time to live),过了这段时间,该键就会自动被删除。
expire的使用
expire命令的使用方法如下:
expire key ttl(单位秒)
127.0.0.1:6379> expire name 2 #2秒失效
(integer) 1
127.0.0.1:6379> get name
(nil)
127.0.0.1:6379> set name zhangfei
OK
127.0.0.1:6379> ttl name #永久有效
(integer) -1
127.0.0.1:6379> expire name 30 #30秒失效
(integer) 1
127.0.0.1:6379> ttl name #还有24秒失效
(integer) 24
127.0.0.1:6379> ttl name #失效
(integer) -2
expire原理
typedef struct redisDb {
dict *dict; -- key Value
dict *expires; -- key ttl
dict *blocking_keys;
dict *ready_keys;
dict *watched_keys;
int id;
} redisDb;
上面的代码是Redis中关于数据库的结构体定义,这个结构体定义中除了id以外都是指向字典的指针,其中我们只看dict和expires。
dict用来维护一个Redis数据库中包含的所有Key-Value键值对,expires则用于维护一个Redis数据库中设置了失效时间的键(即key与失效时间的映射)。
当我们使用 expire
命令设置一个key的失效时间时,Redis 首先到dict这个字典表中查找要设置的key是否存在,如果存在就将这个key和失效时间添加到expires这个字典表。
当我们使用setex
命令向系统插入数据时,Redis首先将Key和Value添加到dict这个字典表中,然后将Key和失效时间添加到expires这个字典表中。
简单地总结来说就是,设置了失效时间的key和具体的失效时间全部都维护在expires这个字典表中。
删除策略
Redis的数据删除有定时删除、惰性删除和主动删除三种方式。Redis目前采用惰性删除+主动删除的方式。
定时删除
在设置键的过期时间的同时,创建一个定时器,让定时器在键的过期时间来临时,立即执行对键的删除操作。
需要创建定时器,而且消耗CPU,一般不推荐使用。
惰性删除
在key被访问时如果发现它已经失效,那么就删除它。
调用expireIfNeeded函数,该函数的意义是:读取数据之前先检查一下它有没有失效,如果失效了就删除它。
int expireIfNeeded(redisDb *db, robj *key) {
//获取主键的失效时间 get当前时间-创建时间>ttl
long long when = getExpire(db,key);
//假如失效时间为负数,说明该主键未设置失效时间(失效时间默认为-1),直接返回0
if (when < 0) return 0;
//假如Redis服务器正在从RDB文件中加载数据,暂时不进行失效主键的删除,直接返回0
if (server.loading) return 0;
...
//如果以上条件都不满足,就将主键的失效时间与当前时间进行对比,如果发现指定的主键
//还未失效就直接返回0
if (mstime() <= when) return 0;
//如果发现主键确实已经失效了,那么首先更新关于失效主键的统计个数,然后将该主键失
//效的信息进行广播,最后将该主键从数据库中删除
server.stat_expiredkeys++;
propagateExpire(db,key);
return dbDelete(db,key);
}
主动删除
在redis.conf文件中可以配置主动删除策略,默认是no-enviction(不删除)
maxmemory-policy allkeys-lru
LRU
LRU (Least recently used) 最近最少使用,算法根据数据的历史访问记录来进行淘汰数据,其核心思想是“如果数据最近被访问过,那么将来被访问的几率也更高”。
最常见的实现是使用一个链表保存缓存数据,详细算法实现如下:
- 新数据插入到链表头部
- 每当缓存命中(即缓存数据被访问),则将数据移到链表头部
- 当链表满的时候,将链表尾部的数据丢弃
- 在Java中可以使用
LinkHashMap
(哈希链表)去实现LRU
Redis的LRU 数据淘汰机制
在服务器配置中保存了lru计数器server.lrulock,会定时(redis 定时程序 serverCorn())更新。server.lrulock的值是根据server.unixtime计算出来的。
另外,从struct redisObject中可以发现,每一个redis对象都会设置相应的lru。可以想象的是,每一次访问数据的时候,会更新redisObject.lru。
LRU 数据淘汰机制是这样的:在数据集中随机挑选几个键值对,取出其中lru最大的键值对淘汰。
淘汰策略有两种,详情如下:
- volatile-lru:从已设置过期时间的数据集(server.db[i].expires)中挑选最近最少使用的数据淘汰
- allkeys-lru:从数据集(server.db[i].dict)中挑选最近最少使用的数据淘汰
LFU
LFU (Least frequently used) 最不经常使用,如果一个数据在最近一段时间内使用次数很少,那么在将来一段时间内被使用的可能性也很小。
淘汰策略有两种,详情如下:
- volatile-lfu:从已设置过期时间的数据集(server.db[i].expires)中挑选最不经常使用的数据淘汰
- allkeys-lfu:从数据集(server.db[i].dict)中挑选最不经常使用的数据淘汰
random
淘汰策略有两种,详情如下:
- volatile-random:从已设置过期时间的数据集(server.db[i].expires)中任意选择数据淘汰
- allkeys-random:从数据集(server.db[i].dict)中任意选择数据淘汰
ttl
- volatile-ttl:从已设置过期时间的数据集(server.db[i].expires)中挑选将要过期的数据淘汰
noenviction
禁止驱逐数据,不删除。
缓存淘汰策略的选择
- allkeys-lru : 在不确定时一般采用策略。 冷热数据交换
- volatile-lru : 比allkeys-lru性能差。存过期时间
- allkeys-random : 希望请求符合平均分布(每个元素以相同的概率被访问)
通讯协议及事件处理机制
通信协议
Redis是单进程单线程的。应用系统和Redis通过Redis协议(RESP)进行交互。
请求响应模式
Redis协议位于TCP层之上,即客户端和Redis实例保持双工的连接。
串行的请求响应模式(ping-pong)
串行化是最简单模式,客户端与服务器端建立长连接。
连接通过心跳机制检测(ping-pong) ack应答。
客户端发送请求,服务端响应,客户端收到响应后,再发起第二个请求,服务器端再响应。
telnet和redis-cli 发出的命令 都属于该种模式。该模式的特点为:有问有答、耗时在网络传输命令、性能较低。
双工的请求响应模式(pipeline)
批量请求,批量响应。
请求响应交叉进行,不会混淆(TCP双工)。
- pipeline的作用是将一批命令进行打包,然后发送给服务器,服务器执行完按顺序打包返回
- 通过pipeline,一次pipeline(n条命令)=一次网络时间 + n次命令时间
通过Jedis可以很方便的使用pipeline
Jedis redis = new Jedis("192.168.1.111", 6379);
redis.auth("12345678");//授权密码 对应redis.conf的requirepass密码
Pipeline pipe = jedis.pipelined();
for (int i = 0; i <50000; i++) {
pipe.set("key_"+String.valueOf(i),String.valueOf(i));
}
//将封装后的PIPE一次性发给redis
pipe.sync();
原子化的批量请求响应模式(事务)
Redis可以利用事务机制批量执行命令。
发布订阅模式(pub/sub)
发布订阅模式是:一个客户端触发,多个客户端被动接收,通过服务器中转。
脚本化的批量执行(lua)
客户端向服务器端提交一个lua脚本,服务器端执行该脚本。
请求数据格式
Redis客户端与服务器交互采用序列化协议(RESP)。请求以字符串数组的形式来表示要执行命令的参数,Redis使用命令特有(command-specific)数据类型作为回复。
Redis通信协议的主要特点有:
- 客户端和服务器通过TCP连接来进行数据交互, 服务器默认的端口号为 6379
- 客户端和服务器发送的命令或数据一律以 \r\n (CRLF)结尾
在这个协议中, 所有发送至 Redis 服务器的参数都是二进制安全(binary safe)的,并保证了数据的简单、高效和易读。
内联格式
可以使用telnet给Redis发送命令,首字符为Redis命令名的字符,格式为 str1 str2 str3…
[root@localhost bin]# telnet 127.0.0.1 6379
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
ping
+PONG
exists name
:1
规范格式(redis-cli) RESP
- 间隔符号,在Linux下是\r\n,在Windows下是\n
- 简单字符串 Simple Strings, 以 "+"加号 开头
- 错误 Errors, 以"-"减号 开头
- 整数型 Integer, 以 ":" 冒号开头
- 大字符串类型 Bulk Strings, 以 "$"美元符号开头,长度限制512M
- 数组类型 Arrays,以 "*"星号开头
用SET
命令来举例说明RESP协议的格式。
redis> SET mykey Hello
"OK"
实际发送的请求数据:
*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$5\r\nHello\r\n
*3
$3
SET
$5
mykey
$5
Hello
实际收到的响应数据:
+OK\r\n
命令处理流程
整个流程包括:服务器启动监听、接收命令请求并解析、执行命令请求、返回命令回复等。
Server启动时监听socket
启动调用initServer
方法:
- 创建eventLoop(事件机制)
- 注册时间事件处理器
- 注册文件事件(socket)处理器
- 监听 socket 建立连接
建立Client
- redis-cli建立socket
- redis-server为每个连接(socket)创建一个Client对象
- 创建文件事件监听socket
- 指定事件处理函数
读取socket数据到输入缓冲区
从client中读取客户端的查询缓冲区内容。
解析获取命令
- 将输入缓冲区中的数据解析成对应的命令
- 判断是单条命令还是多条命令并调用相应的解析器解析
执行命令
解析成功后调用processCommand
方法执行命令,该方法大致分为一下三个部分:
- 调用
lookupCommand
方法获得对应的redisCommand
- 检测当前Redis是否可以执行该命令
- 调用
call
方法真正执行命令
协议响应格式
状态回复
对于状态,回复的第一个字节是“+”
"+OK"
错误回复
对于错误,回复的第一个字节是“ - ”
1. -ERR unknown command 'foobar'
2. -WRONGTYPE Operation against a key holding the wrong kind of value
整数回复
对于整数,回复的第一个字节是“:”
":6"
批量回复
对于批量字符串,回复的第一个字节是“$”
"$6 foobar"
多条批量回复
对于多条批量回复(数组),回复的第一个字节是“*”
"*3"
协议解析及处理
包括协议解析、调用命令、返回结果。
协议解析
用户在Redis客户端键入命令后,Redis-cli会把命令转化为RESP协议格式,然后发送给服务器。服务器再对协议进行解析,分为三个步骤:
- 解析命令请求参数数量
命令请求参数数量的协议格式为"*N\r\n" ,其中N就是数量,比如
127.0.0.1:6379> set name:1 zhaoyun
我们打开aof文件可以看到协议内容
*3(/r/n)
$3(/r/n)
set(/r/n)
$7(/r/n)
name:10(/r/n)
$7(/r/n)
zhaoyun(/r/n)
首字符必须是“*”,使用"\r"定位到行尾,之间的数就是参数数量了。
- 循环解析请求参数
首字符必须是"$",使用"/r"定位到行尾,之间的数是参数的长度,从/n后到下一个"$"之间就是参数的值了。循环解析直到没有"$"。
协议执行
协议的执行包括命令的调用和返回结果。执行流程如下:
- 判断参数个数和取出的参数是否一致。RedisServer解析完命令后,会调用函数
processCommand
处理该命令请求 - quit校验,如果是“quit”命令,直接返回并关闭客户端
- 命令语法校验,执行
lookupCommand
,查找命令(set),如果不存在则返回:“unknown command”错误 - 参数数目校验,参数数目和解析出来的参数个数要匹配,如果不匹配则返回:“wrong number of arguments”错误
- 此外还有权限校验,最大内存校验,集群校验,持久化校验等等
校验成功后,会调用call函数执行命令,并记录命令执行时间和调用次数,如果执行命令时间过长还要记录慢查询日志。
执行命令后返回结果的类型不同则协议格式也不同,分为5类:状态回复、错误回复、整数回复、批量回复、多条批量回复。
事件处理机制
Redis服务器是典型的事件驱动系统。Redis将事件分为两大类:文件事件和时间事件。
文件事件
文件事件即Socket的读写事件,也就是IO事件。 该事件包括客户端的连接、命令请求、数据回复、连接断开等等。
Redis事件处理机制采用单线程的Reactor模式,属于I/O多路复用的一种常见模式。IO多路复用( I/O multiplexing )指的通过单个线程管理多个Socket。
Reactor pattern(反应器设计模式)是一种为处理并发服务请求,并将请求提交到 一个或者多个服务处理程序的事件设计模式。
Reactor模式是事件驱动的,有一个Service Handler,有多个Request Handlers,这个Service Handler会同步的将输入的请求(Event)多路复用的分发给相应的Request Handler。
- Handle:I/O操作的基本文件句柄,在linux下就是fd(文件描述符)
- Synchronous Event Demultiplexer :同步事件分离器,阻塞等待Handles中的事件发生
- Reactor: 事件分派器,负责事件的注册,删除以及对所有注册到事件分派器的事件进行监控, 当事件发生时会调用Event Handler接口来处理事件
- Event Handler: 事件处理器接口,这里需要Concrete Event Handler来实现该接口
- Concrete Event Handler:真实的事件处理器,通常都是绑定了一个handle,实现对可读事件进行读取或对可写事件进行写入的操作
主程序向事件分派器(Reactor)注册要监听的事件。Reactor调用OS提供的事件处理分离器,监听事件(wait)。当有事件产生时,Reactor将事件派给相应的处理器来处理handle_event()
。
4种IO多路复用模型与选择
select,poll,epoll、kqueue都是IO多路复用的机制。
I/O多路复用就是通过一种机制,一个进程可以监视多个描述符(socket),一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。
select
int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct
timeval *timeout);
select 函数监视的文件描述符分3类,分别是:
- writefds
- readfds
- exceptfds
调用后select函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以通过遍历fd列表,来找到就绪的描述符。
优点:
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。
缺点:
单个进程打开的文件描述是有一定限制的,它由FD_SETSIZE设置,默认值是1024,采用数组存储。另外在检查数组中是否有文件描述需要读写时,采用的是线性扫描的方法,即不管这些socket是不是活跃的,都轮询一遍,所以效率比较低。
poll
int poll (struct pollfd *fds, unsigned int nfds, int timeout);
struct pollfd {
int fd; //文件描述符
short events; //要监视的事件
short revents; //实际发生的事件
};
poll使用一个 pollfd的指针实现,pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。
优点:
采样链表的形式存储,它监听的描述符数量没有限制,可以超过select默认限制的1024大小。
缺点:
在检查链表中是否有文件描述需要读写时,采用的是线性扫描的方法,即不管这些socket是不是活跃的,都轮询一遍,所以效率比较低。
epoll
epoll是在linux2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。
// 创建一个epoll的句柄。自从linux2.6.8之后,size参数是被忽略的。需要注意的是,当创建好epoll
// 句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所
// 以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
int epoll_create(int size)
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
epoll的事件注册函数,它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。
第一个参数是epoll_create()的返回值。第二个参数表示动作,用三个宏来表示:
- EPOLL_CTL_ADD:注册新的fd到epfd中
- EPOLL_CTL_MOD:修改已经注册的fd的监听事件
- EPOLL_CTL_DEL:从epfd中删除一个fd
第三个参数是需要监听的fd。
第四个参数是告诉内核需要监听什么事。
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int
timeout);
等待内核返回的可读写事件,最多返回maxevents个事件。
优点:
epoll 没有最大并发连接的限制,上限是最大可以打开文件的数目。举个例子:在1GB内存的机器上大约是10万左 右。
效率提升, epoll 最大的优点就在于它只管你“活跃”的连接 ,而跟连接总数无关,因此在实际的网络环境中, epoll 的效率就会远远高于 select 和 poll 。
epoll使用了共享内存,不用做内存拷贝。
kqueue
kqueue 是 unix 下的一个IO多路复用库。最初是2000年Jonathan Lemon在FreeBSD系统上开发的一个高性能的事件通知接口。注册一批socket描述符到kqueue以后,当其中的描述符状态发生变化时,kqueue将一次性通知应用程序哪些描述符可读、可写或出错了。
struct kevent {
uintptr_t ident; //是事件唯一的 key,在socket()使用中,它是socket的
fd句柄
int16_t filter; //是事件的类型(EVFILT_READ socket可读事件
EVFILT_WRITE socket可写事件)
uint16_t flags; //操作方式
uint32_t fflags;
intptr_t data; //数据长度
void *udata; //数据
};
优点:
能处理大量数据,性能较高。
文件事件分派器
在Redis中,对于文件事件的处理采用了Reactor模型。采用的是epoll的实现方式。Redis在主循环中统一处理文件事件和时间事件,信号事件则由专门的handler来处理。
主循环:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) { //循环监听事件
// 阻塞之前的处理
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 事件处理,第二个参数决定处理哪类事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
事件处理器
连接处理函数 acceptTCPHandler
当客户端向Redis建立socket时,aeEventLoop
会调用acceptTcpHandler
处理函数,服务器会为每个连接创建一个Client 对象,并创建相应文件事件来监听socket的可读事件,并指定事件处理函数。
// 当客户端建立连接时进行的eventloop处理函数 networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
....
// 层层调用,最后在anet.c 中 anetGenericAccept 方法中调用 socket 的 accept 方法
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
/**
* 进行socket 建立连接后的处理
*/
acceptCommonHandler(cfd,0,cip);
}
请求处理函数 readQueryFromClient
当客户端通过socket发送来数据后,Redis会调用readQueryFromClient
方法。readQueryFromClient
方法会调用read
方法从socket中读取数据到输入缓冲区中,然后判断其大小是否大于系统设置的client_max_querybuf_len
,如果大于,则向 Redis返回错误信息,并关闭client。
// 处理从client中读取客户端的输入缓冲区内容。
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
....
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
// 从 fd 对应的socket中读取到 client 中的 querybuf 输入缓冲区
nread = read(fd, c->querybuf+qblen, readlen);
....
// 如果大于系统配置的最大客户端缓存区大小,也就是配置文件中的client-query-buffer-
limit
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
// 返回错误信息,并且关闭client
bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer
length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}
if (!(c->flags & CLIENT_MASTER)) {
// processInputBuffer 处理输入缓冲区
processInputBuffer(c);
} else {
// 如果client是master的连接
size_t prev_offset = c->reploff;
processInputBuffer(c);
// 判断是否同步偏移量发生变化,则通知到后续的slave
size_t applied = c->reploff - prev_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}
命令回复处理器 sendReplyToClient
sendReplyToClient
函数是Redis的命令回复处理器,这个处理器负责将服务器执行命令后得到的命令回复通过套接字返回给客户端。
- 将outbuf内容写入到套接字描述符并传输到客户端
aeDeleteFileEvent
用于删除文件写事件
时间事件
时间事件分为定时事件与周期事件。一个时间事件主要由以下三个属性组成:
- id(全局唯一id)
- when (毫秒时间戳,记录了时间事件的到达时间)
- timeProc(时间事件处理器,当时间到达时,Redis就会调用相应的处理器来处理事件)
/* Time event structure
*
* 时间事件结构
*/
typedef struct aeTimeEvent {
// 时间事件的唯一标识符
long id; /* time event identifier. */
// 事件的到达时间,存贮的是UNIX的时间戳
long when_sec; /* seconds */
long when_ms; /* milliseconds */
// 事件处理函数,当到达指定时间后调用该函数处理对应的问题
aeTimeProc *timeProc;
// 事件释放函数
aeEventFinalizerProc *finalizerProc;
// 多路复用库的私有数据
void *clientData;
// 指向下个时间事件结构,形成链表
struct aeTimeEvent *next;
} aeTimeEvent;
serverCron
时间事件的最主要的应用是在redis服务器需要对自身的资源与配置进行定期的调整,从而确保服务器的长久运行,这些操作由redis.c中的serverCron函数实现。该时间事件主要进行以下操作:
- 更新redis服务器各类统计信息,包括时间、内存占用、数据库占用等情况
- 清理数据库中的过期键值对
- 关闭和清理连接失败的客户端
- 尝试进行aof和rdb持久化操作
- 如果服务器是主服务器,会定期将数据向从服务器做同步操作
- 如果处于集群模式,对集群定期进行同步与连接测试操作
Redis服务器开启后,就会周期性执行此函数,直到Redis服务器关闭为止。默认每秒执行10次,平均100毫秒执行一次,可以在Redis配置文件的hz选项,调整该函数每秒执行的次数。
serverCron在一秒内执行的次数 , 在redis/conf中可以配置:
hz 10 #100毫秒一次
定时事件
定时事件:让一段程序在指定的时间之后执行一次。aeTimeProc(时间处理器)的返回值是AE_NOMORE,该事件在达到后删除,之后不会再重复。
周期性事件
周期性事件:让一段程序每隔指定时间就执行一次。aeTimeProc(时间处理器)的返回值不是AE_NOMORE。当一个时间事件到达后,服务器会根据时间处理器的返回值,对时间事件的when属性进行更新,让这个事件在一段时间后再次达到。
serverCron就是一个典型的周期性事件。
aeEventLoop
aeEventLoop
是整个事件驱动的核心,Redis自己的事件处理机制。它管理着文件事件表和时间事件列表,不断地循环处理着就绪的文件事件和到期的时间事件。
typedef struct aeEventLoop {
//最大文件描述符的值
int maxfd; /* highest file descriptor currently registered */
//文件描述符的最大监听数
int setsize; /* max number of file descriptors tracked */
//用于生成时间事件的唯一标识id
long long timeEventNextId;
//用于检测系统时间是否变更(判断标准 now<lastTime)
time_t lastTime; /* Used to detect system clock skew */
//注册的文件事件
aeFileEvent *events; /* Registered events */
//已就绪的事件
aeFiredEvent *fired; /* Fired events */
//注册要使用的时间事件
aeTimeEvent *timeEventHead;
//停止标志,1表示停止
int stop;
//这个是处理底层特定API的数据,对于epoll来说,该结构体包含了epoll fd和epoll_event
void *apidata; /* This is used for polling API specific data */
//在调用processEvent前(即如果没有事件则睡眠),调用该处理函数
aeBeforeSleepProc *beforesleep;
//在调用aeApiPoll后,调用该函数
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
初始化
Redis服务端在其初始化函数initServer
中,会创建事件管理器aeEventLoop
对象。
函数aeCreateEventLoop
将创建一个事件管理器,主要是初始化aeEventLoop
的各个属性值,比如events 、 fired 、 timeEventHead 和 apidata :
- 首先创建
aeEventLoop
对象 - 初始化注册的文件事件表、就绪文件事件表。 events指针指向注册的文件事件表, fired 指针指向就绪文件事件表。表的内容在后面添加具体事件时进行初变更
- 初始化时间事件列表,设置
timeEventHead
和timeEventNextId
属性 - 调用
aeApiCreate
函数创建epoll
实例,并初始化apidata
aeFileEvent
结构体为已经注册并需要监听的事件的结构体。
typedef struct aeFileEvent {
// 监听事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE ,
// 或者 AE_READABLE | AE_WRITABLE
int mask; /* one of AE_(READABLE|WRITABLE) */
// 读事件处理器
aeFileProc *rfileProc;
// 写事件处理器
aeFileProc *wfileProc;
// 多路复用库的私有数据
void *clientData;
} aeFileEvent;
aeFiredEvent
:已就绪的文件事件
typedef struct aeFiredEvent {
// 已就绪文件描述符
int fd;
// 事件类型掩码,
// 值可以是 AE_READABLE 或 AE_WRITABLE
// 或者是两者的或
int mask;
} aeFiredEvent;
在ae创建的时候,会被赋值为aeApiState
结构体,结构体的定义如下:
typedef struct aeApiState {
// epoll_event 实例描述符
int epfd;
// 事件槽
struct epoll_event *events;
} aeApiState;
这个结构体是为了epoll所准备的数据结构。Redis可以选择不同的io多路复用方法。因此apidata是个void类型,根据不同的io多路复用库来选择不同的实现。
ae.c里面使用如下的方式来决定系统使用的机制:
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
时间事件: timeEventHead, beforesleep, aftersleep
aeTimeEvent
结构体为时间事件,Redis将所有时间事件都放在一个无序链表中,每次Redis会遍历整个链表,查找所有已经到达的时间事件,并且调用相应的事件处理器。
typedef struct aeTimeEvent {
/* 全局唯一ID */
long long id; /* time event identifier. */
/* 秒精确的UNIX时间戳,记录时间事件到达的时间*/
long when_sec; /* seconds */
/* 毫秒精确的UNIX时间戳,记录时间事件到达的时间*/
long when_ms; /* milliseconds */
/* 时间处理器 */
aeTimeProc *timeProc;
/* 事件结束回调函数,析构一些资源*/
aeEventFinalizerProc *finalizerProc;
/* 私有数据 */
void *clientData;
/* 前驱节点 */
struct aeTimeEvent *prev;
/* 后继节点 */
struct aeTimeEvent *next;
} aeTimeEvent;
beforesleep
对象是一个回调函数,在redis-server初始化时已经设置好了。
功能:
- 检测集群状态
- 随机释放已过期的键
- 在数据同步复制阶段取消客户端的阻塞
- 处理输入数据,并且同步副本信息
- 处理非阻塞的客户端请求
- AOF持久化存储策略,类似于MySQL的bin log
- 使用挂起的输出缓冲区处理写入
aftersleep
对象是一个回调函数,在IO多路复用与IO事件处理之间被调用。
aeMain
aeMain
函数其实就是一个封装的while
循环,循环中的代码会一直运行直到eventLoop
的stop
被设置为1(true)。它会不停尝试调用aeProcessEvents
对可能存在的多种事件进行处理,而aeProcessEvents
就是实际用于处理事件的函数。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
aemain
函数中,首先调用beforesleep
。这个方法在Redis每次进入sleep/wait
去等待监听的端口发生I/O事件之前被调用。当有事件发生时,调用aeProcessEvent
进行处理。
aeProcessEvent
首先计算距离当前时间最近的时间事件,以此计算一个超时时间。然后调用aeApiPoll
函数去等待底层的I/O多路复用事件就绪。aeApiPoll
函数返回之后,会处理所有已经产生文件事件和已经达到的时间事件。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{//processed记录这次调度执行了多少事件
int processed = 0, numevents;
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
//获取最近将要发生的时间事件
shortest = aeSearchNearestTimer(eventLoop);
//计算aeApiPoll的超时时间
if (shortest) {
long now_sec, now_ms;
//获取当前时间
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
//计算距离下一次发生时间时间的时间间隔
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {//没有时间事件
if (flags & AE_DONT_WAIT) {//马上返回,不阻塞
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
tvp = NULL; //阻塞到文件事件发生
}
}//等待文件事件发生,tvp为超时时间,超时马上返回(tvp为0表示马上,为null表示阻塞到
事件发生)
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {//处理触发的文件事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
if (fe->mask & mask & AE_READABLE) {
rfired = 1;//处理读事件
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
//处理写事件
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
if (flags & AE_TIME_EVENTS)//时间事件调度和执行
processed += processTimeEvents(eventLoop);
return processed;
}
aeProcessEvents
都会先计算最近的时间事件发生所需要等待的时间 ,然后调用aeApiPoll
方法在这段时间中等待事件的发生,在这段时间中如果发生了文件事件,就会优先处理文件事件,否则就会一直等待,直到最近的时间事件需要触发。
aeApiPoll
用到了epoll,select,kqueue和evport四种实现方式。
rfileProc
(处理读事件)和wfileProc
(处理写事件)就是在文件事件被创建时传入的函数指针。
processTimeEvents
(处理时间事件):取得当前时间,循环时间事件链表,如果当前时间>=预订执行时间,则执行时间处理函数。
以上就是本文的全部内容。欢迎小伙伴们积极留言交流~~~
文章评论