程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> C語言 >> C++ >> C++入門知識 >> LevelDB源碼之五Current文件\Manifest文件\版本信息,leveldbmanifest

LevelDB源碼之五Current文件\Manifest文件\版本信息,leveldbmanifest

編輯:C++入門知識

LevelDB源碼之五Current文件\Manifest文件\版本信息,leveldbmanifest


版本信息有什麼用?先來簡要說明三個類的具體用途:

  • Version:代表了某一時刻的數據庫版本信息,版本信息的主要內容是當前各個Level的SSTable數據文件列表。
  • VersionSet:維護了一份Version列表,包含當前Alive的所有Version信息,列表中第一個代表數據庫的當前版本。
  • VersionEdit:表示Version之間的變化,相當於delta 增量,表示有增加了多少文件,刪除了文件。Version0 +VersionEdit-->Version1。VersionEdit會保存到MANIFEST文件中,當做數據恢復時就會從MANIFEST文件中讀出來重建數據。那麼,何時會觸發版本變遷呢?Compaction。

有了上面的描述,再來看版本信息到底有什麼用呢?

如果還不能給出答案,將上述三個類當作一個整體,再來看Version類組到底包含了哪些信息:

  • 運行信息
    • 運行期各種遞增ID值:log number(log編號)、next file number(下一個文件編號)、last sequence(單條write操作遞增該編號,可認為是版本號)、prev log number(目前已棄用)。
    • 比較器名稱
  • 數據庫元信息
    • 各Level的SSTable文件列表
    • SSTable緩存
  • Compaction信息、
    • Compaction Pointer
    • 通過Seek觸發Compaction信息(文件名、Level);通過Compaction觸發Compaction信息(score、level)

關於版本信息到底有什麼用這個話題暫時先放一放,來看具體類。

VersionSet(維護了一份Version列表,包含當前Alive的所有Version信息,列表中第一個代表數據庫的當前版本)

VersionSet類只有一個實例,在DBImpl(數據庫實現類)類中,維護所有活動的Version對象,來看VersionSet的所有語境:

1. 數據庫啟動時:通過Current文件加載Manifset文件,讀取Manifest文件完成版本信息恢復

  1     Status VersionSet::Recover() {
  2         ......
  3         // Read "CURRENT" file, which contains a pointer to the current manifest file
  4         std::string current;
  5         Status s = ReadFileToString(env_, CurrentFileName(dbname_), &current);
  6         ......
  7         current.resize(current.size() - 1);
  8 
  9         std::string dscname = dbname_ + "/" + current;
 10         SequentialFile* file;
 11         s = env_->NewSequentialFile(dscname, &file);    //manifest文件
 12         ......
 13 
 14         bool have_log_number = false;
 15         bool have_prev_log_number = false;
 16         bool have_next_file = false;
 17         bool have_last_sequence = false;
 18         uint64_t next_file = 0;
 19         uint64_t last_sequence = 0;
 20         uint64_t log_number = 0;
 21         uint64_t prev_log_number = 0;
 22         VersionSet::Builder builder(this, current_);
 23 
 24         {
 25             LogReporter reporter;
 26             reporter.status = &s;
 27             log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
 28             Slice record;
 29             std::string scratch;
 30             while (reader.ReadRecord(&record, &scratch) && s.ok()) {    //依次讀取manifest中的VersionEdit信息,構建VersionSet
 31                 VersionEdit edit;
 32                 s = edit.DecodeFrom(record);
 33                 if (s.ok()) {        //Comparator不一致時,返回錯誤信息
 34                     if (edit.has_comparator_ &&
 35                         edit.comparator_ != icmp_.user_comparator()->Name()) {
 36                         s = Status::InvalidArgument(
 37                             edit.comparator_ + "does not match existing comparator ",
 38                             icmp_.user_comparator()->Name());
 39                         //實際上,這裡可以直接break
 40                     }
 41                 }
 42 
 43                 if (s.ok()) {
 44                     builder.Apply(&edit);    //構建當前Version
 45                 }
 46 
 47                 if (edit.has_log_number_) {
 48                     log_number = edit.log_number_;
 49                     have_log_number = true;
 50                 }
 51 
 52                 if (edit.has_prev_log_number_) {
 53                     prev_log_number = edit.prev_log_number_;
 54                     have_prev_log_number = true;
 55                 }
 56 
 57                 if (edit.has_next_file_number_) {
 58                     next_file = edit.next_file_number_;
 59                     have_next_file = true;
 60                 }
 61 
 62                 if (edit.has_last_sequence_) {
 63                     last_sequence = edit.last_sequence_;
 64                     have_last_sequence = true;
 65                 }
 66             }
 67         }
 68         delete file;
 69         file = NULL;
 70 
 71         ...... 89 
 90         if (s.ok()) {
 91             Version* v = new Version(this);
 92             builder.SaveTo(v);
 93             // Install recovered version
 94             Finalize(v);    //計算下次執行壓縮的Level
 95             AppendVersion(v);
 96             manifest_file_number_ = next_file;
 97             next_file_number_ = next_file + 1;
 98             last_sequence_ = last_sequence;
 99             log_number_ = log_number;
100             prev_log_number_ = prev_log_number;
101         }
102 
103         return s;
104     }
105         

