Redis原理篇之数据结构

x33g5p2x  于2022-05-28 转载在 Redis  
字(23.5k)|赞(0)|评价(0)|浏览(895)

Redis原理

Redis源码可以去官网下载,也可以从我下面提供的这个链接进行下载:

redis-6.2.4.tar.gz

数据结构

动态字符串SDS

redis中保存的Key是字符串,value大多也是字符串或字符串集合,因此字符串是Redis中最常使用的一种数据结构。

不过Redis没有直接使用C语言中的字符串,因为C语言字符串存在很多问题:

  • 获取字符串长度需要的复杂度为O(N)
  • 非二进制安全,C语言使用空字符’\0’作为字符串结尾的标记,如果保存的字符串本身含义该标记,那么会造成读取被截断,获取的数据不完整
  • 不可修改
  • 容易造成缓冲区溢出,例如字符串拼接时,超过原本的空间大小,可能会覆盖掉相邻变量的内存空间

而SDS就是对c字符串的封装,以此来解决上述的问题。

SDS结构

SDS是C语言实现的一个结构体:

一个简单的例子如下:

动态扩容

在c语言中,如果要对字符串操作:

  • 拼接–>先进行内存重分配来扩展底层数组大小,如果忘记了这一步,会导致缓冲区溢出
  • 缩短–>需要通过内存重分配来释放字符串不再使用的那部分空间,如果忘记了会导致内存泄露

因为内存重分配需要执行系统调用,并且系统实现内存重分配算法也非常复杂,所以这通过是一个比较耗时的操作

  • 因此通过内存预分配可以减少内存重分配的次数,进而提高整体执行效率
  • 并且SDS还提供了惰性空间释放的功能,即对字符串缩短操作而言,不会立刻使用内存重分配算法来回收多出来的字节,而是通过一个free属性进行记录,当后面需要进行字符串增长时,就会用到
小结

SDS优点如下:

  • O(1)复杂度获取字符串长度
  • 杜绝缓冲区溢出
  • 减少修改字符串长度时所需的内存重分配次数
  • 二进制安全
  • 兼容部分C字符串函数(因此SDS遵循了以’\0’结尾的惯例)

整数集合IntSet

IntSet是vlaue集合的底层实现之一,当一个集合只包含整数值元素,并且这个集合元素数量不多的情况下,Redis就会使用IntSet作为该value集合的底层实现。

IntSet是Redis用于保存整数值集合抽象数据结构,它可以保存类型为int16_t,int32_t,int64_t的整数值,并且保证集合中不会出现重复元素。

IntSet结构如下:

  1. typedef struct intset {
  2. //编码方式,支持存放16位,32位,64位整数
  3. uint32_t encoding;
  4. //元素个数
  5. uint32_t length;
  6. //整数数组,保存集合数据
  7. int8_t contents[];
  8. } intset;

contents是整数数组底层实现,用来存储元素,并且各个项在数组中的值按从小到大有序排列,并且数组中不包含重复元素。

其中的encoding包含三种模式,表示存储的整数大小不同:

  1. /* Note that these encodings are ordered, so:
  2. * INTSET_ENC_INT16 < INTSET_ENC_INT32 < INTSET_ENC_INT64. */
  3. /* 2字节整数,范围类似java的short */
  4. #define INTSET_ENC_INT16 (sizeof(int16_t))
  5. /* 4字节整数,范围类似java的int */
  6. #define INTSET_ENC_INT32 (sizeof(int32_t))
  7. /* 8字节整数,范围类似java的long */
  8. #define INTSET_ENC_INT64 (sizeof(int64_t))

为了方便查找,Redis会将intset中所有的整数按照升序依次保存在contents数组中,结构如图:

现在,数组中每个数字都在int16_t的范围内,因此采用的编码方式是INSET_ENC_INT16,每部分占用的字节大小为:

  • encoding: 4字节
  • length: 4字节
  • contents: 2字节*3=6字节
    上图中给出的公式是计算每个数组元素起始地址,从这里也能看出为什么很多语言中,数组元素下标都从0开始

因为,如果从1开始,那么公式就变成了: startPtr+(sizeof(int16)*(index-1))

还要额外计算一次减法操作,这会浪费额外的cpu资源

  • startPtr: 数组首元素起始地址
  • sizeof(int16): 数组中每个元素的大小,数组中每个元素大小一致,便于按照下标寻址
  • sizeof(int16)*(index): index下标元素举例起始地址多远,即index元素的起始地址
IntSet升級

  • 升级编码为INTSET_ENC_INT32,每个整数占4字节,并按照新的编码方式及元素个数扩容数组
  • 倒序依次将数组中的元素拷贝到扩容后的正确位置
    正序挨个拷贝,会导致前面的元素扩容后覆盖后面的元素,而倒序可以避免这种情况。

c语言写数组插入元素的算法时,也是将元素挨个后移,然后腾出位置,插入新元素。

  • 将待添加的元素放入数组末尾

  • 最后,将intset的encoding属性改为INTSET_ENC_INT32,将length属性改为4

