
Author Avatar
Magicmanoooo 3月 09, 2019
  • 在其它设备中阅读本文章



先上源码,首先是构造函数,这四个构造函数分别使用空字符串C-style字符串(以NULL结尾)以及C++ string来进行构造:

class LEVELDB_EXPORT Slice {
  // Create an empty slice.
  // 创建空的字符串,使用方法为:Slice str;
  Slice() : data_(""), size_(0) { }

  // Create a slice that refers to d[0,n-1].
  // 传入一个字符串指针以及该字符串的长度
  Slice(const char* d, size_t n) : data_(d), size_(n) { }

  // Create a slice that refers to the contents of "s"
  // 传入一个C++ string进行初始化
  Slice(const std::string& s) : data_(s.data()), size_(s.size()) { }

  // Create a slice that refers to s[0,strlen(s)-1]
  // 传入一个字符串指针(长度通过strlen获取)进行初始化
  Slice(const char* s) : data_(s), size_(strlen(s)) { }


// Return a pointer to the beginning of the referenced data
// 获取字符串的值,返回指向该字符串起始位置的指针
  const char* data() const { return data_; }

  // Return the length (in bytes) of the referenced data
  // 获取字符串的长度
  size_t size() const { return size_; }

  // Return true iff the length of the referenced data is zero
  bool empty() const { return size_ == 0; }

