redis-ziplist源码阅读

ziplist结构

  1. zlbytes:记录整个压缩列表占用的内存字节数
  2. zltail:记录压缩列表表尾节点距离压缩列表的起始地址有多少字节
  3. zllen:记录了压缩列表包含的节点数量
  4. entryX:压缩列表包含的各个节点,节点的长度由节点保存的内容决定
  5. zlend:特殊值 0xFF (十进制 255 ),用于标记压缩列表的末端

节点结构

/*
 * 保存 ziplist 节点信息的结构
 */
typedef struct zlentry {

    // prevrawlen :前置节点的长度
    // prevrawlensize :编码 prevrawlen 所需的字节大小;只能是1字节或者5字节
    unsigned int prevrawlensize, prevrawlen;

    // len :当前节点值的长度,指出实体字段的具体长度即真正存储数据的字段,单位是字节
    // lensize :编码 len 所需的字节大小, 1字节, 2字节 5字节
    unsigned int lensize, len;

    // 当前节点 header 的大小
    // 等于 prevrawlensize + lensize
    unsigned int headersize;

    // 当前节点值所使用的编码类型
    unsigned char encoding;

    // 指向当前节点的指针
    unsigned char *p;

} zlentry;

Alt text

  1. prevrawlensize:编码prevrawlen所需的字节大小
  2. prevrawlen:前置节点的长度;当长度小于255字节时,用一个字节存储;当长度大于等于255时,用五个字节进行存储,其中第一个字节会被设置为255表示前一个entry的长度由后面四个字节表示
  3. len:当前节点值的长度
  4. lensize:编码 len 所需的字节大小, 1字节, 2字节 5字节
  5. headersize:等于 prevrawlensize + lensize
  6. encoding:当前节点值所使用的编码类;它会根据当前元素内容的不同会采用不同的编码方式
    如果元素内容为字符串,encoding的值分别为:
    00xx xxxx :00开头表示该字符串的长度用6个bit表示。
    01xx xxxx | xxxx xxxx :01开头表示字符串的长度由14bit表示,这14个bit采用大端存储。
    1000 0000 | xxxx xxxx | xxxx xxxx | xxxx xxxx | xxxx xxxx :10开头表示后续的四个字节为字符串长度,这32个bit采用大端存储

    如果元素内容为数字,encoding的值分别为:
    1100 0000:表示数字占用后面2个字节。
    1101 0000:表示数字占用后面4个字节。
    1110 0000:表示数字占用后面8个字节。
    1111 0000 :表示数字占用后面3个字节。
    1111 1110 :表示数字占用后面1个字节。
    1111 1111 :表示压缩链表中最后一个元素(特殊编码)。
    1111 xxxx :表示只用后4位表示0~12的整数,由于0000,1110跟1111三种已经被占用,也就是说这里的xxxx四位只能表示0001~1101,转换成十进制就是数字1~13,但是redis规定它用来表示0~12,因此当遇到这个编码时,我们需要取出后四位然后减1来得到正确的值。

  7. *p:指向当前节点的指针

使用场景

  1. 一个列表只含有少量列表项
  2. 每个列表项是小整数值或者长度比较短的字符串

编码,解码

编码前置节点长度信息

/* Encode the length of the previous entry and write it to "p". Return the
 * number of bytes needed to encode this length if "p" is NULL. 
 *
 * 对前置节点的长度 len 进行编码,并将它写入到 p 中,
 * 然后返回编码 len 所需的字节数量。
 *
 * 如果 p 为 NULL ,那么不进行写入,仅返回编码 len 所需的字节数量。
 *
 * T = O(1)
 * 前置节点的长度编码字节数只有1字节和5字节
 * 如果前一节点的长度小于 254 字节, 那么 previous_entry_length 属性的长度为 1 字节: 前一节点的长度就保存在这一个字节里面。
 * 如果前一节点的长度大于等于 254 字节, 那么 previous_entry_length 属性的长度为 5 字节: 其中属性的第一字节会被设置为 0xFE (十进制值 254), 而之后的四个字节则用于保存前一节点的长度。
 */
static unsigned int zipPrevEncodeLength(unsigned char *p, unsigned int len) {

    // 仅返回编码 len 所需的字节数量
    if (p == NULL) {
        return (len < ZIP_BIGLEN) ? 1 : sizeof(len) + 1;
    } else {
        // 写入并返回编码 len 所需的字节数量
        // 1 字节
        if (len < ZIP_BIGLEN) {
            p[0] = len;
            return 1;

        // 5 字节
        } else {
            // 添加 5 字节长度标识
            p[0] = ZIP_BIGLEN;
            // 写入编码
            memcpy(p + 1, &len, sizeof(len));
            // 如果有必要的话,进行大小端转换
            memrev32ifbe(p + 1);
            // 返回编码长度
            return 1 + sizeof(len);
        }
    }
}