升级源码分析
  • insetAdd–插入元素
  1. /* Insert an integer in the intset */
  2. intset *intsetAdd(
  3. //需要插入的intset
  4. intset *is,
  5. //需要插入的新元素
  6. int64_t value,
  7. //是否插入成功
  8. uint8_t *success) {
  9. //获取当前值编码
  10. uint8_t valenc = _intsetValueEncoding(value);
  11. //要插入的位置
  12. uint32_t pos;
  13. if (success) *success = 1;
  14. /* Upgrade encoding if necessary. If we need to upgrade, we know that
  15. * this value should be either appended (if > 0) or prepended (if < 0),
  16. * because it lies outside the range of existing values. */
  17. //判断编码是不是超过了当前intset的编码
  18. if (valenc > intrev32ifbe(is->encoding)) {
  19. /* This always succeeds, so we don't need to curry *success. */
  20. //超出编码,需要升级
  21. return intsetUpgradeAndAdd(is,value);
  22. } else {
  23. //不需要进行数组编码升级,只需要将元素插入到指定位置即可
  24. /* Abort if the value is already present in the set.
  25. * This call will populate "pos" with the right position to insert
  26. * the value when it cannot be found. */
  27. //在当前intset中查找值与value一样的元素的角标--使用二分查找法
  28. //如果找到了,说明元素已经存在,无需再次插入,那么pos就是该元素的位置
  29. //否则pos指向比value大的前一个元素
  30. if (intsetSearch(is,value,&pos)) {
  31. //如果找到了,则无需插入,直接结束并返回
  32. if (success) *success = 0;
  33. return is;
  34. }
  35. //数组扩容
  36. is = intsetResize(is,intrev32ifbe(is->length)+1);
  37. //移动数组中pos之后的元素到pos+1,给新元素腾出空间
  38. if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
  39. }
  40. //插入新元素
  41. _intsetSet(is,pos,value);
  42. //重置元素长度
  43. is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
  44. return is;
  45. }
  • intsetUpgradeAndAdd–升级数组编码
  1. /* Upgrades the intset to a larger encoding and inserts the given integer. */
  2. /* 插入的元素比当前数组编码要大,因此数组需要进行扩容,但是这个新元素具体是插入头部还是尾部不确定
  3. * 因为该元素可能是一个负数!!!
  4. * */
  5. static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
  6. //获取当intset编码
  7. uint8_t curenc = intrev32ifbe(is->encoding);
  8. //获取新编码
  9. uint8_t newenc = _intsetValueEncoding(value);
  10. //获取元素个数
  11. int length = intrev32ifbe(is->length);
  12. //判断新元素是大于0还是小于0,小于0插入队列头部,大于0插入队尾
  13. int prepend = value < 0 ? 1 : 0;
  14. /* First set new encoding and resize */
  15. //重置编码为新编码
  16. is->encoding = intrev32ifbe(newenc);
  17. //重置数组大小--扩容
  18. is = intsetResize(is,intrev32ifbe(is->length)+1);
  19. /* Upgrade back-to-front so we don't overwrite values.
  20. * Note that the "prepend" variable is used to make sure we have an empty
  21. * space at either the beginning or the end of the intset. */
  22. //倒序遍历,逐个搬运元素到新的位置,_intsetGetEncoded按照旧编码方式查找旧元素
  23. while(length--)
  24. //_intsetSet按照新编码方式将取出的旧元素插入到数组中
  25. //length+prepend: 如果新元素为负数,那么prepend为1,即旧元素后移的过程中,还会在数组头部腾出一个新位置
  26. _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));
  27. /* Set the value at the beginning or the end. */
  28. //插入新元素,prepend决定是数组头部还是尾部
  29. if (prepend)
  30. _intsetSet(is,0,value);
  31. else
  32. _intsetSet(is,intrev32ifbe(is->length),value);
  33. //修改数组长度
  34. is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
  35. return is;
  36. }
  • intsetSearch–二分查找元素
  1. /* Search for the position of "value". Return 1 when the value was found and
  2. * sets "pos" to the position of the value within the intset. Return 0 when
  3. * the value is not present in the intset and sets "pos" to the position
  4. * where "value" can be inserted. */
  5. //返回1表示元素存在,我们不需要进行任何操作
  6. //如果返回0,表示元素还不存在
  7. static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
  8. //初始化二分查找需要的min,max,mid
  9. int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
  10. //mid对应的值
  11. int64_t cur = -1;
  12. /* The value can never be found when the set is empty */
  13. //如果数组为空则不用找了
  14. if (intrev32ifbe(is->length) == 0) {
  15. if (pos) *pos = 0;
  16. return 0;
  17. } else {
  18. /* Check for the case where we know we cannot find the value,
  19. * but do know the insert position. */
  20. //数组不为空,判断value是否大于最大值,小于最小值
  21. if (value > _intsetGet(is,max)) {
  22. //大于最大值,插入队尾
  23. if (pos) *pos = intrev32ifbe(is->length);
  24. return 0;
  25. } else if (value < _intsetGet(is,0)) {
  26. //小于最小值,插入队尾
  27. if (pos) *pos = 0;
  28. return 0;
  29. }
  30. }
  31. //二分查找
  32. while(max >= min) {
  33. mid = ((unsigned int)min + (unsigned int)max) >> 1;
  34. cur = _intsetGet(is,mid);
  35. if (value > cur) {
  36. min = mid+1;
  37. } else if (value < cur) {
  38. max = mid-1;
  39. } else {
  40. break;
  41. }
  42. }
  43. if (value == cur) {
  44. if (pos) *pos = mid;
  45. return 1;
  46. } else {
  47. if (pos) *pos = min;
  48. return 0;
  49. }
  50. }

整数集合升级策略有两个好处:

  • 提升整数集合的灵活性
  • 尽可能节约内存
降级

整数集合不支持降级操作,一旦对数组进行了升级,编码就会一直保持升级后的状态。
内存都是连续存放的,就算进行了降级,也会产生很多内存碎片,如果还要花时间去整理这些碎片更浪费时间。

