Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/version_edit.cc",
"db/version_edit_handler.cc",
"db/version_set.cc",
"db/version_util.cc",
"db/wal_edit.cc",
"db/wal_manager.cc",
"db/wide/wide_column_serialization.cc",
Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ set(SOURCES
db/version_edit.cc
db/version_edit_handler.cc
db/version_set.cc
db/version_util.cc
db/wal_edit.cc
db/wal_manager.cc
db/wide/wide_column_serialization.cc
Expand Down
9 changes: 9 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4349,6 +4349,15 @@ unsigned char rocksdb_options_get_paranoid_checks(rocksdb_options_t* opt) {
return opt->rep.paranoid_checks;
}

void rocksdb_options_set_open_files_async(rocksdb_options_t* opt,
unsigned char v) {
opt->rep.open_files_async = v;
}

unsigned char rocksdb_options_get_open_files_async(rocksdb_options_t* opt) {
return opt->rep.open_files_async;
}

void rocksdb_options_set_db_paths(rocksdb_options_t* opt,
const rocksdb_dbpath_t** dbpath_values,
size_t num_paths) {
Expand Down
11 changes: 6 additions & 5 deletions db/compaction/compaction_picker_fifo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
FileMetaData* f = *ritr;
assert(f);
if (f->fd.table_reader && f->fd.table_reader->GetTableProperties()) {
TableReader* t = f->fd.table_reader.load(std::memory_order_acquire);
if (t && t->GetTableProperties()) {
uint64_t newest_key_time = f->TryGetNewestKeyTime();
uint64_t creation_time =
f->fd.table_reader->GetTableProperties()->creation_time;
uint64_t creation_time = t->GetTableProperties()->creation_time;
uint64_t est_newest_key_time = newest_key_time == kUnknownNewestKeyTime
? creation_time
: newest_key_time;
Expand Down Expand Up @@ -166,8 +166,9 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
assert(f);
uint64_t newest_key_time = f->TryGetNewestKeyTime();
uint64_t creation_time = 0;
if (f->fd.table_reader && f->fd.table_reader->GetTableProperties()) {
creation_time = f->fd.table_reader->GetTableProperties()->creation_time;
TableReader* t = f->fd.table_reader.load(std::memory_order_acquire);
if (t && t->GetTableProperties()) {
creation_time = t->GetTableProperties()->creation_time;
}
uint64_t est_newest_key_time = newest_key_time == kUnknownNewestKeyTime
? creation_time
Expand Down
8 changes: 5 additions & 3 deletions db/compaction/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ class CompactionPickerTestBase : public testing::Test {
}
TableProperties tp;
tp.newest_key_time = newest_key_time;
f->fd.table_reader = new mock::MockTableReader(mock::KVVector{}, tp);
f->fd.table_reader.store(new mock::MockTableReader(mock::KVVector{}, tp),
std::memory_order_release);

vstorage->AddFile(level, f);
files_.emplace_back(f);
Expand Down Expand Up @@ -272,8 +273,9 @@ class CompactionPickerTestBase : public testing::Test {

void ClearFiles() {
for (auto& file : files_) {
if (file->fd.table_reader != nullptr) {
delete file->fd.table_reader;
TableReader* t = file->fd.table_reader.load();
if (t != nullptr) {
delete t;
}
}
files_.clear();
Expand Down
37 changes: 24 additions & 13 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11485,20 +11485,27 @@ TEST_F(DBCompactionTest, RecordNewestKeyTimeForTtlCompaction) {
std::vector<FileMetaData*> file_metadatas = GetLevelFileMetadatas(0);
ASSERT_EQ(file_metadatas.size(), 4);
uint64_t first_newest_key_time =
file_metadatas[0]->fd.table_reader->GetTableProperties()->newest_key_time;
file_metadatas[0]
->fd.table_reader.load(std::memory_order_acquire)
->GetTableProperties()
->newest_key_time;
ASSERT_NE(first_newest_key_time, kUnknownNewestKeyTime);
// Check that the newest_key_times are in expected ordering
uint64_t prev_newest_key_time = first_newest_key_time;
for (size_t idx = 1; idx < file_metadatas.size(); idx++) {
uint64_t newest_key_time = file_metadatas[idx]
->fd.table_reader->GetTableProperties()
->newest_key_time;
uint64_t newest_key_time =
file_metadatas[idx]
->fd.table_reader.load(std::memory_order_acquire)
->GetTableProperties()
->newest_key_time;

ASSERT_LT(newest_key_time, prev_newest_key_time);
prev_newest_key_time = newest_key_time;
ASSERT_EQ(newest_key_time, file_metadatas[idx]
->fd.table_reader->GetTableProperties()
->creation_time);
ASSERT_EQ(newest_key_time,
file_metadatas[idx]
->fd.table_reader.load(std::memory_order_acquire)
->GetTableProperties()
->creation_time);
}
// The delta between the first and last newest_key_times is 15s
uint64_t last_newest_key_time = prev_newest_key_time;
Expand All @@ -11512,14 +11519,18 @@ TEST_F(DBCompactionTest, RecordNewestKeyTimeForTtlCompaction) {
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
file_metadatas = GetLevelFileMetadatas(0);
ASSERT_EQ(file_metadatas.size(), 1);
ASSERT_EQ(
file_metadatas[0]->fd.table_reader->GetTableProperties()->newest_key_time,
first_newest_key_time);
ASSERT_EQ(file_metadatas[0]
->fd.table_reader.load(std::memory_order_acquire)
->GetTableProperties()
->newest_key_time,
first_newest_key_time);
// Contrast newest_key_time with creation_time, which records the oldest
// ancestor time (15s older than newest_key_time)
ASSERT_EQ(
file_metadatas[0]->fd.table_reader->GetTableProperties()->creation_time,
last_newest_key_time);
ASSERT_EQ(file_metadatas[0]
->fd.table_reader.load(std::memory_order_acquire)
->GetTableProperties()
->creation_time,
last_newest_key_time);
ASSERT_EQ(file_metadatas[0]->oldest_ancester_time, last_newest_key_time);

// Make sure TTL of 5s causes compaction
Expand Down
9 changes: 5 additions & 4 deletions db/db_impl/compacted_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ Status CompactedDBImpl::Get(const ReadOptions& _read_options,
/*b_has_ts=*/false) < 0) {
return Status::NotFound();
}
Status s = f.fd.table_reader->Get(read_options, lkey.internal_key(),
&get_context, nullptr);
TableReader* t = f.fd.table_reader.load(std::memory_order_acquire);
Status s = t->Get(read_options, lkey.internal_key(), &get_context, nullptr);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
Expand Down Expand Up @@ -164,8 +164,9 @@ void CompactedDBImpl::MultiGet(const ReadOptions& _read_options,
/*b_has_ts=*/false) < 0) {
reader_list.push_back(nullptr);
} else {
f.fd.table_reader->Prepare(lkey.internal_key());
reader_list.push_back(f.fd.table_reader);
TableReader* t = f.fd.table_reader.load(std::memory_order_acquire);
t->Prepare(lkey.internal_key());
reader_list.push_back(t);
}
}
for (size_t i = 0; i < num_keys; ++i) {
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ Status DBImpl::CloseHelper() {
// Wait for background work to finish
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_ || bg_purge_scheduled_ ||
pending_purge_obsolete_files_ ||
bg_async_file_open_scheduled_ || pending_purge_obsolete_files_ ||
error_handler_.IsRecoveryInProgress()) {
TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
bg_cv_.Wait();
Expand Down
10 changes: 10 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1720,6 +1720,13 @@ class DBImpl : public DB {
// recovery.
Status LogAndApplyForRecovery(const RecoveryContext& recovery_ctx);

// Schedule background work to open and validate SST files asynchronously.
// Called when open_files_async is enabled.
void ScheduleAsyncFileOpening();

// Background work function for async file opening.
static void BGWorkAsyncFileOpen(void* arg);

void InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap();

// Return true to proceed with current WAL record whose content is stored in
Expand Down Expand Up @@ -3062,6 +3069,9 @@ class DBImpl : public DB {
// number of background obsolete file purge jobs, submitted to the HIGH pool
int bg_purge_scheduled_ = 0;

// number of background async file opening jobs, submitted to the LOW pool
int bg_async_file_open_scheduled_ = 0;

std::deque<ManualCompactionState*> manual_compaction_dequeue_;

// shall we disable deletion of obsolete files
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
state.manifest_delete_files.size());
// We may ignore the dbname when generating the file names.
for (auto& file : state.sst_delete_files) {
(void)file.metadata->fd.table_reader.load(std::memory_order_acquire);
auto* handle = file.metadata->table_reader_handle;
if (file.only_delete_metadata) {
if (handle) {
Expand Down
106 changes: 106 additions & 0 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "db/db_impl/db_impl.h"
#include "db/error_handler.h"
#include "db/periodic_task_scheduler.h"
#include "db/version_util.h"
#include "env/composite_env_wrapper.h"
#include "file/filename.h"
#include "file/read_write_util.h"
Expand Down Expand Up @@ -294,6 +295,12 @@ Status DBImpl::ValidateOptions(const DBOptions& db_options) {
return Status::InvalidArgument(
"write_dbid_to_manifest and write_identity_file cannot both be false");
}

if (db_options.open_files_async && db_options.max_open_files != -1) {
return Status::InvalidArgument(
"open_files_async is not useful except when max_open_files = -1.");
}

return Status::OK();
}

Expand Down Expand Up @@ -2657,6 +2664,10 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
impl->DeleteObsoleteFiles();
TEST_SYNC_POINT("DBImpl::Open:AfterDeleteFiles");
impl->MaybeScheduleFlushOrCompaction();

if (impl->immutable_db_options_.open_files_async) {
impl->ScheduleAsyncFileOpening();
}
impl->mutex_.Unlock();
}

Expand Down Expand Up @@ -2705,4 +2716,99 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
return s;
}

// Context for async file opening background work
struct AsyncFileOpenContext {
DBImpl* db;
FileOptions file_options;
std::vector<Version*> versions;

~AsyncFileOpenContext() {
for (auto* v : versions) {
v->Unref();
}
}
};

void DBImpl::ScheduleAsyncFileOpening() {
mutex_.AssertHeld();

auto* ctx = new AsyncFileOpenContext();
ctx->db = this;
ctx->file_options = versions_->file_options();

for (auto cfd : *versions_->GetColumnFamilySet()) {
assert(!cfd->IsDropped());
Version* current = cfd->current();
VersionStorageInfo* vstorage = current->storage_info();
bool has_files = false;
for (int level = 0; level < vstorage->num_levels() && !has_files; level++) {
has_files = !vstorage->LevelFiles(level).empty();
}
if (has_files) {
current->Ref();
ctx->versions.push_back(current);
}
}

bg_async_file_open_scheduled_++;

// since this is a one time job, best to schedule it with high priority
env_->Schedule(&DBImpl::BGWorkAsyncFileOpen, ctx, Env::Priority::HIGH,
nullptr);
}

void DBImpl::BGWorkAsyncFileOpen(void* arg) {
TEST_SYNC_POINT("DBImpl::BGWorkAsyncFileOpen::Start");

AsyncFileOpenContext* raw_ctx = reinterpret_cast<AsyncFileOpenContext*>(arg);
DBImpl* db = raw_ctx->db;

auto deleter = [db](AsyncFileOpenContext* p) {
InstrumentedMutexLock l(&db->mutex_);
delete p;
db->bg_async_file_open_scheduled_--;
db->bg_cv_.SignalAll();
};
std::unique_ptr<AsyncFileOpenContext, decltype(deleter)> ctx(raw_ctx,
deleter);

if (ctx->versions.empty()) {
return;
}
ReadOptions ro;

for (auto* version : ctx->versions) {
ColumnFamilyData* cfd = version->cfd();
VersionStorageInfo* vstorage = version->storage_info();

const MutableCFOptions& mutable_cf_options =
cfd->GetLatestMutableCFOptions();
size_t max_file_size_for_l0_meta_pin =
MaxFileSizeForL0MetaPin(mutable_cf_options);

std::vector<std::pair<FileMetaData*, int>> files_meta;
for (int level = 0; level < vstorage->num_levels(); level++) {
for (FileMetaData* file_meta : vstorage->LevelFiles(level)) {
files_meta.emplace_back(file_meta, level);
}
}

Status s = LoadTableHandlersHelper(
files_meta, cfd->table_cache(), ctx->file_options,
*vstorage->InternalComparator(), cfd->internal_stats(),
db->immutable_db_options_.max_file_opening_threads,
false /* prefetch_index_and_filter_in_cache */, mutable_cf_options,
max_file_size_for_l0_meta_pin, ro, &db->shutting_down_);
if (!s.ok()) {
ROCKS_LOG_ERROR(
db->immutable_db_options_.info_log,
"BGWorkAsyncFileOpen: LoadTableHandlers failed for CF %s: "
"%s",
cfd->GetName().c_str(), s.ToString().c_str());
}
}
TEST_SYNC_POINT("DBImpl::BGWorkAsyncFileOpen:Done");
}

} // namespace ROCKSDB_NAMESPACE
Loading
Loading