Recover通過Manifest恢復VersionSet及Current Version信息,恢復完畢後Alive的Version列表中僅包含當Current Version對象。

2. Compaction時:Compaction(壓縮)應該是LevelDB中最為復雜的功能,它需要Version類組的深度介入。來看VersionSet中所有和Compaction相關的接口聲明:

 1         // Apply *edit to the current version to form a new descriptor that
 2         // is both saved to persistent state and installed as the new
 3         // current version.  Will release *mu while actually writing to the file.
 4         // REQUIRES: *mu is held on entry.
 5         // REQUIRES: no other thread concurrently calls LogAndApply()
 6         Status LogAndApply(VersionEdit* edit, port::Mutex* mu);
 7 
 8         // Pick level and inputs for a new compaction.
 9         // Returns NULL if there is no compaction to be done.
10         // Otherwise returns a pointer to a heap-allocated object that
11         // describes the compaction.  Caller should delete the result.
12         Compaction* PickCompaction();
13 
14         // Return a compaction object for compacting the range [begin,end] in
15         // the specified level.  Returns NULL if there is nothing in that
16         // level that overlaps the specified range.  Caller should delete
17         // the result.
18         Compaction* CompactRange(
19             int level,
20             const InternalKey* begin,
21             const InternalKey* end);
22 
23         // Create an iterator that reads over the compaction inputs for "*c".
24         // The caller should delete the iterator when no longer needed.
25         Iterator* MakeInputIterator(Compaction* c);
26 
27         // Returns true iff some level needs a compaction.
28         bool NeedsCompaction() const {
29             Version* v = current_;
30             return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL);
31         }
32 
33         // Add all files listed in any live version to *live.
34         // May also mutate some internal state.
35         void AddLiveFiles(std::set<uint64_t>* live);

數據庫的讀、寫操作都可能觸發Compaction,通過調用NeedCompaction判定是否需要執行Compaction,如需Compaction則調用PickCompaction獲取Compactoin信息。

其他幾個方法也和Compaction操作相關,其中LogAndApply非常重要,它將VersionEdit應用於Current Version、VersoinEdit持久化到Manifest文件、將新的Version做為Current Version。

 1     Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
 2         if (edit->has_log_number_) {
 3             assert(edit->log_number_ >= log_number_);
 4             assert(edit->log_number_ < next_file_number_);
 5         }
 6         else {
 7             edit->SetLogNumber(log_number_);
 8         }
 9 
