Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions fs/cache/full_file_cache/cache_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ ICacheStore* FileCachePool::do_open(std::string_view pathname, int flags, mode_t
find->second->openCount++;
}

auto lruEntry = find->second.get();
if (lruEntry->openCount == 1) {
SCOPED_LOCK(lruEntry->lock_);
lruEntry->rw_lock_ = std::make_unique<photon::rwlock>();
}

return new FileCacheStore(this, localFile, refillUnit_, find);
}

Expand Down Expand Up @@ -136,7 +142,9 @@ int FileCachePool::evict(std::string_view filename) {
}
int err = 0;
{
photon::scoped_rwlock rl(lruEntry->rw_lock_, photon::WLOCK);
SCOPED_LOCK(lruEntry->lock_);
if (lruEntry->rw_lock_) lruEntry->rw_lock_->lock(photon::WLOCK);
DEFER(if (lruEntry->rw_lock_) lruEntry->rw_lock_->unlock(););
err = mediaFs_->truncate(filePath.data(), 0);
lruEntry->truncate_done = false;
}
Expand Down Expand Up @@ -165,7 +173,12 @@ bool FileCachePool::isFull() {
}

void FileCachePool::removeOpenFile(FileNameMap::iterator iter) {
iter->second->openCount--;
auto lruEntry = iter->second.get();
lruEntry->openCount--;
if (lruEntry->openCount == 0) {
SCOPED_LOCK(lruEntry->lock_);
lruEntry->rw_lock_.reset();
}
}

