From 02521078a52e8b9f2cb23c9232a6a03e09a95d7b Mon Sep 17 00:00:00 2001 From: liao-yonghuang Date: Tue, 11 Nov 2025 21:17:32 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9B=E5=BB=BA=E5=88=86=E5=B8=83=E5=BC=8F?= =?UTF-8?q?=E8=A1=A8=E6=97=B6=E5=8F=AF=E4=BB=A5=E6=89=93=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: liao-yonghuang --- .../distributeddb/common/include/db_errno.h | 1 + .../interfaces/include/store_types.h | 1 + .../interfaces/src/kv_store_errno.cpp | 1 + .../relational_store_delegate_impl.cpp | 2 +- .../relational/sqlite_relational_store.cpp | 10 +- .../relational/sqlite_relational_utils.h | 2 + .../sqlite_relational_utils_client.cpp | 6 +- ...qlite_single_relational_storage_engine.cpp | 148 ++++++++- .../sqlite_single_relational_storage_engine.h | 20 ++ ...single_ver_relational_storage_executor.cpp | 18 +- ...e_single_ver_relational_storage_executor.h | 10 +- frameworks/libs/distributeddb/test/BUILD.gn | 4 + ...istributeddb_interfaces_stop_task_test.cpp | 310 ++++++++++++++++++ 13 files changed, 517 insertions(+), 16 deletions(-) create mode 100644 frameworks/libs/distributeddb/test/unittest/common/interfaces/distributeddb_interfaces_stop_task_test.cpp diff --git a/frameworks/libs/distributeddb/common/include/db_errno.h b/frameworks/libs/distributeddb/common/include/db_errno.h index 62e9431760c..b8fb1674979 100644 --- a/frameworks/libs/distributeddb/common/include/db_errno.h +++ b/frameworks/libs/distributeddb/common/include/db_errno.h @@ -195,6 +195,7 @@ constexpr const int E_FEEDBACK_DB_CLOSING = (E_BASE + 208); // Db was closing fe // The target user ID is incorrect and needs to be re-obtained constexpr const int E_NEED_CORRECT_TARGET_USER = (E_BASE + 209); constexpr const int E_CLOUD_ASSET_NOT_FOUND = (E_BASE + 210); // Cloud download asset return 404 error +constexpr const int E_TASK_INTERRUPTED = (E_BASE + 211); // Task(cloud sync, generate log) interrupted } // namespace DistributedDB #endif // DISTRIBUTEDDB_ERRNO_H diff --git a/frameworks/libs/distributeddb/interfaces/include/store_types.h b/frameworks/libs/distributeddb/interfaces/include/store_types.h index 6395a9be59a..cd870a085f0 100644 --- a/frameworks/libs/distributeddb/interfaces/include/store_types.h +++ b/frameworks/libs/distributeddb/interfaces/include/store_types.h @@ -98,6 +98,7 @@ enum DBStatus { NEED_CORRECT_TARGET_USER, // The target user ID is incorrect and needs to be re-obtained CLOUD_ASSET_NOT_FOUND, // The cloud download asset return 404 error FILE_NOT_FOUND, // local asset file abnormality + TASK_INTERRUPTED, // Task(cloud sync, generate log) interrupted BUTT_STATUS = 27394048 // end of status }; diff --git a/frameworks/libs/distributeddb/interfaces/src/kv_store_errno.cpp b/frameworks/libs/distributeddb/interfaces/src/kv_store_errno.cpp index 554d827fd28..61ccbb5ebb6 100644 --- a/frameworks/libs/distributeddb/interfaces/src/kv_store_errno.cpp +++ b/frameworks/libs/distributeddb/interfaces/src/kv_store_errno.cpp @@ -88,6 +88,7 @@ namespace { { -E_CLOUD_DISABLED, CLOUD_DISABLED }, { -E_DISTRIBUTED_FIELD_DECREASE, DISTRIBUTED_FIELD_DECREASE }, { -E_CLOUD_ASSET_NOT_FOUND, CLOUD_ASSET_NOT_FOUND }, + { -E_TASK_INTERRUPTED, TASK_INTERRUPTED }, }; } diff --git a/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.cpp b/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.cpp index b8650d981cc..e0adffca8a0 100644 --- a/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.cpp +++ b/frameworks/libs/distributeddb/interfaces/src/relational/relational_store_delegate_impl.cpp @@ -658,7 +658,7 @@ DBStatus RelationalStoreDelegateImpl::StopTask(TaskType type) LOGE("[RelationalStore Delegate] Invalid connection for stop task."); return DB_ERROR; } - return NOT_SUPPORT; + return TransferDBErrno(conn_->StopTask(type)); } } // namespace DistributedDB #endif \ No newline at end of file diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.cpp index cb27be62cce..370326b3b5f 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.cpp @@ -443,6 +443,7 @@ void SQLiteRelationalStore::WakeUpSyncer() int SQLiteRelationalStore::CreateDistributedTable(const std::string &tableName, TableSyncType syncType, bool trackerSchemaChanged) { + sqliteStorageEngine_->StartGenLogTask(); RelationalSchemaObject localSchema = sqliteStorageEngine_->GetSchema(); TableInfo tableInfo = localSchema.GetTable(tableName); if (!tableInfo.Empty()) { @@ -2082,9 +2083,14 @@ void SQLiteRelationalStore::StopAllBackgroundTask() #ifdef USE_DISTRIBUTEDDB_CLOUD if (cloudSyncer_ == nullptr) { LOGW("[RelationalStore] cloudSyncer was not initialized when stop all background task"); - return; + } else { + (void) cloudSyncer_->StopSyncTask(nullptr); + } + if (sqliteStorageEngine_ == nullptr) { + LOGW("[RelationalStore] Storage engine was not initialized when stop all background task"); + } else { + sqliteStorageEngine_->StopGenLogTask(); } - (void) cloudSyncer_->StopSyncTask(nullptr); #endif } } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h index 8b63f15df3d..c8d81a5c9d5 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils.h @@ -78,10 +78,12 @@ public: static int InitKnowledgeTableTypeToMeta(sqlite3 *db, bool isMemory, const std::string &tableName); static int SetLogTriggerStatus(sqlite3 *db, bool status); + static constexpr const uint32_t BATCH_GEN_LOG_SIZE = 1000; struct GenLogParam { sqlite3 *db = nullptr; bool isMemory = false; bool isTrackerTable = false; + uint32_t batchLimit = 0; }; static int GeneTimeStrForLog(const TableInfo &tableInfo, GenLogParam ¶m, std::string &timeStr); diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils_client.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils_client.cpp index 56f327707b8..fcd69f246bd 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils_client.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_utils_client.cpp @@ -169,9 +169,13 @@ int SQLiteRelationalUtils::GeneLogInfoForExistedData(const std::string &identity sql += GetExtendValue(tableInfo.GetTrackerTable()); sql += ", 0, '', '', 0 FROM '" + tableName + "' AS a "; if (param.isTrackerTable) { - sql += " WHERE 1 = 1;"; + sql += "WHERE 1 = 1;"; } else { sql += "WHERE NOT EXISTS (SELECT 1 FROM " + logTable + " WHERE data_key = a._rowid_);"; + if (param.batchLimit > 0) { + sql.pop_back(); + sql += " LIMIT " + std::to_string(param.batchLimit) + ";"; + } } errCode = trackerTable.ReBuildTempTrigger(param.db, TriggerMode::TriggerModeEnum::INSERT, [db = param.db, &sql]() { int ret = SQLiteUtils::ExecuteRawSQL(db, sql); diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp index 150d6a23638..7a232a55a30 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.cpp @@ -17,10 +17,10 @@ #include "db_common.h" #include "db_errno.h" +#include "log_table_manager_factory.h" #include "res_finalizer.h" #include "sqlite_relational_database_upgrader.h" #include "sqlite_single_ver_relational_storage_executor.h" -#include "sqlite_relational_utils.h" namespace DistributedDB { @@ -233,6 +233,14 @@ int SQLiteSingleRelationalStorageEngine::CreateDistributedTable(const std::strin LOGE("CreateDistributedTable failed. %d", errCode); return errCode; } + if (syncType == CLOUD_COOPERATION && schema.GetTable(tableName).GetTrackerTable().GetTableName().empty()) { + errCode = GenCloudLogInfo(tableName, schema, identity, isUpgraded); + if (errCode != E_OK) { + LOGE("Generate cloud log info failed. %d", errCode); + return errCode; + } + SetSchema(schema); + } if (isUpgraded && (schemaChanged || trackerSchemaChanged)) { // Used for upgrading the stock data of the trackerTable errCode = GenLogInfoForUpgrade(tableName, schema, schemaChanged); @@ -303,6 +311,9 @@ int SQLiteSingleRelationalStorageEngine::CreateDistributedTable(const std::strin return errCode; } errCode = handle->Commit(); + if (tableSyncType == CLOUD_COOPERATION && table.GetTrackerTable().GetTableName().empty()) { + return errCode; + } if (errCode == E_OK) { SetSchema(schema); } @@ -330,6 +341,10 @@ int SQLiteSingleRelationalStorageEngine::CreateDistributedTable(SQLiteSingleVerR // update table if tableName changed schema.RemoveRelationalTable(tableName); schema.AddRelationalTable(table); + if (table.GetTableSyncType() == CLOUD_COOPERATION && !table.GetSharedTableMark() && + table.GetTrackerTable().GetTableName().empty()) { + return E_OK; + } errCode = SaveSchemaToMetaTable(handle, schema); if (errCode != E_OK) { LOGE("Save schema to meta table for create distributed table failed. %d", errCode); @@ -991,6 +1006,76 @@ int SQLiteSingleRelationalStorageEngine::CleanTrackerDeviceTable(const std::vect return errCode; } +int SQLiteSingleRelationalStorageEngine::GenCloudLogInfo(const std::string &tableName, + const RelationalSchemaObject &schema, const std::string &identity, bool isForUpgrade) +{ + TableInfo table = schema.GetTable(tableName); + GenerateLogInfo info = { + .tableName = tableName, .identity = identity + }; + int errCode = GenLogInfo(info, schema); + if (errCode != E_OK) { + LOGE("Generate log for exist data failed: %d", errCode); + return errCode; + } + errCode = SaveInfoToMetaData(schema, tableName, CLOUD_COOPERATION); + if (errCode != E_OK) { + LOGE("Save info to meta table failed: %d", errCode); + } + return errCode; +} + +int SQLiteSingleRelationalStorageEngine::GenLogInfo(const GenerateLogInfo &info, const RelationalSchemaObject &schema) +{ + int errCode = E_OK; + auto *handle = static_cast(FindExecutor(true, OperatePerm::NORMAL_PERM, + errCode)); + if (handle == nullptr) { + return errCode; + } + ResFinalizer finalizer([handle, &errCode, this] { + SQLiteSingleVerRelationalStorageExecutor *releaseHandle = handle; + this->ReleaseExecutor(releaseHandle); + }); + + errCode = handle->StartTransaction(TransactType::IMMEDIATE); + if (errCode != E_OK) { + return errCode; + } + + errCode = GenLogInfoInTransaction(info, schema, handle); + if (errCode != E_OK && errCode != -E_TASK_INTERRUPTED) { + (void)handle->Rollback(); + return errCode; + } + int ret = handle->SetLogTriggerStatus(true); + if (ret != E_OK) { + (void)handle->Rollback(); + return ret; + } + (void)handle->Commit(); + return errCode; +} + +int SQLiteSingleRelationalStorageEngine::GenLogInfoInTransaction(const GenerateLogInfo &info, + const RelationalSchemaObject &schema, SQLiteSingleVerRelationalStorageExecutor *&handle) +{ + TableInfo table = schema.GetTable(info.tableName); + sqlite3 *db = nullptr; + if (handle->GetDbHandle(db) != E_OK) { + LOGE("[GenLogInfoInTransaction] invalid db"); + return -E_INVALID_DB; + } + auto mode = GetRelationalProperties().GetDistributedTableMode(); + std::unique_ptr tableManager = + LogTableManagerFactory::GetTableManager(table, mode, table.GetTableSyncType()); + SQLiteRelationalUtils::GenLogParam param = { + db, handle->IsMemory(), false, SQLiteRelationalUtils::BATCH_GEN_LOG_SIZE + }; + return GeneLogInfoForExistedDataInBatch(info.identity, table, tableManager, param); +} + + int SQLiteSingleRelationalStorageEngine::GenLogInfoForUpgrade(const std::string &tableName, RelationalSchemaObject &schema, bool schemaChanged) { @@ -1017,6 +1102,55 @@ int SQLiteSingleRelationalStorageEngine::GenLogInfoForUpgrade(const std::string return handle->Commit(); } +int SQLiteSingleRelationalStorageEngine::GeneLogInfoForExistedDataInBatch(const std::string &identity, + const TableInfo &tableInfo, std::unique_ptr &logMgrPtr, + SQLiteRelationalUtils::GenLogParam ¶m) +{ + int changedCount = 0; + do { + if (isGenLogStop_) { + LOGI("gen log task interrupted."); + return -E_TASK_INTERRUPTED; + } + int errCode = SQLiteRelationalUtils::GeneLogInfoForExistedData(identity, tableInfo, logMgrPtr, param); + if (errCode != E_OK) { + LOGE("[GeneLogInfoForExistedDataInBatch] Generate one batch log failed: %d", errCode); + return errCode; + } + changedCount = sqlite3_changes(param.db); + } while (changedCount != 0); + return E_OK; +} + +int SQLiteSingleRelationalStorageEngine::SaveInfoToMetaData(const RelationalSchemaObject &schema, + const std::string &tableName, TableSyncType syncType) +{ + int errCode = E_OK; + auto *handle = static_cast(FindExecutor(true, OperatePerm::NORMAL_PERM, + errCode)); + if (handle == nullptr) { + return errCode; + } + ResFinalizer finalizer([handle, &errCode, this] { + SQLiteSingleVerRelationalStorageExecutor *releaseHandle = handle; + if (errCode != E_OK) { + (void)releaseHandle->Rollback(); + } else { + (void)releaseHandle->Commit(); + } + this->ReleaseExecutor(releaseHandle); + }); + errCode = handle->StartTransaction(TransactType::IMMEDIATE); + if (errCode != E_OK) { + return errCode; + } + errCode = SaveSchemaToMetaTable(handle, schema); + if (errCode != E_OK) { + return errCode; + } + return SaveSyncTableTypeAndDropFlagToMeta(handle, tableName, syncType); +} + std::map> SQLiteSingleRelationalStorageEngine::GetReachableWithShared( const std::map> &reachableReference, const std::map &tableToShared) @@ -1195,5 +1329,17 @@ int SQLiteSingleRelationalStorageEngine::SetDistributedSchemaInTraction(Relation } return errCode; } + +void SQLiteSingleRelationalStorageEngine::StartGenLogTask() +{ + LOGI("Start gen log task"); + isGenLogStop_ = false; +} + +void SQLiteSingleRelationalStorageEngine::StopGenLogTask() +{ + LOGI("Stop gen log task"); + isGenLogStop_ = true; +} } #endif \ No newline at end of file diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h index 61588301fa7..8150576a6d2 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_relational_storage_engine.h @@ -18,11 +18,17 @@ #include "macro_utils.h" #include "relationaldb_properties.h" +#include "sqlite_relational_utils.h" #include "sqlite_storage_engine.h" #include "sqlite_single_ver_relational_storage_executor.h" #include "tracker_table.h" namespace DistributedDB { +struct GenerateLogInfo { + std::string tableName; + std::string identity; +}; + class SQLiteSingleRelationalStorageEngine : public SQLiteStorageEngine { public: explicit SQLiteSingleRelationalStorageEngine(RelationalDBProperties properties); @@ -66,6 +72,9 @@ public: std::pair SetDistributedSchema(const DistributedSchema &schema, const std::string &localIdentity, bool isForceUpgrade); + + void StartGenLogTask(); + void StopGenLogTask(); protected: StorageExecutor *NewSQLiteStorageExecutor(sqlite3 *dbHandle, bool isWrite, bool isMemDb) override; int Upgrade(sqlite3 *db) override; @@ -91,7 +100,16 @@ private: int CleanTrackerDeviceTable(const std::vector &tableNames, RelationalSchemaObject &trackerSchemaObj, SQLiteSingleVerRelationalStorageExecutor *&handle); + int GenCloudLogInfo(const std::string &tableName, const RelationalSchemaObject &schema, + const std::string &identity, bool isForUpgrade); + int GenLogInfo(const GenerateLogInfo &info, const RelationalSchemaObject &schema); + int GenLogInfoInTransaction(const GenerateLogInfo &info, const RelationalSchemaObject &schema, + SQLiteSingleVerRelationalStorageExecutor *&handle); int GenLogInfoForUpgrade(const std::string &tableName, RelationalSchemaObject &schema, bool schemaChanged); + int GeneLogInfoForExistedDataInBatch(const std::string &identity, const TableInfo &tableInfo, + std::unique_ptr &logMgrPtr, SQLiteRelationalUtils::GenLogParam ¶m); + int SaveInfoToMetaData(const RelationalSchemaObject &schema, const std::string &tableName, + TableSyncType syncType); static std::map> GetReachableWithShared( const std::map> &reachableReference, @@ -139,6 +157,8 @@ private: RelationalDBProperties properties_; std::mutex createDistributedTableMutex_; mutable std::mutex propertiesMutex_; + + std::atomic isGenLogStop_ = false; }; } // namespace DistributedDB #endif diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.cpp index 1a49c5d4e5b..b35b99c2978 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.cpp @@ -152,7 +152,7 @@ int SQLiteSingleVerRelationalStorageExecutor::GeneLogInfoForExistedData(sqlite3 return SQLiteRelationalUtils::GeneLogInfoForExistedData(identity, tableInfo, logMgrPtr, param); } -int SQLiteSingleVerRelationalStorageExecutor::ResetLogStatus(std::string &tableName) +int SQLiteSingleVerRelationalStorageExecutor::ResetLogStatus(const std::string &tableName) { int errCode = SetLogTriggerStatus(false); if (errCode != E_OK) { @@ -230,6 +230,16 @@ int SQLiteSingleVerRelationalStorageExecutor::CreateRelationalLogTable(Distribut LOGE("[CreateDistributedTable] create log table failed"); return errCode; } + // add trigger + errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, identity); + if (errCode != E_OK) { + LOGE("[CreateDistributedTable] Add relational log table trigger failed."); + return errCode; + } + if (table.GetTableSyncType() == CLOUD_COOPERATION && !table.GetSharedTableMark() && + table.GetTrackerTable().GetTableName().empty()) { + return SetLogTriggerStatus(true); + } std::string tableName = table.GetTableName(); bool isOnceDropped = false; (void)IsTableOnceDropped(tableName, isOnceDropped); @@ -254,12 +264,6 @@ int SQLiteSingleVerRelationalStorageExecutor::CreateRelationalLogTable(Distribut return errCode; } - // add trigger - errCode = tableManager->AddRelationalLogTableTrigger(dbHandle_, table, identity); - if (errCode != E_OK) { - LOGE("[CreateDistributedTable] Add relational log table trigger failed."); - return errCode; - } return SetLogTriggerStatus(true); } diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.h index cd4d9eaed48..6c58f4a732b 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_executor.h @@ -58,7 +58,7 @@ public: // Delete the copy and assign constructors DISABLE_COPY_ASSIGN_MOVE(SQLiteSingleVerRelationalStorageExecutor); - int ResetLogStatus(std::string &tableName); + int ResetLogStatus(const std::string &tableName); int CreateRelationalLogTable(DistributedTableMode mode, bool isUpgraded, const std::string &identity, TableInfo &table); @@ -281,6 +281,9 @@ public: const std::set &extendColNames, const std::string &sql); int ConvertLogToLocal(const std::string &tableName, const std::vector &gids); + + int UpdateTrackerTable(sqlite3 *db, const std::string &identity, const TableInfo &tableInfo, + std::unique_ptr &logMgrPtr, bool isTimestampOnly); private: int UpdateHashKeyWithOutPk(DistributedTableMode mode, const TableInfo &tableInfo, TableSyncType syncType, const std::string &localIdentity); @@ -429,9 +432,6 @@ private: int GetDeleteStatementForCloudSync(const TableSchema &tableSchema, const std::set &pkSet, const VBucket &vBucket, sqlite3_stmt *&deleteStmt); - int UpdateTrackerTable(sqlite3 *db, const std::string &identity, const TableInfo &tableInfo, - std::unique_ptr &logMgrPtr, bool isTimestampOnly); - int DeleteCloudData(const std::string &tableName, const VBucket &vBucket, const TableSchema &tableSchema, const TrackerTable &trackerTable); @@ -570,6 +570,8 @@ private: std::atomic maxUploadCount_; std::atomic maxUploadSize_; + + std::atomic isGenLogStop_ = false; }; } // namespace DistributedDB #endif diff --git a/frameworks/libs/distributeddb/test/BUILD.gn b/frameworks/libs/distributeddb/test/BUILD.gn index 9ae348db727..ce84eca4f81 100644 --- a/frameworks/libs/distributeddb/test/BUILD.gn +++ b/frameworks/libs/distributeddb/test/BUILD.gn @@ -913,6 +913,10 @@ distributeddb_unittest("DistributedDBKvPermissionSyncTest") { ] } +distributeddb_unittest("DistributedDBInterfacesStopTaskTest") { + sources = [ "unittest/common/interfaces/distributeddb_interfaces_stop_task_test.cpp.cpp" ] +} + distributeddb_unittest("DistributedDBRDBPermissionSyncTest") { sources = [ "unittest/common/store_test/rdb/distributeddb_rdb_permission_sync_test.cpp", diff --git a/frameworks/libs/distributeddb/test/unittest/common/interfaces/distributeddb_interfaces_stop_task_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/interfaces/distributeddb_interfaces_stop_task_test.cpp new file mode 100644 index 00000000000..5af73ca6bb4 --- /dev/null +++ b/frameworks/libs/distributeddb/test/unittest/common/interfaces/distributeddb_interfaces_stop_task_test.cpp @@ -0,0 +1,310 @@ +/* + * Copyright (c) 2025 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef USE_DISTRIBUTEDDB_CLOUD +#include +#include + +#include "db_common.h" +#include "distributeddb_data_generate_unit_test.h" +#include "distributeddb_tools_unit_test.h" +#include "log_print.h" +#include "relational_store_manager.h" +#include "runtime_config.h" +#include "virtual_cloud_db.h" + +using namespace testing::ext; +using namespace DistributedDB; +using namespace DistributedDBUnitTest; +using namespace std; + +const int ONE_BATCH_NUM = 100; +constexpr const char* DB_SUFFIX = ".db"; +constexpr const char* STORE_ID = "RelationalAsyncCreateDistributedDBTableTest"; +const std::string NORMAL_CREATE_TABLE_SQL = "CREATE TABLE IF NOT EXISTS test_table(" \ + "id INT PRIMARY KEY," \ + "name TEXT," \ + "number INT NOT NULL);"; +const std::vector CLOUD_FIELDS = { + {.colName = "id", .type = TYPE_INDEX, .primary = true}, + {.colName = "name", .type = TYPE_INDEX}, + {.colName = "number", .type = TYPE_INDEX, .nullable = false} +}; +std::string g_testDir; +std::string g_dbDir; +const std::string NORMAL_TABLE = "test_table"; +const std::string NORMAL_SHARED_TABLE = "test_table_shared"; +const std::string META_TABLE = "naturalbase_rdb_aux_metadata"; +sqlite3 *g_db = nullptr; +DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID); +RelationalStoreDelegate *g_delegate = nullptr; +std::shared_ptr g_virtualCloudDb; +SyncProcess g_syncProcess; + +class DistributedDBInterfacesStopTaskTest : public testing::Test { +public: + static void SetUpTestCase(void); + static void TearDownTestCase(void); + void SetUp(); + void TearDown(); +protected: + void InsertData(int startId, int dataNum, const std::string &tableName = NORMAL_TABLE); + void CallSync(DBStatus expectResult = OK); + int GetDataCount(const std::string &tableName, int &count); +}; + +void GetCloudDbSchema(DataBaseSchema &dataBaseSchema) +{ + TableSchema assetsTableSchema = {.name = NORMAL_TABLE, .sharedTableName = NORMAL_TABLE + "_shared", + .fields = CLOUD_FIELDS}; + dataBaseSchema.tables.push_back(assetsTableSchema); +} + +void DistributedDBInterfacesStopTaskTest::SetUpTestCase(void) +{ + DistributedDBToolsUnitTest::TestDirInit(g_testDir); + LOGD("Test dir is %s", g_testDir.c_str()); + g_dbDir = g_testDir + "/"; + DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir); +} + +void DistributedDBInterfacesStopTaskTest::TearDownTestCase(void) +{ +} + +void DistributedDBInterfacesStopTaskTest::SetUp(void) +{ + DistributedDBToolsUnitTest::PrintTestCaseInfo(); + g_db = RelationalTestUtils::CreateDataBase(g_dbDir + STORE_ID + DB_SUFFIX); + ASSERT_NE(g_db, nullptr); + EXPECT_EQ(RelationalTestUtils::ExecSql(g_db, "PRAGMA journal_mode=WAL;"), SQLITE_OK); + EXPECT_EQ(RelationalTestUtils::ExecSql(g_db, NORMAL_CREATE_TABLE_SQL), SQLITE_OK); + + DBStatus status = g_mgr.OpenStore(g_dbDir + STORE_ID + DB_SUFFIX, STORE_ID, {}, g_delegate); + EXPECT_EQ(status, OK); + ASSERT_NE(g_delegate, nullptr); + g_virtualCloudDb = make_shared(); + ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK); +} + +void DistributedDBInterfacesStopTaskTest::TearDown(void) +{ + if (g_db != nullptr) { + EXPECT_EQ(sqlite3_close_v2(g_db), SQLITE_OK); + g_db = nullptr; + } + if (g_delegate != nullptr) { + DBStatus status = g_mgr.CloseStore(g_delegate); + g_delegate = nullptr; + EXPECT_EQ(status, OK); + } + g_virtualCloudDb = nullptr; + DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir); +} + +void DistributedDBInterfacesStopTaskTest::InsertData(int startId, int dataNum, const std::string &tableName) +{ + ASSERT_NE(g_db, nullptr); + SQLiteUtils::BeginTransaction(g_db, TransactType::IMMEDIATE); + sqlite3_stmt *stmt = nullptr; + std::string sql = "INSERT INTO " + tableName + "(id, name, number) VALUES(?, ?, ?);"; + EXPECT_EQ(SQLiteUtils::GetStatement(g_db, sql, stmt), E_OK); + int resetRet = E_OK; + for (int i = startId; i < startId + dataNum; i++) { + EXPECT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 1, i), E_OK); // 1st is id + EXPECT_EQ(SQLiteUtils::BindTextToStatement(stmt, 2, "name_" + std::to_string(i)), E_OK); // 2nd is name + EXPECT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 3, i + i), E_OK); // 3rd is number + EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)); + SQLiteUtils::ResetStatement(stmt, false, resetRet); + } + SQLiteUtils::ResetStatement(stmt, true, resetRet); + SQLiteUtils::CommitTransaction(g_db); +} + +void DistributedDBInterfacesStopTaskTest::CallSync(DBStatus expectResult) +{ + CloudSyncOption option; + option.devices = { "CLOUD" }; + option.query = Query::Select().FromTable({NORMAL_TABLE}); + option.mode = SYNC_MODE_CLOUD_MERGE; + std::mutex dataMutex; + std::condition_variable cv; + bool finish = false; + SyncProcess last; + auto callback = [&last, &cv, &dataMutex, &finish](const std::map &process) { + for (const auto &item: process) { + if (item.second.process == DistributedDB::FINISHED) { + { + std::lock_guard autoLock(dataMutex); + finish = true; + last = item.second; + } + cv.notify_one(); + } + } + }; + ASSERT_EQ(g_delegate->Sync(option, callback), expectResult); + if (expectResult == OK) { + std::unique_lock uniqueLock(dataMutex); + cv.wait(uniqueLock, [&finish]() { + return finish; + }); + } + g_syncProcess = last; +} + +int DistributedDBInterfacesStopTaskTest::GetDataCount(const std::string &tableName, int &count) +{ + std::string sql = "select count(*) from " + tableName; + return SQLiteUtils::GetCountBySql(g_db, sql, count); +} + +/** + * @tc.name: AbortCreateDistributedDBTableTest001 + * @tc.desc: Test abort create distributed table. + * @tc.type: FUNC + * @tc.require: + * @tc.author: liaoyonghuang + */ +HWTEST_F(DistributedDBInterfacesStopTaskTest, AbortCreateDistributedDBTableTest001, TestSize.Level0) +{ + /** + * @tc.steps:step1. Prepare data + * @tc.expected: step1. Return OK. + */ + DataBaseSchema dataBaseSchema; + GetCloudDbSchema(dataBaseSchema); + ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK); + int dataCount = ONE_BATCH_NUM * 100; + InsertData(0, dataCount); + /** + * @tc.steps:step2. Create distributed table and stop task. + * @tc.expected: step2. Return OK. + */ + std::thread subThread([&]() { + EXPECT_EQ(g_delegate->CreateDistributedTable(NORMAL_TABLE, CLOUD_COOPERATION), TASK_INTERRUPTED); + }); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + g_delegate->StopTask(TaskType::BACKGROUND_TASK); + subThread.join(); + int logCount = 0; + EXPECT_EQ(GetDataCount(DBCommon::GetLogTableName(NORMAL_TABLE), logCount), E_OK); + EXPECT_TRUE(logCount < dataCount); + /** + * @tc.steps:step3. Create distributed table again. + * @tc.expected: step3. Return OK. + */ + int newDataCount = 1; + InsertData(dataCount, newDataCount); + EXPECT_EQ(g_delegate->CreateDistributedTable(NORMAL_TABLE, CLOUD_COOPERATION), OK); + EXPECT_EQ(GetDataCount(DBCommon::GetLogTableName(NORMAL_TABLE), logCount), E_OK); + EXPECT_EQ(logCount, dataCount + newDataCount); +} + +/** + * @tc.name: AbortCreateDistributedDBTableTest002 + * @tc.desc: Test sync after abort create distributed table. + * @tc.type: FUNC + * @tc.require: + * @tc.author: liaoyonghuang + */ +HWTEST_F(DistributedDBInterfacesStopTaskTest, AbortCreateDistributedDBTableTest002, TestSize.Level0) +{ + /** + * @tc.steps:step1. Prepare data + * @tc.expected: step1. Return OK. + */ + DataBaseSchema dataBaseSchema; + GetCloudDbSchema(dataBaseSchema); + ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK); + int dataCount = ONE_BATCH_NUM * 100; + InsertData(0, dataCount); + /** + * @tc.steps:step2. Create distributed table and stop task. + * @tc.expected: step2. Return OK. + */ + std::thread subThread([&]() { + EXPECT_EQ(g_delegate->CreateDistributedTable(NORMAL_TABLE, CLOUD_COOPERATION), TASK_INTERRUPTED); + }); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + g_delegate->StopTask(TaskType::BACKGROUND_TASK); + subThread.join(); + int logCount = 0; + EXPECT_EQ(GetDataCount(DBCommon::GetLogTableName(NORMAL_TABLE), logCount), E_OK); + EXPECT_TRUE(logCount < dataCount); + /** + * @tc.steps:step3. Sync to cloud. + * @tc.expected: step3. Return SCHEMA_MISMATCH. + */ + CallSync(SCHEMA_MISMATCH); + /** + * @tc.steps:step3. Create distributed table again and sync. + * @tc.expected: step3. Return OK. + */ + EXPECT_EQ(g_delegate->CreateDistributedTable(NORMAL_TABLE, CLOUD_COOPERATION), OK); + EXPECT_EQ(GetDataCount(DBCommon::GetLogTableName(NORMAL_TABLE), logCount), E_OK); + EXPECT_EQ(logCount, dataCount); + CallSync(); +} + +/** + * @tc.name: AbortCreateDistributedDBTableTest003 + * @tc.desc: Test update/delete after abort create distributed table. + * @tc.type: FUNC + * @tc.require: + * @tc.author: liaoyonghuang + */ +HWTEST_F(DistributedDBInterfacesStopTaskTest, AbortCreateDistributedDBTableTest003, TestSize.Level0) +{ + /** + * @tc.steps:step1. Prepare data + * @tc.expected: step1. Return OK. + */ + DataBaseSchema dataBaseSchema; + GetCloudDbSchema(dataBaseSchema); + ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK); + int dataCount = ONE_BATCH_NUM * 100; + InsertData(0, dataCount); + /** + * @tc.steps:step2. Create distributed table and stop task. + * @tc.expected: step2. Return OK. + */ + std::thread subThread([&]() { + EXPECT_EQ(g_delegate->CreateDistributedTable(NORMAL_TABLE, CLOUD_COOPERATION), TASK_INTERRUPTED); + }); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + g_delegate->StopTask(TaskType::BACKGROUND_TASK); + subThread.join(); + int logCount = 0; + EXPECT_EQ(GetDataCount(DBCommon::GetLogTableName(NORMAL_TABLE), logCount), E_OK); + EXPECT_TRUE(logCount < dataCount); + /** + * @tc.steps:step3. update/delete data and create distributed table again. + * @tc.expected: step3. Return OK. + */ + std::string sql = "delete from " + NORMAL_TABLE + " where id >= 0 and id < 10;"; + EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(g_db, sql), E_OK); + sql = "update " + NORMAL_TABLE + " set number = number + 1 where id >= 10 and id < 20;"; + EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(g_db, sql), E_OK); + EXPECT_EQ(g_delegate->CreateDistributedTable(NORMAL_TABLE, CLOUD_COOPERATION), OK); + EXPECT_EQ(GetDataCount(DBCommon::GetLogTableName(NORMAL_TABLE), logCount), E_OK); + EXPECT_EQ(logCount, dataCount); + int actualDeleteCount = 0; + int expectDeleteCount = 10; + sql = "select count(*) from " + DBCommon::GetLogTableName(NORMAL_TABLE) + " where data_key = -1"; + EXPECT_EQ(SQLiteUtils::GetCountBySql(g_db, sql, actualDeleteCount), E_OK); + EXPECT_EQ(actualDeleteCount, expectDeleteCount); +} +#endif -- Gitee