10         if (!edit->has_prev_log_number_) {
11             edit->SetPrevLogNumber(prev_log_number_);
12         }
13 
14         edit->SetNextFile(next_file_number_);
15         edit->SetLastSequence(last_sequence_);
16 
17         //1. New Version = Current Version + VersionEdit
18         Version* v = new Version(this);    
19         {
20             Builder builder(this, current_);
21             builder.Apply(edit);
22             builder.SaveTo(v);
23         }
24         //2. 重新計算Compaction Level\Compaction Score
25         Finalize(v);    
26 
27         //3. 打開數據庫時,創建新的Manifest並保存當前版本信息
28         // Initialize new descriptor log file if necessary by creating
29         // a temporary file that contains a snapshot of the current version.
30         std::string new_manifest_file;
31         Status s;
32         if (descriptor_log_ == NULL) {
33             // No reason to unlock *mu here since we only hit this path in the
34             // first call to LogAndApply (when opening the database).
35             assert(descriptor_file_ == NULL);
36             new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
37             edit->SetNextFile(next_file_number_);
38             s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
39             if (s.ok()) {
40                 descriptor_log_ = new log::Writer(descriptor_file_);
41                 s = WriteSnapshot(descriptor_log_);    //當前版本信息
42             }
43         }
44 
45         //4. 保存增量信息,即VersionEdit信息
46         // Unlock during expensive MANIFEST log write
47         {
48             mu->Unlock();
49 
50             // Write new record to MANIFEST log
51             if (s.ok()) {
52                 std::string record;
53                 edit->EncodeTo(&record);
54                 s = descriptor_log_->AddRecord(record);
55                 if (s.ok()) {
56                     s = descriptor_file_->Sync();
57                 }
58             }
59 
60             // If we just created a new descriptor file, install it by writing a
61             // new CURRENT file that points to it.
62             if (s.ok() && !new_manifest_file.empty()) {
63                 s = SetCurrentFile(env_, dbname_, manifest_file_number_);
64             }
65 
66             mu->Lock();
67         }
68 
69         //5. 將新的版本添加到Alive版本列表,並將其做為Current Version
70         // Install the new version
71         if (s.ok()) {
72             AppendVersion(v);
73             log_number_ = edit->log_number_;
74             prev_log_number_ = edit->prev_log_number_;
75         }
76         else {
77             delete v;
78             if (!new_manifest_file.empty()) {
79                 delete descriptor_log_;
80                 delete descriptor_file_;
81                 descriptor_log_ = NULL;
82                 descriptor_file_ = NULL;
83                 env_->DeleteFile(new_manifest_file);
84             }
85         }
86 
87         return s;
88     }

3. 讀取數據時:LevelDB通過VersionSet中的TableCache對象完成數據讀取。

TableCache是SSTable的緩存類,NewIterator方法通過傳入指定的文件編號返回該文件的Iterator供外部使用。

 1     class TableCache {
 2     public:
 3         TableCache(const std::string& dbname, const Options* options, int entries);
 4         ~TableCache();
 5 
 6         // Return an iterator for the specified file number (the corresponding
 7         // file length must be exactly "file_size" bytes).  If "tableptr" is
 8         // non-NULL, also sets "*tableptr" to point to the Table object
 9         // underlying the returned iterator, or NULL if no Table object underlies
10         // the returned iterator.  The returned "*tableptr" object is owned by
11         // the cache and should not be deleted, and is valid for as long as the
12         // returned iterator is live.
13         Iterator* NewIterator(const ReadOptions& options,
14             uint64_t file_number,
15             uint64_t file_size,
16             Table** tableptr = NULL);
17 
18         // Evict any entry for the specified file number
19         void Evict(uint64_t file_number);
20 
21     private:
22         Env* const env_;
23         const std::string dbname_;
24         const Options* options_;
25         Cache* cache_;
26     };

緩存機制主要通過Cache對象實現,關於Cache的備忘下節會講。

Version