  // Return the ith byte in the referenced data.
  // REQUIRES: n < size()
  // 重载operator [],通过使用str[i]来获取第i个字符
  char operator[](size_t n) const {
    assert(n < size());
    return data_[n];

  // Change this slice to refer to an empty array
  // 清除整个Slice字符串
  void clear() { data_ = ""; size_ = 0; }

  // Drop the first "n" bytes from this slice.
  // 删除Slice字符串的前n个字符(需要将data_指针前移n位,字符串长度减少n位)
  void remove_prefix(size_t n) {
    assert(n <= size());
    data_ += n;
    size_ -= n;

  // Return a string that contains the copy of the referenced data.
  // 返回C++ string形式的Slice(利用std::string来进行构造)
  std::string ToString() const { return std::string(data_, size_); }

  // Three-way comparison.  Returns value:
  //   <  0 iff "*this" <  "b",
  //   == 0 iff "*this" == "b",
  //   >  0 iff "*this" >  "b"
  int compare(const Slice& b) const;

  // Return true iff "x" is a prefix of "*this"
  // 判断Slice字符串是否是以字符串x起始
  bool starts_with(const Slice& x) const {
    return ((size_ >= x.size_) &&
            (memcmp(data_, x.data_, x.size_) == 0));


  const char* data_;
  size_t size_;


// 重载operator ==,用于判断两个Slice字符串是否相等
inline bool operator==(const Slice& x, const Slice& y) {
  return ((x.size() == y.size()) &&
          (memcmp(x.data(), y.data(), x.size()) == 0));

inline bool operator!=(const Slice& x, const Slice& y) {
  return !(x == y);

// 比较两个Slice字符串
inline int Slice::compare(const Slice& b) const {
  const size_t min_len = (size_ < b.size_) ? size_ : b.size_;
  int r = memcmp(data_, b.data_, min_len);
  if (r == 0) {
    if (size_ < b.size_) r = -1;
    else if (size_ > b.size_) r = +1;
  return r;




enum Code {
    kOk = 0,
    kNotFound = 1,
    kCorruption = 2,
    kNotSupported = 3,
    kInvalidArgument = 4,
    kIOError = 5


const char* state_;

如果为Ok状态,则其state_成员变量的值就是null。对于其他情况而言,state_就是一个new[]的数组,其形式如下 :

  • state_[0...3]:表示消息的长度(不包含Status状态头部信息)
  • state_[4]:表示消息的类型
  • state_[5...]:具体的消息内容


class LEVELDB_EXPORT Status {
  // Create a success status.
  // default ctor,默认创建一个success status
  Status() noexcept : state_(nullptr) { }
  ~Status() { delete[] state_; }

  Status(const Status& rhs);
  Status& operator=(const Status& rhs);

  Status(Status&& rhs) noexcept : state_(rhs.state_) { rhs.state_ = nullptr; }
  Status& operator=(Status&& rhs) noexcept;


  // Return a success status.
  static Status OK() { return Status(); }

  // Return error status of an appropriate type.
  // 返回错误类型
  static Status NotFound(const Slice& msg, const Slice& msg2 = Slice()) {
    return Status(kNotFound, msg, msg2);
  static Status Corruption(const Slice& msg, const Slice& msg2 = Slice()) {
    return Status(kCorruption, msg, msg2);
  static Status NotSupported(const Slice& msg, const Slice& msg2 = Slice()) {
    return Status(kNotSupported, msg, msg2);
  static Status InvalidArgument(const Slice& msg, const Slice& msg2 = Slice()) {
    return Status(kInvalidArgument, msg, msg2);
  static Status IOError(const Slice& msg, const Slice&  = Slice()) {
    return Status(kIOError, msg, msg2);


Status(Code code, const Slice& msg, const Slice& msg2);


Status::Status(Code code, const Slice& msg, const Slice& msg2) {
  assert(code != kOk);
  const uint32_t len1 = msg.size();
  const uint32_t len2 = msg2.size();

  //判断第二个字符串长度是否为0,如果不为0,则信息总长度为len1+2+len2,这里的2是用于存储 ':' 和 ' '。
  const uint32_t size = len1 + (len2 ? (2 + len2) : 0);            
  char* result = new char[size + 5];        //计算state_总长度时,需要将status头部信息那5个字节包含进来
  memcpy(result, &size, sizeof(size));      //将信息长度存入result前四个字节(sizeof(uint32_t) ===>4)
  result[4] = static_cast<char>(code);       //第5个字节存状态
  memcpy(result + 5, msg.data(), len1);  //从第6个字节开始,存储第一个消息的具体内容
  if (len2) {                                                //如果msg2不为空,则需要在最终的信息内容中加上':'+' '+msg2
    result[5 + len1] = ':';                                //第6个字节用于存储':'
    result[6 + len1] = ' ';                                //第7个字节用于存储' '
    memcpy(result + 7 + len1, msg2.data(), len2);    //从第(8+len1)个字节开始,用于存储msg2
  state_ = result;

接下来是copy ctor和assignment operator:

inline Status::Status(const Status& s) {
  state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);

inline void Status::operator=(const Status& s) {
  if (state_ != s.state_) {
    delete[] state_;
    state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);



const char* Status::CopyState(const char* state) {
  uint32_t size;
  memcpy(&size, state, sizeof(size));    //获取message的长度
  char* result = new char[size + 5];      //重新分配空间
  memcpy(result, state, size + 5);          //将message拷贝到新的空间
  return result;                                      //返回新空间的地址




  // Allocation state
  char* alloc_ptr_;                                  //表示内存的offset指针,即指向未使用内存的首地址
  size_t alloc_bytes_remaining_;          //剩余的内存大小(即还能分配的内存)

  // Array of new[] allocated memory blocks
  std::vector<char*> blocks_;              //用于存放每一次所分配的内存指针

  // Total memory usage of the arena.
  port::AtomicPointer memory_usage_; //已经分配的总内存的大小


//default ctor将进行初始化:已分配内存大小为0,offset指针为NULL,剩余内存大小为0
Arena::Arena() : memory_usage_(0) {
  alloc_ptr_ = nullptr;                              // First allocation will allocate a block
  alloc_bytes_remaining_ = 0;

Arena::~Arena() {
  for (size_t i = 0; i < blocks_.size(); i++) {
    delete[] blocks_[i];


  // Return a pointer to a newly allocated memory block of "bytes" bytes.
  char* Allocate(size_t bytes);

  // Allocate memory with the normal alignment guarantees provided by malloc
  char* AllocateAligned(size_t bytes);

  // Returns an estimate of the total memory usage of data allocated
  // by the arena.
  size_t MemoryUsage() const {
    return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());

  char* AllocateFallback(size_t bytes);
  char* AllocateNewBlock(size_t block_bytes);


  • 如果所申请的内存小于剩余的内存容量(alloc_bytes_remaining_),则直接在剩余内存中划出一块即可
  • 如果所申请的内存大于剩余的内存容量,并且其大于4096/4=1024kb时,则需要单独给它分配一块区域,大小为bytes,从而避免过多的内存浪费(例如,现在剩余的内存容量为1200kb,如果有个内存需求为300kb,第二个内存需求为1200kb。则第一个需求可以使用4次才进行一次重新的内存分配,而第二个需求只能使用一次就需要重新进行一个内存分配)
  • 如果所申请的内存大于剩余的内存容量,但其小于4096/4=1024kb时,则需重新分配一个内存块,默认大小为4096


inline char* Arena::Allocate(size_t bytes) {
  // The semantics of what to return are a bit messy if we allow
  // 0-byte allocations, so we disallow them here (we don't need
  // them for our internal use).
  assert(bytes > 0);

  if (bytes <= alloc_bytes_remaining_) {
    char* result = alloc_ptr_;                    //保存offset指针,用于返回
    alloc_ptr_ += bytes;                         //将offset指针前移bytes个字节
    alloc_bytes_remaining_ -= bytes;    //将剩余的内存容量减少bytes个字节
    return result;
  return AllocateFallback(bytes);

char* Arena::AllocateFallback(size_t bytes) {
  if (bytes > kBlokbSize / 4) {
    // Object is more than a quarter of our block size.  Allocate it separately
    // to avoid wasting too much space in leftover bytes.
    char* result = AllocateNewBlock(bytes);
    return result;

  // We waste the remaining space in the current block.
  alloc_ptr_ = AllocateNewBlock(kBlockSize);        //重新开辟一个大小为4096kb的内存
  alloc_bytes_remaining_ = kBlockSize;                   //修改最新剩余内存容量的大小

  char* result = alloc_ptr_;
  alloc_ptr_ += bytes;
  alloc_bytes_remaining_ -= bytes;
  return result;

char* Arena::AllocateNewBlock(size_t block_bytes) {
  char* result = new char[block_bytes];        //重新开辟一个内存空间
  blocks_.push_back(result);                     //将新分配的内存空间的首地址指针加入已分配内存的指针数组指针
  memory_usage_.NoBarrier_Store(         //总的内村加上刚分配的内存
      reinterpret_cast<void*>(MemoryUsage() + block_bytes + sizeof(char*)));
  return result;


char* Arena::AllocateAligned(size_t bytes) {
  const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;
  assert((align & (align-1)) == 0);   // Pointer size should be a power of 2
  size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);
  size_t slop = (current_mod == 0 ? 0 : align - current_mod);
// 最终分配的字节数=申请内存大小+为了对齐添加的补足字节数
  size_t needed = bytes + slop;
  char* result;
  if (needed <= alloc_bytes_remaining_) {
    result = alloc_ptr_ + slop;
    alloc_ptr_ += needed;
    alloc_bytes_remaining_ -= needed;
  } else {
    // AllocateFallback always returned aligned memory
    result = AllocateFallback(bytes);
  assert((reinterpret_cast<uintptr_t>(result) & (align-1)) == 0);
  return result;


  // Returns an estimate of the total memory usage of data allocated
  // by the arena.
  size_t MemoryUsage() const {
    return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());


data block



  1. 首先需要将数据写入log文件(log文件主要用处:防止在断电时,内存中的数据会丢失,数据可以从log文件中恢复),接着指定一块内存用于写数据(这块内存称之为MemTable),当占用的内存到达阈值之后(options属性中的write_buffer_size的大小,默认为4<<20),就将这块内存转换为只读的(read-only,这块只读内存称为Immutable MemTable),并且log文件也会生成一个新的log文件
  2. 与此同时,开辟一块新的内存(MemTable)来进行继续写数据
  3. 然后异步地将Immutable MemTable的数据添加到磁盘之中(即持久化到磁盘中)






void EncodeFixed32(char* buf, uint32_t value) {
  if (port::kLittleEndian) {
    memcpy(buf, &value, sizeof(value));
  } else {
    buf[0] = value & 0xff;             //得到第一个字节(低8位)
    buf[1] = (value >> 8) & 0xff;
    buf[2] = (value >> 16) & 0xff;
    buf[3] = (value >> 24) & 0xff;

inline uint32_t DecodeFixed32(const char* ptr) {
  if (port::kLittleEndian) {
    // Load the raw bytes
    uint32_t result;
    memcpy(&result, ptr, sizeof(result));  // gcc optimizes this to a plain load
    return result;
  } else {
    return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0])))
        | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8)
        | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16)
        | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24));

inline uint64_t DecodeFixed64(const char* ptr) {
  if (port::kLittleEndian) {
    // Load the raw bytes
    uint64_t result;
    memcpy(&result, ptr, sizeof(result));  // gcc optimizes this to a plain load
    return result;
  } else {
    uint64_t lo = DecodeFixed32(ptr);
    uint64_t hi = DecodeFixed32(ptr + 4);
    return (hi << 32) | lo;



char* EncodeVarint32(char* dst, uint32_t v) {
  // Operate on characters as unsigneds
  unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
//128 ===> 1000 0000  
  static const int B = 128;
if (v < (1<<7)) {
    *(ptr++) = v;
  } else if (v < (1<<14)) {
    *(ptr++) = v | B;
    *(ptr++) = v>>7;
  } else if (v < (1<<21)) {
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = v>>14;
  } else if (v < (1<<28)) {
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = (v>>14) | B;
    *(ptr++) = v>>21;
  } else {
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = (v>>14) | B;
    *(ptr++) = (v>>21) | B;
    *(ptr++) = v>>28;
  return reinterpret_cast<char*>(ptr);

const char* GetVarint32PtrFallback(const char* p,
                                   const char* limit,
                                   uint32_t* value) {
  uint32_t result = 0;
  for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
    uint32_t byte = *(reinterpret_cast<const unsigned char*>(p));
    if (byte & 128) {
      // More bytes are present
      result |= ((byte & 127) << shift);
    } else {
      result |= (byte << shift);
      *value = result;
      return reinterpret_cast<const char*>(p);
  return nullptr;




1. InternalKey,其格式为:

| user key | sequence number | type |
InternalKey_size = key_size + 8



其实,InternalKey就是对plain strings的一个简单封装,即在使用时应该将key保存在InternalKey之中,而不是普通的的string中。这样做的目的是为了防止用户错误地使用字符串比较函数,而是应该使用提供的InternalKeyComparator


class InternalKey {
  std::string rep_;
  InternalKey() { }   // Leave rep_ as empty to indicate it is invalid
  InternalKey(const Slice& user_key, SequenceNumber s, ValueType t) {
    AppendInternalKey(&rep_, ParsedInternalKey(user_key, s, t));

  void DecodeFrom(const Slice& s) { rep_.assign(s.data(), s.size()); }
  Slice Encode() const {
    return rep_;

  Slice user_key() const { return ExtractUserKey(rep_); }

  void SetFrom(const ParsedInternalKey& p) {
    AppendInternalKey(&rep_, p);

  void Clear() { rep_.clear(); }


inline bool ParseInternalKey(const Slice& internal_key,
                             ParsedInternalKey* result) {
  const size_t n = internal_key.size();
  if (n < 8) return false;
  uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
  unsigned char c = num & 0xff;
  result->sequence = num >> 8;
  result->type = static_cast<ValueType>(c);
  result->user_key = Slice(internal_key.data(), n - 8);
  return (c <= static_cast<unsigned char>(kTypeValue));

2. ParsedInternalKey的格式为:

| user_key | sequence number | type |


struct ParsedInternalKey {
  Slice user_key;
  SequenceNumber sequence;
  ValueType type;

  ParsedInternalKey() { }  // Intentionally left uninitialized (for speed)
  ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t)
      : user_key(u), sequence(seq), type(t) { }

3. skiplist内部存储的key的格式为:

VatInt(InternalKey_size)len | InternalKey | VarInt(value) len | value |


int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
  // Order by:
  //    increasing user key (according to user-supplied comparator)
  //    decreasing sequence number
  //    decreasing type (though sequence# should be enough to disambiguate)
  int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
  if (r == 0) {
    const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
    const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
    if (anum > bnum) {
      r = -1;
    } else if (anum < bnum) {
      r = +1;
  return r;

4. 传入memtable的是LookupKey,其格式为:

| InternalKey_size | InternalKey |


// A helper class useful for DBImpl::Get()
class LookupKey {
  // Initialize *this for looking up user_key at a snapshot with
  // the specified sequence number.
  LookupKey(const Slice& user_key, SequenceNumber sequence);


  // Return a key suitable for lookup in a MemTable.
  Slice memtable_key() const { return Slice(start_, end_ - start_); }

  // Return an internal key (suitable for passing to an internal iterator)
  Slice internal_key() const { return Slice(kstart_, end_ - kstart_); }

  // Return the user key
  Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); }

  // We construct a char array of the form:
  //    klength   varint32                   <-- start_
  //    userkey  char[klength]           <-- kstart_
  //    tag          uint64
  //                                                   <-- end_
  // The array is a suitable MemTable key.
  // The suffix starting with "userkey" can be used as an InternalKey.
  const char* start_;
  const char* kstart_;
  const char* end_;
  char space_[200];      // Avoid allocation for short keys

  // No copying allowed
  LookupKey(const LookupKey&);
  void operator=(const LookupKey&);

inline LookupKey::~LookupKey() {
  if (start_ != space_) delete[] start_;


  1. 最短的是InternalKey,由uer_key+sequence+type组成
  2. LookupKey,由InternalKey的长度+InternalKey组成
  3. skiplist中存储的键为:LookupKey+value的长度+value




  1. Get:用于获取某个键值(key)
  2. Add:用于添加某个键值(key)
  3. NewIterator:用于获取memtable的迭代器


  typedef SkipList<const char*, KeyComparator> Table; //实例化跳跃表
  KeyComparator comparator_;    //键值(字符)比较器
  int refs_;                                      //memtable的引用计数
  Arena arena_;                                //内存池
  Table table_;                                //底层跳跃表


MemTable::MemTable(const InternalKeyComparator& cmp)
    : comparator_(cmp),
      table_(comparator_, &arena_) {


  1. 首先需要将keySequenceNumberValueType打包成InternalKey。(【注】:每次进行更新(Put/Delete)时,都会产生不同的序列号SequenceNumberValueType用于区分entry是真实的key-value数据还是更新操作。如果是delete操作,leveldb需要先进行记录,然后在后台的compact县城会完成真正的删除工作;进行存储时,SequenceNumber7个字节,ValueType1个字节,两者用户共占用8个字节)。
  2. 打包除了有keyInternalKey)和value,还加入了keyvalue各自的长度信息,而两者均是自定义的Variant(变长整型)数据,需要将它们进行int——>Variant的类型转换。
void MemTable::Add(SequenceNumber s, ValueType type,
                   const Slice& key,
                   const Slice& value) {
  // Format of an entry is concatenation of:
  //  key_size     : varint32 of internal_key.size()
  //  key bytes    : char[internal_key.size()]
  //  value_size   : varint32 of value.size()
  //  value bytes  : char[value.size()]
  size_t key_size = key.size();                         //key(键值)长度
  size_t val_size = value.size();                       //值的长度
  size_t internal_key_size = key_size + 8;       //InternalKey的长度
  const size_t encoded_len =                           //skiplist中存储结点的总长度
      VarintLength(internal_key_size) + internal_key_size +
      VarintLength(val_size) + val_size;
  char* buf = arena_.Allocate(encoded_len);      //为key分配内存
  char* p = EncodeVarint32(buf, internal_key_size);  //将key的长度存入buf中
  memcpy(p, key.data(), key_size);                      //将key的具体内容存入buf之中
  p += key_size;                         //将指针后移key_size个字节,使得指针指向sequence+type
  EncodeFixed64(p, (s << 8) | type);     //获取sequence+type
  p += 8;                                                 
  p = EncodeVarint32(p, val_size);       //将value的长度进行编码
  memcpy(p, value.data(), val_size);    //将编码后的value长度压入内存
  assert(p + val_size == buf + encoded_len);


int VarintLength(uint64_t v) {
  int len = 1;
  while (v >= 128) {
    v >>= 7;
  return len;


bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
  Slice memkey = key.memtable_key();
  Table::Iterator iter(&table_);
  if (iter.Valid()) {
    // entry format is:
    //    klength    varint32
    //    userkey   char[klength]
    //    tag          uint64
    //    vlength    varint32
    //    value       char[vlength]
    // Check that it belongs to same user key.  We do not check the
    // sequence number since the Seek() call above should have skipped
    // all entries with overly large sequence numbers.
    const char* entry = iter.key();
    uint32_t key_length;
    const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
    if (comparator_.comparator.user_comparator()->Compare(
            Slice(key_ptr, key_length - 8),
            key.user_key()) == 0) {
      // Correct user key
//如果ValueType==kTypeValue ====> 是真实的key-value数据
//如果ValueType==kTypeDeletion ====> 是要删除的类型,并不代表数据真实存在
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
      switch (static_cast<ValueType>(tag & 0xff)) {
        case kTypeValue: {
          Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
          value->assign(v.data(), v.size());
          return true;
        case kTypeDeletion:
          *s = Status::NotFound(Slice());
          return true;
  return false;


class MemTableIterator : public Iterator {
  explicit MemTableIterator(MemTable::Table* table) : iter_(table) { }

  virtual bool Valid() const { return iter_.Valid(); }
  virtual void Seek(const Slice& k) { iter_.Seek(EncodeKey(&tmp_, k)); }
  virtual void SeekToFirst() { iter_.SeekToFirst(); }
  virtual void SeekToLast() { iter_.SeekToLast(); }
  virtual void Next() { iter_.Next(); }
  virtual void Prev() { iter_.Prev(); }
  virtual Slice key() const { return GetLengthPrefixedSlice(iter_.key()); }
  virtual Slice value() const {
    Slice key_slice = GetLengthPrefixedSlice(iter_.key());
    return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());

  virtual Status status() const { return Status::OK(); }

  MemTable::Table::Iterator iter_;
  std::string tmp_;       // For passing to EncodeKey

  // No copying allowed
  MemTableIterator(const MemTableIterator&);
  void operator=(const MemTableIterator&);


Iterator* MemTable::NewIterator() {
  return new MemTableIterator(&table_);






  1. CheckSum,即CRC验证码,占4个字节
  2. 记录长度,即数据部分的长度,2个字节
  3. 类型,这条记录的类型,占1个字节
  4. 数据,即这条记录的数据


  1. FULL,表示这是一条完整的记录
  2. FIRST,表示这是一条记录的第一部分
  3. MIDDLE,表示这是一条记录的中间部分
  4. LAST,表示这是一条记录的最后一部分



  WritableFile* dest_;    //log日志文件的封装类
  int block_offset_;       //块内偏移量,用于指定写地址(Current offset in block)


  static void InitTypeCrc(uint32_t* type_crc) {
  for (int i = 0; i <= kMaxRecordType; i++) {
    char t = static_cast<char>(i);
    type_crc[i] = crc32c::Value(&t, 1);

// 在Writer的使用过程中,*dest需要一直remain live
Writer::Writer(WritableFile* dest)
    : dest_(dest),
      block_offset_(0) {

Writer::Writer(WritableFile* dest, uint64_t dest_length)
    : dest_(dest), block_offset_(dest_length % kBlockSize) {


enum RecordType {
  // Zero is reserved for preallocated files
  kZeroType = 0,

  kFullType = 1,

  // For fragments
  kFirstType = 2,
  kMiddleType = 3,
  kLastType = 4

Status Writer::AddRecord(const Slice& slice) {
  const char* ptr = slice.data();      //需要添加的记录数据
  size_t left = slice.size();              //记录数据的长度

  // Fragment the record if necessary and emit it.  Note that if slice
  // is empty, we still want to iterate once to emit a single
  // zero-length record
  Status s;
  bool begin = true;
  do {
//kBlockSize ===> 32768
    const int leftover = kBlockSize - block_offset_;     
    assert(leftover >= 0);
    if (leftover < kHeaderSize) {             //如果剩余容量小于记录头长度(kHeaderSize=7kb)
      // Switch to a new block
      if (leftover > 0) {
        // Fill the trailer (literal below relies on kHeaderSize being 7)
        assert(kHeaderSize == 7);
        dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
      block_offset_ = 0;

    // Invariant: we never leave < kHeaderSize bytes in a block.
    assert(kBlockSize - block_offset_ - kHeaderSize >= 0);

    const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
    const size_t fragment_length = (left < avail) ? left : avail;

    RecordType type;
    const bool end = (left == fragment_length); 
    if (begin && end) {      //如果拥有开头和结尾标志,说明是一个完整块
      type = kFullType;           
    } else if (begin) {           //如果只有开头标志,说明它是一个起始块
      type = kFirstType;
    } else if (end) {           //如果只有结尾标志,说明它是记录的最后一块
      type = kLastType;
    } else {
      type = kMiddleType;

    s = EmitPhysicalRecord(type, ptr, fragment_length);
    ptr += fragment_length;  //指针向后移动fragment_length个字节
    left -= fragment_length;  //减少记录的剩余长度
    begin = false;
  } while (s.ok() && left > 0);
  return s;

Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
  assert(n <= 0xffff);  // Must fit in two bytes
  assert(block_offset_ + kHeaderSize + n <= kBlockSize);

  // Format the header
//封装记录头Header:checksum + length + flag
  char buf[kHeaderSize];
  buf[4] = static_cast<char>(n & 0xff);
  buf[5] = static_cast<char>(n >> 8);
  buf[6] = static_cast<char>(t);

  // Compute the crc of the record type and the payload.
  uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);  //用crc填充buf前四个字节
  crc = crc32c::Mask(crc);                 // Adjust for storage
  EncodeFixed32(buf, crc);

  // Write the header and the payload
  Status s = dest_->Append(Slice(buf, kHeaderSize));
  if (s.ok()) {
    s = dest_->Append(Slice(ptr, n));    //将记录内容写入缓存
    if (s.ok()) {
      s = dest_->Flush();                        //将缓存的数据刷进内核
  block_offset_ += kHeaderSize + n;  //更新块内偏移量
  return s;



  SequentialFile* const file_;      //读取文件的封装类
  Reporter* const reporter_;       //报告错误类
  bool const checksum_;            //是否进行CRC验证
  char* const backing_store_;    
  Slice buffer_;                                                     
  bool eof_;   // Last Read() indicated EOF by returning < kBlockSize

  // Offset of the last record returned by ReadRecord.
  uint64_t last_record_offset_;
  // Offset of the first location past the end of buffer_.
  uint64_t end_of_buffer_offset_;

  // Offset at which to start looking for the first record to return
  uint64_t const initial_offset_;

  // True if we are resynchronizing after a seek (initial_offset_ > 0). In
  // particular, a run of kMiddleType and kLastType records can be silently
  // skipped in this mode
  bool resyncing_;


bool Reader::SkipToInitialBlock() {
  const size_t offset_in_block = initial_offset_ % kBlockSize;
  uint64_t block_start_location = initial_offset_ - offset_in_block;

  // Don't search a block if we'd be in the trailer
  if (offset_in_block > kBlockSize - 6) {
    block_start_location += kBlockSize;

  end_of_buffer_offset_ = block_start_location;

  // Skip to start of first block that can contain the initial record
  if (block_start_location > 0) {
    Status skip_status = file_->Skip(block_start_location);
    if (!skip_status.ok()) {
      ReportDrop(block_start_location, skip_status);
      return false;

  return true;


bool Reader::ReadRecord(Slice* record, std::string* scratch) {
  if (last_record_offset_ < initial_offset_) {
    if (!SkipToInitialBlock()) {
      return false;

  bool in_fragmented_record = false;
  // Record offset of the logical record that we're reading
  // 0 is a dummy value to make compilers happy
  uint64_t prospective_record_offset = 0;

  Slice fragment;
  while (true) {
    const unsigned int record_type = ReadPhysicalRecord(&fragment);

    // ReadPhysicalRecord may have only had an empty trailer remaining in its
    // internal buffer. Calculate the offset of the next physical record now
    // that it has returned, properly accounting for its header size.
    uint64_t physical_record_offset =
        end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();

    if (resyncing_) {
      if (record_type == kMiddleType) {
      } else if (record_type == kLastType) {
        resyncing_ = false;
      } else {
        resyncing_ = false;

    switch (record_type) {
      case kFullType:
        if (in_fragmented_record) {
          // Handle bug in earlier versions of log::Writer where
          // it could emit an empty kFirstType record at the tail end
          // of a block followed by a kFullType or kFirstType record
          // at the beginning of the next block.
          if (!scratch->empty()) {
            ReportCorruption(scratch->size(), "partial record without end(1)");
        prospective_record_offset = physical_record_offset;
        *record = fragment;
        last_record_offset_ = prospective_record_offset;
        return true;

      case kFirstType:
        if (in_fragmented_record) {
          // Handle bug in earlier versions of log::Writer where
          // it could emit an empty kFirstType record at the tail end
          // of a block followed by a kFullType or kFirstType record
          // at the beginning of the next block.
          if (!scratch->empty()) {
            ReportCorruption(scratch->size(), "partial record without end(2)");
        prospective_record_offset = physical_record_offset;
        scratch->assign(fragment.data(), fragment.size());
        in_fragmented_record = true;

      case kMiddleType:
        if (!in_fragmented_record) {
                           "missing start of fragmented record(1)");
        } else {
          scratch->append(fragment.data(), fragment.size());

      case kLastType:
        if (!in_fragmented_record) {
                           "missing start of fragmented record(2)");
        } else {
          scratch->append(fragment.data(), fragment.size());
          *record = Slice(*scratch);
          last_record_offset_ = prospective_record_offset;
          return true;

      case kEof:
        if (in_fragmented_record) {
          // This can be caused by the writer dying immediately after
          // writing a physical record but before completing the next; don't
          // treat it as a corruption, just ignore the entire logical record.
        return false;

      case kBadRecord:
        if (in_fragmented_record) {
          ReportCorruption(scratch->size(), "error in middle of record");
          in_fragmented_record = false;

      default: {
        char buf[40];
        snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
            (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
        in_fragmented_record = false;
  return false;





  • 由多层结构组成
  • 每一层都是一个有序链表,排列顺序为由高层到低层,都至少包含两个链表结点,分别是最前面的head结点和后面的nil结点
  • 最底层的链表包含了所有的元素
  • 如果一个元素出现在某一层链表之中,那么在该层之下的链表也全都会出现(即上一层元素是当前层元素的子集)
  • 链表中的每个结点都包含两个指针,一个指向同一层的下一个链表结点,另一个指向下一层的同一个链表结点

由上图可以看出,该skiplist一共有4层,最上面的一层就是最高层(level 3),最下面的一层就是最底层(level 0),然后每一列中的链表结点中的值都是相同的,它们之间用指针进行连接。此外,跳跃表的层数与结构中最高结点的高度是相同的。在理想情况下,跳跃表结构中的第一层存在所有结点,第二层只有一半的结点,而且是均匀间隔,第三层存在1/4的结点且均匀分布…由此类推,可得知理想的层数就是logN




Search(x) {
  while(true) {
    while(x > p->next->key)
      p = p->next;
    if(p->down == nil)
      return p->next;
    p = p->down;



  • 抛硬币。只要得到的是正面就累加,直到遇到反面才停止,最后记录正面的次数,并将其作为要添加的新元素的层。
  • 进行概率统计。先给定一个概率p,产生一个01之间的随机数,如果这个随机数小于p,则将高度加1,直到产生的随机数大于概率p才停止。根据结论,可知当概率为1/2或者1/4的时候,整体的性能会比较好(当p=1/2时,也就是抛硬币的方法)。




内存屏障(memory barrier)


class AtomicPointer {
  void* rep_;            //原子指针
  AtomicPointer() { }
  explicit AtomicPointer(void* p) : rep_(p) {}
  inline void* NoBarrier_Load() const { return rep_; }
  inline void NoBarrier_Store(void* v) { rep_ = v; }
  inline void* Acquire_Load() const {
    void* result = rep_;
    return result;
  inline void Release_Store(void* v) {
    rep_ = v;



template<typename Key, class Comparator>
struct SkipList<Key,Comparator>::Node {
  explicit Node(const Key& k) : key(k) { }

  Key const key;

  // Accessors/mutators for links.  Wrapped in methods so we can
  // add the appropriate barriers as necessary.
  Node* Next(int n) {
    assert(n >= 0);
    // Use an 'acquire load' so that we observe a fully initialized
    // version of the returned Node.
    return reinterpret_cast<Node*>(next_[n].Acquire_Load());
  void SetNext(int n, Node* x) {
    assert(n >= 0);
    // Use a 'release store' so that anybody who reads through this
    // pointer observes a fully initialized version of the inserted node.

  // No-barrier variants that can be safely used in a few locations.
  Node* NoBarrier_Next(int n) {
    assert(n >= 0);
    return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
  void NoBarrier_SetNext(int n, Node* x) {
    assert(n >= 0);

  // Array of length equal to the node height.  next_[0] is lowest level link.
  port::AtomicPointer next_[1];



  enum { kMaxHeight = 12 };

  // Immutable after construction
  Comparator const compare_;
  Arena* const arena_;    // Arena used for allocations of nodes

  Node* const head_;

  // Modified only by Insert().  Read racily by readers, but stale
  // values are ok.
  port::AtomicPointer max_height_;   // Height of the entire list

  // Read/written only by Insert().
  Random rnd_;


template<typename Key, class Comparator>
SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena)
    : compare_(cmp),
      head_(NewNode(0 /* any key will do */, kMaxHeight)),
      rnd_(0xdeadbeef) {
  for (int i = 0; i < kMaxHeight; i++) {
    head_->SetNext(i, nullptr);


template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::NewNode(const Key& key, int height) {
//sizeof(port::AtomicPointer) * (height - 1)是为了兼容性变长数组声明为 port::AtomicPointer next_[1](也可以用next[0],但是有些编译器不支持)  
  char* mem = arena_->AllocateAligned(
      sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
  return new (mem) Node(key);

在进行插入结点时,需要先随机化处一个插入高度height,接着 再找到此结点的前height个结点,然后进行插入:

template<typename Key, class Comparator>
void SkipList<Key,Comparator>::Insert(const Key& key) {
  // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
  // here since Insert() is externally synchronized
  Node* prev[kMaxHeight];
  Node* x = FindGreaterOrEqual(key, prev);

  assert(x == nullptr || !Equal(key, x->key));

  int height = RandomHeight();
  if (height > GetMaxHeight()) {
    for (int i = GetMaxHeight(); i < height; i++) {
      prev[i] = head_;
    //fprintf(stderr, "Change height from %d to %d\n", max_height_, height);

    // It is ok to mutate max_height_ without any synchronization
    // with concurrent readers.  A concurrent reader that observes
    // the new value of max_height_ will see either the old value of
    // new level pointers from head_ (nullptr), or a new value set in
    // the loop below.  In the former case the reader will
    // immediately drop to the next level since nullptr sorts after all
    // keys.  In the latter case the reader will use the new node.

  x = NewNode(key, height);
  for (int i = 0; i < height; i++) {
    // NoBarrier_SetNext() suffices since we will add a barrier when
    // we publish a pointer to "x" in prev[i].
    x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
    prev[i]->SetNext(i, x);