当然,有小伙伴会说,可以参考SDS的做法,使用free属性来标记空闲空间大小—>当然应该存在更好的做法,大家可以尝试去思考更好的解法

小结

intset具备以下特点:

  • Redis会确保intset中的元素唯一,有序
  • 具备类型升级机制,可以节约内存空间
  • 底层采用二分查找方式来查询

字典(DICT)

Redis是一个键值型(Key-Value Pair)的数据库,我们可以根据键实现快速的增删改查,而键与值的映射关系正是通过Dict实现的。

Dict由三部分组成,分别是: 哈希表(DictHashTable),哈希节点(DictEntry).字典(Dict)

  1. //哈希节点
  2. typedef struct dictEntry {
  3. //键
  4. void *key;
  5. //值
  6. union {
  7. void *val;
  8. uint64_t u64;
  9. int64_t s64;
  10. double d;
  11. } v;
  12. //下一个entry的指针
  13. struct dictEntry *next;
  14. } dictEntry;
  1. /* This is our hash table structure. Every dictionary has two of this as we
  2. * implement incremental rehashing, for the old to the new table. */
  3. //哈希表
  4. typedef struct dictht {
  5. //entry数组,数组中保存的是指向entry的指针
  6. dictEntry **table;
  7. //哈希表的大小
  8. unsigned long size;
  9. //哈希表大小的掩码,总是等于size-1
  10. unsigned long sizemask;
  11. //entry的个数
  12. unsigned long used;
  13. } dictht;

当出现hash碰撞的时候,会采用链表形式将碰撞的元素连接起来,然后链表的新元素采用头插法

  1. //字典
  2. typedef struct dict {
  3. //dict类型,内置不同的hash函数
  4. dictType *type;
  5. //私有数据,在做特殊运算时使用
  6. void *privdata;
  7. //一个Dict包含两个哈希表,其中一个是当前数据,另一个一般为空,rehash时使用
  8. dictht ht[2];
  9. //rehash的进度,-1表示未开始
  10. long rehashidx; /* rehashing not in progress if rehashidx == -1 */
  11. //rehash是否暂停,1则暂停,0则继续
  12. int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
  13. } dict;

扩容

  1. /* Expand the hash table if needed */
  2. //如果需要的话就进行扩容
  3. static int _dictExpandIfNeeded(dict *d)
  4. {
  5. /* Incremental rehashing already in progress. Return. */
  6. //如果正在rehash,则返回ok
  7. if (dictIsRehashing(d)) return DICT_OK;
  8. /* If the hash table is empty expand it to the initial size. */
  9. //如果哈希表为空,则初始哈希表为默认大小4
  10. if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
  11. /* If we reached the 1:1 ratio, and we are allowed to resize the hash
  12. * table (global setting) or we should avoid it but the ratio between
  13. * elements/buckets is over the "safe" threshold, we resize doubling
  14. * the number of buckets. */
  15. //d->ht[0].used >= d->ht[0].size: 说明哈希节点数量已经大于数组长度了,这个条件要满足
  16. //下面两个条件满足其中一个:
  17. //1.dict_can_resize: 当服务器执行BGSAVE或者BGREWRITERAO时,该值为假
  18. //2.d->ht[0].used/d->ht[0].size计算出来的就是负载因子
  19. //当负载因子大于5时,不管是否正在执行BGSAVE或者BGREWRITERAO,都会进行扩容
  20. //如果dict type 有expandAllowed函数,则会调用判断是否能够进行扩容
  21. if (d->ht[0].used >= d->ht[0].size &&
  22. (dict_can_resize ||
  23. d->ht[0].used/d->ht[0].size > dict_force_resize_ratio) &&
  24. dictTypeExpandAllowed(d))
  25. {
  26. //扩容带下为used+1,底层会对扩容大小进行判断,实际上找的是第一个大于等于used+1的2^n
  27. return dictExpand(d, d->ht[0].used + 1);
  28. }
  29. return DICT_OK;
  30. }
收缩

Dict除了扩容以外,每次删除元素时,也会对负载因子做检查,当LoadFactory<0.1时,会做哈希表收缩:

  • 删除元素源码
  1. /* Delete an element from a hash.
  2. * Return 1 on deleted and 0 on not found. */
  3. //从hash中删除一个元素,删除成功返回1,没找到返回0
  4. int hashTypeDelete(robj *o, sds field) {
  5. int deleted = 0;
  6. //底层采用压缩链表实现,这个暂时不管
  7. if (o->encoding == OBJ_ENCODING_ZIPLIST) {
  8. unsigned char *zl, *fptr;
  9. zl = o->ptr;
  10. fptr = ziplistIndex(zl, ZIPLIST_HEAD);
  11. if (fptr != NULL) {
  12. fptr = ziplistFind(zl, fptr, (unsigned char*)field, sdslen(field), 1);
  13. if (fptr != NULL) {
  14. zl = ziplistDelete(zl,&fptr); /* Delete the key. */
  15. zl = ziplistDelete(zl,&fptr); /* Delete the value. */
  16. o->ptr = zl;
  17. deleted = 1;
  18. }
  19. }
  20. }
  21. //底层采用hash实现
  22. else if (o->encoding == OBJ_ENCODING_HT) {
  23. //删除成功返回C_OK
  24. if (dictDelete((dict*)o->ptr, field) == C_OK) {
  25. deleted = 1;
  26. /* Always check if the dictionary needs a resize after a delete. */
  27. //删除成功后,检查是否需要重置DICT大小,如果需要则调用dictResize重置
  28. if (htNeedsResize(o->ptr)) dictResize(o->ptr);
  29. }
  30. } else {
  31. serverPanic("Unknown hash encoding");
  32. }
  33. return deleted;
  34. }
  • htNeedsResize–判断是否需要重置Dict大小
  1. htNeedsResize(dict *dict) {
  2. long long size, used;
  3. //哈希表大小--槽的数量就是数组长度
  4. size = dictSlots(dict);
  5. //entry数量
  6. used = dictSize(dict);
  7. //当哈希表大小大于4并且负载因子低于0.1,表示需要进行收缩
  8. return (size > DICT_HT_INITIAL_SIZE &&
  9. (used*100/size < HASHTABLE_MIN_FILL));
  10. }
  • dictSize–真正进行收缩的源码
  1. /* Resize the table to the minimal size that contains all the elements,
  2. * but with the invariant of a USED/BUCKETS ratio near to <= 1 */
  3. int dictResize(dict *d)
  4. {
  5. unsigned long minimal;
  6. //如果正在做bgsave或bgrewriteof或rehash,则返回错误
  7. if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
  8. //获取entry个数
  9. minimal = d->ht[0].used;
  10. //如果entry小于4,则重置为4
  11. if (minimal < DICT_HT_INITIAL_SIZE)
  12. minimal = DICT_HT_INITIAL_SIZE;
  13. //重置大小为minimal,其实是第一个大于等于minimal的2^n
  14. return dictExpand(d, minimal);
  15. }