Version維護了一份當前版本的SSTable的元數據,其對外暴露的接口大部分也和元數據相關:

 1         void GetOverlappingInputs(
 2             int level,
 3             const InternalKey* begin,         // NULL means before all keys
 4             const InternalKey* end,           // NULL means after all keys
 5             std::vector<FileMetaData*>* inputs);
 6 
 7         // Returns true iff some file in the specified level overlaps
 8         // some part of [*smallest_user_key,*largest_user_key].
 9         // smallest_user_key==NULL represents a key smaller than all keys in the DB.
10         // largest_user_key==NULL represents a key largest than all keys in the DB.
11         bool OverlapInLevel(int level,
12             const Slice* smallest_user_key,
13             const Slice* largest_user_key);
14 
15         // Return the level at which we should place a new memtable compaction
16         // result that covers the range [smallest_user_key,largest_user_key].
17         int PickLevelForMemTableOutput(const Slice& smallest_user_key,
18             const Slice& largest_user_key);
19 
20         int NumFiles(int level) const { return files_[level].size(); }

還有兩個數據庫讀取操作相關的方法Get、UpdateStats,來看Get:

 

  1     Status Version::Get(const ReadOptions& options,
  2         const LookupKey& k,
  3         std::string* value,
  4         GetStats* stats)
  5     {
  6         Slice ikey = k.internal_key();
  7         Slice user_key = k.user_key();
  8         const Comparator* ucmp = vset_->icmp_.user_comparator();
  9         Status s;
 10 
 11         stats->seek_file = NULL;
 12         stats->seek_file_level = -1;
 13         FileMetaData* last_file_read = NULL;
 14         int last_file_read_level = -1;
 15 
 16         // We can search level-by-level since entries never hop across
 17         // levels.  Therefore we are guaranteed that if we find data
 18         // in an smaller level, later levels are irrelevant.
 19         std::vector<FileMetaData*> tmp;
 20         FileMetaData* tmp2;
 21 
 22         //1. 查找包含指定Key的所有文件
 23         for (int level = 0; level < config::kNumLevels; level++) {
 24             size_t num_files = files_[level].size();
 25             if (num_files == 0) continue;
 26 
 27             // Get the list of files to search in this level
 28             FileMetaData* const* files = &files_[level][0];
 29             if (level == 0) {    //1.1 Level-0可能存在多個文件均包含該Key
 30                 // Level-0 files may overlap each other.  Find all files that
 31                 // overlap user_key and process them in order from newest to oldest.
 32                 tmp.reserve(num_files);
 33                 for (uint32_t i = 0; i < num_files; i++) {
 34                     FileMetaData* f = files[i];
 35                     if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
 36                         ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
 37                         tmp.push_back(f);
 38                     }
 39                 }
 40                 if (tmp.empty()) continue;
 41 
 42                 std::sort(tmp.begin(), tmp.end(), NewestFirst);    //將文件按更新順序排列
 43                 files = &tmp[0];
 44                 num_files = tmp.size();
 45             }
 46             else {            //1.2 Level-0之上,一個Key只可能存在於一個文件中
 47                 // Binary search to find earliest index whose largest key >= ikey.
 48                 uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
 49                 if (index >= num_files) {
 50                     files = NULL;
 51                     num_files = 0;
 52                 }
 53                 else {
 54                     tmp2 = files[index];
 55                     if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
 56                         // All of "tmp2" is past any data for user_key
 57                         files = NULL;
 58                         num_files = 0;
 59                     }
 60                     else {
 61                         files = &tmp2;
 62                         num_files = 1;
 63                     }
 64                 }
 65             }
 66 
 67             //2. 遍歷所有文件,查找Key值數據。
 68             for (uint32_t i = 0; i < num_files; ++i) {
 69                 if (last_file_read != NULL && stats->seek_file == NULL) {
 70                     // We have had more than one seek for this read.  Charge the 1st file.
 71                     stats->seek_file = last_file_read;
 72                     stats->seek_file_level = last_file_read_level;
 73                 }
 74 
 75                 FileMetaData* f = files[i];
 76                 last_file_read = f;
 77                 last_file_read_level = level;
 78 
 79                 //2.1 SSTable迭代器
 80                 Iterator* iter = vset_->table_cache_->NewIterator(
 81                     options,
 82                     f->number,
 83                     f->file_size);
 84                 iter->Seek(ikey);    //2.2 查找指定Key
 85                 const bool done = GetValue(iter, user_key, value, &s);    //2.3 Get Value
 86                 if (!iter->status().ok()) {
 87                     s = iter->status();
 88                     delete iter;
 89                     return s;
 90                 }
 91                 else {
 92                     delete iter;
 93                     if (done) {
 94                         return s;
 95                     }
 96                 }
 97             }
 98         }
 99 