解码前置节点长度信息

/* Decode the number of bytes required to store the length of the previous
 * element, from the perspective of the entry pointed to by 'ptr'. 
 *
 * 解码 ptr 指针,
 * 取出编码前置节点长度所需的字节数,并将它保存到 prevlensize 变量中。
 *
 * T = O(1)
 */
#define ZIP_DECODE_PREVLENSIZE(ptr, prevlensize) do {                          \
    if ((ptr)[0] < ZIP_BIGLEN) {                                               \
        (prevlensize) = 1;                                                     \
    } else {                                                                   \
        (prevlensize) = 5;                                                     \
    }                                                                          \
} while(0);

/* Decode the length of the previous element, from the perspective of the entry
 * pointed to by 'ptr'. 
 *
 * 解码 ptr 指针,
 * 取出编码前置节点长度所需的字节数,
 * 并将这个字节数保存到 prevlensize 中。
 *
 * 然后根据 prevlensize ,从 ptr 中取出前置节点的长度值,
 * 并将这个长度值保存到 prevlen 变量中。
 *
 * T = O(1)
 */
#define ZIP_DECODE_PREVLEN(ptr, prevlensize, prevlen) do {                     \
                                                                               \
    /* 先计算被编码长度值的字节数 */                                           \
    ZIP_DECODE_PREVLENSIZE(ptr, prevlensize);                                  \
                                                                               \
    /* 再根据编码字节数来取出长度值 */                                         \
    if ((prevlensize) == 1) {                                                  \
        (prevlen) = (ptr)[0];                                                  \
    } else if ((prevlensize) == 5) {                                           \
        assert(sizeof((prevlensize)) == 4);                                    \
        memcpy(&(prevlen), ((char*)(ptr)) + 1, 4);                             \
        memrev32ifbe(&prevlen);                                                \
    }                                                                          \
} while(0);

插入

/*
 * 根据指针 p 所指定的位置,将长度为 slen 的字符串 s 插入到 zl 中。
 *
 * 函数的返回值为完成插入操作之后的 ziplist
 *
 * T = O(N^2)
 */