rehash源码分析
  • _dictExpand函数是真正完成扩容的方法,下面来看看这个方法干了啥
  1. /* Expand or create the hash table,
  2. * when malloc_failed is non-NULL, it'll avoid panic if malloc fails (in which case it'll be set to 1).
  3. * Returns DICT_OK if expand was performed, and DICT_ERR if skipped. */
  4. int _dictExpand(dict *d, unsigned long size, int* malloc_failed)
  5. {
  6. if (malloc_failed) *malloc_failed = 0;
  7. /* the size is invalid if it is smaller than the number of
  8. * elements already inside the hash table */
  9. //如果当前entry数量超过了要申请的size大小,或者正在rehash,直接报错
  10. if (dictIsRehashing(d) || d->ht[0].used > size)
  11. return DICT_ERR;
  12. //声明新的hash table
  13. dictht n; /* the new hash table */
  14. //扩容后的数组实际大小,第一个大于等于size的2^n次方
  15. unsigned long realsize = _dictNextPower(size);
  16. /* Rehashing to the same table size is not useful. */
  17. //计算得到的新数组大小与旧数组大小一致,返回错误信息
  18. if (realsize == d->ht[0].size) return DICT_ERR;
  19. /* Allocate the new hash table and initialize all pointers to NULL */
  20. //设置新的hash table的大小和掩码
  21. n.size = realsize;
  22. n.sizemask = realsize-1;
  23. if (malloc_failed) {
  24. n.table = ztrycalloc(realsize*sizeof(dictEntry*));
  25. *malloc_failed = n.table == NULL;
  26. if (*malloc_failed)
  27. return DICT_ERR;
  28. } else//为新的hash table分配内存: size*entrySize
  29. n.table = zcalloc(realsize*sizeof(dictEntry*));
  30. //新的hash table的used为0
  31. n.used = 0;
  32. /* Is this the first initialization? If so it's not really a rehashing
  33. * we just set the first hash table so that it can accept keys. */
  34. //如果是第一次来,即进行哈希表的初始化,那么直接将
  35. //上面新创建的n赋值给ht[0]即可
  36. if (d->ht[0].table == NULL) {
  37. d->ht[0] = n;
  38. return DICT_OK;
  39. }
  40. /* Prepare a second hash table for incremental rehashing */
  41. //否则,需要rehash,此处需要把rehashidx设置为0
  42. //表示当前rehash的进度
  43. //在每次增删改查时都会触发rehash(渐进式hash下面会讲)
  44. d->ht[1] = n;
  45. d->rehashidx = 0;
  46. return DICT_OK;
  47. }

rehash流程分析
  • 插入新元素,导致rehash产生

  • ht[1]扩容到合适的大小,设置rehash进度

  • ht[0]数组中元素转移到ht[1]

  • 交换ht[0]和ht[1]指针指向,然后rehash标记设置为-1表示rehash结束

渐进式rehash

上面列出的rehash看上去很好,但是redis没有这样做,因为如果需要迁移元素很多,由于redis单线程的特性,会导致主线程被阻塞,因此redis采用的是渐进式hash,即慢慢的,慢慢的迁移元素。

小结

ZipList(压缩列表)

ZipList是一种特殊的"双端链表",由一系列特殊编码的连续内存块组成。可以在任意一端进行压入/弹出操作,并且该操作的时间复杂度为0(1)。

压缩列表可以包含任意多个节点,每个节点可以保存一个字节数组或者一个整数值。

压缩列表是列表键和哈希键的底层实现之一。当一个列表键只包含少量列表项,并且每个列表项要么就是小整数值,要么就是长度比较短的字符串,那么Redis底层就会使用ziplist存储存储结构。

当一个哈希键只包含少量列表项,并且每个列表项要么就是小整数值,要么就是长度比较短的字符串,那么Redis底层也会使用ziplist存储存储结构。

zipList构成

zipListEntry构成

ZipList中所有存储长度的数值采用小端字节序,即低位字节在前,高位字节在后。

例如: 数值0x1234,采用小端字节序后实际存储值为: 0x3412

encoding编码

例如: 我们要保存字符串"ab"和"bc"

存储长度的数值采用小端字节序表示

