diff --git a/build.sh b/build.sh index bff92d3e..59169430 100755 --- a/build.sh +++ b/build.sh @@ -17,6 +17,7 @@ cmake .. -G "Unix Makefiles" -DCMAKE_TOOLCHAIN_FILE=$GENERATORS_DIR/conan_toolch -DPRIVMX_BUILD_ENDPOINT_INTERFACE=ON \ -DPRIVMX_ENABLE_TESTS=ON \ -DPRIVMX_ENABLE_TESTS_E2E=ON \ - -DPRIVMX_BUILD_DEBUG=OFF + -DPRIVMX_BUILD_DEBUG=OFF \ + -DPRIVMX_BUILD_SEARCH_BENCHMARK=ON cmake --build . -- -j20 source $GENERATORS_DIR/deactivate_conanbuild.sh diff --git a/endpoint/CMakeLists.txt b/endpoint/CMakeLists.txt index d0bce7e9..abd80e0d 100644 --- a/endpoint/CMakeLists.txt +++ b/endpoint/CMakeLists.txt @@ -35,6 +35,10 @@ if(PRIVMX_BUILD_BENCHMARK AND PRIVMX_BUILD_ENDPOINT_ENDPOINT) add_subdirectory(programs/benchmark) endif() +if(PRIVMX_BUILD_SEARCH_BENCHMARK AND PRIVMX_BUILD_ENDPOINT_ENDPOINT) + add_subdirectory(programs/search_bench ) +endif() + if(PRIVMX_BUILD_STREAM_TESTING AND PRIVMX_BUILD_ENDPOINT_ENDPOINT) add_subdirectory(programs/stream_testing) endif() \ No newline at end of file diff --git a/endpoint/programs/search_bench/CMakeLists.txt b/endpoint/programs/search_bench/CMakeLists.txt new file mode 100644 index 00000000..5955fbc5 --- /dev/null +++ b/endpoint/programs/search_bench/CMakeLists.txt @@ -0,0 +1,12 @@ +target_include_directories(privmxendpointcrypto PRIVATE ${INCLUDE_DIRS}) +target_include_directories(privmxendpointcore PRIVATE ${INCLUDE_DIRS}) +target_include_directories(privmxendpointevent PRIVATE ${INCLUDE_DIRS}) +target_include_directories(privmxendpointthread PRIVATE ${INCLUDE_DIRS}) +target_include_directories(privmxendpointstore PRIVATE ${INCLUDE_DIRS}) +target_include_directories(privmxendpointinbox PRIVATE ${INCLUDE_DIRS}) +target_include_directories(privmxendpointkvdb PRIVATE ${INCLUDE_DIRS}) +target_include_directories(privmxendpointsearch PRIVATE ${INCLUDE_DIRS}) + +# search_bench +add_executable(search_bench ${CMAKE_CURRENT_SOURCE_DIR}/main.cpp) +target_link_libraries(search_bench privmx privmxendpointcore privmxendpointcrypto privmxendpointstore privmxendpointkvdb privmxendpointsearch) diff --git a/endpoint/programs/search_bench/common_words_500.txt b/endpoint/programs/search_bench/common_words_500.txt new file mode 100644 index 00000000..e2f59176 --- /dev/null +++ b/endpoint/programs/search_bench/common_words_500.txt @@ -0,0 +1,500 @@ +a +about +above +across +act +action +activity +add +after +again +against +age +ago +air +all +allow +almost +alone +along +already +also +always +among +an +and +animal +another +answer +any +appear +apply +are +area +around +arrive +art +as +ask +at +away +back +bad +ball +bank +base +be +beat +beautiful +because +become +bed +been +before +begin +behind +believe +best +better +between +big +bird +black +blue +body +book +both +box +boy +bring +brother +build +business +but +buy +by +call +camera +can +car +care +carry +case +cat +cause +center +certain +chair +change +check +child +children +city +class +clean +clear +close +cold +college +color +come +common +community +company +complete +computer +consider +continue +control +cook +copy +corner +could +country +course +cover +create +cross +cut +dark +data +day +decide +deep +develop +did +die +difference +different +direct +do +doctor +dog +door +down +draw +dream +drive +drop +during +each +early +earth +east +easy +eat +education +effect +effort +eight +either +end +enough +enter +environment +especially +even +evening +event +ever +every +example +experience +eye +face +fact +family +far +farm +fast +father +feel +few +field +figure +fill +final +find +fine +finish +fire +first +fish +five +floor +follow +food +for +force +form +four +free +friend +from +front +full +game +garden +general +get +girl +give +go +good +government +great +green +ground +group +grow +guess +had +half +hand +happen +hard +has +have +he +head +hear +help +her +here +high +him +his +history +hold +home +horse +hot +hour +house +how +however +human +hundred +idea +if +important +in +include +increase +information +interest +into +is +issue +it +its +job +join +just +keep +kind +king +knew +know +land +language +large +last +late +later +laugh +law +lead +learn +leave +left +less +let +letter +life +light +like +line +list +listen +little +live +local +long +look +lot +love +low +machine +made +main +make +man +many +map +market +may +me +mean +measure +meet +member +men +message +might +mile +mind +minute +miss +money +month +more +morning +most +mother +mountain +move +much +music +must +my +name +nation +near +need +never +new +next +night +no +north +not +note +nothing +notice +now +number +of +off +offer +office +often +old +on +once +one +only +open +operation +opportunity +or +order +other +our +out +over +own +page +paper +part +party +pass +past +pay +people +perhaps +person +picture +piece +place +plan +plant +play +point +policy +poor +position +possible +power +present +pressure +pretty +problem +process +produce +product +program +project +provide +public +put +question +quick +quite +race +radio +rain +raise +reach +read +ready +real +reason +receive +record +red +remain +remember +report +research +result +return +right +river +road +rock +room +run +same +save +say +school +science +sea +season +second +see +seem +self +send +sense +service +set +seven +several +she +ship +short +should +show +side +simple +since +single +sister +sit +six +size +skill +small +social +some +someone +something +sometimes +song +soon +sound +south +space +speak +special +spring +stand +star +start +state +stay +step +still +stop +story +street +strong +student +study +such +summer +support +sure +system +table +take +talk +task +teach +team +technology +tell +ten +test +than +that +the +their +them +then +there +these +they +thing diff --git a/endpoint/programs/search_bench/main.cpp b/endpoint/programs/search_bench/main.cpp new file mode 100644 index 00000000..03e8ea02 --- /dev/null +++ b/endpoint/programs/search_bench/main.cpp @@ -0,0 +1,256 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +// #include +#include + +#include +#include +#include +#include +#include +#include +#include + +using namespace std; +using namespace privmx::endpoint; +namespace fs = std::filesystem; + +static vector getParamsList(int argc, char* argv[]) { + vector args(argv + 1, argv + argc); + return args; +} + +static std::vector readTextFileLines(const std::string& filePath) { + std::ifstream file(filePath); + if (!file.is_open()) { + throw std::runtime_error("Unable to open file: " + filePath); + } + + std::vector lines; + std::string line; + while (std::getline(file, line)) { + if (!line.empty() && line.back() == '\r') { + line.pop_back(); + } + if (line.empty()) { + continue; + } + lines.push_back(line); + } + + return lines; +} + +static void processAllTxtFiles( + const std::string& directoryPath, + const std::function& processContent +) { + if (!fs::exists(directoryPath)) { + throw std::runtime_error("Directory does not exist: " + directoryPath); + } + if (!fs::is_directory(directoryPath)) { + throw std::runtime_error("Path is not a directory: " + directoryPath); + } + + for (const auto& entry : fs::directory_iterator(directoryPath)) { + if (!entry.is_regular_file()) { + continue; + } + + std::ifstream file(entry.path()); + if (!file.is_open()) { + throw std::runtime_error("Unable to open file: " + entry.path().string()); + } + + std::ostringstream content; + content << file.rdbuf(); + processContent(content.str()); + } +} + +static std::vector generateMessages(const std::vector& words, int num) { + if (words.empty()) { + throw std::runtime_error("Words list cannot be empty"); + } + if (num < 0) { + throw std::runtime_error("Number of messages cannot be negative"); + } + + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution messageLengthDist(5, 20); + std::uniform_int_distribution wordIndexDist(0, words.size() - 1); + + std::vector messages; + messages.reserve(num); + + for (int i = 0; i < num; ++i) { + const int messageLength = messageLengthDist(gen); + std::ostringstream message; + + for (int j = 0; j < messageLength; ++j) { + if (j > 0) { + message << ' '; + } + message << words[wordIndexDist(gen)]; + } + + messages.push_back(message.str()); + } + + return messages; +} + +static std::string extractMessageDataFromJson(const std::string& json) { + auto jsonObject = privmx::utils::Utils::parseJsonObject(json); + if (!jsonObject->has("data")) { + throw std::runtime_error("JSON does not contain 'data' field"); + } + return jsonObject->getValue("data"); +} + +int main(int argc, char** argv) { + auto params = getParamsList(argc, argv); + if(params.size() != 5) { + std::cout << "Invalid params. Required params are: 'PrivKey', 'SolutionId', 'BridgeUrl', 'ContextId', 'docs_dir_path'" << std::endl; + return -1; + } + std::string privKey = {params[0].begin(), params[0].end()}; + std::string solutionId = {params[1].begin(), params[1].end()}; + std::string bridgeUrl = {params[2].begin(), params[2].end()}; + std::string contextId = {params[3].begin(), params[3].end()}; + std::string docsDir = {params[4].begin(), params[4].end()}; + + try { + std::cout << "Reading words from: " << docsDir << std::endl; + auto words = readTextFileLines(docsDir + "/" + "common_words_500.txt"); + std::cout << "Words count: " << words.size() << std::endl; + + core::Connection connection = core::Connection::connect( + privKey, + solutionId, + bridgeUrl + ); + kvdb::KvdbApi kvdb_api = kvdb::KvdbApi::create(connection); + store::StoreApi store_api = store::StoreApi::create(connection); + search::SearchApi search_api = search::SearchApi::create(connection, store_api, kvdb_api); + + + auto contextUsersInfo = connection.listContextUsers(contextId, {0, 100, "asc"}); + std::vector usersWithPubKey = {}; + for(const auto& userInfo : contextUsersInfo.readItems) { + usersWithPubKey.push_back(userInfo.user); + } + + auto index {search_api.createSearchIndex(contextId, usersWithPubKey, usersWithPubKey, {}, {}, search::IndexMode::WITH_CONTENT) }; + auto indexHandle {search_api.openSearchIndex(index)}; + + std::cout << "Adding docs from: " << docsDir << " to the index..." << std::endl; + int id = 1; + + std::vector documentsToAdd {}; + processAllTxtFiles(docsDir + "/msgs", [&](std::string content) { + if (id > 100) return; + std::string name = "name_" + std::to_string(id++); + documentsToAdd.push_back({name, extractMessageDataFromJson(content)}); + }); + std::cout << "Adding " << documentsToAdd.size() << " documents..." << std::endl; + + const int batchCount = 1; + const int messagesPerBatch = 100; + long long totalBatchAddDurationMs = 0; + const auto searchStart100 = std::chrono::steady_clock::now(); + // for (int i = 0; i < batchCount; i++) { + // // auto randomMessages = generateMessages(words, messagesPerBatch); + // const auto searchStart = std::chrono::steady_clock::now(); + // for (auto message : documentsToAdd) { + // id++; + // search_api.addDocument(indexHandle, message.name, message.content); + // } + // const auto searchEnd = std::chrono::steady_clock::now(); + // const auto searchDurationMs = std::chrono::duration_cast(searchEnd - searchStart).count(); + // totalBatchAddDurationMs += searchDurationMs; + // const auto totalElapsedMs = std::chrono::duration_cast(searchEnd - searchStart100).count(); + // std::cout << "Adding " << messagesPerBatch << " messages - took: " << searchDurationMs << " ms" << std::endl; + // std::cout << "Average add time per "<< messagesPerBatch<<"-message batch after batch " << (i + 1) << ": " + // << (static_cast(totalBatchAddDurationMs) / (i + 1)) << " ms" << std::endl; + // std::cout << "Average total operation time per "<(totalElapsedMs) / (i + 1)) << " ms" << std::endl; + // + // } + // const auto searchEnd100 = std::chrono::steady_clock::now(); + // const auto searchDurationMs100 = std::chrono::duration_cast(searchEnd100 - searchStart100).count(); + // std::cout << "Adding "<<(messagesPerBatch * batchCount) <<" messages - took: " << searchDurationMs100 << " ms" << std::endl; + // std::cout << "Average add time per "<(totalBatchAddDurationMs) / batchCount) << " ms" << std::endl; + // std::cout << "Average total operation time per "<(searchDurationMs100) / batchCount) << " ms" << std::endl; + + + + + // auto randomMessagesBatch = generateMessages(words, messagesPerBatch); + // std::vector documentsToAdd {}; + // int nameId = 1; + // for (auto doc : randomMessagesBatch) { + // std::string name = "name_" + std::to_string(nameId++); + // documentsToAdd.push_back({name, doc}); + // } + const auto batchAddStart = std::chrono::steady_clock::now(); + search_api.addDocuments(indexHandle, documentsToAdd); + + const auto batchAddEnd = std::chrono::steady_clock::now(); + const auto batchAddDurationMs = std::chrono::duration_cast(batchAddEnd - batchAddStart).count(); + + std::cout << "Adding " << messagesPerBatch << " messages (as batch) - took: " << batchAddDurationMs << " ms" << std::endl; + + auto added = search_api.listDocuments(indexHandle, {0, 100, "desc"}); + + // std::cout << "Added docs:" << std::endl; + // for (auto doc : added.readItems) { + // std::cout << doc.name << ": " << doc.content << std::endl; + // } + // std::cout << "Added docs count: " << added.readItems.size() << std::endl; + search_api.closeSearchIndex(indexHandle); + + // get existing index + auto existingIndexes = search_api.listSearchIndexes(contextId, {0, 1, "desc"}); + auto existingIndex = existingIndexes.readItems[0]; + auto indexHandle2 = search_api.openSearchIndex(existingIndex.indexId); + + const auto searchStart = std::chrono::steady_clock::now(); + auto result = search_api.searchDocuments(indexHandle2, "is", {.skip = 0, .limit = 100, .sortOrder = "asc"}); + const auto searchEnd = std::chrono::steady_clock::now(); + const auto searchDurationMs = std::chrono::duration_cast(searchEnd - searchStart).count(); + std::cout << "Search query took: " << searchDurationMs << " ms" << std::endl; + for (const auto& doc : result.readItems) { + std::cout << doc.name << ": " << doc.content << std::endl; + } + search_api.closeSearchIndex(indexHandle2); + std::cout << "Done!" << std::endl; + + } catch (const core::Exception& e) { + cerr << e.getFull() << endl; + } catch (const privmx::utils::PrivmxException& e) { + cerr << e.what() << endl; + cerr << e.getData() << endl; + cerr << e.getCode() << endl; + } catch (const exception& e) { + cerr << e.what() << endl; + } catch (...) { + cerr << "Error" << endl; + } + + return 0; +} diff --git a/endpoint/programs/search_bench/run.sh b/endpoint/programs/search_bench/run.sh new file mode 100755 index 00000000..a2033c49 --- /dev/null +++ b/endpoint/programs/search_bench/run.sh @@ -0,0 +1,8 @@ +#!/bin/bash +PRIV_KEY="L3ycXibEzJm9t9swoJ4KtSmJsenHmmgRnYY79Q2TqfJMwTGaWfA7" +CONTEXT_ID="edfcb422-6428-48c1-b52b-7be4b673a170" +SOLUTION_ID="ab7ac88f-f611-4cb2-b163-5308a05b67dc" +BRIDGE_URL="http://localhost:9111" +DOCS_DIR="/home/zurek/search_bench" +cd ../../../build/endpoint/programs/search_bench || exit +./search_bench $PRIV_KEY $SOLUTION_ID $BRIDGE_URL $CONTEXT_ID $DOCS_DIR \ No newline at end of file diff --git a/endpoint/search/include/privmx/endpoint/search/FullTextSearch.hpp b/endpoint/search/include/privmx/endpoint/search/FullTextSearch.hpp index 274047d6..f82f5f58 100644 --- a/endpoint/search/include/privmx/endpoint/search/FullTextSearch.hpp +++ b/endpoint/search/include/privmx/endpoint/search/FullTextSearch.hpp @@ -28,8 +28,9 @@ class FullTextSearch { public: static std::shared_ptr openDb(const std::string& filename, const IndexMode mode); - FullTextSearch(std::shared_ptr db, const IndexMode mode); + FullTextSearch(std::shared_ptr db, std::string filename, const IndexMode mode); int64_t addDocument(const std::string& name, const std::string& content); + std::vector addDocuments(const std::vector& documents); Document getDocument(const int64_t documentId); core::PagingList listDocuments(const core::PagingQuery& pagingQuery); void updateDocument(const Document& document); @@ -44,6 +45,7 @@ class FullTextSearch int64_t getCountOfAll(); std::shared_ptr _db; + std::string _filename; IndexMode _mode; }; diff --git a/endpoint/search/include/privmx/endpoint/search/PrivmxFS.hpp b/endpoint/search/include/privmx/endpoint/search/PrivmxFS.hpp index e8caac34..2ebc58b3 100644 --- a/endpoint/search/include/privmx/endpoint/search/PrivmxFS.hpp +++ b/endpoint/search/include/privmx/endpoint/search/PrivmxFS.hpp @@ -12,8 +12,11 @@ limitations under the License. #define _PRIVMXLIB_ENDPOINT_SEARCH_PRIVMXFS_HPP_ #include +#include +#include #include #include +#include #include #include "privmx/endpoint/core/Connection.hpp" @@ -76,6 +79,20 @@ class PrivmxFile { public: PrivmxFile(std::shared_ptr session, const std::string& fileId, const std::string& path); + struct MemoryFileState + { + std::string data; + LockLevel lockLevel = LockLevel::NONE; + bool reservedLock = false; + }; + + PrivmxFile( + std::shared_ptr session, + const std::string& fileId, + const std::string& path, + bool memoryOnly, + std::shared_ptr memoryFileState + ); void open(); void close(); privmx::endpoint::core::Buffer read(int64_t size, int64_t offset); @@ -93,21 +110,31 @@ class PrivmxFile int64_t fh = -1; Writer writer; LockSession lockSession; + bool memoryOnly = false; + std::shared_ptr memoryFileState; }; class PrivmxFS { public: static std::shared_ptr create(std::shared_ptr session); + static void beginDbOperation(const std::string& fullPath); + static void endDbOperation(const std::string& fullPath); PrivmxFS(const std::shared_ptr& session); std::shared_ptr openFile(const std::string& path); bool access(const std::string& path); void deleteFile(const std::string& path); private: + bool isJournalPath(const std::string& path) const; + std::string getCachedFileId(const std::string& name); std::string getFileId(const std::string& name); std::shared_ptr _session; + mutable std::mutex _fileIdCacheMutex; + std::unordered_map _fileIdCache; + mutable std::mutex _memoryFileMutex; + std::unordered_map> _memoryFiles; }; class PrivmxExtFS diff --git a/endpoint/search/include/privmx/endpoint/search/SearchApiImpl.hpp b/endpoint/search/include/privmx/endpoint/search/SearchApiImpl.hpp index 1fb8ecc2..157c367f 100644 --- a/endpoint/search/include/privmx/endpoint/search/SearchApiImpl.hpp +++ b/endpoint/search/include/privmx/endpoint/search/SearchApiImpl.hpp @@ -89,6 +89,7 @@ class SearchApiImpl : public privmx::utils::ManualManagedClass void closeSearchIndex(const int64_t indexHandle); int64_t addDocument(const int64_t indexHandle, const std::string& name, const std::string& content); + std::vector addDocuments(const int64_t indexHandle, const std::vector& documents); void updateDocument(const int64_t indexHandle, const Document& document); diff --git a/endpoint/search/include/privmx/endpoint/search/VarDeserializer.hpp b/endpoint/search/include/privmx/endpoint/search/VarDeserializer.hpp index b3a725e2..a84dbd20 100644 --- a/endpoint/search/include/privmx/endpoint/search/VarDeserializer.hpp +++ b/endpoint/search/include/privmx/endpoint/search/VarDeserializer.hpp @@ -25,6 +25,9 @@ namespace core { template<> search::IndexMode VarDeserializer::deserialize(const Poco::Dynamic::Var& val, const std::string& name); +template<> +search::NewDocument VarDeserializer::deserialize(const Poco::Dynamic::Var& val, const std::string& name); + template<> search::Document VarDeserializer::deserialize(const Poco::Dynamic::Var& val, const std::string& name); diff --git a/endpoint/search/include/privmx/endpoint/search/VarSerializer.hpp b/endpoint/search/include/privmx/endpoint/search/VarSerializer.hpp index 4a946018..50a8a640 100644 --- a/endpoint/search/include/privmx/endpoint/search/VarSerializer.hpp +++ b/endpoint/search/include/privmx/endpoint/search/VarSerializer.hpp @@ -35,6 +35,9 @@ Poco::Dynamic::Var VarSerializer::serialize(const search::Ind template<> Poco::Dynamic::Var VarSerializer::serialize(const search::SearchIndex& val); +template<> +Poco::Dynamic::Var VarSerializer::serialize(const search::NewDocument& val); + template<> Poco::Dynamic::Var VarSerializer::serialize(const search::Document& val); diff --git a/endpoint/search/include/privmx/endpoint/search/varinterface/SearchApiVarInterface.hpp b/endpoint/search/include/privmx/endpoint/search/varinterface/SearchApiVarInterface.hpp index f284b8db..334c8234 100644 --- a/endpoint/search/include/privmx/endpoint/search/varinterface/SearchApiVarInterface.hpp +++ b/endpoint/search/include/privmx/endpoint/search/varinterface/SearchApiVarInterface.hpp @@ -34,11 +34,12 @@ class SearchApiVarInterface { OpenSearchIndex = 6, CloseSearchIndex = 7, AddDocument = 8, - UpdateDocument = 9, - DeleteDocument = 10, - GetDocument = 11, - ListDocuments = 12, - SearchDocuments = 13, + AddDocuments = 9, + UpdateDocument = 10, + DeleteDocument = 11, + GetDocument = 12, + ListDocuments = 13, + SearchDocuments = 14, }; SearchApiVarInterface(core::Connection connection, store::StoreApi storeApi, kvdb::KvdbApi kvdbApi, const core::VarSerializer& serializer) @@ -53,6 +54,7 @@ class SearchApiVarInterface { Poco::Dynamic::Var openSearchIndex(const Poco::Dynamic::Var& args); Poco::Dynamic::Var closeSearchIndex(const Poco::Dynamic::Var& args); Poco::Dynamic::Var addDocument(const Poco::Dynamic::Var& args); + Poco::Dynamic::Var addDocuments(const Poco::Dynamic::Var& args); Poco::Dynamic::Var updateDocument(const Poco::Dynamic::Var& args); Poco::Dynamic::Var deleteDocument(const Poco::Dynamic::Var& args); Poco::Dynamic::Var getDocument(const Poco::Dynamic::Var& args); diff --git a/endpoint/search/include_pub/privmx/endpoint/search/SearchApi.hpp b/endpoint/search/include_pub/privmx/endpoint/search/SearchApi.hpp index 9a42109f..c0cedf86 100644 --- a/endpoint/search/include_pub/privmx/endpoint/search/SearchApi.hpp +++ b/endpoint/search/include_pub/privmx/endpoint/search/SearchApi.hpp @@ -137,6 +137,15 @@ class SearchApi : public privmx::endpoint::core::ExtendedPointer */ int64_t addDocument(const int64_t indexHandle, const std::string& name, const std::string& content); + /** + * Adds multiple new documents to the Search Index in a single batch. + * + * @param indexHandle Handle of the Index to add the documents to + * @param documents Documents to add + * @return IDs of the newly added documents in input order + */ + std::vector addDocuments(const int64_t indexHandle, const std::vector& documents); + /** * Updates an existing document in the Search Index. * diff --git a/endpoint/search/include_pub/privmx/endpoint/search/Types.hpp b/endpoint/search/include_pub/privmx/endpoint/search/Types.hpp index 504a2316..1d3818fe 100644 --- a/endpoint/search/include_pub/privmx/endpoint/search/Types.hpp +++ b/endpoint/search/include_pub/privmx/endpoint/search/Types.hpp @@ -130,7 +130,23 @@ struct SearchIndex }; /** - * A structure representing a document for indexing. + * A structure representing a new document for indexing. + */ +struct NewDocument +{ + /** + * Document name + */ + std::string name; + + /** + * Document content + */ + std::string content; +}; + +/** + * A structure representing a document stored in the index. */ struct Document { diff --git a/endpoint/search/src/FullTextSearch.cpp b/endpoint/search/src/FullTextSearch.cpp index 258474bf..c4433b2c 100644 --- a/endpoint/search/src/FullTextSearch.cpp +++ b/endpoint/search/src/FullTextSearch.cpp @@ -12,6 +12,7 @@ limitations under the License. #include #include "privmx/endpoint/search/FullTextSearch.hpp" +#include "privmx/endpoint/search/PrivmxFS.hpp" #include "privmx/endpoint/search/PrivmxSqliteVFS.hpp" #include "privmx/endpoint/search/SearchException.hpp" #include @@ -42,30 +43,81 @@ std::shared_ptr FullTextSearch::openDb(const std::string& filena throw DatabaseAttachException(sqlite3_errmsg(db)); } - return std::make_shared(db2, mode); + return std::make_shared(db2, filename, mode); } -FullTextSearch::FullTextSearch(std::shared_ptr db, const IndexMode mode) : _db(std::move(db)), _mode(mode) {} +FullTextSearch::FullTextSearch(std::shared_ptr db, std::string filename, const IndexMode mode) + : _db(std::move(db)), _filename(std::move(filename)), _mode(mode) {} int64_t FullTextSearch::addDocument(const std::string& name, const std::string& content) { - const char* insertSql = "INSERT INTO pmx.documents (name, content) VALUES (?, ?);"; - sqlite3_stmt* stmt; - if (sqlite3_prepare_v2(_db.get(), insertSql, -1, &stmt, nullptr) != SQLITE_OK) { - throw InsertPrepareException(sqlite3_errmsg(_db.get())); + NewDocument document; + document.name = name; + document.content = content; + auto result = addDocuments({document}); + return result.front(); +} + +std::vector FullTextSearch::addDocuments(const std::vector& documents) { + if (documents.empty()) { + return {}; } - sqlite3_bind_text(stmt, 1, name.c_str(), -1, SQLITE_TRANSIENT); - sqlite3_bind_text(stmt, 2, content.c_str(), -1, SQLITE_TRANSIENT); + PrivmxFS::beginDbOperation(_filename); + const char* beginSql = "BEGIN IMMEDIATE;"; + const char* commitSql = "COMMIT;"; + const char* rollbackSql = "ROLLBACK;"; + const char* insertSql = "INSERT INTO pmx.documents (name, content) VALUES (?, ?);"; + sqlite3_stmt* stmt = nullptr; + bool transactionStarted = false; + std::vector rowIds; + rowIds.reserve(documents.size()); + try { + if (sqlite3_exec(_db.get(), beginSql, nullptr, nullptr, nullptr) != SQLITE_OK) { + throw InsertExecuteException(sqlite3_errmsg(_db.get())); + } + transactionStarted = true; - int status = sqlite3_step(stmt); - if (status != SQLITE_DONE) { - sqlite3_finalize(stmt); - throw InsertExecuteException(sqlite3_errmsg(_db.get())); - } + if (sqlite3_prepare_v2(_db.get(), insertSql, -1, &stmt, nullptr) != SQLITE_OK) { + throw InsertPrepareException(sqlite3_errmsg(_db.get())); + } - sqlite3_finalize(stmt); + for (const auto& document : documents) { + sqlite3_bind_text(stmt, 1, document.name.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_text(stmt, 2, document.content.c_str(), -1, SQLITE_TRANSIENT); + + int status = sqlite3_step(stmt); + if (status != SQLITE_DONE) { + sqlite3_finalize(stmt); + stmt = nullptr; + throw InsertExecuteException(sqlite3_errmsg(_db.get())); + } + + rowIds.push_back(sqlite3_last_insert_rowid(_db.get())); + if (sqlite3_reset(stmt) != SQLITE_OK) { + sqlite3_finalize(stmt); + stmt = nullptr; + throw InsertExecuteException(sqlite3_errmsg(_db.get())); + } + sqlite3_clear_bindings(stmt); + } - return sqlite3_last_insert_rowid(_db.get()); + sqlite3_finalize(stmt); + stmt = nullptr; + if (sqlite3_exec(_db.get(), commitSql, nullptr, nullptr, nullptr) != SQLITE_OK) { + throw InsertExecuteException(sqlite3_errmsg(_db.get())); + } + PrivmxFS::endDbOperation(_filename); + return rowIds; + } catch (...) { + if (stmt != nullptr) { + sqlite3_finalize(stmt); + } + if (transactionStarted) { + sqlite3_exec(_db.get(), rollbackSql, nullptr, nullptr, nullptr); + } + PrivmxFS::endDbOperation(_filename); + throw; + } } Document FullTextSearch::getDocument(const int64_t documentId) { diff --git a/endpoint/search/src/PrivmxFS.cpp b/endpoint/search/src/PrivmxFS.cpp index 9c33efa1..c8b66685 100644 --- a/endpoint/search/src/PrivmxFS.cpp +++ b/endpoint/search/src/PrivmxFS.cpp @@ -1,14 +1,65 @@ #include "privmx/endpoint/search/PrivmxFS.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "privmx/endpoint/core/ConvertedExceptions.hpp" +#include "privmx/endpoint/core/ExceptionConverter.hpp" #include "privmx/endpoint/search/DynamicTypes.hpp" #include "privmx/endpoint/search/SearchException.hpp" -#include "privmx/endpoint/core/ExceptionConverter.hpp" -#include "privmx/endpoint/core/ConvertedExceptions.hpp" -#include static const privmx::endpoint::core::Buffer META = privmx::endpoint::core::Buffer::from("{}"); using namespace privmx::endpoint::search; +namespace { + +struct ScopedPathInfo { + std::string sessionId; + std::string path; +}; + +struct ScopedDbLockState { + LockSession lockSession; + std::uint64_t refCount = 0; + bool locked = false; +}; + +std::mutex g_scopedDbLockMutex; +std::unordered_map> g_scopedDbLocks; + +std::string makeScopedDbLockKey(const std::string& sessionId, const std::string& path) { + return sessionId + ":" + path; +} + +ScopedPathInfo parseScopedPath(const std::string& fullPath) { + Poco::Path path; + path.parse(fullPath); + if (path.depth() >= 2 && path[0] == "pmx") { + ScopedPathInfo result{.sessionId = path[1], .path = std::string()}; + path.popFrontDirectory(); + path.popFrontDirectory(); + result.path = path.toString(); + return result; + } + throw std::runtime_error("Invalid PrivmxFS scoped path: " + fullPath); +} + +bool isScopedDbLockActive(const std::shared_ptr& session, const std::string& path) { + std::lock_guard lock(g_scopedDbLockMutex); + auto it = g_scopedDbLocks.find(makeScopedDbLockKey(session->id, path)); + return it != g_scopedDbLocks.end() && it->second->locked; +} + +} // namespace + std::shared_ptr SessionManager::_singleton; std::shared_ptr SessionManager::get() { @@ -66,14 +117,33 @@ int64_t Writer::size() { } PrivmxFile::PrivmxFile(std::shared_ptr session, const std::string& fileId, const std::string& path) - : session(session), fileId(fileId), path(path), lockSession(session->kvdbApi, session->kvdbId, path) {} + : PrivmxFile(session, fileId, path, false, nullptr) {} + +PrivmxFile::PrivmxFile( + std::shared_ptr session, + const std::string& fileId, + const std::string& path, + bool memoryOnly, + std::shared_ptr memoryFileState +) : session(session), + fileId(fileId), + path(path), + lockSession(session->kvdbApi, session->kvdbId, path), + memoryOnly(memoryOnly), + memoryFileState(memoryFileState) {} void PrivmxFile::open() { + if (memoryOnly) { + return; + } LOG_TRACE("PrivmxFile::open - ", fileId) fh = session->storeApi.openFile(fileId); } void PrivmxFile::close() { + if (memoryOnly) { + return; + } if (fh != -1) { session->storeApi.closeFile(fh); fh = -1; @@ -81,6 +151,17 @@ void PrivmxFile::close() { } privmx::endpoint::core::Buffer PrivmxFile::read(int64_t size, int64_t offset) { + if (memoryOnly) { + if (offset < 0) { + return privmx::endpoint::core::Buffer::from("", 0); + } + if (static_cast(offset) >= memoryFileState->data.size()) { + return privmx::endpoint::core::Buffer::from("", 0); + } + const auto availableSize = memoryFileState->data.size() - static_cast(offset); + const auto readSize = std::min(static_cast(size), availableSize); + return privmx::endpoint::core::Buffer::from(memoryFileState->data.data() + offset, readSize); + } sync(); session->storeApi.seekInFile(fh, offset); auto res = session->storeApi.readFromFile(fh, size); @@ -88,6 +169,21 @@ privmx::endpoint::core::Buffer PrivmxFile::read(int64_t size, int64_t offset) { } void PrivmxFile::write(const privmx::endpoint::core::Buffer& data, int64_t offset) { + if (memoryOnly) { + if (offset < 0) { + throw std::runtime_error("Invalid write offset"); + } + const auto writeOffset = static_cast(offset); + if (memoryFileState->data.size() < writeOffset) { + memoryFileState->data.resize(writeOffset, '\0'); + } + const auto writeSize = data.size(); + if (memoryFileState->data.size() < writeOffset + writeSize) { + memoryFileState->data.resize(writeOffset + writeSize, '\0'); + } + std::memcpy(memoryFileState->data.data() + writeOffset, data.data(), writeSize); + return; + } int64_t of; std::string res; tie(of, res) = writer.write(offset, data.stdString()); @@ -98,11 +194,21 @@ void PrivmxFile::write(const privmx::endpoint::core::Buffer& data, int64_t offse } void PrivmxFile::truncate(int64_t size) { + if (memoryOnly) { + if (size < 0) { + throw std::runtime_error("Invalid truncate size"); + } + memoryFileState->data.resize(static_cast(size), '\0'); + return; + } session->storeApi.seekInFile(fh, size); session->storeApi.writeToFile(fh, privmx::endpoint::core::Buffer::from("", 0), true); } void PrivmxFile::sync() { + if (memoryOnly) { + return; + } int64_t of; std::string res; tie(of, res) = writer.write(-1, ""); @@ -113,14 +219,26 @@ void PrivmxFile::sync() { } int64_t PrivmxFile::getFileSize() { + if (memoryOnly) { + return static_cast(memoryFileState->data.size()); + } auto fileInfo = session->storeApi.getFile(fileId); if(fileInfo.statusCode != 0) { - throw MalformedInternalFileException(); + throw MalformedInternalFileException(); } return fileInfo.size; } bool PrivmxFile::lock(LockLevel level) { + const std::string lockStatName = memoryOnly ? "lock_journal" : "lock_db"; + if (memoryOnly) { + memoryFileState->lockLevel = level; + memoryFileState->reservedLock = level >= LockLevel::RESERVED; + return true; + } + if (isScopedDbLockActive(session, path)) { + return true; + } bool val = lockSession.lock(level); if (val) { session->storeApi.syncFile(fh); @@ -129,10 +247,24 @@ bool PrivmxFile::lock(LockLevel level) { } bool PrivmxFile::unlock(LockLevel level) { + if (memoryOnly) { + memoryFileState->lockLevel = level; + memoryFileState->reservedLock = level >= LockLevel::RESERVED; + return true; + } + if (isScopedDbLockActive(session, path)) { + return true; + } return lockSession.unlock(level); } bool PrivmxFile::checkReservedLock() { + if (memoryOnly) { + return memoryFileState->reservedLock; + } + if (isScopedDbLockActive(session, path)) { + return true; + } return lockSession.checkReservedLock(); } @@ -143,19 +275,119 @@ std::shared_ptr PrivmxFS::create( return res; } +void PrivmxFS::beginDbOperation(const std::string& fullPath) { + const auto parsed = parseScopedPath(fullPath); + if (parsed.path.size() >= 8 && parsed.path.compare(parsed.path.size() - 8, 8, "-journal") == 0) { + return; + } + + const auto session = SessionManager::get()->getSession(parsed.sessionId); + const auto key = makeScopedDbLockKey(parsed.sessionId, parsed.path); + + std::shared_ptr state; + bool shouldAcquire = false; + { + std::lock_guard lock(g_scopedDbLockMutex); + auto& entry = g_scopedDbLocks[key]; + if (!entry) { + entry = std::make_shared(ScopedDbLockState{ + .lockSession = LockSession(session->kvdbApi, session->kvdbId, parsed.path), + .refCount = 0, + .locked = false + }); + } + state = entry; + shouldAcquire = state->refCount == 0; + ++state->refCount; + } + + if (!shouldAcquire) { + return; + } + + try { + if (!state->lockSession.lock(LockLevel::EXCLUSIVE)) { + throw std::runtime_error("Unable to acquire scoped db lock"); + } + std::lock_guard lock(g_scopedDbLockMutex); + state->locked = true; + } catch (...) { + std::lock_guard lock(g_scopedDbLockMutex); + auto it = g_scopedDbLocks.find(key); + if (it != g_scopedDbLocks.end()) { + if (it->second->refCount > 0) { + --it->second->refCount; + } + if (it->second->refCount == 0) { + g_scopedDbLocks.erase(it); + } + } + throw; + } +} + +void PrivmxFS::endDbOperation(const std::string& fullPath) { + const auto parsed = parseScopedPath(fullPath); + if (parsed.path.size() >= 8 && parsed.path.compare(parsed.path.size() - 8, 8, "-journal") == 0) { + return; + } + + const auto key = makeScopedDbLockKey(parsed.sessionId, parsed.path); + std::shared_ptr state; + bool shouldRelease = false; + { + std::lock_guard lock(g_scopedDbLockMutex); + auto it = g_scopedDbLocks.find(key); + if (it == g_scopedDbLocks.end()) { + return; + } + state = it->second; + if (state->refCount > 0) { + --state->refCount; + } + shouldRelease = state->refCount == 0 && state->locked; + if (state->refCount == 0) { + g_scopedDbLocks.erase(it); + } + } + + if (shouldRelease) { + state->lockSession.unlock(LockLevel::NONE); + } +} + std::shared_ptr PrivmxFS::openFile(const std::string& path) { - std::string fileId = getFileId(path); + if (isJournalPath(path)) { + std::lock_guard lock(_memoryFileMutex); + auto& memoryFileState = _memoryFiles[path]; + if (!memoryFileState) { + memoryFileState = std::make_shared(); + } + std::shared_ptr result = std::make_shared(_session, "", path, true, memoryFileState); + result->open(); + return result; + } + std::string fileId = getCachedFileId(path); std::shared_ptr result = std::make_shared(_session, fileId, path); result->open(); return result; } bool PrivmxFS::access(const std::string& path) { + if (isJournalPath(path)) { + std::lock_guard lock(_memoryFileMutex); + return _memoryFiles.find(path) != _memoryFiles.end(); + } LOG_TRACE("PrivmxFS::access - ", path, " | kvdbId: ",_session->kvdbId) return _session->kvdbApi.hasEntry(_session->kvdbId, path); } void PrivmxFS::deleteFile(const std::string& path) { + if (isJournalPath(path)) { + std::lock_guard lock(_memoryFileMutex); + _memoryFiles.erase(path); + return; + } LOG_TRACE("PrivmxFS::deleteFile - ", path, " | kvdbId: ",_session->kvdbId) privmx::endpoint::kvdb::KvdbEntry kvdbEntry = _session->kvdbApi.getEntry(_session->kvdbId, path); std::string fileId = ""; @@ -165,17 +397,39 @@ void PrivmxFS::deleteFile(const std::string& path) { _session->kvdbApi.deleteEntry(_session->kvdbId, path); _session->storeApi.deleteFile(fileId); LockSession::destroyLock(_session->kvdbApi, _session->kvdbId, path); + std::lock_guard lock(_fileIdCacheMutex); + _fileIdCache.erase(path); } PrivmxFS::PrivmxFS( const std::shared_ptr& session ) : _session(session) {} +bool PrivmxFS::isJournalPath(const std::string& path) const { + return path.size() >= 8 && path.compare(path.size() - 8, 8, "-journal") == 0; +} + +std::string PrivmxFS::getCachedFileId(const std::string& name) { + { + std::lock_guard lock(_fileIdCacheMutex); + auto it = _fileIdCache.find(name); + if (it != _fileIdCache.end()) { + return it->second; + } + } + + std::string fileId = getFileId(name); + + std::lock_guard lock(_fileIdCacheMutex); + _fileIdCache[name] = fileId; + return fileId; +} + std::string PrivmxFS::getFileId(const std::string& name) { LOG_TRACE("PrivmxFS::getFileId - ", name, " | kvdbId: ",_session->kvdbId) try { privmx::endpoint::kvdb::KvdbEntry kvdbEntry = _session->kvdbApi.getEntry(_session->kvdbId, name); if(kvdbEntry.statusCode != 0) { - throw MalformedInternalFileIdException(); + throw MalformedInternalFileIdException(); } std::string fileId = kvdbEntry.data.stdString(); return fileId; diff --git a/endpoint/search/src/SearchApi.cpp b/endpoint/search/src/SearchApi.cpp index 8acad909..738467b6 100644 --- a/endpoint/search/src/SearchApi.cpp +++ b/endpoint/search/src/SearchApi.cpp @@ -156,6 +156,16 @@ int64_t SearchApi::addDocument(const int64_t indexHandle, const std::string& nam } } +std::vector SearchApi::addDocuments(const int64_t indexHandle, const std::vector& documents) { + auto impl = getImpl(); + try { + return impl->addDocuments(indexHandle, documents); + } catch (const privmx::utils::PrivmxException& e) { + core::ExceptionConverter::rethrowAsCoreException(e); + throw core::Exception("ExceptionConverter rethrow error"); + } +} + void SearchApi::updateDocument(const int64_t indexHandle, const Document& document) { auto impl = getImpl(); try { diff --git a/endpoint/search/src/SearchApiImpl.cpp b/endpoint/search/src/SearchApiImpl.cpp index 615b42c7..7d8ab563 100644 --- a/endpoint/search/src/SearchApiImpl.cpp +++ b/endpoint/search/src/SearchApiImpl.cpp @@ -20,6 +20,7 @@ limitations under the License. #include "privmx/endpoint/store/StoreApiImpl.hpp" #include "privmx/endpoint/kvdb/KvdbApiImpl.hpp" +#include using namespace privmx::endpoint; using namespace privmx::endpoint::search; @@ -97,7 +98,7 @@ core::PagingList SearchApiImpl::listSearchIndexes(const std::string int64_t SearchApiImpl::openSearchIndex(const std::string& indexId) { auto data = getIndexData(indexId); auto session = SessionManager::get()->addSession(_connection, _storeApi, _kvdbApi, indexId, data.storeId()); - std::string filename = "/pmx/" + session->id + "/index.db"; + std::string filename = "/pmx/" + session->id + "/index_db"; auto fts = FullTextSearch::openDb(filename, (IndexMode)data.mode()); fts->ensureTableCreated(); return _fts.add(fts); @@ -114,6 +115,11 @@ int64_t SearchApiImpl::addDocument(const int64_t indexHandle, const std::string& return fts->addDocument(name, content); } +std::vector SearchApiImpl::addDocuments(const int64_t indexHandle, const std::vector& documents) { + auto fts = _fts.get(indexHandle); + return fts->addDocuments(documents); +} + void SearchApiImpl::updateDocument(const int64_t indexHandle, const Document& document) { auto fts = _fts.get(indexHandle); fts->updateDocument(document); diff --git a/endpoint/search/src/VarDeserializer.cpp b/endpoint/search/src/VarDeserializer.cpp index ecb1ca3c..b7c2c2b8 100644 --- a/endpoint/search/src/VarDeserializer.cpp +++ b/endpoint/search/src/VarDeserializer.cpp @@ -36,3 +36,10 @@ search::Document VarDeserializer::deserialize(const Poco::Dyna .name = deserialize(obj->get("name"), name + ".name"), .content = deserialize(obj->get("content"), name + ".content")}; } + +template<> +search::NewDocument VarDeserializer::deserialize(const Poco::Dynamic::Var& val, const std::string& name) { + Poco::JSON::Object::Ptr obj = val.extract(); + return {.name = deserialize(obj->get("name"), name + ".name"), + .content = deserialize(obj->get("content"), name + ".content")}; +} diff --git a/endpoint/search/src/VarSerializer.cpp b/endpoint/search/src/VarSerializer.cpp index c0288a3d..a6619197 100644 --- a/endpoint/search/src/VarSerializer.cpp +++ b/endpoint/search/src/VarSerializer.cpp @@ -82,3 +82,14 @@ Poco::Dynamic::Var VarSerializer::serialize(const search::Docu obj->set("content", serialize(val.content)); return obj; } + +template<> +Poco::Dynamic::Var VarSerializer::serialize(const search::NewDocument& val) { + Poco::JSON::Object::Ptr obj = new Poco::JSON::Object(); + if (_options.addType) { + obj->set("__type", "search$NewDocument"); + } + obj->set("name", serialize(val.name)); + obj->set("content", serialize(val.content)); + return obj; +} diff --git a/endpoint/search/src/varinterface/SearchApiVarInterface.cpp b/endpoint/search/src/varinterface/SearchApiVarInterface.cpp index fafdeb89..927fc316 100644 --- a/endpoint/search/src/varinterface/SearchApiVarInterface.cpp +++ b/endpoint/search/src/varinterface/SearchApiVarInterface.cpp @@ -29,6 +29,7 @@ std::map(argsArr->get(0), "indexHandle"); + auto documents = _deserializer.deserializeVector(argsArr->get(1), "documents"); + auto result = _searchApi.addDocuments(indexHandle, documents); + return _serializer.serialize(result); +} + Poco::Dynamic::Var SearchApiVarInterface::updateDocument(const Poco::Dynamic::Var& args) { auto argsArr = core::VarInterfaceUtil::validateAndExtractArray(args, 2); auto indexHandle = _deserializer.deserialize(argsArr->get(0), "indexHandle"); diff --git a/endpoint/store/src/ServerApi.cpp b/endpoint/store/src/ServerApi.cpp index d9a18a0e..7ccb3a5f 100644 --- a/endpoint/store/src/ServerApi.cpp +++ b/endpoint/store/src/ServerApi.cpp @@ -52,10 +52,12 @@ server::StoreFileCreateResult ServerApi::storeFileCreate(const server::StoreFile } server::StoreFileReadResult ServerApi::storeFileRead(const server::StoreFileReadModel& model) { + std::cout << "storeFileRead" << std::endl; return request("storeFileRead", model); } void ServerApi::storeFileWrite(const server::StoreFileWriteModel& model) { + std::cout << "storeFileWrite" << std::endl; request("storeFileWrite", model); }