static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    // 记录当前 ziplist 的长度
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789; /* initialized to avoid warning. Using a value
                                    that is easy to see if for some reason
                                    we use it uninitialized. */
    zlentry entry, tail;

    /* Find out prevlen for the entry that is inserted. */
    if (p[0] != ZIP_END) {
        // 如果 p[0] 不指向列表末端,说明列表非空,并且 p 正指向列表的其中一个节点
        // 那么取出 p 所指向节点的信息,并将它保存到 entry 结构中
        // 然后用 prevlen 变量记录前置节点的长度
        // (当插入新节点之后 p 所指向的节点就成了新节点的前置节点)
        // T = O(1)
        entry = zipEntry(p);
        prevlen = entry.prevrawlen;
    } else {
        // 如果 p 指向表尾末端,那么程序需要检查列表是否为:
        // 1)如果 ptail 也指向 ZIP_END ,那么列表为空;
        // 2)如果列表不为空,那么 ptail 将指向列表的最后一个节点。
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        if (ptail[0] != ZIP_END) {
            // 表尾节点为新节点的前置节点

            // 取出表尾节点的长度
            // T = O(1)
            prevlen = zipRawEntryLength(ptail);
        }
    }

    /* See if the entry can be encoded */
    // 尝试看能否将输入字符串转换为整数,如果成功的话:
    // 1)value 将保存转换后的整数值
    // 2)encoding 则保存适用于 value 的编码方式
    // 无论使用什么编码, reqlen 都保存节点值的长度
    // T = O(N)
    if (zipTryEncoding(s, slen, &value, &encoding)) {
        /* 'encoding' is set to the appropriate integer encoding */
        reqlen = zipIntSize(encoding);
    } else {
        /* 'encoding' is untouched, however zipEncodeLength will use the
         * string length to figure out how to encode it. */
        reqlen = slen;
    }
    /* We need space for both the length of the previous entry and
     * the length of the payload. */
    // 计算编码前置节点的长度所需的大小
    // T = O(1)
    reqlen += zipPrevEncodeLength(NULL, prevlen);
    // 计算编码当前节点值所需的大小
    // T = O(1)
    reqlen += zipEncodeLength(NULL, encoding, slen);

    /* When the insert position is not equal to the tail, we need to
     * make sure that the next entry can hold this entry's length in
     * its prevlen field. */
    // 只要新节点不是被添加到列表末端,
    // 那么程序就需要检查看 p 所指向的节点(的 header)能否编码新节点的长度。
    // nextdiff 保存了新旧编码之间的字节大小差,如果这个值大于 0
    // 那么说明需要对 p 所指向的节点(的 header )进行扩展
    // T = O(1)
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p, reqlen) : 0;

    /* Store offset because a realloc may change the address of zl. */
    // 因为重分配空间可能会改变 zl 的地址
    // 所以在分配之前,需要记录 zl 到 p 的偏移量,然后在分配之后依靠偏移量还原 p
    offset = p-zl;
    // curlen 是 ziplist 原来的长度
    // reqlen 是整个新节点的长度
    // nextdiff 是新节点的后继节点扩展 header 的长度(要么 0 字节,要么 4 个字节)
    // T = O(N)
    zl = ziplistResize(zl, curlen + reqlen + nextdiff);
    p = zl + offset;

    /* Apply memory move when necessary and update tail offset. */
    if (p[0] != ZIP_END) {
        // 新元素之后还有节点,因为新元素的加入,需要对这些原有节点进行调整

        /* Subtract one because of the ZIP_END bytes */
        // 移动现有元素,为新元素的插入空间腾出位置
        // T = O(N)
        memmove(p + reqlen, p - nextdiff, curlen - offset - 1 + nextdiff);

        /* Encode this entry's raw length in the next entry. */
        // 将新节点的长度编码至后置节点
        // p+reqlen 定位到后置节点
        // reqlen 是新节点的长度
        // T = O(1)
        zipPrevEncodeLength(p + reqlen, reqlen);

        /* Update offset for tail */
        // 更新到达表尾的偏移量,将新节点的长度也算上
        ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)) + reqlen);

        /* When the tail contains more than one entry, we need to take
         * "nextdiff" in account as well. Otherwise, a change in the
         * size of prevlen doesn't have an effect on the *tail* offset. */
        // 如果新节点的后面有多于一个节点
        // 那么程序需要将 nextdiff 记录的字节数也计算到表尾偏移量中
        // 这样才能让表尾偏移量正确对齐表尾节点
        // T = O(1)
        tail = zipEntry(p + reqlen);
        if (p[reqlen + tail.headersize + tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                    intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)) + nextdiff);
        }
    } else {
        /* This element will be the new tail. */
        // 新元素是新的表尾节点
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p - zl);
    }

    /* When nextdiff != 0, the raw length of the next entry has changed, so
     * we need to cascade the update throughout the ziplist */
    // 当 nextdiff != 0 时,新节点的后继节点的(header 部分)长度已经被改变,
    // 所以需要级联地更新后续的节点
    if (nextdiff != 0) {
        offset = p - zl;
        // T  = O(N^2)
        zl = __ziplistCascadeUpdate(zl, p + reqlen);
        p = zl + offset;
    }

    /* Write the entry */
    // 一切搞定,将前置节点的长度写入新节点的 header
    p += zipPrevEncodeLength(p, prevlen);
    // 将节点值的长度写入新节点的 header
    p += zipEncodeLength(p, encoding, slen);
    // 写入节点值
    if (ZIP_IS_STR(encoding)) {
        // T = O(N)
        memcpy(p, s, slen);
    } else {
        // T = O(1)
        zipSaveInteger(p, value, encoding);
    }

    // 更新列表的节点数量计数器
    // T = O(1)
    ZIPLIST_INCR_LENGTH(zl, 1);

    return zl;
}

删除

/* Delete "num" entries, starting at "p". Returns pointer to the ziplist.
 *
 * 从位置 p 开始,连续删除 num 个节点。
 *
 * 函数的返回值为处理删除操作之后的 ziplist 。
 *
 * T = O(N^2)
 */
static unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num) {
    unsigned int i, totlen, deleted = 0;
    size_t offset;
    int nextdiff = 0;
    zlentry first, tail;

    // 计算被删除节点总共占用的内存字节数
    // 以及被删除节点的总个数
    // T = O(N)
    first = zipEntry(p);
    for (i = 0; p[0] != ZIP_END && i < num; i++) {
        p += zipRawEntryLength(p);
        deleted++;
    }

    // totlen 是所有被删除节点总共占用的内存字节数
    totlen = p-first.p;
    if (totlen > 0) {
        if (p[0] != ZIP_END) {

            // 执行这里,表示被删除节点之后仍然有节点存在

            /* Storing `prevrawlen` in this entry may increase or decrease the
             * number of bytes required compare to the current `prevrawlen`.
             * There always is room to store this, because it was previously
             * stored by an entry that is now being deleted. */
            // 因为位于被删除范围之后的第一个节点的 header 部分的大小
            // 可能容纳不了新的前置节点,所以需要计算新旧前置节点之间的字节数差
            // T = O(1)
            nextdiff = zipPrevLenByteDiff(p, first.prevrawlen);
            // 如果有需要的话,将指针 p 后退 nextdiff 字节,为新 header 空出空间
            p -= nextdiff;
            // 将 first 的前置节点的长度编码至 p 中
            // T = O(1)
            zipPrevEncodeLength(p, first.prevrawlen);

            /* Update offset for tail */
            // 更新到达表尾的偏移量
            // T = O(1)
            ZIPLIST_TAIL_OFFSET(zl) =
                    intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)) - totlen);

            /* When the tail contains more than one entry, we need to take
             * "nextdiff" in account as well. Otherwise, a change in the
             * size of prevlen doesn't have an effect on the *tail* offset. */
            // 如果被删除节点之后,有多于一个节点
            // 那么程序需要将 nextdiff 记录的字节数也计算到表尾偏移量中
            // 这样才能让表尾偏移量正确对齐表尾节点
            // T = O(1)
            tail = zipEntry(p);
            if (p[tail.headersize+tail.len] != ZIP_END) {
                ZIPLIST_TAIL_OFFSET(zl) =
                        intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)) + nextdiff);
            }

            /* Move tail to the front of the ziplist */
            // 从表尾向表头移动数据,覆盖被删除节点的数据
            // T = O(N)
            memmove(first.p, p,
                    intrev32ifbe(ZIPLIST_BYTES(zl)) - (p - zl) - 1);
        } else {

            // 执行这里,表示被删除节点之后已经没有其他节点了

            /* The entire tail was deleted. No need to move memory. */
            // T = O(1)
            ZIPLIST_TAIL_OFFSET(zl) =
                    intrev32ifbe((first.p-zl) - first.prevrawlen);
        }

        /* Resize and update length */
        // 缩小并更新 ziplist 的长度
        offset = first.p - zl;
        zl = ziplistResize(zl, intrev32ifbe(ZIPLIST_BYTES(zl)) - totlen+nextdiff);
        ZIPLIST_INCR_LENGTH(zl, -deleted);
        p = zl + offset;

        /* When nextdiff != 0, the raw length of the next entry has changed, so
         * we need to cascade the update throughout the ziplist */
        // 如果 p 所指向的节点的大小已经变更,那么进行级联更新
        // 检查 p 之后的所有节点是否符合 ziplist 的编码要求
        // T = O(N^2)
        if (nextdiff != 0) {
            zl = __ziplistCascadeUpdate(zl, p);
        }
    }

    return zl;
}

连锁更新

/* When an entry is inserted, we need to set the prevlen field of the next
 * entry to equal the length of the inserted entry. It can occur that this
 * length cannot be encoded in 1 byte and the next entry needs to be grow
 * a bit larger to hold the 5-byte encoded prevlen. This can be done for free,
 * because this only happens when an entry is already being inserted (which
 * causes a realloc and memmove). However, encoding the prevlen may require
 * that this entry is grown as well. This effect may cascade throughout
 * the ziplist when there are consecutive entries with a size close to
 * ZIP_BIGLEN, so we need to check that the prevlen can be encoded in every
 * consecutive entry.
 *
 * 当将一个新节点添加到某个节点之前的时候,
 * 如果原节点的 header 空间不足以保存新节点的长度,
 * 那么就需要对原节点的 header 空间进行扩展(从 1 字节扩展到 5 字节)。
 *
 * 但是,当对原节点进行扩展之后,原节点的下一个节点的 prevlen 可能出现空间不足,
 * 这种情况在多个连续节点的长度都接近 ZIP_BIGLEN 时可能发生。
 *
 * 这个函数就用于检查并修复后续节点的空间问题。
 *
 * Note that this effect can also happen in reverse, where the bytes required
 * to encode the prevlen field can shrink. This effect is deliberately ignored,
 * because it can cause a "flapping" effect where a chain prevlen fields is
 * first grown and then shrunk again after consecutive inserts. Rather, the
 * field is allowed to stay larger than necessary, because a large prevlen
 * field implies the ziplist is holding large entries anyway.
 *
 * 反过来说,
 * 因为节点的长度变小而引起的连续缩小也是可能出现的,
 * 不过,为了避免扩展-缩小-扩展-缩小这样的情况反复出现(flapping,抖动),
 * 我们不处理这种情况,而是任由 prevlen 比所需的长度更长。

 * The pointer "p" points to the first entry that does NOT need to be
 * updated, i.e. consecutive fields MAY need an update.
 *
 * 注意,程序的检查是针对 p 的后续节点,而不是 p 所指向的节点。
 * 因为节点 p 在传入之前已经完成了所需的空间扩展工作。
 *
 * T = O(N^2)
 */
static unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), rawlen, rawlensize;
    size_t offset, noffset, extra;
    unsigned char *np;
    zlentry cur, next;

    // T = O(N^2)
    while (p[0] != ZIP_END) {

        // 将 p 所指向的节点的信息保存到 cur 结构中
        cur = zipEntry(p);
        // 当前节点的长度
        rawlen = cur.headersize + cur.len;
        // 计算编码当前节点的长度所需的字节数
        // T = O(1)
        rawlensize = zipPrevEncodeLength(NULL, rawlen);

        /* Abort if there is no next entry. */
        // 如果已经没有后续空间需要更新了,跳出
        if (p[rawlen] == ZIP_END) {
            break;
        }

        // 取出后续节点的信息,保存到 next 结构中
        // T = O(1)
        next = zipEntry(p + rawlen);

        /* Abort when "prevlen" has not changed. */
        // 后续节点编码当前节点的空间已经足够,无须再进行任何处理,跳出
        // 可以证明,只要遇到一个空间足够的节点,
        // 那么这个节点之后的所有节点的空间都是足够的
        if (next.prevrawlen == rawlen) {
            break;
        }

        if (next.prevrawlensize < rawlensize) {

            /* The "prevlen" field of "next" needs more bytes to hold
             * the raw length of "cur". */
            // 执行到这里,表示 next 空间的大小不足以编码 cur 的长度
            // 所以程序需要对 next 节点的(header 部分)空间进行扩展

            // 记录 p 的偏移量
            offset = p - zl;
            // 计算需要增加的节点数量
            extra = rawlensize - next.prevrawlensize;
            // 扩展 zl 的大小
            // T = O(N)
            zl = ziplistResize(zl, curlen + extra);
            // 还原指针 p
            p = zl + offset;

            /* Current pointer and offset for next element. */
            // 记录下一节点的偏移量
            np = p + rawlen;
            noffset = np - zl;

            /* Update tail offset when next element is not the tail element. */
            // 当 next 节点不是表尾节点时,更新列表到表尾节点的偏移量
            //
            // 不用更新的情况(next 为表尾节点):
            //
            // |     | next |      ==>    |     | new next          |
            //       ^                          ^
            //       |                          |
            //     tail                        tail
            //
            // 需要更新的情况(next 不是表尾节点):
            //
            // | next |     |   ==>     | new next          |     |
            //        ^                        ^
            //        |                        |
            //    old tail                 old tail
            //
            // 更新之后:
            //
            // | new next          |     |
            //                     ^
            //                     |
            //                  new tail
            // T = O(1)
            if ((zl + intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) {
                ZIPLIST_TAIL_OFFSET(zl) =
                        intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)) + extra);
            }

            /* Move the tail to the back. */
            // 向后移动 cur 节点之后的数据,为 cur 的新 header 腾出空间
            //
            // 示例:
            //
            // | header | value |  ==>  | header |    | value |  ==>  | header      | value |
            //                                   |<-->|
            //                            为新 header 腾出的空间
            // T = O(N)
            memmove(np+rawlensize,
                    np+next.prevrawlensize,
                    curlen - noffset - next.prevrawlensize - 1);
            // 将新的前一节点长度值编码进新的 next 节点的 header
            // T = O(1)
            zipPrevEncodeLength(np, rawlen);

            /* Advance the cursor */
            // 移动指针,继续处理下个节点
            p += rawlen;
            curlen += extra;
        } else {
            if (next.prevrawlensize > rawlensize) {
                /* This would result in shrinking, which we want to avoid.
                 * So, set "rawlen" in the available bytes. */
                // 执行到这里,说明 next 节点编码前置节点的 header 空间有 5 字节
                // 而编码 rawlen 只需要 1 字节
                // 但是程序不会对 next 进行缩小,
                // 所以这里只将 rawlen 写入 5 字节的 header 中就算了。
                // T = O(1)
                zipPrevEncodeLengthForceLarge(p + rawlen, rawlen);
            } else {
                // 运行到这里,
                // 说明 cur 节点的长度正好可以编码到 next 节点的 header 中
                // T = O(1)
                zipPrevEncodeLength(p + rawlen, rawlen);
            }

            /* Stop here, as the raw length of "next" has not changed. */
            break;
        }
    }

    return zl;
}
坚持原创技术分享,您的支持将鼓励我继续创作!