void FileCachePool::forceRecycle() {
Expand Down Expand Up @@ -257,7 +270,9 @@ void FileCachePool::eviction() {
}

{
photon::scoped_rwlock rl(lruEntry->rw_lock_, photon::WLOCK);
SCOPED_LOCK(lruEntry->lock_);
if (lruEntry->rw_lock_) lruEntry->rw_lock_->lock(photon::WLOCK);
DEFER(if (lruEntry->rw_lock_) lruEntry->rw_lock_->unlock(););
err = mediaFs_->truncate(fileName.data(), 0);
lruEntry->truncate_done = false;
}
Expand Down
3 changes: 2 additions & 1 deletion fs/cache/full_file_cache/cache_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class FileCachePool : public photon::fs::ICachePool {
uint32_t lruIter;
int openCount;
uint64_t size;
photon::rwlock rw_lock_;
std::unique_ptr<photon::rwlock> rw_lock_;
photon::spinlock lock_; // to protect rw_lock_
bool truncate_done;
};

Expand Down
8 changes: 4 additions & 4 deletions fs/cache/full_file_cache/cache_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ FileCacheStore::~FileCacheStore() {

ICacheStore::try_preadv_result FileCacheStore::try_preadv2(const struct iovec *iov, int iovcnt,
off_t offset, int flags) {
auto lruEntry = static_cast<FileCachePool::LruEntry *>(iterator_->second.get());
photon::scoped_rwlock rl(lruEntry->rw_lock_, photon::RLOCK);
auto lruEntry = iterator_->second.get();
photon::scoped_rwlock rl(*lruEntry->rw_lock_, photon::RLOCK);
return this->ICacheStore::try_preadv2(iov, iovcnt, offset, flags);
}

Expand All @@ -70,8 +70,8 @@ ssize_t FileCacheStore::do_pwritev(const struct iovec *iov, int iovcnt, off_t of
{
ssize_t ret;
iovector_view view((iovec*)iov, iovcnt);
auto lruEntry = static_cast<FileCachePool::LruEntry *>(iterator_->second.get());
photon::scoped_rwlock rl(lruEntry->rw_lock_, photon::RLOCK);
auto lruEntry = iterator_->second.get();
photon::scoped_rwlock rl(*lruEntry->rw_lock_, photon::RLOCK);
if (!lruEntry->truncate_done) {
// May repeated ftruncate() here, but it doesn't matter
ret = localFile_->ftruncate(actual_size_);
Expand Down
8 changes: 6 additions & 2 deletions fs/cache/full_file_cache/quota_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,9 @@ int QuotaFilePool::evict(std::string_view filename) {
int err;
auto lruEntry = static_cast<QuotaLruEntry*>(fileIter->second.get());
{
photon::scoped_rwlock rl(lruEntry->rw_lock_, photon::WLOCK);
SCOPED_LOCK(lruEntry->lock_);
if (lruEntry->rw_lock_) lruEntry->rw_lock_->lock(photon::WLOCK);
DEFER(if (lruEntry->rw_lock_) lruEntry->rw_lock_->unlock(););
lru.mark_key_cleared(lruEntry->QuotaLruIter);
err = mediaFs_->truncate(filePath.data(), 0);
if (err) {
Expand Down Expand Up @@ -235,7 +237,9 @@ void QuotaFilePool::dirEviction() {
int err;
bool flags_dir_delete = false;
{
photon::scoped_rwlock rl(lruEntry->rw_lock_, photon::WLOCK);
SCOPED_LOCK(lruEntry->lock_);
if (lruEntry->rw_lock_) lruEntry->rw_lock_->lock(photon::WLOCK);
DEFER(if (lruEntry->rw_lock_) lruEntry->rw_lock_->unlock(););
if (lruEntry->openCount==0){
dir->lru.mark_key_cleared(lruEntry->QuotaLruIter);
} else {
Expand Down
5 changes: 3 additions & 2 deletions fs/cache/full_file_cache/quota_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ ssize_t QuotaFileStore::do_pwritev2(const struct iovec *iov, int iovcnt, off_t o
}
ssize_t ret;
{
auto lruEntry = static_cast<FileCachePool::LruEntry*>(iterator_->second.get());
photon::scoped_rwlock wl(lruEntry->rw_lock_, photon::WLOCK);
auto lruEntry = iterator_->second.get();
SCOPED_LOCK(lruEntry->lock_);
photon::scoped_rwlock wl(*lruEntry->rw_lock_, photon::WLOCK);
ret = localFile_->pwritev(iov, iovcnt, offset);
}
if (ret < 0 && ENOSPC == errno) {
Expand Down
36 changes: 36 additions & 0 deletions fs/cache/test/cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,42 @@ TEST(CachePool, random_evict_file) {
if (cacheStore) cacheStore->release();
}

TEST(CachePool, do_open_same_file) {
std::string root = "/tmp/ease/cache/do_open_same_file/";
SetupTestDir(root);
auto mediaFs = new_localfs_adaptor(root.c_str(), ioengine_libaio);
auto alignFs = new_aligned_fs_adaptor(mediaFs, 4 * 1024, true, true);
auto cacheAllocator = new AlignedAlloc(4 * 1024);
auto roCachedFs = new_full_file_cached_fs(nullptr, alignFs, 1024 * 1024,
1, 1000 * 1000 * 1, 128ul * 1024 * 1024, cacheAllocator, 0);
auto cachePool = roCachedFs->get_pool();
DEFER({ delete cacheAllocator; delete roCachedFs; });

auto fileName = "/testDir/testfile";
std::vector<ICacheStore*> cacheStores;
for (int i = 0; i < 10; i++) {
cacheStores.push_back(cachePool->do_open(fileName, O_CREAT | O_RDWR, 0644));
ASSERT_NE(nullptr, cacheStores.back());
}

const size_t bufSize = 1024 * 1024;
IOVector buffer(*cacheAllocator);
buffer.push_back(bufSize);

uint64_t written = 0;

for (auto cacheStore : cacheStores) {
if (rand() % 2 == 0) {
cacheStore->release();
} else {
auto ret = cacheStore->do_pwritev2(buffer.iovec(), buffer.iovcnt(), written, 0);
EXPECT_EQ(ret, (ssize_t)bufSize);
written += ret;
cacheStore->release();
}
}
}

}
}
int main(int argc, char** argv) {
Expand Down
Loading