版本信息有什麼用?先來簡要說明三個類的具體用途:
有了上面的描述,再來看版本信息到底有什麼用呢?
如果還不能給出答案,將上述三個類當作一個整體,再來看Version類組到底包含了哪些信息:
關於版本信息到底有什麼用這個話題暫時先放一放,來看具體類。
VersionSet(維護了一份Version列表,包含當前Alive的所有Version信息,列表中第一個代表數據庫的當前版本)
VersionSet類只有一個實例,在DBImpl(數據庫實現類)類中,維護所有活動的Version對象,來看VersionSet的所有語境:
1. 數據庫啟動時:通過Current文件加載Manifset文件,讀取Manifest文件完成版本信息恢復
1 Status VersionSet::Recover() {
2 ......
3 // Read "CURRENT" file, which contains a pointer to the current manifest file
4 std::string current;
5 Status s = ReadFileToString(env_, CurrentFileName(dbname_), ¤t);
6 ......
7 current.resize(current.size() - 1);
8
9 std::string dscname = dbname_ + "/" + current;
10 SequentialFile* file;
11 s = env_->NewSequentialFile(dscname, &file); //manifest文件
12 ......
13
14 bool have_log_number = false;
15 bool have_prev_log_number = false;
16 bool have_next_file = false;
17 bool have_last_sequence = false;
18 uint64_t next_file = 0;
19 uint64_t last_sequence = 0;
20 uint64_t log_number = 0;
21 uint64_t prev_log_number = 0;
22 VersionSet::Builder builder(this, current_);
23
24 {
25 LogReporter reporter;
26 reporter.status = &s;
27 log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
28 Slice record;
29 std::string scratch;
30 while (reader.ReadRecord(&record, &scratch) && s.ok()) { //依次讀取manifest中的VersionEdit信息,構建VersionSet
31 VersionEdit edit;
32 s = edit.DecodeFrom(record);
33 if (s.ok()) { //Comparator不一致時,返回錯誤信息
34 if (edit.has_comparator_ &&
35 edit.comparator_ != icmp_.user_comparator()->Name()) {
36 s = Status::InvalidArgument(
37 edit.comparator_ + "does not match existing comparator ",
38 icmp_.user_comparator()->Name());
39 //實際上,這裡可以直接break
40 }
41 }
42
43 if (s.ok()) {
44 builder.Apply(&edit); //構建當前Version
45 }
46
47 if (edit.has_log_number_) {
48 log_number = edit.log_number_;
49 have_log_number = true;
50 }
51
52 if (edit.has_prev_log_number_) {
53 prev_log_number = edit.prev_log_number_;
54 have_prev_log_number = true;
55 }
56
57 if (edit.has_next_file_number_) {
58 next_file = edit.next_file_number_;
59 have_next_file = true;
60 }
61
62 if (edit.has_last_sequence_) {
63 last_sequence = edit.last_sequence_;
64 have_last_sequence = true;
65 }
66 }
67 }
68 delete file;
69 file = NULL;
70
71 ...... 89
90 if (s.ok()) {
91 Version* v = new Version(this);
92 builder.SaveTo(v);
93 // Install recovered version
94 Finalize(v); //計算下次執行壓縮的Level
95 AppendVersion(v);
96 manifest_file_number_ = next_file;
97 next_file_number_ = next_file + 1;
98 last_sequence_ = last_sequence;
99 log_number_ = log_number;
100 prev_log_number_ = prev_log_number;
101 }
102
103 return s;
104 }
105
Recover通過Manifest恢復VersionSet及Current Version信息,恢復完畢後Alive的Version列表中僅包含當Current Version對象。
2. Compaction時:Compaction(壓縮)應該是LevelDB中最為復雜的功能,它需要Version類組的深度介入。來看VersionSet中所有和Compaction相關的接口聲明:
1 // Apply *edit to the current version to form a new descriptor that
2 // is both saved to persistent state and installed as the new
3 // current version. Will release *mu while actually writing to the file.
4 // REQUIRES: *mu is held on entry.
5 // REQUIRES: no other thread concurrently calls LogAndApply()
6 Status LogAndApply(VersionEdit* edit, port::Mutex* mu);
7
8 // Pick level and inputs for a new compaction.
9 // Returns NULL if there is no compaction to be done.
10 // Otherwise returns a pointer to a heap-allocated object that
11 // describes the compaction. Caller should delete the result.
12 Compaction* PickCompaction();
13
14 // Return a compaction object for compacting the range [begin,end] in
15 // the specified level. Returns NULL if there is nothing in that
16 // level that overlaps the specified range. Caller should delete
17 // the result.
18 Compaction* CompactRange(
19 int level,
20 const InternalKey* begin,
21 const InternalKey* end);
22
23 // Create an iterator that reads over the compaction inputs for "*c".
24 // The caller should delete the iterator when no longer needed.
25 Iterator* MakeInputIterator(Compaction* c);
26
27 // Returns true iff some level needs a compaction.
28 bool NeedsCompaction() const {
29 Version* v = current_;
30 return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL);
31 }
32
33 // Add all files listed in any live version to *live.
34 // May also mutate some internal state.
35 void AddLiveFiles(std::set<uint64_t>* live);
數據庫的讀、寫操作都可能觸發Compaction,通過調用NeedCompaction判定是否需要執行Compaction,如需Compaction則調用PickCompaction獲取Compactoin信息。
其他幾個方法也和Compaction操作相關,其中LogAndApply非常重要,它將VersionEdit應用於Current Version、VersoinEdit持久化到Manifest文件、將新的Version做為Current Version。
1 Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
2 if (edit->has_log_number_) {
3 assert(edit->log_number_ >= log_number_);
4 assert(edit->log_number_ < next_file_number_);
5 }
6 else {
7 edit->SetLogNumber(log_number_);
8 }
9
10 if (!edit->has_prev_log_number_) {
11 edit->SetPrevLogNumber(prev_log_number_);
12 }
13
14 edit->SetNextFile(next_file_number_);
15 edit->SetLastSequence(last_sequence_);
16
17 //1. New Version = Current Version + VersionEdit
18 Version* v = new Version(this);
19 {
20 Builder builder(this, current_);
21 builder.Apply(edit);
22 builder.SaveTo(v);
23 }
24 //2. 重新計算Compaction Level\Compaction Score
25 Finalize(v);
26
27 //3. 打開數據庫時,創建新的Manifest並保存當前版本信息
28 // Initialize new descriptor log file if necessary by creating
29 // a temporary file that contains a snapshot of the current version.
30 std::string new_manifest_file;
31 Status s;
32 if (descriptor_log_ == NULL) {
33 // No reason to unlock *mu here since we only hit this path in the
34 // first call to LogAndApply (when opening the database).
35 assert(descriptor_file_ == NULL);
36 new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
37 edit->SetNextFile(next_file_number_);
38 s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
39 if (s.ok()) {
40 descriptor_log_ = new log::Writer(descriptor_file_);
41 s = WriteSnapshot(descriptor_log_); //當前版本信息
42 }
43 }
44
45 //4. 保存增量信息,即VersionEdit信息
46 // Unlock during expensive MANIFEST log write
47 {
48 mu->Unlock();
49
50 // Write new record to MANIFEST log
51 if (s.ok()) {
52 std::string record;
53 edit->EncodeTo(&record);
54 s = descriptor_log_->AddRecord(record);
55 if (s.ok()) {
56 s = descriptor_file_->Sync();
57 }
58 }
59
60 // If we just created a new descriptor file, install it by writing a
61 // new CURRENT file that points to it.
62 if (s.ok() && !new_manifest_file.empty()) {
63 s = SetCurrentFile(env_, dbname_, manifest_file_number_);
64 }
65
66 mu->Lock();
67 }
68
69 //5. 將新的版本添加到Alive版本列表,並將其做為Current Version
70 // Install the new version
71 if (s.ok()) {
72 AppendVersion(v);
73 log_number_ = edit->log_number_;
74 prev_log_number_ = edit->prev_log_number_;
75 }
76 else {
77 delete v;
78 if (!new_manifest_file.empty()) {
79 delete descriptor_log_;
80 delete descriptor_file_;
81 descriptor_log_ = NULL;
82 descriptor_file_ = NULL;
83 env_->DeleteFile(new_manifest_file);
84 }
85 }
86
87 return s;
88 }
3. 讀取數據時:LevelDB通過VersionSet中的TableCache對象完成數據讀取。
TableCache是SSTable的緩存類,NewIterator方法通過傳入指定的文件編號返回該文件的Iterator供外部使用。
1 class TableCache {
2 public:
3 TableCache(const std::string& dbname, const Options* options, int entries);
4 ~TableCache();
5
6 // Return an iterator for the specified file number (the corresponding
7 // file length must be exactly "file_size" bytes). If "tableptr" is
8 // non-NULL, also sets "*tableptr" to point to the Table object
9 // underlying the returned iterator, or NULL if no Table object underlies
10 // the returned iterator. The returned "*tableptr" object is owned by
11 // the cache and should not be deleted, and is valid for as long as the
12 // returned iterator is live.
13 Iterator* NewIterator(const ReadOptions& options,
14 uint64_t file_number,
15 uint64_t file_size,
16 Table** tableptr = NULL);
17
18 // Evict any entry for the specified file number
19 void Evict(uint64_t file_number);
20
21 private:
22 Env* const env_;
23 const std::string dbname_;
24 const Options* options_;
25 Cache* cache_;
26 };
緩存機制主要通過Cache對象實現,關於Cache的備忘下節會講。
Version
Version維護了一份當前版本的SSTable的元數據,其對外暴露的接口大部分也和元數據相關:
1 void GetOverlappingInputs(
2 int level,
3 const InternalKey* begin, // NULL means before all keys
4 const InternalKey* end, // NULL means after all keys
5 std::vector<FileMetaData*>* inputs);
6
7 // Returns true iff some file in the specified level overlaps
8 // some part of [*smallest_user_key,*largest_user_key].
9 // smallest_user_key==NULL represents a key smaller than all keys in the DB.
10 // largest_user_key==NULL represents a key largest than all keys in the DB.
11 bool OverlapInLevel(int level,
12 const Slice* smallest_user_key,
13 const Slice* largest_user_key);
14
15 // Return the level at which we should place a new memtable compaction
16 // result that covers the range [smallest_user_key,largest_user_key].
17 int PickLevelForMemTableOutput(const Slice& smallest_user_key,
18 const Slice& largest_user_key);
19
20 int NumFiles(int level) const { return files_[level].size(); }
還有兩個數據庫讀取操作相關的方法Get、UpdateStats,來看Get:
1 Status Version::Get(const ReadOptions& options,
2 const LookupKey& k,
3 std::string* value,
4 GetStats* stats)
5 {
6 Slice ikey = k.internal_key();
7 Slice user_key = k.user_key();
8 const Comparator* ucmp = vset_->icmp_.user_comparator();
9 Status s;
10
11 stats->seek_file = NULL;
12 stats->seek_file_level = -1;
13 FileMetaData* last_file_read = NULL;
14 int last_file_read_level = -1;
15
16 // We can search level-by-level since entries never hop across
17 // levels. Therefore we are guaranteed that if we find data
18 // in an smaller level, later levels are irrelevant.
19 std::vector<FileMetaData*> tmp;
20 FileMetaData* tmp2;
21
22 //1. 查找包含指定Key的所有文件
23 for (int level = 0; level < config::kNumLevels; level++) {
24 size_t num_files = files_[level].size();
25 if (num_files == 0) continue;
26
27 // Get the list of files to search in this level
28 FileMetaData* const* files = &files_[level][0];
29 if (level == 0) { //1.1 Level-0可能存在多個文件均包含該Key
30 // Level-0 files may overlap each other. Find all files that
31 // overlap user_key and process them in order from newest to oldest.
32 tmp.reserve(num_files);
33 for (uint32_t i = 0; i < num_files; i++) {
34 FileMetaData* f = files[i];
35 if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
36 ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
37 tmp.push_back(f);
38 }
39 }
40 if (tmp.empty()) continue;
41
42 std::sort(tmp.begin(), tmp.end(), NewestFirst); //將文件按更新順序排列
43 files = &tmp[0];
44 num_files = tmp.size();
45 }
46 else { //1.2 Level-0之上,一個Key只可能存在於一個文件中
47 // Binary search to find earliest index whose largest key >= ikey.
48 uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
49 if (index >= num_files) {
50 files = NULL;
51 num_files = 0;
52 }
53 else {
54 tmp2 = files[index];
55 if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
56 // All of "tmp2" is past any data for user_key
57 files = NULL;
58 num_files = 0;
59 }
60 else {
61 files = &tmp2;
62 num_files = 1;
63 }
64 }
65 }
66
67 //2. 遍歷所有文件,查找Key值數據。
68 for (uint32_t i = 0; i < num_files; ++i) {
69 if (last_file_read != NULL && stats->seek_file == NULL) {
70 // We have had more than one seek for this read. Charge the 1st file.
71 stats->seek_file = last_file_read;
72 stats->seek_file_level = last_file_read_level;
73 }
74
75 FileMetaData* f = files[i];
76 last_file_read = f;
77 last_file_read_level = level;
78
79 //2.1 SSTable迭代器
80 Iterator* iter = vset_->table_cache_->NewIterator(
81 options,
82 f->number,
83 f->file_size);
84 iter->Seek(ikey); //2.2 查找指定Key
85 const bool done = GetValue(iter, user_key, value, &s); //2.3 Get Value
86 if (!iter->status().ok()) {
87 s = iter->status();
88 delete iter;
89 return s;
90 }
91 else {
92 delete iter;
93 if (done) {
94 return s;
95 }
96 }
97 }
98 }
99
100 return Status::NotFound(Slice()); // Use an empty error message for speed
101 }
VersionEdit
版本建變化除運行期編號修改外,最主要的是SSTable文件的增刪信息。當Compaction執行時,必然會出現部分SSTable無效被移除,合並生成的新SSTable被加入到數據庫中。VersionEdit提供AddFile、DeleteFile完成變更標識。
VersionEdit提供的另外一個主要功能接口聲明如下:
1 void EncodeTo(std::string* dst) const; 2 Status DecodeFrom(const Slice& src);
這是一組序列化、反序列化方法,序列化文件為Manifest文件。
1 void VersionEdit::EncodeTo(std::string* dst) const {
2 //1. 序列化比較器
3 if (has_comparator_) {
4 PutVarint32(dst, kComparator);
5 PutLengthPrefixedSlice(dst, comparator_);
6 }
7 //2. 序列化運行期編號信息
8 if (has_log_number_) {
9 PutVarint32(dst, kLogNumber);
10 PutVarint64(dst, log_number_);
11 }
12 if (has_prev_log_number_) {
13 PutVarint32(dst, kPrevLogNumber);
14 PutVarint64(dst, prev_log_number_);
15 }
16 if (has_next_file_number_) {
17 PutVarint32(dst, kNextFileNumber);
18 PutVarint64(dst, next_file_number_);
19 }
20 if (has_last_sequence_) {
21 PutVarint32(dst, kLastSequence);
22 PutVarint64(dst, last_sequence_);
23 }
24 //3. 序列化Compact Pointer
25 for (size_t i = 0; i < compact_pointers_.size(); i++) {
26 PutVarint32(dst, kCompactPointer);
27 PutVarint32(dst, compact_pointers_[i].first); // level
28 PutLengthPrefixedSlice(dst, compact_pointers_[i].second.Encode());
29 }
30
31 //4. 序列化本次版本變化的SSTable文件列表
32 for (DeletedFileSet::const_iterator iter = deleted_files_.begin();
33 iter != deleted_files_.end();
34 ++iter) {
35 PutVarint32(dst, kDeletedFile);
36 PutVarint32(dst, iter->first); // level
37 PutVarint64(dst, iter->second); // file number
38 }
39
40 for (size_t i = 0; i < new_files_.size(); i++) {
41 const FileMetaData& f = new_files_[i].second;
42 PutVarint32(dst, kNewFile);
43 PutVarint32(dst, new_files_[i].first); // level
44 PutVarint64(dst, f.number);
45 PutVarint64(dst, f.file_size);
46 PutLengthPrefixedSlice(dst, f.smallest.Encode());
47 PutLengthPrefixedSlice(dst, f.largest.Encode());
48 }
49 }
回到最開始的問題:版本信息由什麼用?
每個LevelDB有一個Current File,Current File內唯一的信息為:當前數據庫的Manifest文件名。Manifest中包含了上次運行後全部的版本信息,LevelDB通過Manifest文件恢復版本信息。
LevelDB的版本信息為富語義功能組,它所包含的信息已經大大超出了版本定義本身。如果將Version類封裝為結構體、VersionSet僅僅為Version列表、VersionEdit也是單純的結構數據,再為上述結構提供多套功能類應該更為合理。目前來看,這應當算作LevelDB實現的一處臭味。