Major Compaction
What is Major Compaction
?
- This is the actual process we commonly refer to as Compaction
.
- Unlike Minor Compaction
, which moves data from memory to disk, it merges data within the disk.
- It merges each data and moves it to a lower level.
Why use Major Compaction
?
- The most obvious benefit of using Major Compaction
is to clean up duplicate data.
- If the same key exists in sst files of different levels, you can delete the data (old data) at the lower level.
- Since previously written data might be needed, data to be deleted is recorded sequentially, and the latest data is updated to save disk space.
- Since Level 0 may not have ordered data files, merging them into Level 1 sorts the data, making it easier to find files and improving read efficiency.
When does Major Compaction
occur?
- It occurs when the files at each level accumulate to a threshold and there is no immutable present.
Overall Major Compaction Code Flow
Let's take a look at the overall Major Compaction Code Flow.
When the files at each level accumulate to a threshold, MaybeScheduleCompaction
determines whether a merge is needed. If a merge is necessary, it checks for the presence of immutable
and, if absent, calls PickCompaction
to gather information for the merge, and Major Compaction proceeds in DoCompactionWork
.
Let's delve into the process of Major compaction through the source code.
Overall Process
- The number of files at a certain level reaches a threshold.
MaybeScheduleCompaction
determines if a merge is needed.BackgroundCompaction
checks for the presence of immutable and, if absent, calls functions for Major Compaction.PickCompaction
stores the information needed for the merge.- With the stored information, the actual merge is carried out in
DoCompactionWork
. - In
DoCompactionWork
,OpenCompactionOutputFile
,FinishCompactionOutputFile
, andInstallCompactionResults
are executed to create sst files, insert merged records, check for errors, and place the files in the respective level. - Afterward, the version is updated, and finally,
CleanupCompaction
is called to delete unnecessary sst files, completing the Major Compaction.
MaybeScheduleCompaction
Determines if a merge is needed.
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
//(TeamCompaction)If a merger is already in progress, DB is being deleted, or if there is an error, nothing will happen.
if (background_compaction_scheduled_) {
} else if (shutting_down_.load(std::memory_order_acquire)) {
} else if (!bg_error_.ok()) {
//(TeamCompaction)If immutable does not exist, manual compaction do not exist, and mergers at each level are not required, nothing will happen.
} else if (imm_ == nullptr && manual_compaction_ == nullptr &&
!versions_->NeedsCompaction()) {
//(TeamCompaction)In other cases, since a merger occurs, call 'BGWork' to proceed with the merger.
} else {
background_compaction_scheduled_ = true;
env_->Schedule(&DBImpl::BGWork, this);
}
}
- If a merge is already in progress, the DB is being deleted, or there is an error, nothing happens.
- If immutable does not exist, manual compaction does not exist, and compaction is not needed at each level (
NeedCompaction
), nothing happens. (Manual compaction will be explained in the BackgroundCompaction section.) - In other cases, since a merge is needed,
BGWork
is called to proceed with the merge.
BackgroundCompaction
Determines Major Compaction or Minor Compaction based on the presence of immutable. Also, if the user wants manual compaction, it proceeds here.
void DBImpl::BackgroundCompaction() {
mutex_.AssertHeld();
//(TeamCompaction)If imutable exists, proceed with Minor Compact through the CompactMemtable function.
if (imm_ != nullptr) {
CompactMemTable();
return;
}
Compaction* c;
//(TeamCompaction)If you want to merge manually, proceed with the manual merger with true (most of them are automatically merged, so they are not used well)
bool is_manual = (manual_compaction_ != nullptr);
InternalKey manual_end;
if (is_manual) {
ManualCompaction* m = manual_compaction_;
c = versions_->CompactRange(m->level, m->begin, m->end);
m->done = (c == nullptr);
if (c != nullptr) {
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
}
Log(options_.info_log,
"Manual compaction at level-%d from %s .. %s; will stop at %s\n",
m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
(m->end ? m->end->DebugString().c_str() : "(end)"),
(m->done ? "(end)" : manual_end.DebugString().c_str()));
} //(TeamCompaction)Store information that needs to be merged if immutable does not exist
else {
c = versions_->PickCompaction();
}
//...skipped
Status status;
//(TeamCompaction)Nothing happens if there is no information to merge
if (c == nullptr) {
} else if (!is_manual && c->IsTrivialMove()) {
//...skipped (TeamCompaciton) Manual Compaction progression part
} //(TeamCompaction)With information to merge, proceed with Major Compaction.
else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
if (!status.ok()) {
RecordBackgroundError(status);
}
//(TeamCompaction)If there is no problem after completing the merger completely, remove the sst files (files collected after the merger) that are now unnecessary
CleanupCompaction(compact);
//(TeamCompaction)Remove the sst files that were saved for the merger
c->ReleaseInputs();
RemoveObsoleteFiles();
}
//...skipped
- If immutable exists, Minor Compaction (Flush) is performed through
CompactMemtable
. - If immutable does not exist and manual compaction is not desired, information for compaction is stored through
PickCompaction
. - If there is no information to merge, nothing happens. If there is, Major Compaction is carried out in
DoCompactionWork
based on that information. - After the merge is completed in
DoCompactionWork
and the state is intact, unnecessary sst files from before the merge are removed throughCleanupCompaction
to finish.
(Additional Explanation)
Manual Compaction vs Automatic Compaction
Manual Compaction
- Rarely used and mainly for debugging purposes.
- To use, set a key range in the benchmark and execute it, setting the manual compaction boolean to true to perform manual compaction.
Automatic Compaction
- Most of the compactions we use are performed automatically.
DoCompactionWork
The actual Major Compaction is carried out.
Status DBImpl::DoCompactionWork(CompactionState* compact) {
//...skipped
//(TeamCompaction)Create an iter for the index block and data block of each sst file from level 0, 1 to N, and use it to find each key.
//(TeamCompaction)After that, arrange the created iters so that they can be listed in the order of level 0, 1 to N, and store them in input (see Compaction-Iter.md for details)
Iterator* input = versions_->MakeInputIterator(compact->compaction);
// Release mutex while we're actually doing the compaction work
mutex_.Unlock();
//(TeamCompaction)Position the pointer position of the created iter first
input->SeekToFirst();
Status status;
//(TeamCompaction)Parse the internal key and divide it into user key, sequence number, and type
ParsedInternalKey ikey;
std::string current_user_key;
bool has_current_user_key = false;
//(TeamCompaction)Set the latest key to the highest value
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
//(TeamCompaction)The process of repeatedly finding and processing the key/value that needs to be merged through iter stored in the input
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
//...skipped
//(TeamCompaction)Obtain the key of the current corresponding sst file
Slice key = input->key();
//(TeamCompaction)Check if you need an sst file to put the key in, and if there is an sst file to put in, the merger is completed
if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != NULL) {
status = FinishCompactionOutputFile(compact, input);
}
//(TeamCompaction)Compare different sequences for the same key to obtain the latest user key and delete records from different user keys that were the same
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
} else {
if (!has_current_user_key ||
user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
0) {
// First occurrence of this user key
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
}
if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key
drop = true; // (A)
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= compact->smallest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
}
last_sequence_for_key = ikey.sequence;
}
//...skipped
if (!drop) {
//(TeamCompaction)Create new sst file if necessary
if (compact->builder == nullptr) {
status = OpenCompactionOutputFile(compact);
if (!status.ok()) {
break;
}
}
if (compact->builder->NumEntries() == 0) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
//(TeamCompaction)Add records from the latest user key to the sst file
compact->builder->Add(key, input->value());
//(TeamCompaction)If data accumulates in the sst file and exceeds the maximum file size, the merger of the sst file is completed
if (compact->builder->FileSize() >=
compact->compaction->MaxOutputFileSize()) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
}
//(TeamCompaction)Among the sst files arranged in order of level 0, 1 to N in the input variable, the next sst file is moved on
input->Next();
}
if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
status = Status::IOError("Deleting DB during compaction");
}
//(TeamCompaction)If the sst file does not have an error and the sst file exists, the merger is completed
if (status.ok() && compact->builder != nullptr) {
status = FinishCompactionOutputFile(compact, input);
}
if (status.ok()) {
status = input->status();
}
//...skipped
//(TeamCompaction)Move the merged sst file to its level
if (status.ok()) {
status = InstallCompactionResults(compact);
}
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
return status;
}
- Create an iter for the
index block
anddata block
of each sst file from level 0, 1 to N, and use it to find each key. Then, arrange the created iters so that they can be listed in the order of level 0, 1 to N, and store them ininput
. - Position the pointer of the iter to the first position and parse the
internalkey
into user key, sequence, and type. - Obtain the key of the current corresponding sst file using
key()
. - Compare different sequences for the same key to obtain the latest user key's record and delete records from different user keys that were the same.
- If an sst file is needed, create a new one and insert the latest user key's record.
- If data accumulates in the sst file and exceeds the maximum file size, complete the merge and move to the next sst file.
- Repeat steps 2-6 to complete the merge of all sst files, check the overall status, and if there are no errors, move the merged sst file to its level.
OpenCompactionOutputFile
Creates a new sst file to insert the merged user key records.
Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
//...skipped
//(TeamCompaction)Set the number of the newly created sst file
std::string fname = TableFileName(dbname_, file_number);
//(TeamCompaction)Numbered and temporarily merged records in writablefile
Status s = env_->NewWritableFile(fname, &compact->outfile);
//(TeamCompaction)If the record is in good condition, create a sst file and add it to the builder
if (s.ok()) {
compact->builder = new TableBuilder(options_, compact->outfile);
}
return s;
}
- Assign a number to the newly created sst file.
- Numbered and temporarily merged records in
WriteableFile
. - If the record is in good condition, create an sst file and add it to the builder.
FinishCompactionOutputFile
Checks for errors in the merged iter and sst file.
Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
Iterator* input) {
//...skipped
//(TeamCompaction)Check for errors for iter
Status s = input->status();
const uint64_t current_entries = compact->builder->NumEntries();
if (s.ok()) {
s = compact->builder->Finish();
} else {
compact->builder->Abandon();
}
//...skipped
//(TeamCompaction)Check and finalize errors for the sst file itself
if (s.ok()) {
s = compact->outfile->Sync();
}
if (s.ok()) {
s = compact->outfile->Close();
}
delete compact->outfile;
compact->outfile = nullptr;
//...skipped
}
- Check for errors in the iter.
- Check and finalize errors for the sst file itself.
InstallCompactionResults
Moves the merged sst file to its level and updates the version.
Status DBImpl::InstallCompactionResults(CompactionState* compact) {
//...skipped
//(TeamCompaction) Moved merged sst files to that level
compact->compaction->AddInputDeletions(compact->compaction->edit());
const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest);
}
//(TeamCompaction)Update version
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
- Move the merged sst file to its level.
- Update the version to complete.
CleanupCompaction
Removes unnecessary sst files (sst files before the merge) after the merge is completed.
void DBImpl::CleanupCompaction(CompactionState* compact) {
mutex_.AssertHeld();
//(TeamCompaction)Remove files if they exist in the builder that contains the sst files that need to be merged
if (compact->builder != nullptr) {
// May happen if we get a shutdown call in the middle of compaction
compact->builder->Abandon();
delete compact->builder;
} else {
assert(compact->outfile == nullptr);
}
delete compact->outfile;
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
pending_outputs_.erase(out.number);
}
delete compact;
}
- Since the latest sst file has been updated, if there are files in the
builder
that contains the sst files to be merged, they are removed as they are no longer needed. - Finalize the Major Compaction.