LevelDB源码解析
Slice
LevelDB并没有使用STL中自带的std::string
,而是自己写了个Slice
,它的原理就是:拥有一个指向字符串的指针和表示该字符串大小的size
成员。
先上源码,首先是构造函数,这四个构造函数分别使用空字符串、C-style字符串(以NULL结尾)以及C++ string来进行构造:
class LEVELDB_EXPORT Slice {
public:
// Create an empty slice.
// 创建空的字符串,使用方法为:Slice str;
Slice() : data_(""), size_(0) { }
// Create a slice that refers to d[0,n-1].
// 传入一个字符串指针以及该字符串的长度
Slice(const char* d, size_t n) : data_(d), size_(n) { }
// Create a slice that refers to the contents of "s"
// 传入一个C++ string进行初始化
Slice(const std::string& s) : data_(s.data()), size_(s.size()) { }
// Create a slice that refers to s[0,strlen(s)-1]
// 传入一个字符串指针(长度通过strlen获取)进行初始化
Slice(const char* s) : data_(s), size_(strlen(s)) { }
};
接下来是成员函数:
// Return a pointer to the beginning of the referenced data
// 获取字符串的值,返回指向该字符串起始位置的指针
const char* data() const { return data_; }
// Return the length (in bytes) of the referenced data
// 获取字符串的长度
size_t size() const { return size_; }
// Return true iff the length of the referenced data is zero
bool empty() const { return size_ == 0; }
// Return the ith byte in the referenced data.
// REQUIRES: n < size()
// 重载operator [],通过使用str[i]来获取第i个字符
char operator[](size_t n) const {
assert(n < size());
return data_[n];
}
// Change this slice to refer to an empty array
// 清除整个Slice字符串
void clear() { data_ = ""; size_ = 0; }
// Drop the first "n" bytes from this slice.
// 删除Slice字符串的前n个字符(需要将data_指针前移n位,字符串长度减少n位)
void remove_prefix(size_t n) {
assert(n <= size());
data_ += n;
size_ -= n;
}
// Return a string that contains the copy of the referenced data.
// 返回C++ string形式的Slice(利用std::string来进行构造)
std::string ToString() const { return std::string(data_, size_); }
// Three-way comparison. Returns value:
// < 0 iff "*this" < "b",
// == 0 iff "*this" == "b",
// > 0 iff "*this" > "b"
int compare(const Slice& b) const;
// Return true iff "x" is a prefix of "*this"
// 判断Slice字符串是否是以字符串x起始
bool starts_with(const Slice& x) const {
return ((size_ >= x.size_) &&
(memcmp(data_, x.data_, x.size_) == 0));
}
接着,是两个成员变量:
private:
const char* data_;
size_t size_;
};
其他一些函数:
// 重载operator ==,用于判断两个Slice字符串是否相等
inline bool operator==(const Slice& x, const Slice& y) {
return ((x.size() == y.size()) &&
(memcmp(x.data(), y.data(), x.size()) == 0));
}
inline bool operator!=(const Slice& x, const Slice& y) {
return !(x == y);
}
// 比较两个Slice字符串
inline int Slice::compare(const Slice& b) const {
const size_t min_len = (size_ < b.size_) ? size_ : b.size_;
int r = memcmp(data_, b.data_, min_len);
if (r == 0) {
if (size_ < b.size_) r = -1;
else if (size_ > b.size_) r = +1;
}
return r;
}
Status
Status
封装了执行某个操作之后的结果(即返回的状态),它可能表示成功或者表示失败并返回一些错误信息。
Status
一共包含了6种状态:
enum Code {
kOk = 0,
kNotFound = 1,
kCorruption = 2,
kNotSupported = 3,
kInvalidArgument = 4,
kIOError = 5
};
Status
拥有一个成员变量state_
:
const char* state_;
如果为Ok
状态,则其state_
成员变量的值就是null
。对于其他情况而言,state_
就是一个new[]
的数组,其形式如下 :
state_[0...3]
:表示消息的长度(不包含Status状态头部信息)state_[4]
:表示消息的类型state_[5...]
:具体的消息内容
先是一些构造函数、析构函数以及一些与copy赋值相关的操作符:
class LEVELDB_EXPORT Status {
public:
// Create a success status.
// default ctor,默认创建一个success status
Status() noexcept : state_(nullptr) { }
~Status() { delete[] state_; }
Status(const Status& rhs);
Status& operator=(const Status& rhs);
Status(Status&& rhs) noexcept : state_(rhs.state_) { rhs.state_ = nullptr; }
Status& operator=(Status&& rhs) noexcept;
主要的成员函数:
// Return a success status.
static Status OK() { return Status(); }
// Return error status of an appropriate type.
// 返回错误类型
static Status NotFound(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kNotFound, msg, msg2);
}
static Status Corruption(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kCorruption, msg, msg2);
}
static Status NotSupported(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kNotSupported, msg, msg2);
}
static Status InvalidArgument(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kInvalidArgument, msg, msg2);
}
static Status IOError(const Slice& msg, const Slice& = Slice()) {
return Status(kIOError, msg, msg2);
}
上述函数都直接调用了构造函数:
Status(Code code, const Slice& msg, const Slice& msg2);
其实现为:
Status::Status(Code code, const Slice& msg, const Slice& msg2) {
assert(code != kOk);
const uint32_t len1 = msg.size();
const uint32_t len2 = msg2.size();
//判断第二个字符串长度是否为0,如果不为0,则信息总长度为len1+2+len2,这里的2是用于存储 ':' 和 ' '。
const uint32_t size = len1 + (len2 ? (2 + len2) : 0);
char* result = new char[size + 5]; //计算state_总长度时,需要将status头部信息那5个字节包含进来
memcpy(result, &size, sizeof(size)); //将信息长度存入result前四个字节(sizeof(uint32_t) ===>4)
result[4] = static_cast<char>(code); //第5个字节存状态
memcpy(result + 5, msg.data(), len1); //从第6个字节开始,存储第一个消息的具体内容
if (len2) { //如果msg2不为空,则需要在最终的信息内容中加上':'+' '+msg2
result[5 + len1] = ':'; //第6个字节用于存储':'
result[6 + len1] = ' '; //第7个字节用于存储' '
memcpy(result + 7 + len1, msg2.data(), len2); //从第(8+len1)个字节开始,用于存储msg2
}
state_ = result;
}
接下来是copy ctor和assignment operator:
inline Status::Status(const Status& s) {
state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);
}
inline void Status::operator=(const Status& s) {
if (state_ != s.state_) {
delete[] state_;
state_ = (s.state_ == NULL) ? NULL : CopyState(s.state_);
}
}
两者都调用了CopyState
方法来拷贝s.state_
中的内容。
CopyState
的实现为:
const char* Status::CopyState(const char* state) {
uint32_t size;
memcpy(&size, state, sizeof(size)); //获取message的长度
char* result = new char[size + 5]; //重新分配空间
memcpy(result, state, size + 5); //将message拷贝到新的空间
return result; //返回新空间的地址
}
内存池Arena
内存池的主要作用是:减少malloc
或者new
调用的次数,从而减少时常需要分配内存所带来的系统开销。
该类一共拥有4个成员变量:
// Allocation state
char* alloc_ptr_; //表示内存的offset指针,即指向未使用内存的首地址
size_t alloc_bytes_remaining_; //剩余的内存大小(即还能分配的内存)
// Array of new[] allocated memory blocks
std::vector<char*> blocks_; //用于存放每一次所分配的内存指针
// Total memory usage of the arena.
port::AtomicPointer memory_usage_; //已经分配的总内存的大小
接着是ctor和dtor:
//default ctor将进行初始化:已分配内存大小为0,offset指针为NULL,剩余内存大小为0
Arena::Arena() : memory_usage_(0) {
alloc_ptr_ = nullptr; // First allocation will allocate a block
alloc_bytes_remaining_ = 0;
}
//进行析构时,只需将所有的指向已分配内存的指针全部删除即可
Arena::~Arena() {
for (size_t i = 0; i < blocks_.size(); i++) {
delete[] blocks_[i];
}
}
Arena
主要有3个关于内存分配的函数:
// Return a pointer to a newly allocated memory block of "bytes" bytes.
char* Allocate(size_t bytes);
// Allocate memory with the normal alignment guarantees provided by malloc
char* AllocateAligned(size_t bytes);
// Returns an estimate of the total memory usage of data allocated
// by the arena.
size_t MemoryUsage() const {
return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());
}
private:
char* AllocateFallback(size_t bytes);
char* AllocateNewBlock(size_t block_bytes);
其总体的内存分配策略为:
- 如果所申请的内存小于剩余的内存容量(
alloc_bytes_remaining_
),则直接在剩余内存中划出一块即可 - 如果所申请的内存大于剩余的内存容量,并且其大于
4096/4=1024kb
时,则需要单独给它分配一块区域,大小为bytes
,从而避免过多的内存浪费(例如,现在剩余的内存容量为1200kb,如果有个内存需求为300kb,第二个内存需求为1200kb。则第一个需求可以使用4次才进行一次重新的内存分配,而第二个需求只能使用一次就需要重新进行一个内存分配) - 如果所申请的内存大于剩余的内存容量,但其小于
4096/4=1024kb
时,则需重新分配一个内存块,默认大小为4096
它们的源码为:
inline char* Arena::Allocate(size_t bytes) {
// The semantics of what to return are a bit messy if we allow
// 0-byte allocations, so we disallow them here (we don't need
// them for our internal use).
assert(bytes > 0);
//如果申请的内存大熊啊小于剩余的内存容量,则直接划出一块内存就好。
//三部曲:移动offset指针,减少剩余内存容量,返回刚分配内存的其实地址
if (bytes <= alloc_bytes_remaining_) {
char* result = alloc_ptr_; //保存offset指针,用于返回
alloc_ptr_ += bytes; //将offset指针前移bytes个字节
alloc_bytes_remaining_ -= bytes; //将剩余的内存容量减少bytes个字节
return result;
}
return AllocateFallback(bytes);
}
char* Arena::AllocateFallback(size_t bytes) {
//当申请内存大于1024kb时,需要进行重新开辟内存
if (bytes > kBlokbSize / 4) {
// Object is more than a quarter of our block size. Allocate it separately
// to avoid wasting too much space in leftover bytes.
char* result = AllocateNewBlock(bytes);
return result;
}
// We waste the remaining space in the current block.
//当申请内存大于剩余内存容量,但小于1024kb时
alloc_ptr_ = AllocateNewBlock(kBlockSize); //重新开辟一个大小为4096kb的内存
alloc_bytes_remaining_ = kBlockSize; //修改最新剩余内存容量的大小
char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
}
char* Arena::AllocateNewBlock(size_t block_bytes) {
char* result = new char[block_bytes]; //重新开辟一个内存空间
blocks_.push_back(result); //将新分配的内存空间的首地址指针加入已分配内存的指针数组指针
memory_usage_.NoBarrier_Store( //总的内村加上刚分配的内存
reinterpret_cast<void*>(MemoryUsage() + block_bytes + sizeof(char*)));
return result;
}
Arena
还提供了字节对齐的内存分配。通常是8
字节对齐分配:
char* Arena::AllocateAligned(size_t bytes) {
//用于判断对齐的大小(64为计算机sizeof(void*)===>8),所以对齐大小为8
const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8;
//用于判断对齐大小是否为2的幂,此外也可以使用v&&!(v&(v-1))达到相同的目的
assert((align & (align-1)) == 0); // Pointer size should be a power of 2
//align-1==>此时,后三位为1,其余各位均为0,再与alloc_ptr作与运算,就相当于
//让指针与align进行求余运算,即alloc_ptr%align
size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);
//根据当前的模式,计算出需要添加的字节数(如果不够8,就补足8,使其成为8的倍数)
size_t slop = (current_mod == 0 ? 0 : align - current_mod);
// 最终分配的字节数=申请内存大小+为了对齐添加的补足字节数
size_t needed = bytes + slop;
char* result;
if (needed <= alloc_bytes_remaining_) {
result = alloc_ptr_ + slop;
alloc_ptr_ += needed;
alloc_bytes_remaining_ -= needed;
} else {
// AllocateFallback always returned aligned memory
result = AllocateFallback(bytes);
}
//再次验证分配的内存是否为8的倍数
assert((reinterpret_cast<uintptr_t>(result) & (align-1)) == 0);
return result;
}
Arena
还有一个MemoryUsage
的接口,用于返回内存池已分配的总的内存大小:
// Returns an estimate of the total memory usage of data allocated
// by the arena.
size_t MemoryUsage() const {
return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());
}
最终,可以得到Arena
的模型:
data block
leveldb作为一个key-value数据库,它并没有将数据存放在内存之中,而是把数据存放在磁盘之中。
leveldb存放数据的流程
- 首先需要将数据写入
log
文件(log
文件主要用处:防止在断电时,内存中的数据会丢失,数据可以从log
文件中恢复),接着指定一块内存用于写数据(这块内存称之为MemTable
),当占用的内存到达阈值之后(options
属性中的write_buffer_size
的大小,默认为4<<20
),就将这块内存转换为只读的(read-only,这块只读内存称为Immutable MemTable
),并且log
文件也会生成一个新的log
文件 - 与此同时,开辟一块新的内存(
MemTable
)来进行继续写数据 - 然后异步地将
Immutable MemTable
的数据添加到磁盘之中(即持久化到磁盘中)
整形数据存储
leveldb多有数据都是字符从事,即使是整形也会被转换为字符型进行存储。这样做的目的是为了减少内存空间的使用。例如,有一个int
型数据,小于128
,存储为整型时,将占用4
个字节;而存储为字符型时,只需要1
个字节即可。
leveldb共有两种整型和字符型数据转换,一种是fixed
,一种是variant
。
fixed
转换
直接将int
的每一个字节直接存入字符数组中。
//编码(整型——>字符串)
void EncodeFixed32(char* buf, uint32_t value) {
//如果是小端,直接进行copy
if (port::kLittleEndian) {
memcpy(buf, &value, sizeof(value));
} else {
//如果是大端,则需要一个字节一个字节的进行copy
buf[0] = value & 0xff; //得到第一个字节(低8位)
buf[1] = (value >> 8) & 0xff;
buf[2] = (value >> 16) & 0xff;
buf[3] = (value >> 24) & 0xff;
}
}
//解码(字符串——>整型)
inline uint32_t DecodeFixed32(const char* ptr) {
if (port::kLittleEndian) {
// Load the raw bytes
uint32_t result;
memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain load
return result;
} else {
return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0])))
| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8)
| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16)
| (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24));
}
}
inline uint64_t DecodeFixed64(const char* ptr) {
if (port::kLittleEndian) {
// Load the raw bytes
uint64_t result;
memcpy(&result, ptr, sizeof(result)); // gcc optimizes this to a plain load
return result;
} else {
uint64_t lo = DecodeFixed32(ptr);
uint64_t hi = DecodeFixed32(ptr + 4);
return (hi << 32) | lo;
}
}
varient
这种转换是将一个字节分为两部分,前7
个字节用于存储数据,第8
个字节用于表示高位是否还有数据:
char* EncodeVarint32(char* dst, uint32_t v) {
// Operate on characters as unsigneds
unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
//128 ===> 1000 0000
static const int B = 128;
//如果v<128,则将v的低7位copy到ptr,ptr第8位为0,表示高位没有数据
if (v < (1<<7)) {
*(ptr++) = v;
//v的低7位copy到ptr,ptr的第8位为1,表示高位还有数据
//在将v的高7位copy到(ptr+1)的低7位,(ptr+1)的第8位为0,表示高位没有数据了
} else if (v < (1<<14)) {
*(ptr++) = v | B;
*(ptr++) = v>>7;
} else if (v < (1<<21)) {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = v>>14;
} else if (v < (1<<28)) {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = (v>>14) | B;
*(ptr++) = v>>21;
} else {
*(ptr++) = v | B;
*(ptr++) = (v>>7) | B;
*(ptr++) = (v>>14) | B;
*(ptr++) = (v>>21) | B;
*(ptr++) = v>>28;
}
return reinterpret_cast<char*>(ptr);
}
const char* GetVarint32PtrFallback(const char* p,
const char* limit,
uint32_t* value) {
uint32_t result = 0;
for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
//取出p字符串当前的第一个字节
uint32_t byte = *(reinterpret_cast<const unsigned char*>(p));
p++;
//判断第8位是否为1,如果为1,说明高位还有数据,则继续循环
if (byte & 128) {
// More bytes are present
//每7位移动一次,分别向result的7位进行赋值
result |= ((byte & 127) << shift);
} else {
result |= (byte << shift);
*value = result;
return reinterpret_cast<const char*>(p);
}
}
return nullptr;
}
在解析variant
时,需要首先取出字符数组的每一个字节,先判断该字节是否大于128
。如果其小于128
,则直接取出该字节的低7
位,赋值给result
的低7
位。而如果该字节大于128
,则说明还有第二个字节,则取出第二个字节,得到其低7
位,然后左移7
位,然后赋值给result
的8~14
位,后续亦是如此。
leveldb键的形式
都定义在db/dbformat.h
头文件之中。
1. InternalKey
,其格式为:
| user key | sequence number | type |
InternalKey_size = key_size + 8
user_key
就是用户输入的key,而InternalKey
在user_key
的基础上,封装了sequence
+type
。sequence
是一个全局递增的序列号,每一次Put
操作都会递增。这样,不同时间的写入操作会得到不一样的sequence
。
sstable
单条record
中的key,其内部其实就是一个InternalKey
。sequence
主要跟snapshot
机制与version
机制相关,对压缩会产生一定影响。根据type
字段,可以获知本次写入操作是写还是删除(也就是说删除是一种特殊的写)。而LookupKey
/memtable_key
用于在memtable
中,多了一个长度字段。
其实,InternalKey
就是对plain strings的一个简单封装,即在使用时应该将key保存在InternalKey
之中,而不是普通的的string中。这样做的目的是为了防止用户错误地使用字符串比较函数,而是应该使用提供的InternalKeyComparator
。
其源码为:
class InternalKey {
private:
std::string rep_;
public:
InternalKey() { } // Leave rep_ as empty to indicate it is invalid
InternalKey(const Slice& user_key, SequenceNumber s, ValueType t) {
AppendInternalKey(&rep_, ParsedInternalKey(user_key, s, t));
}
void DecodeFrom(const Slice& s) { rep_.assign(s.data(), s.size()); }
Slice Encode() const {
assert(!rep_.empty());
return rep_;
}
Slice user_key() const { return ExtractUserKey(rep_); }
void SetFrom(const ParsedInternalKey& p) {
rep_.clear();
AppendInternalKey(&rep_, p);
}
void Clear() { rep_.clear(); }
};
解析InternalKey
的源码:
inline bool ParseInternalKey(const Slice& internal_key,
ParsedInternalKey* result) {
//获取key的长度
const size_t n = internal_key.size();
//key的长度必须大于8
if (n < 8) return false;
//将key的具体内容进行变长解码,其中sequence+type一共占8个字节,+n的目的是将字符指针移动到key的末尾,-8是为了使得字符指针移动到sequence+type处。
uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
//获得key的低8位,即type
unsigned char c = num & 0xff;
result->sequence = num >> 8;
result->type = static_cast<ValueType>(c);
result->user_key = Slice(internal_key.data(), n - 8);
return (c <= static_cast<unsigned char>(kTypeValue));
}
2. ParsedInternalKey
的格式为:
| user_key | sequence number | type |
其源码为:
struct ParsedInternalKey {
Slice user_key;
SequenceNumber sequence;
ValueType type;
ParsedInternalKey() { } // Intentionally left uninitialized (for speed)
ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t)
: user_key(u), sequence(seq), type(t) { }
};
3. skiplist内部存储的key的格式为:
VatInt(InternalKey_size)len | InternalKey | VarInt(value) len | value |
skiplist中的单个节点不仅存储了key,也存储了value。尽管单个节点的开头部分是一个LookupKey
,但其内部比较时,还是使用的InternalKey
。也就是说,比较时,先使用InternalKey
内部的user_key
进行比较,再比较sequence
。这样,不管是memtable
还是sstable
文件,其内部都是按InternalKey
有序的。
int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
// Order by:
// increasing user key (according to user-supplied comparator)
// decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate)
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0) {
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
r = +1;
}
}
return r;
}
4. 传入memtable
的是LookupKey
,其格式为:
| InternalKey_size | InternalKey |
LookupKey
是leveldb为了在memtable
/sstable
中查找方便,为key包装的类型,其源码为:
// A helper class useful for DBImpl::Get()
class LookupKey {
public:
// Initialize *this for looking up user_key at a snapshot with
// the specified sequence number.
//初始化LoopupKey,主要用于在snapshot中查找user_key
LookupKey(const Slice& user_key, SequenceNumber sequence);
~LookupKey();
//其实,在本质上,memtable_key和LookupKey是一样的
// Return a key suitable for lookup in a MemTable.
Slice memtable_key() const { return Slice(start_, end_ - start_); }
// Return an internal key (suitable for passing to an internal iterator)
Slice internal_key() const { return Slice(kstart_, end_ - kstart_); }
// Return the user key
Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); }
private:
// We construct a char array of the form:
// klength varint32 <-- start_
// userkey char[klength] <-- kstart_
// tag uint64
// <-- end_
// The array is a suitable MemTable key.
// The suffix starting with "userkey" can be used as an InternalKey.
const char* start_;
const char* kstart_;
const char* end_;
char space_[200]; // Avoid allocation for short keys
// No copying allowed
LookupKey(const LookupKey&);
void operator=(const LookupKey&);
};
inline LookupKey::~LookupKey() {
if (start_ != space_) delete[] start_;
}
总结:
- 最短的是
InternalKey
,由uer_key
+sequence
+type
组成 LookupKey
,由InternalKey
的长度+InternalKey
组成- skiplist中存储的键为:
LookupKey
+value
的长度+value
memtable
memtable
是leveldb数据在内存中的存储形式,写操作的数据都会先写到memtable
中,memtable
的大小有上限(write_buffer_size
)。memtable
的底层DS是skiplist。memtable
的主要作用是:完成key-value的打包,调用底层skiplist,为上层调用插入数据提供接口。
memtable
主要有三个公共interface:
Get
:用于获取某个键值(key)Add
:用于添加某个键值(key)NewIterator
:用于获取memtable
的迭代器
memtable
一共有4个成员变量:
typedef SkipList<const char*, KeyComparator> Table; //实例化跳跃表
KeyComparator comparator_; //键值(字符)比较器
int refs_; //memtable的引用计数
Arena arena_; //内存池
Table table_; //底层跳跃表
构造函数:
MemTable::MemTable(const InternalKeyComparator& cmp)
: comparator_(cmp),
refs_(0),
table_(comparator_, &arena_) {
}
往memtable
中插入记录,其实就是讲key-value
数据进行打包。memtable
的插入操作其实很简单,只需要按照协议(数据存储格式)封装好键值即可,最后再将其插入到skiplist之中即可。
- 首先需要将
key
、SequenceNumber
和ValueType
打包成InternalKey
。(【注】:每次进行更新(Put
/Delete
)时,都会产生不同的序列号SequenceNumber
;ValueType
用于区分entry
是真实的key-value
数据还是更新操作。如果是delete
操作,leveldb需要先进行记录,然后在后台的compact
县城会完成真正的删除工作;进行存储时,SequenceNumber
占7
个字节,ValueType
占1
个字节,两者用户共占用8
个字节)。 - 打包除了有
key
(InternalKey
)和value
,还加入了key
、value
各自的长度信息,而两者均是自定义的Variant
(变长整型)数据,需要将它们进行int——>Variant
的类型转换。
void MemTable::Add(SequenceNumber s, ValueType type,
const Slice& key,
const Slice& value) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
size_t key_size = key.size(); //key(键值)长度
size_t val_size = value.size(); //值的长度
size_t internal_key_size = key_size + 8; //InternalKey的长度
const size_t encoded_len = //skiplist中存储结点的总长度
VarintLength(internal_key_size) + internal_key_size +
VarintLength(val_size) + val_size;
char* buf = arena_.Allocate(encoded_len); //为key分配内存
char* p = EncodeVarint32(buf, internal_key_size); //将key的长度存入buf中
memcpy(p, key.data(), key_size); //将key的具体内容存入buf之中
p += key_size; //将指针后移key_size个字节,使得指针指向sequence+type
EncodeFixed64(p, (s << 8) | type); //获取sequence+type
p += 8;
p = EncodeVarint32(p, val_size); //将value的长度进行编码
memcpy(p, value.data(), val_size); //将编码后的value长度压入内存
assert(p + val_size == buf + encoded_len);
table_.Insert(buf);
}
其中,关于VariantLenth()
的实现为:
int VarintLength(uint64_t v) {
int len = 1;
while (v >= 128) {
v >>= 7;
len++;
}
return len;
接下来,是读取操作:
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
//LookupKey是leveldb为了在memtable/sstable中查找方便,为key包装的类型
//调用memtable_key可以返回在memtable中的key格式
Slice memkey = key.memtable_key();
//利用skiplist的专属迭代器查找key
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
if (iter.Valid()) {
// entry format is:
// klength varint32
// userkey char[klength]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter.key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8),
key.user_key()) == 0) {
// Correct user key
//从memtable中的entry'中提取出ValueType
//如果ValueType==kTypeValue ====> 是真实的key-value数据
//如果ValueType==kTypeDeletion ====> 是要删除的类型,并不代表数据真实存在
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
}
case kTypeDeletion:
*s = Status::NotFound(Slice());
return true;
}
}
}
return false;
}
memtable
迭代器的定义为:
class MemTableIterator : public Iterator {
public:
explicit MemTableIterator(MemTable::Table* table) : iter_(table) { }
virtual bool Valid() const { return iter_.Valid(); }
virtual void Seek(const Slice& k) { iter_.Seek(EncodeKey(&tmp_, k)); }
virtual void SeekToFirst() { iter_.SeekToFirst(); }
virtual void SeekToLast() { iter_.SeekToLast(); }
virtual void Next() { iter_.Next(); }
virtual void Prev() { iter_.Prev(); }
virtual Slice key() const { return GetLengthPrefixedSlice(iter_.key()); }
virtual Slice value() const {
Slice key_slice = GetLengthPrefixedSlice(iter_.key());
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
}
virtual Status status() const { return Status::OK(); }
private:
MemTable::Table::Iterator iter_;
std::string tmp_; // For passing to EncodeKey
// No copying allowed
MemTableIterator(const MemTableIterator&);
void operator=(const MemTableIterator&);
};
返回memtable
迭代器:
Iterator* MemTable::NewIterator() {
return new MemTableIterator(&table_);
}
读写log
文件
leveldb的第二大组件log
文件的读写,log
文件也可以成为恢复日志。当leveldb插入数据时,先将数据插入log
日志文件中,接着再插入到内存中的memtable
中。这样,即使在使用过程,突然断电,memtable
还不来及把数据持久化到磁盘时,内存数据就不会丢失,这是就可从log
文件中恢复。
log
文件按块划分,默认每块为32768kb=32M
。这么大的好处是:可以减少从磁盘读取数据的次数,减少磁盘IO。可以看如下图:
上图中有3
个block
。然后每一条数据的格式为:
log
文件每一条记录由四个部分组成:
CheckSum
,即CRC验证码,占4个字节- 记录长度,即数据部分的长度,2个字节
- 类型,这条记录的类型,占1个字节
- 数据,即这条记录的数据
关于记录的类型,通常使用的有4
种:
FULL
,表示这是一条完整的记录FIRST
,表示这是一条记录的第一部分MIDDLE
,表示这是一条记录的中间部分LAST
,表示这是一条记录的最后一部分
Writer
类
log
日志写相对简单,把数据按记录的格式封装好,再写入文件即可。成员变量有:
WritableFile* dest_; //log日志文件的封装类
int block_offset_; //块内偏移量,用于指定写地址(Current offset in block)
构造函数为:
static void InitTypeCrc(uint32_t* type_crc) {
for (int i = 0; i <= kMaxRecordType; i++) {
char t = static_cast<char>(i);
type_crc[i] = crc32c::Value(&t, 1);
}
}
//当创建一个writer时,数据将被append到*dest中。
//*dest初始必须为empty
// 在Writer的使用过程中,*dest需要一直remain live
Writer::Writer(WritableFile* dest)
: dest_(dest),
block_offset_(0) {
InitTypeCrc(type_crc_);
}
Writer::Writer(WritableFile* dest, uint64_t dest_length)
: dest_(dest), block_offset_(dest_length % kBlockSize) {
InitTypeCrc(type_crc_);
}
添加记录函数:
enum RecordType {
// Zero is reserved for preallocated files
kZeroType = 0,
kFullType = 1,
// For fragments
kFirstType = 2,
kMiddleType = 3,
kLastType = 4
};
Status Writer::AddRecord(const Slice& slice) {
const char* ptr = slice.data(); //需要添加的记录数据
size_t left = slice.size(); //记录数据的长度
// Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single
// zero-length record
Status s;
bool begin = true;
do {
//kBlockSize ===> 32768
//由于block_offset_表示块内偏移量,而kBlockSize表示一个分配的最大内存块
//所以,leftover表示当前块的剩余容量
const int leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
if (leftover < kHeaderSize) { //如果剩余容量小于记录头长度(kHeaderSize=7kb)
// Switch to a new block
if (leftover > 0) {
// Fill the trailer (literal below relies on kHeaderSize being 7)
assert(kHeaderSize == 7);
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
}
block_offset_ = 0;
}
// Invariant: we never leave < kHeaderSize bytes in a block.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);
//除去记录头以后,剩余的块还有剩余空间avail
const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
//判断当前块能否容纳下当前记录数据。如果记录数据小于剩余空间,这将fragment的长度设置为记录数据的长度。反之,就将其设置为剩余的块内长度
const size_t fragment_length = (left < avail) ? left : avail;
RecordType type;
const bool end = (left == fragment_length);
if (begin && end) { //如果拥有开头和结尾标志,说明是一个完整块
type = kFullType;
} else if (begin) { //如果只有开头标志,说明它是一个起始块
type = kFirstType;
} else if (end) { //如果只有结尾标志,说明它是记录的最后一块
type = kLastType;
} else {
type = kMiddleType;
}
//将fragment_length长度记录写入文件
s = EmitPhysicalRecord(type, ptr, fragment_length);
ptr += fragment_length; //指针向后移动fragment_length个字节
left -= fragment_length; //减少记录的剩余长度
begin = false;
} while (s.ok() && left > 0);
return s;
}
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
assert(n <= 0xffff); // Must fit in two bytes
assert(block_offset_ + kHeaderSize + n <= kBlockSize);
// Format the header
//封装记录头Header:checksum + length + flag
char buf[kHeaderSize];
buf[4] = static_cast<char>(n & 0xff);
buf[5] = static_cast<char>(n >> 8);
buf[6] = static_cast<char>(t);
// Compute the crc of the record type and the payload.
uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n); //用crc填充buf前四个字节
crc = crc32c::Mask(crc); // Adjust for storage
EncodeFixed32(buf, crc);
// Write the header and the payload
//将记录头写入缓存
Status s = dest_->Append(Slice(buf, kHeaderSize));
if (s.ok()) {
s = dest_->Append(Slice(ptr, n)); //将记录内容写入缓存
if (s.ok()) {
s = dest_->Flush(); //将缓存的数据刷进内核
}
}
block_offset_ += kHeaderSize + n; //更新块内偏移量
return s;
}
log
文件的Reader
类
Reader
类的成员变量:
private:
SequentialFile* const file_; //读取文件的封装类
Reporter* const reporter_; //报告错误类
bool const checksum_; //是否进行CRC验证
char* const backing_store_;
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
// Offset of the last record returned by ReadRecord.
//上条记录的偏移量
uint64_t last_record_offset_;
// Offset of the first location past the end of buffer_.
//当前块结尾在log文件的偏移量
uint64_t end_of_buffer_offset_;
// Offset at which to start looking for the first record to return
//开始查找的起始地址
uint64_t const initial_offset_;
// True if we are resynchronizing after a seek (initial_offset_ > 0). In
// particular, a run of kMiddleType and kLastType records can be silently
// skipped in this mode
bool resyncing_;
SkipToInitialBlock
函数:
bool Reader::SkipToInitialBlock() {
//计算当前块在block中的偏移量,并圆整到开始读取block的起始位置
const size_t offset_in_block = initial_offset_ % kBlockSize;
uint64_t block_start_location = initial_offset_ - offset_in_block;
// Don't search a block if we'd be in the trailer
if (offset_in_block > kBlockSize - 6) {
block_start_location += kBlockSize;
}
end_of_buffer_offset_ = block_start_location;
// Skip to start of first block that can contain the initial record
if (block_start_location > 0) {
Status skip_status = file_->Skip(block_start_location);
if (!skip_status.ok()) {
ReportDrop(block_start_location, skip_status);
return false;
}
}
return true;
}
读取文件函数:
bool Reader::ReadRecord(Slice* record, std::string* scratch) {
if (last_record_offset_ < initial_offset_) {
if (!SkipToInitialBlock()) {
return false;
}
}
scratch->clear();
record->clear();
bool in_fragmented_record = false;
// Record offset of the logical record that we're reading
// 0 is a dummy value to make compilers happy
uint64_t prospective_record_offset = 0;
Slice fragment;
while (true) {
const unsigned int record_type = ReadPhysicalRecord(&fragment);
// ReadPhysicalRecord may have only had an empty trailer remaining in its
// internal buffer. Calculate the offset of the next physical record now
// that it has returned, properly accounting for its header size.
uint64_t physical_record_offset =
end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();
if (resyncing_) {
if (record_type == kMiddleType) {
continue;
} else if (record_type == kLastType) {
resyncing_ = false;
continue;
} else {
resyncing_ = false;
}
}
switch (record_type) {
case kFullType:
if (in_fragmented_record) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (!scratch->empty()) {
ReportCorruption(scratch->size(), "partial record without end(1)");
}
}
prospective_record_offset = physical_record_offset;
scratch->clear();
*record = fragment;
last_record_offset_ = prospective_record_offset;
return true;
case kFirstType:
if (in_fragmented_record) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (!scratch->empty()) {
ReportCorruption(scratch->size(), "partial record without end(2)");
}
}
prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
in_fragmented_record = true;
break;
case kMiddleType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
} else {
scratch->append(fragment.data(), fragment.size());
}
break;
case kLastType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
} else {
scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
return true;
}
break;
case kEof:
if (in_fragmented_record) {
// This can be caused by the writer dying immediately after
// writing a physical record but before completing the next; don't
// treat it as a corruption, just ignore the entire logical record.
scratch->clear();
}
return false;
case kBadRecord:
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record");
in_fragmented_record = false;
scratch->clear();
}
break;
default: {
char buf[40];
snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
ReportCorruption(
(fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
buf);
in_fragmented_record = false;
scratch->clear();
break;
}
}
}
return false;
}
##
Skiplist
LevelDB在内存中存储数据的区域称为memtable
,它的底层是通过跳跃表skiplist实现的。
跳跃表的性质:
- 由多层结构组成
- 每一层都是一个有序链表,排列顺序为由高层到低层,都至少包含两个链表结点,分别是最前面的head结点和后面的nil结点
- 最底层的链表包含了所有的元素
- 如果一个元素出现在某一层链表之中,那么在该层之下的链表也全都会出现(即上一层元素是当前层元素的子集)
- 链表中的每个结点都包含两个指针,一个指向同一层的下一个链表结点,另一个指向下一层的同一个链表结点
由上图可以看出,该skiplist一共有4层,最上面的一层就是最高层(level 3),最下面的一层就是最底层(level 0),然后每一列中的链表结点中的值都是相同的,它们之间用指针进行连接。此外,跳跃表的层数与结构中最高结点的高度是相同的。在理想情况下,跳跃表结构中的第一层存在所有结点,第二层只有一半的结点,而且是均匀间隔,第三层存在1/4
的结点且均匀分布…由此类推,可得知理想的层数就是logN
。
搜索
基本原理:从最高层的链表结点开始,如果比当前结点大,但比当前层(cur)的下一个结点(next)小,那么就往下一层(down)寻找,即和当前层的下一层(down)结点的下一个(next)结点进行比较,以此类推,一直找到最底层的一个结点(即p->down==nil
)。如果找到,就返回,否则返回空。
伪代码实现:
Search(x) {
p=top;
while(true) {
while(x > p->next->key)
p = p->next;
if(p->down == nil)
return p->next;
p = p->down;
}
}
插入
在插入之前,需要确定插入的层数。一般有两种方法:
- 抛硬币。只要得到的是正面就累加,直到遇到反面才停止,最后记录正面的次数,并将其作为要添加的新元素的层。
- 进行概率统计。先给定一个概率
p
,产生一个0
到1
之间的随机数,如果这个随机数小于p
,则将高度加1
,直到产生的随机数大于概率p
才停止。根据结论,可知当概率为1/2
或者1/4
的时候,整体的性能会比较好(当p=1/2
时,也就是抛硬币的方法)。
当确定好要插入元素的层数之后,只需要将元素都插入到从最底层到第k层即可。
删除
在各层中找到包含指定值的结点,然后将结点从链表中删除即可。如果删除以后只剩下头、尾两个结点,则直接删除这一层。
内存屏障(memory barrier)
内存屏障类AtomPointer
(即原子操作)的实现为:
class AtomicPointer {
private:
void* rep_; //原子指针
public:
AtomicPointer() { }
explicit AtomicPointer(void* p) : rep_(p) {}
inline void* NoBarrier_Load() const { return rep_; }
inline void NoBarrier_Store(void* v) { rep_ = v; }
inline void* Acquire_Load() const {
void* result = rep_;
MemoryBarrier();
return result;
}
inline void Release_Store(void* v) {
MemoryBarrier();
rep_ = v;
}
};
内存屏障主要用处就是保证内存数据和处理器寄存器和缓存数据一致性。因为当某个处理器上改变某个变量x
时,那么其他处理器上的x
的副本都必须失效,否则将会读取错误值。
skiplist结点
template<typename Key, class Comparator>
struct SkipList<Key,Comparator>::Node {
explicit Node(const Key& k) : key(k) { }
Key const key;
// Accessors/mutators for links. Wrapped in methods so we can
// add the appropriate barriers as necessary.
Node* Next(int n) {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return reinterpret_cast<Node*>(next_[n].Acquire_Load());
}
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_[n].Release_Store(x);
}
// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].NoBarrier_Store(x);
}
private:
// Array of length equal to the node height. next_[0] is lowest level link.
//用于表示结点的层数
port::AtomicPointer next_[1];
};
skiplist具体实现
skiplist成员变量的如下:
private:
//定义skiplist链表结点的最大的高度
enum { kMaxHeight = 12 };
// Immutable after construction
Comparator const compare_;
//内存池,从memtable传来,用于构造skiplist结点
Arena* const arena_; // Arena used for allocations of nodes
//skiplist头结点
Node* const head_;
// Modified only by Insert(). Read racily by readers, but stale
// values are ok.
port::AtomicPointer max_height_; // Height of the entire list
// Read/written only by Insert().
//在插入结点时,随机化处该结点的插入层
Random rnd_;
其构造函数为:
//其中,cmp和arena_都由调用者传入,head_头结点的key初始化为0,高度初始化为链表的高度上限12。max_height初始化为1,并将每一层头结点下一个结点设置为NULL(即设置尾结点,初始化之后每一层就头、尾两个结点)
template<typename Key, class Comparator>
SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena)
: compare_(cmp),
arena_(arena),
head_(NewNode(0 /* any key will do */, kMaxHeight)),
max_height_(reinterpret_cast<void*>(1)),
rnd_(0xdeadbeef) {
for (int i = 0; i < kMaxHeight; i++) {
head_->SetNext(i, nullptr);
}
}
skiplist用于分配一个新结点的NewNode
:
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::NewNode(const Key& key, int height) {
//直接从内存池中进行分配。
//sizeof(port::AtomicPointer) * (height - 1)是为了兼容性变长数组声明为 port::AtomicPointer next_[1](也可以用next[0],但是有些编译器不支持)
//由于Node自身已经占用了一个,所以只需要在添加(height-1)个即可。如果是next[0]的话,就不能减1了
char* mem = arena_->AllocateAligned(
sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
return new (mem) Node(key);
}
在进行插入结点时,需要先随机化处一个插入高度height
,接着 再找到此结点的前height
个结点,然后进行插入:
template<typename Key, class Comparator>
void SkipList<Key,Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized
//kMaxHeight个前结点,因为还不知道插入结点的高度,所以先设置为最大值12
Node* prev[kMaxHeight];
//查找键值为key的结点的前GetMaxHeight()个结点
Node* x = FindGreaterOrEqual(key, prev);
//不允许重复插入
assert(x == nullptr || !Equal(key, x->key));
//随机化一个结点高度
int height = RandomHeight();
//如果需要插入结点的高度大于最高结点的的高度,则高出部分的前结点都将成为头结点
if (height > GetMaxHeight()) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
//fprintf(stderr, "Change height from %d to %d\n", max_height_, height);
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
// new level pointers from head_ (nullptr), or a new value set in
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
}
x = NewNode(key, height);
for (int i = 0; i < height; i++) {
// NoBarrier_SetNext() suffices since we will add a barrier when
// we publish a pointer to "x" in prev[i].
x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
prev[i]->SetNext(i, x);
}
}