100         return Status::NotFound(Slice());  // Use an empty error message for speed
101     }

VersionEdit

版本建變化除運行期編號修改外,最主要的是SSTable文件的增刪信息。當Compaction執行時,必然會出現部分SSTable無效被移除,合並生成的新SSTable被加入到數據庫中。VersionEdit提供AddFile、DeleteFile完成變更標識。

VersionEdit提供的另外一個主要功能接口聲明如下:

1         void EncodeTo(std::string* dst) const;
2         Status DecodeFrom(const Slice& src);

這是一組序列化、反序列化方法,序列化文件為Manifest文件。

 1     void VersionEdit::EncodeTo(std::string* dst) const {
 2         //1. 序列化比較器
 3         if (has_comparator_) {
 4             PutVarint32(dst, kComparator);
 5             PutLengthPrefixedSlice(dst, comparator_);
 6         }
 7         //2. 序列化運行期編號信息
 8         if (has_log_number_) {
 9             PutVarint32(dst, kLogNumber);
10             PutVarint64(dst, log_number_);
11         }
12         if (has_prev_log_number_) {
13             PutVarint32(dst, kPrevLogNumber);
14             PutVarint64(dst, prev_log_number_);
15         }
16         if (has_next_file_number_) {
17             PutVarint32(dst, kNextFileNumber);
18             PutVarint64(dst, next_file_number_);
19         }
20         if (has_last_sequence_) {
21             PutVarint32(dst, kLastSequence);
22             PutVarint64(dst, last_sequence_);
23         }
24         //3. 序列化Compact Pointer
25         for (size_t i = 0; i < compact_pointers_.size(); i++) {
26             PutVarint32(dst, kCompactPointer);
27             PutVarint32(dst, compact_pointers_[i].first);  // level
28             PutLengthPrefixedSlice(dst, compact_pointers_[i].second.Encode());
29         }
30 
31         //4. 序列化本次版本變化的SSTable文件列表
32         for (DeletedFileSet::const_iterator iter = deleted_files_.begin();
33         iter != deleted_files_.end();
34             ++iter) {
35             PutVarint32(dst, kDeletedFile);
36             PutVarint32(dst, iter->first);   // level
37             PutVarint64(dst, iter->second);  // file number
38         }
39 
40         for (size_t i = 0; i < new_files_.size(); i++) {
41             const FileMetaData& f = new_files_[i].second;
42             PutVarint32(dst, kNewFile);
43             PutVarint32(dst, new_files_[i].first);  // level
44             PutVarint64(dst, f.number);
45             PutVarint64(dst, f.file_size);
46             PutLengthPrefixedSlice(dst, f.smallest.Encode());
47             PutLengthPrefixedSlice(dst, f.largest.Encode());
48         }
49     }

回到最開始的問題:版本信息由什麼用?

每個LevelDB有一個Current File,Current File內唯一的信息為:當前數據庫的Manifest文件名。Manifest中包含了上次運行後全部的版本信息,LevelDB通過Manifest文件恢復版本信息。

LevelDB的版本信息為富語義功能組,它所包含的信息已經大大超出了版本定義本身。如果將Version類封裝為結構體、VersionSet僅僅為Version列表、VersionEdit也是單純的結構數據,再為上述結構提供多套功能類應該更為合理。目前來看,這應當算作LevelDB實現的一處臭味。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved