Redis源码分析
Redis源码分析
1.Redis准备
1.1 简介
Redis(REmote DIctionary Server)是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。需要注意他和leveldb的区别。()
1.2 特性
- 性能高
- 丰富的数据类型
- 支持事务
- 内建replication及集群
- 支持持久化
- 单线程,原子性操作
1.3 源码编译与调试
1.3.1源码下载
git地址:<https://github.com/redis/redis/tree/5.0>
或者这里: Index of /releases/
wget也可以下载
wget http://download.redis.io/releases/redis-**6.0.8**.tar.gz下载后解压安装包
tar xzf redis-6.0.8.tar.gz1.3.2执行编译
- 修改配置文件中的daemon为yes
- 禁用gcc编译优化,将makefile文件中OPTIMIZATION?=-O2修为-O0,可直接使用源码包下的二进制程序
- gdb redis-server [conf],配合gdb相关命令
- 也可以直接开启server.c的main测试函数
1.4 压测
Redis-benchmark是官方自带的Redis性能测试工具,可以有效的测试Redis服务的性能。
redis-benchmark [-h <host>] [-p <port>] [-c <clients>] [-n <requests]> [-k <boolean>]2.基本数据结构
2.1 简单动态字符串(SDS)
2.1.1数据结构
redis为了节省内存,针对不同长度的数据采用不同的数据结构。如下共五种,但SDS_TYPE_5并不使用,因为该类型不会存放数据长度,每次都需要进行分配和释放:
#define SDS_TYPE_5 0
#define SDS_TYPE_8 1
#define SDS_TYPE_16 2
#define SDS_TYPE_32 3
#define SDS_TYPE_64 4以type=1为例:
typedef char* sds
/*
__attribute__ ((packed)) 的作用就是告诉编译器取消结构在编译过程中的优化对齐,按照实际占用字节数进行对齐,是GCC特有的语法
*/
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; /* 数据长度 */
uint8_t alloc; /* 去掉头和null结束符,有效长度+数据长度*/
unsigned char flags; /* 3 lsb of type, 5 unused bits,小端*/
//变长数据
char buf[];
};2.1.2 空间扩容
- 当前有效长度>=新增长度,直接返回
- 更新之后,判断新旧类型是否一致:
- 一致使用remalloc,否则使用malloc+free
- a.当前有效长度>=新增长度,直接返回
- 一致使用remalloc,否则使用malloc+free
- 增长步长:
- 新增后长度小于预分配长度(1024*1024),扩大一倍;
- 新增后长度大于等于预分配的长度,每次加预分配长度(减少不必要的内存)
sds sdsMakeRoomFor(sds s, size_t addlen) {
void *sh, *newsh;
size_t avail = sdsavail(s);
size_t len, newlen;
char type, oldtype = s[-1] & SDS_TYPE_MASK;
int hdrlen;
/* Return ASAP if there is enough space left. */
//当前有效长度>=新增长度,直接返回
if (avail >= addlen) return s;
len = sdslen(s);
sh = (char*)s-sdsHdrSize(oldtype);
newlen = (len+addlen);
//新增后长度小于预分配长度(1024*1024),扩大一倍;SDS_MAX_PREALLOC=1024*1024
if (newlen < SDS_MAX_PREALLOC)
newlen *= 2;
//新增后长度大于等于预分配的长度,每次加预分配长度
else
newlen += SDS_MAX_PREALLOC;
type = sdsReqType(newlen);
/* Don't use type 5: the user is appending to the string and type 5 is
* not able to remember empty space, so sdsMakeRoomFor() must be called
* at every appending operation. */
if (type == SDS_TYPE_5) type = SDS_TYPE_8;
// 新老类型一致使用remalloc,否则使用malloc+freea.当前有效长度>=新增长度,直接返回
hdrlen = sdsHdrSize(type);
if (oldtype==type) {
newsh = s_realloc(sh, hdrlen+newlen+1);
if (newsh == NULL) return NULL;
s = (char*)newsh+hdrlen;
} else {
/* Since the header size changes, need to move the string forward,
* and can't use realloc */
// 不一致则需要重新分配内存
newsh = s_malloc(hdrlen+newlen+1);
if (newsh == NULL) return NULL;
memcpy((char*)newsh+hdrlen, s, len+1);
s_free(sh);
s = (char*)newsh+hdrlen;
s[-1] = type;
sdssetlen(s, len);
}
sdssetalloc(s, newlen);
return s;
}2.1.3 空间缩容
在trim操作时,采用的是惰性空间释放即:不会立即使用内存重分配来回收缩短的字节,只是进行移动和标记,并修改数据长
sds sdstrim(sds s, const char *cset) {
char *start, *end, *sp, *ep;
size_t len;
sp = start = s;
ep = end = s+sdslen(s)-1;
while(sp <= end && strchr(cset, *sp)) sp++;
while(ep > sp && strchr(cset, *ep)) ep--;
len = (sp > ep) ? 0 : ((ep-sp)+1);
if (s != sp) memmove(s, sp, len);
s[len] = '\0';
sdssetlen(s,len);
return s;
}- 真正的删除被放在后续操作中见tryObjectEncoding
if (o->encoding == OBJ_ENCODING_RAW &&
sdsavail(s) > len/10)
{
o->ptr = sdsRemoveFreeSpace(o->ptr);
}2.1.4优点
- 常量获取字符串长度(len)
- 避免缓冲区溢出
- 减少字符串修改带来的内存频繁重分配次数
- 二进制操作安全:可以保持文本数据,也可以保持任意格式的二进制数据(如视频流数据)
- 以’\0’结尾,使其兼容部分C字符串函数
2.1.5其他
- sds是char*的别名,可以理解为分配的是一块连续内存(表头+数据),根据局部性原理可以提高访问速度。
- 数据存储不使用SDS_TYPE_5,因为使用该类型每次新数据时,都需要进行扩充
- 利用C语言内存布局,在sds结构体中使用了一个0长度的数组,既可以达到变长,又能保证内存也是连续的,因此在sds一列操作中,看到使用s[-1]这样的操作搞到惊讶,当然这里的s指向的是buf位置。
/* sdsalloc() = sdsavail() + sdslen() */
static inline size_t sdsalloc(const sds s) {
//主要是看这句,s指向的是数据部分
unsigned char flags = s[-1];
return 0;
}2.2链表
2.2.1数据结构
redis的链表没啥特别之处,就是普通的双向链表。
2.2.2 迭代器
这里链表的迭代和数据是分开的,采用了类似迭代器模式,这个思想被用到很多场景如后面要分析的leveldb中,可以搜索一下迭代器设计模式。
typedef struct listIter {
listNode *next;
int direction;
} listIter;
listIter *listGetIterator(list *list, int direction)
{
listIter *iter;
if ((iter = zmalloc(sizeof(*iter))) == NULL) return NULL;
if (direction == AL_START_HEAD)
iter->next = list->head;
else
iter->next = list->tail;
iter->direction = direction;
return iter;
}2.3字典
2.3.1数据结构
typedef struct dictEntry {
//key
void *key;
//这里采用了union类型,节省内存
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
//下一个节点,因此可以看出来,采用的链地址法.针对新节点采用头插法
struct dictEntry *next;
} dictEntry;
//dict相关的操作函数,以函数指针的方式存在
typedef struct dictType {
uint64_t (*hashFunction)(const void *key);
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
} dictType;
//每个hashtable都会有两个,将旧的复制到新的
/* This is our hash table structure. Every dictionary has two of this as we
* implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
dictEntry **table;
unsigned long size; //hash表的大小
unsigned long sizemask;//mask计算索引值
unsigned long used;//表示已经使用的个数
} dictht;
typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2];//新的dictht和旧的dictht,一般只会使用0,ht[1]哈希表只会对ht[0]哈希表进行rehash操作
//如果rehashidx==-1表示不会进行rehash
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
unsigned long iterators; /* number of iterators currently running */
} dict;2.3.2重点解释
2.3.2.1 数据结构
- dictht中的size每次都是<=2^s来进行分配
- dict中的rehashindex=-1|#标记是否在进行rehash,默认-1表示未进行,#表示当前dt[0]中第#个tb实例
等于-1:
不等于-1:
- dictht中的sizemask用于计算当前的key位于哪一个dictEntry。其大小等于size-1,index = hash & dict->ht[x].sizemask;使用&运算要比%性能更高
- dict ht在第一次初始化时只会启用ht[0],ht[1]在整个dict扮演着临时存储的作用
2.3.2.2 哈希算法
- 哈希算法
在redis中hash默认使用的是siphash算法(当然也可以自定义)。计算哈希值和索引值的方法如下:
1、使用字典设置的哈希函数,计算键 key 的哈希值
hash = dict->type->hashFunction(key);
2、使用哈希表的sizemask属性和第一步得到的哈希值,计算索引值
index = hash & dict->ht[x].sizemask;相当于取模运算
- 哈希冲突
redis解决哈希冲突的方法是链地址法,而且采用的是头插式
- 哈希扩容
采用链地址法来解决冲突,如果数据量太大,大量的冲突会导致某个桶上的链表非常长,不利于数据查询,因此需要根据负载因子(负载因子用来评估键冲突的概率)来判断当前是否需要进行扩容,见函数_dictExpandIfNeeded。其中resize还与是否正在进行持久化有关:
void updateDictResizePolicy(void) {
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
dictEnableResize();
else
dictDisableResize();
}_dictExpandIfNeeded函数如下:
思考一下:为什么持久化模式下,尽量的减少resize捏??
- 缩容
redis中进行resize的条件是由两处决定的即htNeedsResize和dictResize。
- 超过了初始值且填充率小于10%,这个说明需要缩容。
/* Hash table parameters */
#define HASHTABLE_MIN_FILL 10 /* Minimal hash table fill 10% */
/* This is the initial size of every hash table */
#define DICT_HT_INITIAL_SIZE 4
int htNeedsResize(dict *dict) {
long long size, used;
size = dictSlots(dict);
used = dictSize(dict);
return (size > DICT_HT_INITIAL_SIZE &&
(used*100/size < HASHTABLE_MIN_FILL));
}发生的时机主要是在每次数据移除和serverCron定时任务中:
serverCron定时任务:
void databasesCron(void) {
//......
/* Resize */
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % server.dbnum);
resize_db++;
}
//......
}最后一张图总结一下
2.3.2.3 rehash优化
- rehash过程,ht[0]中会存在空节点,为了防止阻塞,每次限定如果连续访问10*N(N表示步长)个空节点后,直接返回。见dictRehash函数
- 渐进式hash。是指rehash操作不是一次性、集中式完成的。
- 第一类:正常操作见上面的dictRehash函数。但是对于Redis来说,如果Hash表的key太多,这样可能导致rehash操作需要长时间进行,阻塞服务器,所以Redis本身将rehash操作分散在了后续的每次增删改查中(以桶为单位)。
- 第二类:针对第一类间接式rehash,存在一个问题:如果a服务器长时间处于空闲状态,导致哈希表长期使用0和1号两个表。为解决这个问题,在serverCron定时函数中,每次拿出1ms时间来执行Rehash操作,每次步长为100,但需要开启activerehashing。见databasesCron函数
void databasesCron(void) {
/* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad
* as will cause a lot of copy-on-write of memory pages. */
if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
/* We use global counters so if we stop the computation at a given
* DB we'll be able to start from the successive in the next
* cron loop iteration. */
static unsigned int resize_db = 0;
static unsigned int rehash_db = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
int j;
/* Don't test more DBs than we have. */
if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
/* Resize */
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % server.dbnum);
resize_db++;
}
/* Rehash:渐进式hash */
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db);
if (work_done) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
break;
} else {
/* If this db didn't need rehash, we'll try the next one. */
rehash_db++;
rehash_db %= server.dbnum;
}
}
}
}
}这里需要注意的是,在定时任务中不能花太长时间,防止其阻塞其他操作。
int incrementallyRehash(int dbid) {
/* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1; /* already used our millisecond for this loop... */
}
/* Expires */
if (dictIsRehashing(server.db[dbid].expires)) {
dictRehashMilliseconds(server.db[dbid].expires,1);
return 1; /* already used our millisecond for this loop... */
}
return 0;
}
/* Rehash in ms+"delta" milliseconds. The value of "delta" is larger
* than 0, and is smaller than 1 in most cases. The exact upper bound
* depends on the running time of dictRehash(d,100).*/
int dictRehashMilliseconds(dict *d, int ms) {
if (d->pauserehash > 0) return 0;
long long start = timeInMilliseconds();
int rehashes = 0;
while(dictRehash(d,100)) {
rehashes += 100;
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}rehash注意点
2.3.3迭代器
2.3.3.1数据结构
2.3.3.2迭代器类型
redis的dict迭代器分为两种类型,安全迭代器(safe=1)和非安全迭代器(safe=0)。
安全模式下,支持边遍历边修改(字典key的过期判断),支持字典的添加,删除,查找等操作,但是不支持rehash操作(避免重复遍历)。
非安全模式下,只支持只读操作,使用字典的删除、添加、查找等方法会造成不可预期的问题,如重复遍历元素或者漏掉元素,但支持rehash操作。
2.3.3.3 迭代器选择
- 为避免元素的重复遍历,必须使用迭代器的安全模式,如bgaofwrite以及bgsave操作。
- 遍历过程中需要处理元素,此时也必须要使用安全迭代器,如keys命令。
- 允许遍历过程中出现个别元素重复时,选择非安全迭代器,比如scan
2.3.4线段跳表
- 数据结构
- 特点
- 排序按照score来排序,如果是score相等,那么则按照ele来排序
- 平均查询时间复杂度为O(logn)。
2.4 整数集合(intset)
2.4.1数据结构
整数集合(intset)是redis用于保存整数值的集合抽象数据结构,他可以保存类型为16、32或者64位的整数值,且保证集合中不会出现重复元素,数据也是从小到大存储。
类型的选择如下
#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t))
/* Return the required encoding for the provided value. */
static uint8_t _intsetValueEncoding(int64_t v) {
if (v < INT32_MIN || v > INT32_MAX)
return INTSET_ENC_INT64;
else if (v < INT16_MIN || v > INT16_MAX)
return INTSET_ENC_INT32;
else
return INTSET_ENC_INT16;
}2.4.1数据操作
2.4.1.1查找
intset查找(intsetSearch)采用的是折半查找方式,时间复杂度为O(logN)
2.4.1.2插入与升级
当新插入的元素类型大于当前intset类型时,为防止溢出,会对其进行升级操作。见图中的“是”分支。
跟着例子一起来吧:
intset有一个优化,会先进行首尾的比较,如果不满足,此时也不需要进行二分查找了
2.4.1.2优点
- 提升灵活性
可以通过自动升级底层数组来适应新元素,所以可以将任意类型的整数添加至集合,而不必担心类型错误
- 节约内存
不同类型采用不同的类型的空间对其存储,从而避免空间浪费
- 降级:不支持降级
- 添加和删除
均需要进行remalloc操作,因此慎用。
2.5压缩列表(ziplist)
2.5.1数据结构
压缩列表是由一系列特殊编码的连续内存块组成的顺序型数据结构,一个压缩列表可以包含任意多个节点,每个节点可以保存一个字节数组或者一个整数值。适合存储小对象和长度有限的数据。
#define ZIP_END 255 /* Special "end of ziplist" entry. */
#define ZIP_BIG_PREVLEN 254 /* Max number of bytes of the previous entry, for
the "prevlen" field prefixing each entry, to be
represented with just a single byte. Otherwise
it is represented as FF AA BB CC DD, where
AA BB CC DD are a 4 bytes unsigned integer
representing the previous entry len. */
/* Different encoding/length possibilities */
#define ZIP_STR_MASK 0xc0 //1100 0000 字节数组的掩码
#define ZIP_INT_MASK 0x30 //0011 0000 整型掩码
#define ZIP_STR_06B (0 << 6) //0000 0000
#define ZIP_STR_14B (1 << 6)//0100 0000
#define ZIP_STR_32B (2 << 6) //1000 0000
#define ZIP_INT_16B (0xc0 | 0<<4) //1100 0000 | 0000 0000
#define ZIP_INT_32B (0xc0 | 1<<4)//1100 0000 | 0001 0000 =1101 0000
#define ZIP_INT_64B (0xc0 | 2<<4)//1100 0000 | 0010 0000 =1110 0000
#define ZIP_INT_24B (0xc0 | 3<<4)//1100 0000 | 0111 0000 =1111 0000
#define ZIP_INT_8B 0xfe //=1111 1110
/* 4 bit integer immediate encoding |1111xxxx| with xxxx between
* 0001 and 1101. */
#define ZIP_INT_IMM_MASK 0x0f /* Mask to extract the 4 bits value. To add
one is needed to reconstruct the value. */
#define ZIP_INT_IMM_MIN 0xf1 /* 11110001 */
#define ZIP_INT_IMM_MAX 0xfd /* 11111101 */
#define INT24_MAX 0x7fffff
#define INT24_MIN (-INT24_MAX - 1)
//判断是否字节数组,注意字节数组没有以"11"开头的
/* Macro to determine if the entry is a string. String entries never start
* with "11" as most significant bits of the first byte. */
#define ZIP_IS_STR(enc) (((enc) & ZIP_STR_MASK) < ZIP_STR_MASK)
/* Utility macros.*/
//从起始位置开始计算的uint32_t记录的是ziplist的总字节数
/* Return total bytes a ziplist is composed of. */
#define ZIPLIST_BYTES(zl) (*((uint32_t*)(zl)))
//从起始位置+uint32_t开始计算,往后uint32_t记录的是ziplist的tail偏移量
/* Return the offset of the last item inside the ziplist. */
#define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t))))
//ziplist总长度
/* Return the length of a ziplist, or UINT16_MAX if the length cannot be
* determined without scanning the whole ziplist. */
#define ZIPLIST_LENGTH(zl) (*((uint16_t*)((zl)+sizeof(uint32_t)*2)))
/* The size of a ziplist header: two 32 bit integers for the total
* bytes count and last item offset. One 16 bit integer for the number
* of items field. */
#define ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t))
/* Size of the "end of ziplist" entry. Just one byte. */
#define ZIPLIST_END_SIZE (sizeof(uint8_t))
/* Return the pointer to the first entry of a ziplist. */
#define ZIPLIST_ENTRY_HEAD(zl) ((zl)+ZIPLIST_HEADER_SIZE)
//通过偏移量,达到尾部
/* Return the pointer to the last entry of a ziplist, using the
* last entry offset inside the ziplist header. */
#define ZIPLIST_ENTRY_TAIL(zl) ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)))
//实体的尾部
/* Return the pointer to the last byte of a ziplist, which is, the
* end of ziplist FF entry. */
#define ZIPLIST_ENTRY_END(zl) ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)
//我们知道如果列表的长度超过了UINT16_MAX,此时zllen不在表示节点的个数,
//如果想要知道节点个数,那么必须要进行遍历
#define ZIPLIST_INCR_LENGTH(zl,incr) { \
if (ZIPLIST_LENGTH(zl) < UINT16_MAX) \
ZIPLIST_LENGTH(zl) = intrev16ifbe(intrev16ifbe(ZIPLIST_LENGTH(zl))+incr); \
}
/* We use this function to receive information about a ziplist entry.
* Note that this is not how the data is actually encoded, is just what we
* get filled by a function in order to operate more easily. */
typedef struct zlentry {
// prevrawlen是前一个节点的长度,prevrawlensize是指prevrawlen的大小,有1字节和5字节两种
unsigned int prevrawlensize; /* Bytes used to encode the previous entry len*/
unsigned int prevrawlen; /* Previous entry len. */
//len为当前节点长度 lensize为编码len所需的字节大小
unsigned int lensize; /* Bytes used to encode this entry type/len.
For example strings have a 1, 2 or 5 bytes
header. Integers always use a single byte.*/
unsigned int len; /* Bytes used to represent the actual entry.
For strings this is just the string length
while for integers it is 1, 2, 3, 4, 8 or
0 (for 4 bit immediate) depending on the
number range. */
// 当前节点的header大小prevrawlensize + lensize.
unsigned int headersize; /* prevrawlensize + lensize. */
//编码方式
unsigned char encoding; /* Set to ZIP_STR_* or ZIP_INT_* depending on
the entry encoding. However for 4 bits
immediate integers this can assume a range
of values and must be range-checked. */
//指向节点起始位置
unsigned char *p; /* Pointer to the very start of the entry, that
is, this points to prev-entry-len field. */
} zlentry;不再一点点看了.
对应关系如下
具体的转换函数如下:
/* Return a struct with all information about an entry. */
void zipEntry(unsigned char *p, zlentry *e) {
ZIP_DECODE_PREVLEN(p, e->prevrawlensize, e->prevrawlen);
ZIP_DECODE_LENGTH(p + e->prevrawlensize, e->encoding, e->lensize, e->len);
e->headersize = e->prevrawlensize + e->lensize;
e->p = p;
}数据为字符串
数据为整数时
2.5.2注意点
- 查找的时间复杂度为O(N)
- 列表的长度超过了UINT16_MAX,此时zllen不在表示节点的个数
- 连锁更新
- 在entry中prevlen会保存前一个节点的长度:
- 如果前一个节点的长度小于254字节,那么prevlen属性需要用1个字节
- 如果前一个节点的长度大于等于254字节,那么prevlen属性需要用5个字节空间来保存这个长度值
因此当插入或删除操作,会打破ziplist具有的特性,此时需要进行节点的更新。Redis中只处理节点的扩张即由1个字节变成5个字节,不进行收缩操作(为了防止反复出现缩小-扩大)。
最差的情况是:连锁更新对ziplist执行N次空间分配操作,而每次空间重分配的最坏复杂度为O(N),所以连锁更新最坏的情况是O(N^2)。
3.基础对象(redisObject)
3.1 数据结构
#define LRU_BITS 24
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;3.2 补充说明
3.2.1 位域
在redisObject数据结构中,采用了C中的位域来节省内存,位域操作非常方便。需要注意的就是他的移植性,比如某些嵌入式中1个字节不是8位,还有比如你的类型跨两个字节了,这样话可能会导致不可预期的后果,因此适当字节填充是必要的。当然不要被这两个问题吓着,毕竟位域要比位运算好操作多。
3.2.2时间计算
获取时间的函数属于系统调用,比较耗费资源,因此redis采用缓存的方式,在定时任务中定期更新见serverCron函数。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
//....
/* Update the time cache. */
updateCachedTime();
//....
}需要注意的是在lruclock中使用了atomicGet是因为可能在别的线程可能也会用到该时间,如集群状态。
4.字符串
4.1数据结构
redis中字符串是分三种类型的。首先我们回顾一下对象类型
char *strEncoding(int encoding) {
switch(encoding) {
case OBJ_ENCODING_RAW: return "raw";
case OBJ_ENCODING_INT: return "int";
case OBJ_ENCODING_HT: return "hashtable";
case OBJ_ENCODING_QUICKLIST: return "quicklist";
case OBJ_ENCODING_ZIPLIST: return "ziplist";
case OBJ_ENCODING_INTSET: return "intset";
case OBJ_ENCODING_SKIPLIST: return "skiplist";
case OBJ_ENCODING_EMBSTR: return "embstr";
default: return "unknown";
}
}由上可见字符串分为”raw”和”embstr”,其实还有一种就是int.
4.2 补充说明
tryObjectEncoding
其中int有分为两种一种是本身程序启动就已经创建好的,一种是需要手动创建的。
- embstr与raw分析
类型特点优点缺点embstr1.只分配一次内存空间,因此robj和sds是连续的;2.只读;3.Embstr字符串需要修改时,会转成raw,之后一直为raw1.创建和删除只需要一次; 2.寻找速度快1.重分配涉及到robj和sds整个对象,因此embstr是只读的raw1.robj和sds非连续; 2.可修改
可以使用命令
这里append会修改字符串底层的类型
- redis的字符串约定其长度不能大于512M
static int checkStringLength(client *c, long long size) {
if (size > 512*1024*1024) {
addReplyError(c,"string exceeds maximum allowed size (512MB)");
return C_ERR;
}
return C_OK;
}4.3 适用场景
- 缓存功能:mysql存储,redis做缓存
- 计数器:如点赞次数,视频播放次数
- 限流:见基于redis的分布式限流
public static final String DISTRIBUTE_LIMIT_SCRIPT = "local key = KEYS[1] \n"
+ "local limit = tonumber(ARGV[1]) \n"
+ "local current = tonumber(redis.call('get',key) or '0') \n"
+ " \n"
+ "if (limit < current + 1) then \n"
+ " return 0 \n"
+ "else \n"
+ " redis.call('incrby',key,'1') \n"
+ " redis.call('expire',key,'2') \n"
+ " return 1 \n"
+ "end \n";
/*
* 1表示通过,0表示为限流
* */
private static boolean tryAcquire(ResourceLimiterCfg apiRateLimiter,Cluster redisClient){
Long distributeLimitResult=1L;
try{
String key= RedisCachePrefix.DISTRIBUTE_LIMIT_PRE+
(StringUtils.isNotEmpty(apiRateLimiter.getPrefix())?apiRateLimiter.getPrefix()+SplitCharacter.SPLIT_COLON.key:"")+apiRateLimiter.getResource()+SplitCharacter.SPLIT_COLON.key+System.currentTimeMillis()/1000;
if(distributeLimit==null){
distributeLimit = redisClient.scriptLoad(RedisLuaScript.DISTRIBUTE_LIMIT_SCRIPT);
}
distributeLimitResult = (Long) redisClient.evalsha(distributeLimit, Collections.singletonList(key),
Collections.singletonList(apiRateLimiter.getPerSecCount().toString()), true, ScriptOutputType.INTEGER);
log.info("tryAcquire key={},distributeLimitResult={} ",key,distributeLimitResult );
}catch (Exception e){
log.error("tryAcquire={}",e);
e.printStackTrace();
}finally {
return distributeLimitResult.intValue()==1;
}
}4.4常见命令
5.列表
5.1数据结构
新版redis中的list实际上只有quicklist一种类型,而且是一个双向链表,因此我们来具体分析一下:
/* quicklistNode is a 32 byte struct describing a ziplist for a quicklist.
* We use bit fields keep the quicklistNode at 32 bytes.
* count: 16 bits, max 65536 (max zl bytes is 65k, so max count actually < 32k).
* encoding: 2 bits, RAW=1, LZF=2.
* container: 2 bits, NONE=1, ZIPLIST=2.
* recompress: 1 bit, bool, true if node is temporarry decompressed for usage.
* attempted_compress: 1 bit, boolean, used for verifying during testing.
* extra: 10 bits, free for future use; pads out the remainder of 32 bits */
typedef struct quicklistNode {
//双向链表的前节点
struct quicklistNode *prev;
//双向链表的后节点
struct quicklistNode *next;
///不设置压缩数据参数recompress时指向一个ziplist结构
//设置压缩数据参数recompress指向quicklistLZF结构
unsigned char *zl;
//压缩列表ziplist的总长度
unsigned int sz; /* ziplist size in bytes */
//每个ziplist中entry总个数
unsigned int count : 16; /* count of items in ziplist */
//表示是否采用了LZF压缩算法压缩quicklist节点,1表示压缩过,2表示没压缩,占2 bits长度
unsigned int encoding : 2; /* RAW==1 or LZF==2 */
//表示是否启用ziplist来进行存储
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
//记录该几点之前是否被压缩过
unsigned int recompress : 1; /* was this node previous compressed? */
//测试是使用
unsigned int attempted_compress : 1; /* node can't compress; too small */
//额外扩展位,占10bits长度
unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;
/*
当指定使用lzf压缩算法压缩ziplist的entry节点时,
quicklistNode结构的zl成员指向quicklistLZF结构
quicklistLZF is a 4+N byte struct holding 'sz' followed by 'compressed'.
* 'sz' is byte length of 'compressed' field.
* 'compressed' is LZF data with total (compressed) length 'sz'
* NOTE: uncompressed length is stored in quicklistNode->sz.
* When quicklistNode->zl is compressed, node->zl points to a quicklistLZF */
typedef struct quicklistLZF {
////表示被LZF算法压缩后的ziplist的大小
unsigned int sz; /* LZF size in bytes*/
//压缩之后的数据,柔性数组
char compressed[];
} quicklistLZF;
/* quicklist is a 40 byte struct (on 64-bit systems) describing a quicklist.
* 'count' is the number of total entries.
* 'len' is the number of quicklist nodes.
* 'compress' is: -1 if compression disabled, otherwise it's the number
* of quicklistNodes to leave uncompressed at ends of quicklist.
* 'fill' is the user-requested (or default) fill factor. */
typedef struct quicklist {
//链表表头
quicklistNode *head;
//链表表尾
quicklistNode *tail;
//所有quicklistnode节点中所有的entry个数
unsigned long count; /* total count of all entries in all ziplists */
//quicklistnode节点个数,也就是quicklist的长度
unsigned long len; /* number of quicklistNodes */
//单个节点的填充因子,也就是ziplist的大小
int fill : 16; /* fill factor for individual nodes */
//保存压缩程度值,配置文件设定,占16bits,0表示不压缩
unsigned int compress : 16; /* depth of end nodes not to compress;0=off */
} quicklist;
//quicklist中quicklistNode的entry结构
typedef struct quicklistEntry {
//指向所属的quicklist指针
const quicklist *quicklist;
//指向所属的quicklistNode节点的指针
quicklistNode *node;
//指向当前ziplist结构的指针
unsigned char *zi;
//指向当前ziplist结构的字符串value成员
unsigned char *value;
//指向当前ziplist结构的整型value成员
long long longval;
//当前ziplist结构的字节数
unsigned int sz;
//保存相对ziplist的偏移量
int offset;
} quicklistEntry;- 为什么会出现这种数据结构呢?
- 双向链表,插入和删除效率高,但是对内存不友好,而且需要存取额外的前后两个指针
- 数组为代表的连续内存,插入和删除时间复杂度高,但是对内存友好(局部性原理),因此综合了两个,催生了quicklist数据结构,其实在C++的stl中deque也是这种设计模式。
思考:如何设置来均衡链表长度和数组(ziplist)为代表的比例呢?我们来考虑一下两者极端情况,如果链表长度为1,那么就退化成一个ziplist了,此时可能会因为找不到一块很大的连续内存而导致OOM;如果ziplist为1,那么就退化成双向链表了,此时对内存是非常不友好的,而且会造成大量的内存碎片,影响性能?那怎么办呢?且看后面具体分析。
5.2重点分析
- fill: 16bit,控制着ziplist大小,存放
list-max-ziplist-size参数的值
list-max-ziplist-size -2可以看出,正值表示每个quicklist上ziplist的entry个数。
//在原来节点的基础上又需要新添加一个
REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
const int fill, const size_t sz) {
//...
//这里这里,count表示当前entry的总个数
else if ((int)node->count < fill)
return 1;
//....
}- compress: 16bit,节点压缩深度设置,存放
list-compress-depth参数的值
由于当数据很多时,最容易被访问的很可能是两端数据,中间的数据被访问的频率比较低(访问起来性能也很低)。如果应用场景符合这个特点,那么list还提供了一个选项,能够把中间的数据节点进行压缩,从而进一步节省内存空间。见配置参数list-compress-depth。
list-compress-depth 0注意他是node级别的,而不是entry级别的,但是node被压缩时,整个ziplist都会被压缩。
0: 是个特殊值,表示都不压缩。这是Redis的默认值。
1: 表示quicklist两端各有1个节点不压缩,中间的节点压缩。
2: 表示quicklist两端各有2个节点不压缩,中间的节点压缩。
3: 表示quicklist两端各有3个节点不压缩,中间的节点压缩。
依此类推…- recompress
执行lindex等操作后会对节点进行暂时解压,recompress表示之后某个时刻再进行压缩。
可以来看看插入的过程
#define sizeMeetsSafetyLimit(sz) ((sz) <= SIZE_SAFETY_LIMIT)
REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
const int fill, const size_t sz) {
if (unlikely(!node))
return 0;
//Redis源码分析(一)
int ziplist_overhead;
/* size of previous offset */
//新入插入一个节点之后,那么后一个节点就需要存储这个新节点,就需要分配空间
if (sz < 254)
ziplist_overhead = 1;
else
ziplist_overhead = 5;
/* size of forward offset */
//本身数据长度也需要空间来存储
if (sz < 64)
ziplist_overhead += 1;
else if (likely(sz < 16384))
ziplist_overhead += 2;
else
ziplist_overhead += 5;
/* new_sz overestimates if 'sz' encodes to an integer type */
//计算出新的长度
unsigned int new_sz = node->sz + sz + ziplist_overhead;
if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
return 1;
//压缩链表单个ziplist最大字节数
else if (!sizeMeetsSafetyLimit(new_sz))
return 0;
// 个数限制
else if ((int)node->count < fill)
return 1;
else
return 0;
}5.3列表特性
- 阻塞和非阻塞
List的部分操作支持阻塞和非阻塞。重点看阻塞——当所有给定的key中不存在,或者key中包含的是空列表,那么 BLPOP 或 BRPOP 命令将会被阻塞连接,直到另一个client对这些key中执行 [LR]PUSH 命令将一个新数据出现在任意key的列表中,那么这个命令会解除调用BLPOP 或 BRPOP 命令的client的阻塞状态。
其实实现很简单,判断是否超时了,或者有数据了,否则不返回就行了。
BLPOP key [key ...] timeout
BRPOP key [key ...] timeout
BRPOPLPUSH source destination timeout5.4使用场景
- 常见数据结构
- lpush+lpop=Stack(栈)
- lpush+rpop=Queue(队列)
- lpush+ltrim=Capped Collection(有限集合)
- lpush+brpop=Message Queue(消息队列)
- 文章列表
可以使用组合的方式文章用hash存储,而文章列表则使用list存储
5.5常见命令
6.哈希(hash)
6.1数据结构
Redis的hash对象,采用了两种方式来实现的。前面分析过连续内存和非连续内存各自的优缺点,在这里hash表也折中了这两种情况。
两种存储结构:
6.2注意点
- 由ziplist转dict的操作是不可逆的。
- 尽可能的使用ziplist来作为hash底层实现。长度尽量控制在1000以内,否则由于存取操作时间复杂度O(n),长列表会导致CPU消耗严重。对象也不要太大。
- 两个参数【hash-max-ziplist-entries和hash-max-ziplist-value】可在配置文件中进行修改
- ziplist作为底层对象时,其查找的时间复杂度为O(n)
6.3使用场景
- 存储对象(定长或者不定长属性)
- 原生字符串
set user:1:name tom
set user:1:age 23
set user:1:city beijing
优点:简单直观,每个属性都支持单独更新
缺点:占用过多的键,内存占用量大,同时用户信息内聚性差,生产环境比较少用
序列化字符串(结合pb一起使用,使用的最为广泛)
set user:1 serialize(userInfo)
优点:简化编程,如果合理利用序列化,可以提高内存的使用率
缺点:序列化和反序列化有一定的开销,同时每次更新属性都需要把全部数据都提取出来进行反序列化,更新后在重新序列化
哈希类型
hmset user:1 name tom age 23 city beijing
- 优点:简化编程,如果合理利用序列化,可以提高内存的使用率。每个用户属性使用一对field-value,但只用一个键保存。
- 缺点:要控制哈希在ziplist和hashtable两种内部编码的转换,hashtable会消耗更多内存。
6.4常见操作
7.集合(set)
7.1数据结构
Redis的set对象,也是采用了两种方式来实现的。
7.2注意点
- 由intset转dict的操作是不可逆的。
- set是不允许重复的。
- 支持交并补
- 参数【set-max-intset-entries】可在配置文件中进行修改
- dict作为底层对象时,value值为null
- intset作为底层对象时,其查找的时间复杂度为O(logN)
7.3使用场景
- 标签:主要是用于社交里面感兴趣的内容(注意尽量保证一个事务下完成)
- 用户添加标签
sadd user:1:tags tag1 tag2 tag5
sadd user:2:tags tag2 tag3 tag5
...
sadd user:k:tags tag1 tag2 tag4
- 给标签标记用户
sadd tag1:users user:1 user:3
sadd tag2:users user:1 user:2 user:3
...
sadd tagk:users user:1 user:2
...
- 计算用户共同感兴趣的标签
sinter user:1:tags user:2:tags
抽奖中随机数
spop/srandmember
7.4常见操作
8.有序集合(zset)
8.1数据结构
typedef struct zset {
dict *dict;//字段
zskiplist *zsl;//线段跳表
} zset;ziplist插入时候
/* Insert (element,score) pair in ziplist. This function assumes the element is
* not yet present in the list. */
unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
unsigned char *eptr = ziplistIndex(zl,0), *sptr;
double s;
// 排序的过程
while (eptr != NULL) {
sptr = ziplistNext(zl,eptr);
serverAssert(sptr != NULL);
s = zzlGetScore(sptr);
if (s > score) {
/* First element with score larger than score for element to be
* inserted. This means we should take its spot in the list to
* maintain ordering. */
// 元素,score
zl = zzlInsertAt(zl,eptr,ele,score);
break;
} else if (s == score) {
/* Ensure lexicographical ordering for elements. */
if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) {
zl = zzlInsertAt(zl,eptr,ele,score);
break;
}
}
/* Move to next element. */
eptr = ziplistNext(zl,sptr);
}
/* Push on tail of list when it was not yet inserted. */
if (eptr == NULL)
zl = zzlInsertAt(zl,NULL,ele,score);
return zl;
}ziplist----->skiplist
8.2注意点
- 在skiplist的基础上,还需要创建dict的原因是当需要获取某个元素的score时,skplist的时间复杂度为O(lgn),而dict时间复杂度为O(1).(见zsetAdd)。需要特别注意的是当底层为ziplist时,该操作依旧为O(n)
int zsetScore(robj *zobj, sds member, double *score) {
if (!zobj || !member) return C_ERR;
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
if (zzlFind(zobj->ptr, member, score) == NULL) return C_ERR;
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
dictEntry *de = dictFind(zs->dict, member);
if (de == NULL) return C_ERR;
*score = *(double*)dictGetVal(de);
} else {
serverPanic("Unknown sorted set encoding");
}
return C_OK;
}
//重点的zzlFind
unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) {
unsigned char *eptr = ziplistIndex(zl,0), *sptr;
while (eptr != NULL) {
sptr = ziplistNext(zl,eptr);
serverAssert(sptr != NULL);
if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) {
/* Matching element, pull out score. */
if (score != NULL) *score = zzlGetScore(sptr);
return eptr;
}
/* Move to next element. */
eptr = ziplistNext(zl,sptr);
}
return NULL;
}- skiplist和dict共享元素和分值(指针复制)。
- 由ziplist转skiplist的操作是不可逆的。
- 两个参数【zset-max-ziplist-entries和zset-max-ziplist-value 】可在配置文件中进行修改。
- zset也不允许重复。
8.3使用场景
- 优先队列
- 排行榜系统:主要是视频网站需要对用户上传的视频做排行榜,榜单的维度可能是多个方面的:按照时间、按照播放数量、按照获得的赞数
- 添加用户赞数
zadd user:ranking:2016_03_15 3 mike
- 有人点赞
zincrby user:ranking:2016_03_15 1 mike
- 取消点赞
zrem user:ranking:2016_03_15 mike
- 展示获取赞数最多的十个用户
zrevrange user:ranking:2016_03_15 0 9
- 展示用户信息以及用户分数和排名
hgetall user:info:tom
zscore user:ranking:2016_03_15 mike
zrank user:ranking:2016_03_15 mike
8.4常见操作
9.redis的网络模型
redis采用的是单线程reactor。单机压测QPS可以达到10w,因此不要小看单线程的reactor,在选型时要慎重,不是说越复杂的东西就越好,适合才是最好的。
9.1流程图
9.2注意点
对一个网络库而言,主要关心的是三大类事件:文件事件、定时事件以及信号。在redis中文件事件和定时事件被添加至I/O复用中进行统一管理,而信号则通过信号处理函数来异步处理。
9.2.1定时事件:
实际上redis支持的是周期任务事件,即执行完之后不会删除,而是在重新插入链表。
定时器采用链表的方式进行管理,新定时任务插入链表表头。
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}具体定时事件处理如下
/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
/* If the system clock is moved to the future, and then set back to the
* right value, time events may be delayed in a random way. Often this
* means that scheduled operations will not be performed soon enough.
*
* Here we try to detect system clock skews, and force all the time
* events to be processed ASAP when this happens: the idea is that
* processing events earlier is less dangerous than delaying them
* indefinitely, and practice suggests it is. */
//如果系统时间被修改至未来,然后又返回正确的值,此时定时事件可能会被随机推迟。
//在这里的策略就是提前执行定时器要比延迟执行更安全
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
//更新为当前的时间
eventLoop->lastTime = now;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
//在这里就是删除定时器
while(te) {
long now_sec, now_ms;
long long id;
//在下一轮中对事件进行删除
/* Remove events scheduled for deletion. */
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}
/* Make sure we don't process time events created by time events in
* this iteration. Note that this check is currently useless: we always
* add new timers on the head, however if we change the implementation
* detail, this check may be useful again: we keep it here for future
* defense. */
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
// timeProc 函数的返回值 retval 为时间事件执行的时间间隔
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
//如果超时了,那么则标记位删除
te->id = AE_DELETED_EVENT_ID;
}
}
//进行下一个
te = te->next;
}
return processed;
}9.2.2信号:
在initserver中注册信号处理函数sigShutdownHandler,在信号处理函数中主要是将shutdown_asap置为1,当然如果之前已经为1,那么直接exit。否则将会在serverCron函数的prepareForShutdown中执行收尾工作,进行程序的优雅关闭。
void setupSignalHandlers(void) {
struct sigaction act;
/* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
* Otherwise, sa_handler is used. */
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
act.sa_handler = sigShutdownHandler;
sigaction(SIGTERM, &act, NULL);
sigaction(SIGINT, &act, NULL);
return;
}sigShutdownHandler具体函数
static void sigShutdownHandler(int sig) {
char *msg;
switch (sig) {
case SIGINT:
msg = "Received SIGINT scheduling shutdown...";
break;
case SIGTERM:
msg = "Received SIGTERM scheduling shutdown...";
break;
default:
msg = "Received shutdown signal, scheduling shutdown...";
};
/* SIGINT is often delivered via Ctrl+C in an interactive session.
* If we receive the signal the second time, we interpret this as
* the user really wanting to quit ASAP without waiting to persist
* on disk. */
if (server.shutdown_asap && sig == SIGINT) {
serverLogFromHandler(LL_WARNING, "You insist... exiting now.");
rdbRemoveTempFile(getpid());
exit(1); /* Exit with an error since this was not a clean shutdown. */
} else if (server.loading) {
exit(0);
}
serverLogFromHandler(LL_WARNING, msg);
server.shutdown_asap = 1;
}10.Redis通信协议
Redis基于RESP(Redis Serialization Protocol)协议来完成客户端和服务端通信。RESP本质是一种文本协议,实现简单、易于解析。
由于我的客户端和服务端在同一台机器上,因此协议栈做了优化,直接走本地回环接口
tcpdump -i lo port 6379 -Ann细心的大佬可能发现,哎?不对呀。为啥我在客户端不是这样的?
[0 root@migu ~]# redis-cli
127.0.0.1:6379> set name bigold ex 1
OK
127.0.0.1:6379> mset n1 n1 n2 n2
OK那是因为客户端对其进行转化,才显示出来的,见redis-cli.c文件:
static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
sds out = sdsempty();
switch (r->type) {
case REDIS_REPLY_ERROR:
out = sdscatprintf(out,"(error) %s\n", r->str);
break;
case REDIS_REPLY_STATUS:
out = sdscat(out,r->str);
out = sdscat(out,"\n");
break;
case REDIS_REPLY_INTEGER:
out = sdscatprintf(out,"(integer) %lld\n",r->integer);
break;
case REDIS_REPLY_STRING:
/* If you are producing output for the standard output we want
* a more interesting output with quoted characters and so forth */
out = sdscatrepr(out,r->str,r->len);
out = sdscat(out,"\n");
break;
case REDIS_REPLY_NIL:
out = sdscat(out,"(nil)\n");
break;
case REDIS_REPLY_ARRAY:
if (r->elements == 0) {
out = sdscat(out,"(empty list or set)\n");
} else {
unsigned int i, idxlen = 0;
char _prefixlen[16];
char _prefixfmt[16];
sds _prefix;
sds tmp;
/* Calculate chars needed to represent the largest index */
i = r->elements;
do {
idxlen++;
i /= 10;
} while(i);
/* Prefix for nested multi bulks should grow with idxlen+2 spaces */
memset(_prefixlen,' ',idxlen+2);
_prefixlen[idxlen+2] = '\0';
_prefix = sdscat(sdsnew(prefix),_prefixlen);
/* Setup prefix format for every entry */
snprintf(_prefixfmt,sizeof(_prefixfmt),"%%s%%%ud) ",idxlen);
for (i = 0; i < r->elements; i++) {
/* Don't use the prefix for the first element, as the parent
* caller already prepended the index number. */
out = sdscatprintf(out,_prefixfmt,i == 0 ? "" : prefix,i+1);
/* Format the multi bulk entry */
tmp = cliFormatReplyTTY(r->element[i],_prefix);
out = sdscatlen(out,tmp,sdslen(tmp));
sdsfree(tmp);
}
sdsfree(_prefix);
}
break;
default:
fprintf(stderr,"Unknown reply type: %d\n", r->type);
exit(1);
}+
return out;
}这里我们可以使用nc命令,来代替redis-cli命令行:
nc 127.0.0.1 6379
set a a
+OK
get a b
-ERR wrong number of arguments for 'get' command- 其他说明
- 常见的错误
#define REDIS_ERR -1
#define REDIS_OK 0
/* When an error occurs, the err flag in a context is set to hold the type of
* error that occurred. REDIS_ERR_IO means there was an I/O error and you
* should use the "errno" variable to find out what is wrong.
* For other values, the "errstr" field will hold a description. */
#define REDIS_ERR_IO 1 /* Error in read or write */
#define REDIS_ERR_EOF 3 /* End of file */
#define REDIS_ERR_PROTOCOL 4 /* Protocol error */
#define REDIS_ERR_OOM 5 /* Out of memory */
#define REDIS_ERR_OTHER 2 /* Everything else... */
#define REDIS_REPLY_STRING 1
#define REDIS_REPLY_ARRAY 2
#define REDIS_REPLY_INTEGER 3
#define REDIS_REPLY_NIL 4
#define REDIS_REPLY_STATUS 5
#define REDIS_REPLY_ERROR 6- 字符串错误信息——共享对象
void createSharedObjects(void) {
int j;
shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n"));
shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n"));
shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n"));
shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n"));
shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n"));
shared.cnegone = createObject(OBJ_STRING,sdsnew(":-1\r\n"));
shared.nullbulk = createObject(OBJ_STRING,sdsnew("$-1\r\n"));
shared.nullmultibulk = createObject(OBJ_STRING,sdsnew("*-1\r\n"));
shared.emptymultibulk = createObject(OBJ_STRING,sdsnew("*0\r\n"));
shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n"));
shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n"));
shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n"));
shared.wrongtypeerr = createObject(OBJ_STRING,sdsnew(
"-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"));
shared.nokeyerr = createObject(OBJ_STRING,sdsnew(
"-ERR no such key\r\n"));
shared.syntaxerr = createObject(OBJ_STRING,sdsnew(
"-ERR syntax error\r\n"));
shared.sameobjecterr = createObject(OBJ_STRING,sdsnew(
"-ERR source and destination objects are the same\r\n"));
shared.outofrangeerr = createObject(OBJ_STRING,sdsnew(
"-ERR index out of range\r\n"));
shared.noscripterr = createObject(OBJ_STRING,sdsnew(
"-NOSCRIPT No matching script. Please use EVAL.\r\n"));
shared.loadingerr = createObject(OBJ_STRING,sdsnew(
"-LOADING Redis is loading the dataset in memory\r\n"));
shared.slowscripterr = createObject(OBJ_STRING,sdsnew(
"-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
shared.masterdownerr = createObject(OBJ_STRING,sdsnew(
"-MASTERDOWN Link with MASTER is down and replica-serve-stale-data is set to 'no'.\r\n"));
shared.bgsaveerr = createObject(OBJ_STRING,sdsnew(
"-MISCONF Redis is configured to save RDB snapshots, but it is currently not able to persist on disk. Commands that may modify the data set are disabled, because this instance is configured to report errors during writes if RDB snapshotting fails (stop-writes-on-bgsave-error option). Please check the Redis logs for details about the RDB error.\r\n"));
shared.roslaveerr = createObject(OBJ_STRING,sdsnew(
"-READONLY You can't write against a read only replica.\r\n"));
shared.noautherr = createObject(OBJ_STRING,sdsnew(
"-NOAUTH Authentication required.\r\n"));
shared.oomerr = createObject(OBJ_STRING,sdsnew(
"-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
shared.execaborterr = createObject(OBJ_STRING,sdsnew(
"-EXECABORT Transaction discarded because of previous errors.\r\n"));
shared.noreplicaserr = createObject(OBJ_STRING,sdsnew(
"-NOREPLICAS Not enough good replicas to write.\r\n"));
shared.busykeyerr = createObject(OBJ_STRING,sdsnew(
"-BUSYKEY Target key name already exists.\r\n"));
shared.space = createObject(OBJ_STRING,sdsnew(" "));
shared.colon = createObject(OBJ_STRING,sdsnew(":"));
shared.plus = createObject(OBJ_STRING,sdsnew("+"));
for (j = 0; j < PROTO_SHARED_SELECT_CMDS; j++) {
char dictid_str[64];
int dictid_len;
dictid_len = ll2string(dictid_str,sizeof(dictid_str),j);
shared.select[j] = createObject(OBJ_STRING,
sdscatprintf(sdsempty(),
"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, dictid_str));
}
....
}11.Redis命令对象
Redis的命令使用的是redisCommand数据结构来管理的.
11.1数据结构
typedef void redisCommandProc(client *c);
typedef int *redisGetKeysProc(struct redisCommand *cmd, robj **argv, int argc, int *numkeys);
struct redisCommand {
char *name;
redisCommandProc *proc;
int arity;
char *sflags; /* Flags as string representation, one char per flag. */
int flags; /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line.
* Used for Redis Cluster redirect. */
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey; /* The last argument that's a key */
int keystep; /* The step between first and last key */
long long microseconds, calls;
};针对sflag标识,在这里引用《Redis设计与实现》中的一张图
flag记录的是flag值与sflag进行位运算的结果,见populateCommandTable函数
for (j = 0; j < numcommands; j++) {
struct redisCommand *c = redisCommandTable+j;
char *f = c->sflags;
int retval1, retval2;
while(*f != '\0') {
switch(*f) {
case 'w': c->flags |= CMD_WRITE; break;
case 'r': c->flags |= CMD_READONLY; break;
case 'm': c->flags |= CMD_DENYOOM; break;
case 'a': c->flags |= CMD_ADMIN; break;
case 'p': c->flags |= CMD_PUBSUB; break;
case 's': c->flags |= CMD_NOSCRIPT; break;
case 'R': c->flags |= CMD_RANDOM; break;
case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break;
case 'l': c->flags |= CMD_LOADING; break;
case 't': c->flags |= CMD_STALE; break;
case 'M': c->flags |= CMD_SKIP_MONITOR; break;
case 'k': c->flags |= CMD_ASKING; break;
case 'F': c->flags |= CMD_FAST; break;
default: serverPanic("Unsupported command flag"); break;
}
f++;
}具体命令太多,只给出如下几个:
struct redisCommand redisCommandTable[] = {
{"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
{"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
{"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
{"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
{"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
{"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
{"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
{"unlink",unlinkCommand,-2,"wF",0,NULL,1,-1,1,0,0},
{"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
{"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
{"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
}以set为例{“set”,setCommand,-3,”wm”,0,NULL,1,1,1,0,0}
12.客户端
12.1数据结构
/* With multiplexing we need to take per-client state.
* Clients are taken in a linked list. */
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
int fd; /* Client socket. */
redisDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */
// 发给客户端的缓冲区
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
sds pending_querybuf; /* If this client is flagged as master, this buffer
represents the yet not applied portion of the
replication stream that we are receiving from
the master. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
int reqtype; /* Request protocol type: PROTO_REQ_* */
int multibulklen; /* Number of multi bulk arguments left to read. */
long bulklen; /* Length of bulk argument in multi bulk request. */
list *reply; /* List of reply objects to send to the client. */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
size_t sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; /* Client creation time. */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
int flags; /* Client flags: CLIENT_* macros. */
int authenticated; /* When requirepass is non-NULL. */
int replstate; /* Replication state if this is a slave. */
int repl_put_online_on_ack; /* Install slave write handler on ACK. */
int repldbfd; /* Replication DB file descriptor. */
off_t repldboff; /* Replication DB file offset. */
off_t repldbsize; /* Replication DB file size. */
sds replpreamble; /* Replication DB preamble. */
long long read_reploff; /* Read replication offset if this is a master. */
long long reploff; /* Applied replication offset if this is a master. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
multiState mstate; /* MULTI/EXEC state */
int btype; /* Type of blocking op if CLIENT_BLOCKED. */
blockingState bpop; /* blocking state */
long long woff; /* Last write global replication offset. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
sds peerid; /* Cached peer ID. */
listNode *client_list_node; /* list node in client list */
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;第一眼看上去,发现,我擦一个客户端数据结构是真的多啊。这让我杂记啊,没事。不用全部都记住。记几个重要的就行。
12.2配置文件
12.3客户端命令
"id -- Return the ID of the current connection.",
"getname -- Return the name of the current connection.",
"kill <ip:port> -- Kill connection made from <ip:port>.",
"kill <option> <value> [option value ...] -- Kill connections. Options are:",
" addr <ip:port> -- Kill connection made from <ip:port>",
" type (normal|master|replica|pubsub) -- Kill connections by type.",
" skipme (yes|no) -- Skip killing current connection (default: yes).",
"list [options ...] -- Return information about client connections. Options:",
" type (normal|master|replica|pubsub) -- Return clients of specified type.",
"pause <timeout> -- Suspend all Redis clients for <timout> milliseconds.",
"reply (on|off|skip) -- Control the replies sent to the current connection.",
"setname <name> -- Assign the name <name> to the current connection.",
"unblock <clientid> [TIMEOUT|ERROR] -- Unblock the specified blocked client."12.3.1client list->查看客户端的信息
- flags表示客户端类型N-表示普通客户端,M-表示master等等
int getClientTypeByName(char *name) {
if (!strcasecmp(name,"normal")) return CLIENT_TYPE_NORMAL;
else if (!strcasecmp(name,"slave")) return CLIENT_TYPE_SLAVE;
else if (!strcasecmp(name,"replica")) return CLIENT_TYPE_SLAVE;
else if (!strcasecmp(name,"pubsub")) return CLIENT_TYPE_PUBSUB;
else if (!strcasecmp(name,"master")) return CLIENT_TYPE_MASTER;
else return -1;
}
char *getClientTypeName(int class) {
switch(class) {
case CLIENT_TYPE_NORMAL: return "normal";
case CLIENT_TYPE_SLAVE: return "slave";
case CLIENT_TYPE_PUBSUB: return "pubsub";
case CLIENT_TYPE_MASTER: return "master";
default: return NULL;
}
}- obl代表固定缓冲区的长度
- oll代表动态缓冲区列表的长度
- omem代表使用的字节数
- events表示事件类型(r/w)
- cmd记录最后一次执行的命令
具体计算过程,见函数catClientInfoString
12.3.2 info clients–>查看所有客户端
- connected_clients:代表当前Redis节点的客户端连接数,需要重点监控,一旦超过maxclients,新的客户端连接将被拒绝。
- client_longest_output_list:当前所有输出缓冲区中队列对象个数的最大值。
- client_biggest_input_buf:当前所有输入缓冲区中占用的最大容量。
- blocked_clients:正在执行阻塞命令(例如blpop、brpop、brpoplpush)的客户端个数
12.4客户端关闭
- 调用client kill命令
- 不符合协议格式的命令
- 客户端超时
- 输入缓冲区超过阈值1G,受参数
RedisServer的client_max_querybuf_len````控制
#define PROTO_MAX_QUERYBUF_LEN (1024*1024*1024) /* 1GB max query buffer. */
server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN;
config_get_numerical_field("client-query-buffer-limit",server.client_max_querybuf_len);
剩下的readQueryFromClient函数中的- 输出缓冲区超出阈值(软限制和硬限制,超过硬限制客户端立即关闭,超过软限制且持续指定秒后再关闭)
格式为client-output-buffer-limit
client-output-buffer-limit normal 0 0 0
client-output-buffer-limit replica 256mb 64mb 60
client-output-buffer-limit pubsub 32mb 8mb 60
其中class表示客户端类型,超过hard limit客户端立即关闭,超过soft limit且持续soft seconds后再关闭。
class取值:
normal -> normal clients including MONITOR clients
slave -> slave clients
pubsub -> clients subscribed to at least one pubsub channel or pattern注意点:对于client的检测是在serverCron定时函数中进行。
#define CLIENTS_CRON_MIN_ITERATIONS 5
void clientsCron(void) {
/* Try to process at least numclients/server.hz of clients
* per call. Since normally (if there are no big latency events) this
* function is called server.hz times per second, in the average case we
* process all the clients in 1 second. */
int numclients = listLength(server.clients);
//看看这里,如果客户端很多,这个数字将很大,所以这里做了优化,看下面的分析
int iterations = numclients/server.hz;
mstime_t now = mstime();
/* Process at least a few clients while we are at it, even if we need
* to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract
* of processing each client once per second. */
if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ?
numclients : CLIENTS_CRON_MIN_ITERATIONS;
while(listLength(server.clients) && iterations--) {
client *c;
listNode *head;
/* Rotate the list, take the current head, process.
* This way if the client must be removed from the list it's the
* first element and we don't incur into O(N) computation. */
listRotate(server.clients);
head = listFirst(server.clients);
c = listNodeValue(head);
/* The following functions do different service checks on the client.
* The protocol is that they return non-zero if the client was
* terminated. */
if (clientsCronHandleTimeout(c,now)) continue;
if (clientsCronResizeQueryBuffer(c)) continue;
if (clientsCronTrackExpansiveClients(c)) continue;
}
}由于redis是单线程的,所以他不可能一直循环来检测客户端。其中客户端超时检测是在serverCron的clientsCron中进行,serverCron是一个周期函数,每100ms执行一次。server.hz表示serverCron函数的调用频率,默认为10。 clientsCron函数中为了每秒钟都能循环一次所有客户端,所以每次循环次数为iterations = numclients/server.hz。如果客户端多的话,可能会导致redis主线程的阻塞。因此,5.0引入了动态hz,见配置文件dynamic-hz,默认打开。
13.服务端
13.1数据库
数据库存储在redisDb结构中,而在服务端redisServer结构中保存着redisDb对象和个数,个数可以在配置文件中进行更新(见配置文件中的databases)
13.1.1数据结构
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;而redisServer中都包含redisDb数据结构。表示当前使用的是哪个db。
struct redisServer {
/* General */
pid_t pid; /* Main process pid. */
char *configfile; /* Absolute config file path, or NULL */
char *executable; /* Absolute executable file path. */
char **exec_argv; /* Executable argv vector (copy). */
int dynamic_hz; /* Change hz value depending on # of clients. */
int config_hz; /* Configured HZ value. May be different than
the actual 'hz' field value if dynamic-hz
is enabled. */
int hz; /* serverCron() calls frequency in hertz */
//看这里
redisDb *db;
...
}13.1.2数据库切换
而数据库的切换是使用select来执行的。当执行select命令时,实际上是将第n个db指向客户端的db对象,c->db = &server.db[id];
int selectDb(client *c, int id) {
if (id < 0 || id >= server.dbnum)
return C_ERR;
c->db = &server.db[id];
return C_OK;
}13.1.3键空间
由前文可知redis可以有多个数据库,而不同数据库之间是互不影响的,那如何做到的呢?键空间可以理解成C++里面的名称空间,用来隔离。数据存储在redisDb中的dict对象。因此对键的操作,基本都是基于键空间来操作的。
本质其实就是每个redisDd中的dict对象。很好理解,因为选择了不同的db,那肯定这个对象下面的数据也不一样。
13.1.4过期键
- 在redis中过期键保存在redisDb中expires变量里,expires是dict指针类型。
- 储存方式
- Key保存的是数据库的键对象
- Value保存的是数据库键对象的过期时间,长整型Unix时间。
de = dictAddOrFind(db->expires,dictGetKey(kde));
dictSetSignedIntegerVal(de,when);
#define dictSetSignedIntegerVal(entry, _val_) \
do { (entry)->v.s64 = _val_; } while(0)- 设置过期时间
相对方式:expire
绝对方式:expireat
如expire命令: setExpire(c,c->db,key,when)—->de = dictFind(db->dict,key->ptr)—->dictAddOrFind(db->expires,dictGetKey(de))
- 删除过期时间
persist
- 查看键剩余时间
查看过期时间:ttl
lookupKeyReadWithFlags—->getExpire—->ttl = expire-mstime();
- 键过期策略
redis中采用的是定期删除和惰性删除两种
- 定期删除分为两种模式,分别是在不同场景下调用的。因此结束判断不一样。
在定时任务serverCron中的databasesCron中,为ACTIVE_EXPIRE_CYCLE_SLOW,这就意味着我们可以花费更多的时间来处理。而在事件循环前的beforeSleep函数中则为ACTIVE_EXPIRE_CYCLE_FAST,因为不能影响处理事件。因此,还做了一个优化
if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
/* Don't start a fast cycle if the previous cycle did not exit
* for time limit. Also don't repeat a fast cycle for the same period
* as the fast cycle total duration itself. */
if (!timelimit_exit) return;
if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;
last_fast_cycle = start;
}- REDIS_EXPIRELOOKUPS_TIME_PERC是单位时间内分配给activeExpireCycle函数执行的CPU比例,默认为25.
- 每次循环最多16个库。
- 每个库要求找到过期键达到5个就行(需要注意的是,因为每个库是随机选取一个key的,所以数据量不能太少,太少随机效果不好)。
- 在每个库里面,每次随机选取20个key。检查是否是过期键,过期就计数一次。每隔16次检查一下时间是否超过限制,如果超过需要退出循环。不在继续查找。
对于过期键对RDB/AOF/主从的影响,我将会融入在后面具体模块中。
13.2数据库通知
数据库通知主要是利用redis中支持发布订阅模式,让客户端通过订阅给定的频道或者模式(保存在client和server的pubsub_channels和pubsub_patterns,但保存的内容不一样)来获悉数据库中键的变化。主要分为键空间通知和事件通知两类,要开启该功能需要在配置文件中设置参数/动态设置。见notify-keyspace-events
13.2.1键空间通知——某个键执行了什么命令
keyspace@:
13.2.2事件通知——某个事件被哪些命令执行
keyevent@:
参数说明
当然要启用的话,有两种方式:
- 配置文件中进行设置:格式如下notify-keyspace-events Elg,若要启用配置K和E必须要至少开启一个。
- 命令行设置:
config set notify-keyspace-events Ex- 查看发布订阅的key
PSUBSCRIBE __keyevent@*__:expired