最后一种特殊情况: 1111xxxx,可以在后四位xxxx,表示0001-1101范围的大小,即1-13,但是需要减去一,实际可保存0-12的范围.

如果数值大小在1-12区间内,那么采用最后一种特殊编码方式,不需要content属性

例如: 一个ZipList中包含两个整数值: “2"和"5”

连锁更新问题

  • 此时,如果我们将一个长度大于254字节的新节点设置插入进来,称为压缩列表头节点,那么旧头节点的pre_entry_len需要扩展到5字节表示新节点的大小.
  • 旧节点加上4字节后变成了254,那么后面的节点需要再次扩展…直到某个节点pre_entry_len扩展到5字节后长度并没有超过254为止

ZipList这种特殊情况下产生的多次空间扩展操作称之为连续更新。

新增,删除都可能导致连锁更新的发生。

连锁更新虽然复杂度高,会大大降低性能,但是由于产生概率较低,并且及时出现了,只要被更新节点数量不多,性能上不会有太大影响。

小结
  • 压缩列表可以看做一种连续内存空间的双向链表
  • 列表的节点之间不是通过指针连接,而是记录上一节点和本节点的长度来寻址,内存占用低
  • 如果列表数据过多,导致链表过长,可能影响查询性能
  • 增或删较大数据时有可能发生连续更新问题

QuickList(快速链表)

why need ?

限制zipList大小

压缩节点

结构
  1. typedef struct quicklist {
  2. //头节点指针
  3. quicklistNode *head;
  4. //尾结点指针
  5. quicklistNode *tail;
  6. //所有zipList的entry的数量
  7. unsigned long count; /* total count of all entries in all ziplists */
  8. //zipLists总数量
  9. unsigned long len; /* number of quicklistNodes */
  10. //zipList的entry上限,默认值 -2 --8kb
  11. int fill : QL_FILL_BITS; /* fill factor for individual nodes */
  12. //首尾不压缩的节点数量
  13. unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */
  14. //内存重分配时的书签数量及数组,一般用不到
  15. unsigned int bookmark_count: QL_BM_BITS;
  16. quicklistBookmark bookmarks[];
  17. } quicklist;
  1. typedef struct quicklistNode {
  2. //前一个节点指针
  3. struct quicklistNode *prev;
  4. //下一个节点指针
  5. struct quicklistNode *next;
  6. //当前节点的ZipLisr指针
  7. unsigned char *zl;
  8. //当前节点的ZipList的字节大小
  9. unsigned int sz; /* ziplist size in bytes */
  10. //当前节点的ZipList的entry个数
  11. unsigned int count : 16; /* count of items in ziplist */
  12. //编码方式: 1.ziplist 2.lzf压缩模式
  13. unsigned int encoding : 2; /* RAW==1 or LZF==2 */
  14. //数据容器类型(预留): 1. 其他 2. zipList
  15. unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
  16. //是否被解压缩. 1.则说明被解压了,将来要重新压缩
  17. unsigned int recompress : 1; /* was this node previous compressed? */
  18. //测试用
  19. unsigned int attempted_compress : 1; /* node can't compress; too small */
  20. //预留字段
  21. unsigned int extra : 10; /* more bits to steal for future usage */
  22. } quicklistNode;

  • fill为-2表示每个每个ziplist最大内存不超过8kb
  • compress为1表示首尾不压缩,中间节点压缩
特点
  • 是一个节点为ZipList的双端链表
  • 节点采用ZipList,解决了传统链表的内存占用问题
  • 控制了ZipList大小,解决了连续内存空间申请效率问题
  • 中间节点可以压缩,进一步节省内存

SkipList(跳跃表)

SkipList首先是链表,但与传统链表相比有几点差异:

  • 元素按照升序排列存储
  • 节点可能包含多个指针,指针跨度不同

Redis使用跳跃表作为有序集合键,如果一个有序集合包含的元素数量很多,或者有序集合中元素成员是比较长的字符串,Redis就会使用跳跃表作为有序集合键的底层实现。
例如: sortedSet

Redis目前只在两处地方使用到了SkipList,分别是 :

  • 实现有序集合键
  • 在集群节点中用作内部数据结构
结构
  1. //t_zset.c
  2. typedef struct zskiplist {
  3. //头尾节点指针
  4. struct zskiplistNode *header, *tail;
  5. //节点数量
  6. unsigned long length;
  7. //最大的索引层级,默认为1
  8. int level;
  9. } zskiplist;
  1. //t_zset.c
  2. typedef struct zskiplistNode {
  3. //节点存储的值--是sds类型
  4. sds ele;
  5. //节点分数--排序,查找用
  6. double score;
  7. //前一个节点指针--回退指针
  8. struct zskiplistNode *backward;
  9. struct zskiplistLevel {
  10. //下一个节点指针
  11. struct zskiplistNode *forward;
  12. //索引跨度
  13. unsigned long span;
  14. //多级索引数组
  15. } level[];
  16. } zskiplistNode;

特点
  • 跳跃表是一个双向链表,每个节点都包含score和ele值
  • 节点按照score排序,score值一样则按照ele字典排序
  • 每个节点都可以包含多层指针,层数是1到32之间的随机数
  • 不同层指针到下一个节点的跨度不同,层级越高,跨度越大
  • 增删改成效率与红黑树基本一致,实现却更为简单

RedisObject

Redis中的任意数据类型的键和值都会被封装为一个RedisObject,也叫做Redis对象,源码如下:

  • Redis通过引用计数实现了相关内存回收机制,并且还利用该引用计数实现了对象共享机制。
  • 通过记录对象最后一次访问时间,可以在服务器启用了maxmemory功能的情况下,将那么较长时间无人访问的键优先淘汰

对象类型与编码

Redis使用对象来表示数据库中的键和值,每次当我们在Redis的数据库中新创建一个键值对时,我们至少会创建两个对象,一个用于做键值对的键,另一个对象做键值对的值。

Reids中会根据存储的数据类型不同,选择不同的编码方式,功包含11种不同的类型:

每种数据类型使用的编码方式如下:

我们可以使用TYPE命令来查看redis中某个键对应的值对象的类型,而不是键对象的类型。

String对象

String是Redis中最常见的数据存储类型:

  • 其基本编码方式是RAW,基于简单动态字符串SDS实现,存储上限为512mb.

  • 如果存储的SDS长度小于44字节,则会采用EMBSTR编码,此时Object head与SDS是一段连续空间。申请内存时只需要调用一次内存分配函数,效率更高。
    内存释放也只需要一次调用

  • 如果存储的字符串是整数值,并且大小在LONG—MAX范围内,则会采用INT编码:直接将数据保存在RedisObject的ptr指针位置(刚好8字节),不再需要SDS了

编码的转换
  • 如果对保存整数值的字符串对象追加了一个字符串值,那么该字符串对象底层会从int编码转换为raw编码
  • 如果对embstr编码的字符串进行修改,那么底层编码也会从embstr转换为raw

List对象

列表对象的编码可以是以下三种:

插入源码分析
  1. /* Implements LPUSH/RPUSH/LPUSHX/RPUSHX.
  2. * 'xx': push if key exists. */
  3. //通用的列表插入命令处理
  4. void pushGenericCommand(
  5. //封装客户端发送来的命令
  6. client *c,
  7. //插入列表头部还是列表尾部
  8. int where,
  9. //是否在Key存在的时候才进行插入操作,默认为false
  10. //即redis会帮我们自动创建不存在的key
  11. int xx) {
  12. int j;
  13. //redis命令为 lpush key dhy 123 456
  14. //这里argv[1]拿到的是key
  15. //redis默认有1-15个db数据库,c->db是去指定的数据库寻找这个key
  16. //拿到这个key对应的redisObject对象
  17. robj *lobj = lookupKeyWrite(c->db, c->argv[1]);
  18. //该redisObject对象类型必须是OBJ_LIST才可以,如果不是直接返回
  19. if (checkType(c,lobj,OBJ_LIST)) return;
  20. //如果拿到的key为null
  21. if (!lobj) {
  22. //如果xx为true,说明当可以不存在的时候就不进行处理
  23. if (xx) {
  24. addReply(c, shared.czero);
  25. return;
  26. }
  27. //xx默认为false--redis会帮助我们创建一个quicklist对象
  28. lobj = createQuicklistObject();
  29. //设置quicklist对象中ziplist的属性
  30. //限制quicklist中每一个ziplist最大的大小,默认为-2,即8kb
  31. //是否压缩ziplist,默认为0,不开启压缩
  32. quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
  33. server.list_compress_depth);
  34. //执行向数据库db插入key的过程
  35. dbAdd(c->db,c->argv[1],lobj);
  36. }
  37. //lpush key dhy 123 456
  38. //从2开始是value集合
  39. //把value集合中的元素插入搭配list中
  40. for (j = 2; j < c->argc; j++) {
  41. listTypePush(lobj,c->argv[j],where);
  42. server.dirty++;
  43. }
  44. addReplyLongLong(c, listTypeLength(lobj));
  45. //发布事件
  46. char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
  47. signalModifiedKey(c,c->db,c->argv[1]);
  48. notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
  49. }
  • 创建quicklist
  1. robj *createQuicklistObject(void) {
  2. //创建一个quickList
  3. quicklist *l = quicklistCreate();
  4. //根据上面的quicklist创建一个redisObject
  5. robj *o = createObject(OBJ_LIST,l);
  6. //设置redisObj对象编码为quickList
  7. o->encoding = OBJ_ENCODING_QUICKLIST;
  8. return o;
  9. }

Set对象

Set是Redis中的单列集合:

  • 不保住有序性
  • 包装元素唯一(可以判断元素是否存在)
  • 求交集,并集,差集
    那什么样的数据类型适合实现set数据结构呢?

HashTable,也就是Redis中的DICT,不过DICT

  • 当set第一次被创建时
  1. robj *setTypeCreate(sds value) {
  2. //判断value是否是数值类型 long long
  3. if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
  4. //如果是整数类型,则采用Intset编码
  5. return createIntsetObject();
  6. //否在采用默认编码,也就是HT
  7. return createSetObject();
  8. }
  • 采用IntSet类型编码
  1. robj *createIntsetObject(void) {
  2. //初始化INTSET并申请内存空间
  3. intset *is = intsetNew();
  4. //创建RedisObject
  5. robj *o = createObject(OBJ_SET,is);
  6. //指定编码为INTSET
  7. o->encoding = OBJ_ENCODING_INTSET;
  8. return o;
  9. }
  • 采用HT编码
  1. robj *createSetObject(void) {
  2. //初始化DICT类型
  3. dict *d = dictCreate(&setDictType,NULL);
  4. //创建redisObj
  5. robj *o = createObject(OBJ_SET,d);
  6. //设置编码为HT
  7. o->encoding = OBJ_ENCODING_HT;
  8. return o;
  9. }
  • 当往set里面添加元素时
  1. /* Add the specified value into a set.
  2. *
  3. * If the value was already member of the set, nothing is done and 0 is
  4. * returned, otherwise the new element is added and 1 is returned. */
  5. int setTypeAdd(robj *subject, sds value) {
  6. long long llval;
  7. //如果已经是HT编码,则直接插入元素
  8. if (subject->encoding == OBJ_ENCODING_HT) {
  9. dict *ht = subject->ptr;
  10. dictEntry *de = dictAddRaw(ht,value,NULL);
  11. if (de) {
  12. dictSetKey(ht,de,sdsdup(value));
  13. dictSetVal(ht,de,NULL);
  14. return 1;
  15. }
  16. }
  17. //编码为INTSET
  18. else if (subject->encoding == OBJ_ENCODING_INTSET) {
  19. //判断编码是否为整数
  20. if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
  21. //是整数直接添加到intset里面
  22. uint8_t success = 0;
  23. subject->ptr = intsetAdd(subject->ptr,llval,&success);
  24. if (success) {
  25. //当intset元素数量超过set_max_intset_entries,则转为HT
  26. /* Convert to regular set when the intset contains
  27. * too many entries. */
  28. if (intsetLen(subject->ptr) > server.set_max_intset_entries)
  29. setTypeConvert(subject,OBJ_ENCODING_HT);
  30. return 1;
  31. }
  32. } else {
  33. //不是整数,转换为HT
  34. /* Failed to get integer from object, convert to regular set. */
  35. setTypeConvert(subject,OBJ_ENCODING_HT);
  36. /* The set *was* an intset and this value is not integer
  37. * encodable, so dictAdd should always work. */
  38. serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
  39. return 1;
  40. }
  41. } else {
  42. serverPanic("Unknown set encoding");
  43. }
  44. return 0;
  45. }

set_max_intset_entries默认为512,通过下面的命令可以进行查询

  1. config get set_max_intset_entries

编码转换后:

ZSet对象

因此zset底层将这两个数据结构结合在了一起,具体结构如下:

  1. //zset结构
  2. typedef struct zset {
  3. //Dict指针
  4. dict *dict;
  5. //SkipList指针
  6. zskiplist *zsl;
  7. } zset;

创建ZSet对象的方法源码如下:

  1. robj *createZsetObject(void) {
  2. zset *zs = zmalloc(sizeof(*zs));
  3. robj *o;
  4. //创建Dict
  5. zs->dict = dictCreate(&zsetDictType,NULL);
  6. //创建SkipList
  7. zs->zsl = zslCreate();
  8. //创建zset
  9. o = createObject(OBJ_ZSET,zs);
  10. //更改编码
  11. o->encoding = OBJ_ENCODING_SKIPLIST;
  12. return o;
  13. }

具体结构如图:

可以当前ZSet最大的问题在于内存的占用过大,因此为了解决这个问题,ZSet提供了两种编码方式,上面给出的是其中一种,适合在是数据量大的情况下使用,发挥出其快速查找的优势

当数据量比较小的时候,ZSet采用ziplist作为底层结构

add源码
  1. void zaddGenericCommand(client *c, int flags) {
  2. ....
  3. /* Lookup the key and create the sorted set if does not exist. */
  4. //zadd添加元素时,先根据key找到zset,不存在则创建新的zset
  5. zobj = lookupKeyWrite(c->db,key);
  6. if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
  7. //判断键是否存在
  8. if (zobj == NULL) {//不存在
  9. if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
  10. if (server.zset_max_ziplist_entries == 0 ||
  11. server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
  12. {
  13. //如果zset_max_ziplist_entries设置为了0就是禁用了ziplist编码
  14. //或者value大小超过了zset_max_ziplist_value ,采用HT+Skiplist
  15. zobj = createZsetObject();
  16. } else {
  17. //否则采用ziplist
  18. zobj = createZsetZiplistObject();
  19. }
  20. dbAdd(c->db,key,zobj);
  21. }
  22. ...
  23. int retval = zsetAdd(zobj, score, ele, flags, &retflags, &newscore);
  24. ...
  25. }
  • createZsetObject
  1. robj *createZsetObject(void) {
  2. //申请内存
  3. zset *zs = zmalloc(sizeof(*zs));
  4. robj *o;
  5. //创建Dict
  6. zs->dict = dictCreate(&zsetDictType,NULL);
  7. //创建SkipList
  8. zs->zsl = zslCreate();
  9. o = createObject(OBJ_ZSET,zs);
  10. o->encoding = OBJ_ENCODING_SKIPLIST;
  11. return o;
  12. }
  • createZsetZiplistObject
  1. robj *createZsetZiplistObject(void) {
  2. //创建ziplist
  3. unsigned char *zl = ziplistNew();
  4. robj *o = createObject(OBJ_ZSET,zl);
  5. o->encoding = OBJ_ENCODING_ZIPLIST;
  6. return o;
  7. }
  • zsetAdd
  1. int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore) {
  2. ...
  3. /* Update the sorted set according to its encoding. */
  4. //判断编码方式
  5. if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {//是ziplist编码
  6. unsigned char *eptr;
  7. //判断当前元素是否已经存在,已经存在了则更新score即可
  8. if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
  9. ....
  10. return 1;
  11. } else if (!xx) {
  12. /* Optimize: check if the element is too large or the list
  13. * becomes too long *before* executing zzlInsert. */
  14. zobj->ptr = zzlInsert(zobj->ptr,ele,score);
  15. //元素不存在,需要新增,则判断ziplist长度有没有超过限制大小
  16. //并且元素的大小有无超过限制
  17. if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
  18. sdslen(ele) > server.zset_max_ziplist_value)
  19. //如果超过转换为HT+skipList编码
  20. zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
  21. if (newscore) *newscore = score;
  22. *out_flags |= ZADD_OUT_ADDED;
  23. return 1;
  24. } else {
  25. *out_flags |= ZADD_OUT_NOP;
  26. return 1;
  27. }
  28. //本身就是SkipList+HT编码,无需转换
  29. } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
  30. ....
  31. } else {
  32. serverPanic("Unknown sorted set encoding");
  33. }
  34. return 0; /* Never reached. */
  35. }
ziplist如何做到有序存储

小结

ZSet为了兼顾内存占用和性能,使用了两种编码方式,当数据量小的时候,采用ziplist实现,此时内存占用很小,但是由于数据量也很小,因此性能影响不大。

当数据量增大时,为了性能考虑,采用了HT+SkipList编码实现,此时内存占用很大,但是性能很高。

Hash对象

当超过限制后,底层编码会变成HT

源码
  1. void hsetCommand(client *c) { //客户端相关命令和信息都被封装在了client中
  2. int i, created = 0;
  3. robj *o;
  4. //假设客户端命令为 hset user1 name Jack age 21
  5. if ((c->argc % 2) == 1) {
  6. addReplyErrorFormat(c,"wrong number of arguments for '%s' command",c->cmd->name);
  7. return;
  8. }
  9. //判断hash的key是否存在,不存在就创建一个新的,默认采用ziplist编码
  10. if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
  11. //判断是否需要把ziplist转换Dict
  12. hashTypeTryConversion(o,c->argv,2,c->argc-1);
  13. for (i = 2; i < c->argc; i += 2)
  14. //执行插入操作
  15. created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY);
  16. ...
  17. }
  • hashTypeLookupWriteOrCreate—判断hash的key是否存在,不存在就创建一个新的,默认采用ziplist编码
  1. robj *hashTypeLookupWriteOrCreate(client *c, robj *key) {
  2. //查看key
  3. robj *o = lookupKeyWrite(c->db,key);
  4. if (checkType(c,o,OBJ_HASH)) return NULL;
  5. //不存在,则创建新的
  6. if (o == NULL) {
  7. o = createHashObject();
  8. dbAdd(c->db,key,o);
  9. }
  10. return o;
  11. }
  • createHashObject—创建一个默认hash对象
  1. robj *createHashObject(void) {
  2. //默认采用ziplist编码,申请ziplist内存空间
  3. unsigned char *zl = ziplistNew();
  4. robj *o = createObject(OBJ_HASH, zl);
  5. o->encoding = OBJ_ENCODING_ZIPLIST;
  6. return o;
  7. }
  • hashTypeTryConversion—处理hash的编码转换
  1. void hashTypeTryConversion(robj *o, robj **argv, int start, int end) {
  2. int i;
  3. //如果编码已经是HT了,那么直接返回,不需要进行编码转换
  4. if (o->encoding != OBJ_ENCODING_ZIPLIST) return;
  5. //依次遍历命令中的field,value参数
  6. for (i = start; i <= end; i++) {
  7. //如果filed或者value超过hash_max_ziplist_value,则转换为HT
  8. if (sdsEncodedObject(argv[i]) &&
  9. sdslen(argv[i]->ptr) > server.hash_max_ziplist_value)
  10. {
  11. hashTypeConvert(o, OBJ_ENCODING_HT);
  12. break;
  13. }
  14. }
  15. }
  • hashTypeSet–添加元素到hash
  1. int hashTypeSet(robj *o, sds field, sds value, int flags) {
  2. int update = 0;
  3. //判断是否为ziplist编码
  4. if (o->encoding == OBJ_ENCODING_ZIPLIST) {
  5. unsigned char *zl, *fptr, *vptr;
  6. zl = o->ptr;
  7. //查询head指针
  8. fptr = ziplistIndex(zl, ZIPLIST_HEAD);
  9. //head不为空,说明ziplist不为空,开始查找key
  10. if (fptr != NULL) {
  11. fptr = ziplistFind(zl, fptr, (unsigned char*)field, sdslen(field), 1);
  12. //判断是否存在,如果已经存在则更新
  13. if (fptr != NULL) {
  14. /* Grab pointer to the value (fptr points to the field) */
  15. vptr = ziplistNext(zl, fptr);
  16. serverAssert(vptr != NULL);
  17. update = 1;
  18. /* Replace value */
  19. zl = ziplistReplace(zl, vptr, (unsigned char*)value,
  20. sdslen(value));
  21. }
  22. }
  23. //不存在,则直接push
  24. if (!update) {
  25. //依次Push新的field和value到ziplist尾部
  26. /* Push new field/value pair onto the tail of the ziplist */
  27. zl = ziplistPush(zl, (unsigned char*)field, sdslen(field),
  28. ZIPLIST_TAIL);
  29. zl = ziplistPush(zl, (unsigned char*)value, sdslen(value),
  30. ZIPLIST_TAIL);
  31. }
  32. o->ptr = zl;
  33. /* Check if the ziplist needs to be converted to a hash table */
  34. //插入了新元素,检查list长度是否超出,超出转化为HT
  35. if (hashTypeLength(o) > server.hash_max_ziplist_entries)
  36. hashTypeConvert(o, OBJ_ENCODING_HT);
  37. } else if (o->encoding == OBJ_ENCODING_HT) {
  38. //HT编码,直接插入或覆盖
  39. ...
  40. } else {
  41. serverPanic("Unknown hash encoding");
  42. }
  43. /* Free SDS strings we did not referenced elsewhere if the flags
  44. * want this function to be responsible. */
  45. if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field);
  46. if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value);
  47. return update;
  48. }

相关文章