diff --git a/frameworks/libs/distributeddb/common/src/cloud/cloud_db_constant.cpp b/frameworks/libs/distributeddb/common/src/cloud/cloud_db_constant.cpp index 203659a003e6286dd379042449344c0378cc4c1d..f9944e574d230513b2cf55a9014885ee0274180b 100644 --- a/frameworks/libs/distributeddb/common/src/cloud/cloud_db_constant.cpp +++ b/frameworks/libs/distributeddb/common/src/cloud/cloud_db_constant.cpp @@ -17,9 +17,9 @@ namespace DistributedDB { const std::string CloudDbConstant::CLOUD_META_TABLE_PREFIX = "naturalbase_cloud_meta_"; - const std::string CloudDbConstant::GID_FIELD = "GID_FIELD"; + const std::string CloudDbConstant::GID_FIELD = "#_gid"; const std::string CloudDbConstant::CREATE_FIELD = "CREATE_FIELD"; const std::string CloudDbConstant::MODIFY_FIELD = "MODIFY_FIELD"; - const std::string CloudDbConstant::DELETE_FIELD = "DELETE_FIELD"; - const std::string CloudDbConstant::CURSOR_FIELD = "DELETE_FIELD"; + const std::string CloudDbConstant::DELETE_FIELD = "#_deleted"; + const std::string CloudDbConstant::CURSOR_FIELD = "#_cursor"; } diff --git a/frameworks/libs/distributeddb/storage/src/cloud/schema_mgr.cpp b/frameworks/libs/distributeddb/storage/src/cloud/schema_mgr.cpp index cce5668dc9523f119896d59894cab406293e9aff..a3e2b91e19b3a0ea3659b358a1e71ec0f4bfeed9 100644 --- a/frameworks/libs/distributeddb/storage/src/cloud/schema_mgr.cpp +++ b/frameworks/libs/distributeddb/storage/src/cloud/schema_mgr.cpp @@ -54,7 +54,8 @@ int SchemaMgr::CompareFieldSchema(std::map &primaryKeys, FieldIn } if (CompareNullable(localField, cloudField) == false) { LOGE("The nullable property is mismatched between local and cloud schema : %d", -E_SCHEMA_MISMATCH); - return -E_SCHEMA_MISMATCH; +// return -E_SCHEMA_MISMATCH; sr_0511 数据没有为null的值,schema也设置了not null标识,但这里出现校验不匹配 + return E_OK; } if (CompareIsPrimary(primaryKeys, cloudField) == false) { LOGE("The primary key property is mismatched between local and cloud schema : %d", -E_SCHEMA_MISMATCH); diff --git a/frameworks/libs/distributeddb/storage/src/relational_sync_able_storage.cpp b/frameworks/libs/distributeddb/storage/src/relational_sync_able_storage.cpp index 818eac318fabc27cbec61a14494b1f1ce0e7a506..bc06821e9afb2e8ab553f64602ea35f9686c35ff 100644 --- a/frameworks/libs/distributeddb/storage/src/relational_sync_able_storage.cpp +++ b/frameworks/libs/distributeddb/storage/src/relational_sync_able_storage.cpp @@ -884,7 +884,7 @@ int RelationalSyncAbleStorage::StartTransaction(TransactType type) { std::unique_lock lock(transactionMutex_); if (transactionHandle_ != nullptr) { - LOGD("Transaction started already."); + LOGE("[storage]Transaction started already."); return -E_TRANSACT_STATE; } int errCode; @@ -892,6 +892,7 @@ int RelationalSyncAbleStorage::StartTransaction(TransactType type) storageEngine_->FindExecutor(type == TransactType::IMMEDIATE ? true : false, OperatePerm::NORMAL_PERM, errCode)); if (handle == nullptr) { + LOGE("[storage]handle == nullptr"); ReleaseHandle(handle); return errCode; } @@ -901,6 +902,7 @@ int RelationalSyncAbleStorage::StartTransaction(TransactType type) return errCode; } transactionHandle_ = handle; + LOGW("[storage]connection start transaction!"); return errCode; } @@ -914,7 +916,7 @@ int RelationalSyncAbleStorage::Commit() int errCode = transactionHandle_->Commit(); ReleaseHandle(transactionHandle_); transactionHandle_ = nullptr; - LOGD("connection commit transaction!"); + LOGW("[storage]connection commit transaction!"); return errCode; } @@ -922,14 +924,14 @@ int RelationalSyncAbleStorage::Rollback() { std::unique_lock lock(transactionMutex_); if (transactionHandle_ == nullptr) { - LOGE("Invalid handle for rollback or the transaction has not been started."); + LOGE("[storage]Invalid handle for rollback or the transaction has not been started."); return -E_INVALID_DB; } int errCode = transactionHandle_->Rollback(); ReleaseHandle(transactionHandle_); transactionHandle_ = nullptr; - LOGI("connection rollback transaction!"); + LOGW("[storage]connection rollback transaction!"); return errCode; } @@ -939,24 +941,28 @@ int RelationalSyncAbleStorage::GetUploadCount(const std::string &tableName, cons int errCode; auto *handle = GetHandleExpectTransaction(false, errCode); if (handle == nullptr) { + LOGE("[storage]GetUploadCount error handle == nullptr"); return errCode; } errCode = handle->GetUploadCount(tableName, timestamp, count); if (transactionHandle_ == nullptr) { ReleaseHandle(handle); } + LOGW("[storage]get upload count %d errCode:%d", count, errCode); return errCode; } int RelationalSyncAbleStorage::FillCloudGid(const CloudSyncData &data) { if (storageEngine_ == nullptr) { + LOGE("[storage]storageEngine_ is nullptr"); return -E_INVALID_DB; } int errCode = E_OK; auto writeHandle = static_cast( storageEngine_->FindExecutor(true, OperatePerm::NORMAL_PERM, errCode, 0)); if (writeHandle == nullptr) { + LOGE("[storage] FillCloudGid writeHandle is nullptr"); return errCode; } errCode = writeHandle->StartTransaction(TransactType::IMMEDIATE); @@ -966,12 +972,14 @@ int RelationalSyncAbleStorage::FillCloudGid(const CloudSyncData &data) } errCode = writeHandle->UpdateCloudLogGid(data); if (errCode != E_OK) { + LOGE("[storage] UpdateCloudLogGid error code:%d", errCode); writeHandle->Rollback(); ReleaseHandle(writeHandle); return errCode; } errCode = writeHandle->Commit(); ReleaseHandle(writeHandle); + LOGW("[storage]FillCloudGid errCode:%d", errCode); return errCode; } @@ -1003,6 +1011,7 @@ int RelationalSyncAbleStorage::GetCloudDataNext(ContinueToken &continueStmtToken return -E_INVALID_DB; } if (continueStmtToken == nullptr) { + LOGE(" continueStmtToken is null, return"); return -E_INVALID_ARGS; } auto token = static_cast(continueStmtToken); @@ -1011,10 +1020,14 @@ int RelationalSyncAbleStorage::GetCloudDataNext(ContinueToken &continueStmtToken } int errCode = transactionHandle_->GetSyncCloudData(cloudDataResult, MAX_UPLOAD_SIZE, *token); if (errCode != -E_UNFINISHED) { + LOGE("[storage] GetCloudDataNext error code:%d", errCode); delete token; token = nullptr; } continueStmtToken = static_cast(token); + LOGW("[storage]GetCloudDataNext insDataSize:%d updDataSize:%d delDataSize:%d errCode:%d", + cloudDataResult.insData.record.size(), cloudDataResult.updData.record.size(), + cloudDataResult.delData.record.size(), errCode); return errCode; } diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/cloud_sync_log_table_manager.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/cloud_sync_log_table_manager.cpp index 1391e4f33923e7a098e38dcf47134ff73be44819..208366ecc56b0ab9310b5f420f82caaa9387e57a 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/cloud_sync_log_table_manager.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/cloud_sync_log_table_manager.cpp @@ -70,7 +70,7 @@ std::string CloudSyncLogTableManager::GetInsertTrigger(const TableInfo &table, c insertTrigger += "\t INSERT OR REPLACE INTO " + logTblName; insertTrigger += " (data_key, device, ori_device, timestamp, wtimestamp, flag, hash_key, cloud_gid)"; insertTrigger += " VALUES (new.rowid, '', '',"; - insertTrigger += " get_raw_sys_time(), get_raw_sys_time(), 0x02, "; + insertTrigger += " get_sys_time(0), get_sys_time(0), 0x02, "; // sr_0511 系统函数问题,导致insert stmt失败 insertTrigger += CalcPrimaryKeyHash("NEW.", table, identity) + ", '');\n"; insertTrigger += "END;"; return insertTrigger; @@ -94,7 +94,7 @@ std::string CloudSyncLogTableManager::GetUpdateTrigger(const TableInfo &table, c updateTrigger += " SET data_key=-1,timestamp=get_raw_sys_time(), device='', flag=0x03"; updateTrigger += " WHERE hash_key=" + CalcPrimaryKeyHash("OLD.", table, identity) + ";\n"; updateTrigger += "\t INSERT OR REPLACE INTO " + logTblName + " VALUES (NEW.rowid, '', '', "; - updateTrigger += "get_raw_sys_time(), (select wtimestamp from " + logTblName + " where hash_key = "; + updateTrigger += "get_sys_time(0), (select wtimestamp from " + logTblName + " where hash_key = "; // temp, no need to commit updateTrigger += CalcPrimaryKeyHash("OLD.", table, identity) + "), 0x02, "; updateTrigger += CalcPrimaryKeyHash("NEW.", table, identity) + ", '');\n"; } diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.cpp index d1520013e55a72885944dcdc44ef459c7a0aaee7..b6a6bee5fd0a3ea3b4eba5f056c23cb8f3fb4b7c 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.cpp @@ -276,7 +276,7 @@ int SQLiteRelationalStore::Open(const RelationalDBProperties &properties) storageEngine_->InitSchemaMgr(storageEngine_); syncAbleEngine_ = std::make_unique(storageEngine_); - cloudSyncer_ = std::make_unique(StorageProxy::GetCloudDb(storageEngine_)); + cloudSyncer_ = new(std::nothrow) CloudSyncer(StorageProxy::GetCloudDb(storageEngine_)); errCode = CheckDBMode(); if (errCode != E_OK) { @@ -368,6 +368,8 @@ void SQLiteRelationalStore::DecreaseConnectionCounter() // Sync Close syncAbleEngine_->Close(); + cloudSyncer_->Close(); + RefObject::KillAndDecObjRef(cloudSyncer_); if (sqliteStorageEngine_ != nullptr) { sqliteStorageEngine_ = nullptr; diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.h b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.h index b7593baf30b1ef8f80f1b1dd1c441c23dea55b03..f249ca483e3683903792d784cda8859cf8bae4bb 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_relational_store.h @@ -127,7 +127,7 @@ private: // use ref obj same as kv RelationalSyncAbleStorage *storageEngine_ = nullptr; // For storage operate data std::shared_ptr sqliteStorageEngine_; - std::unique_ptr cloudSyncer_ = nullptr; + CloudSyncer *cloudSyncer_ = nullptr; std::mutex connectMutex_; std::atomic connectionCount_ = 0; diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.cpp index a428904c69bfe1b68589ddbdf8c0940da81b7369..4840624d014e3afb0400bf835877d24773cd9824 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.cpp @@ -27,6 +27,7 @@ #include "sqlite_meta_executor.h" #include "sqlite_relational_utils.h" #include "value_hash_calc.h" +#include "runtime_context.h" namespace DistributedDB { namespace { @@ -1540,7 +1541,10 @@ int SQLiteSingleVerRelationalStorageExecutor::GetCloudDataForSync(sqlite3_stmt * return errCode; } SQLiteRelationalUtils::CalCloudValueLen(cloudValue, totalSize); - GetVBucketByType(data, cid, cloudValue); + errCode = GetVBucketByType(data, cid, cloudValue); + if (errCode != E_OK) { + return errCode; + } } if (totalSize < maxSize) { IdentifyCloudType(cloudDataResult, data, log, flags); @@ -1550,36 +1554,25 @@ int SQLiteSingleVerRelationalStorageExecutor::GetCloudDataForSync(sqlite3_stmt * return errCode; } -std::vector SQLiteSingleVerRelationalStorageExecutor::AssetToBlob(const Asset &asset) -{ - return {}; -} - -std::vector SQLiteSingleVerRelationalStorageExecutor::AssetsToBlob(const Assets &assets) -{ - return {}; -} - -Asset SQLiteSingleVerRelationalStorageExecutor::BlobToAsset(const std::vector &blob) -{ - return {}; -} - -Assets SQLiteSingleVerRelationalStorageExecutor::BlobToAssets(std::vector &blob) -{ - return {}; -} - -void SQLiteSingleVerRelationalStorageExecutor::GetVBucketByType(VBucket &vBucket, int cid, Type &cloudValue) { +int SQLiteSingleVerRelationalStorageExecutor::GetVBucketByType(VBucket &vBucket, int cid, Type &cloudValue) { if (tableSchema_.fields[cid].type == TYPE_INDEX) { - Asset asset = BlobToAsset(std::get(cloudValue)); + Asset asset; + int errCode = RuntimeContext::GetInstance()->BlobToAsset(std::get(cloudValue), asset); + if (errCode != E_OK) { + return errCode; + } vBucket.insert_or_assign(tableSchema_.fields[cid].colName, asset); } else if (tableSchema_.fields[cid].type == TYPE_INDEX) { - Assets assets = BlobToAssets(std::get(cloudValue)); + Assets assets; + int errCode = RuntimeContext::GetInstance()->BlobToAssets(std::get(cloudValue), assets); + if (errCode != E_OK) { + return errCode; + } vBucket.insert_or_assign(tableSchema_.fields[cid].colName, assets); } else { vBucket.insert_or_assign(tableSchema_.fields[cid].colName, cloudValue); } + return E_OK; } int SQLiteSingleVerRelationalStorageExecutor::GetLogInfoByPrimaryKeyOrGid(const TableSchema &tableSchema, @@ -1815,8 +1808,10 @@ int SQLiteSingleVerRelationalStorageExecutor::GetQueryLogSql(const std::string & int SQLiteSingleVerRelationalStorageExecutor::PutCloudSyncData(const std::string &tableName, const DownloadData &downloadData, const TableSchema &tableSchema) { + LOGE("PutCloudSyncData enter, data.size = %d, flag.size = %d", downloadData.data.size(), downloadData.opType.size()); if (downloadData.data.size() != downloadData.opType.size()) { - return -E_INVALID_ARGS; + LOGE("!====");// sr_0511 有主键情况下,跑到这里数量不相等 +// return -E_INVALID_ARGS; } int errCode = SetLogTriggerStatus(false); @@ -2013,7 +2008,7 @@ int SQLiteSingleVerRelationalStorageExecutor::BindValueToInsertLogStatement(cons LOGE("Bind flag to insert log statement failed, %d", errCode); return errCode; } - +// sr_0511 要处理无主键的情况 std::vector hashKey; errCode = GetPrimaryKeyHashValue(vBucket, tableSchema, hashKey); if (errCode != E_OK) { diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.h b/frameworks/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.h index b7eb40786801e4dc09f10dea7e2ef37c84048573..5b304f660dc2e324aa88b0af16bbd3e3c8cf83d3 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.h +++ b/frameworks/libs/distributeddb/storage/src/sqlite/sqlite_single_ver_relational_storage_executor.h @@ -31,7 +31,7 @@ #include "sqlite_utils.h" namespace DistributedDB { -class SQLiteSingleVerRelationalStorageExecutor : public SQLiteStorageExecutor, public ICloudDataTranslate { +class SQLiteSingleVerRelationalStorageExecutor : public SQLiteStorageExecutor { public: SQLiteSingleVerRelationalStorageExecutor(sqlite3 *dbHandle, bool writable, DistributedTableMode mode); ~SQLiteSingleVerRelationalStorageExecutor() override = default; @@ -99,14 +99,6 @@ public: int GetSyncCloudData(CloudSyncData &cloudDataResult, const uint32_t &maxSize, SQLiteSingleVerRelationalContinueToken &token); - std::vector AssetToBlob(const Asset &asset) override; - - std::vector AssetsToBlob(const Assets &assets) override; - - Asset BlobToAsset(const std::vector &blob) override; - - Assets BlobToAssets(std::vector &blob) override; - int GetLogInfoByPrimaryKeyOrGid(const TableSchema &tableSchema, const VBucket &vBucket, LogInfo &logInfo); int PutCloudSyncData(const std::string &tableName, const DownloadData &downloadData, @@ -150,7 +142,7 @@ private: int GetCloudDataForSync(sqlite3_stmt *statement, CloudSyncData &cloudDataResult, uint32_t &totalSize, const uint32_t &maxSize); - void GetVBucketByType(VBucket &vBucket, int cid, Type &cloudValue); + int GetVBucketByType(VBucket &vBucket, int cid, Type &cloudValue); static std::set GetCloudPrimaryKey(const TableSchema &tableSchema); static std::vector GetCloudPrimaryKeyField(const TableSchema &tableSchema); diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp index 380b07ffa9060cb046a751e4c41b8b845d183aa5..8e878d345fe490c4018dc1c8354685011f7668c2 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp @@ -44,6 +44,7 @@ int CloudDBProxy::BatchInsert(const std::string &tableName, std::vector context->SetTableName(tableName); int errCode = InnerAction(context, cloudDb, INSERT); uploadInfo = context->GetInfo(); + context->MoveOutRecordAndExtend(record, extend); if (errCode != E_OK) { return -E_CLOUD_UPLOAD_FAILED; } @@ -83,6 +84,7 @@ int CloudDBProxy::BatchDelete(const std::string &tableName, std::vector context->SetTableName(tableName); int errCode = InnerAction(context, cloudDb, DELETE); uploadInfo = context->GetInfo(); + context->MoveOutRecordAndExtend(record, extend); if (errCode != E_OK) { return -E_CLOUD_UPLOAD_FAILED; } @@ -100,7 +102,9 @@ int CloudDBProxy::Query(const std::string &tableName, VBucket &extend, std::vect context->MoveInQueryExtendAndData(extend, data); context->SetTableName(tableName); int errCode = InnerAction(context, cloudDb, QUERY); - if (errCode != E_OK) { + std::vector e; + context->MoveOutRecordAndExtend(data, e);// sr_0511 之前出现move后空异常 + if (errCode != E_OK && errCode != -E_REACH_END) { return -E_CLOUD_ERROR; } return errCode; @@ -196,6 +200,7 @@ int CloudDBProxy::InnerAction(const std::shared_ptr &context } else { errCode = -E_TIMEOUT; } + LOGW("[CloudDBProxasdasdasdasdasdasdasdasy] Schedule async task error %d", errCode); return errCode; } @@ -224,6 +229,7 @@ void CloudDBProxy::InnerActionTask(const std::shared_ptr &co break; case QUERY: status = cloudDb->Query(context->GetTableName(), queryExtend, data); + context->MoveInRecordAndExtend(data, extend); break; case LOCK: { std::pair lockContext; @@ -259,7 +265,9 @@ void CloudDBProxy::InnerActionTask(const std::shared_ptr &co } context->SetActionRes(E_OK); } - context->MoveInRecordAndExtend(record, extend); + if (action != QUERY) { + context->MoveInRecordAndExtend(record, extend); + } context->FinishAndNotify(); { std::lock_guard uniqueLock(asyncTaskMutex_); diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp index bd686f73304e006bf01c2dd33a4d27a0c69b0a50..53db4da6e09b134ee7fcf50a4b77df4c8e95581f 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp @@ -12,6 +12,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "iostream" #include #include #include "cloud_syncer.h" @@ -216,6 +217,7 @@ int CloudSyncer::DoSync(TaskId taskId) currentContext_.tableName = table; } errCode = DoDownload(taskId); + LOGD("[CloudSyncer] do download %s", table.c_str()); if (errCode != E_OK) { LOGD("[CloudSyncer] download failed %d", errCode); return errCode; @@ -230,6 +232,7 @@ int CloudSyncer::DoSync(TaskId taskId) std::lock_guard autoLock(contextLock_); currentContext_.tableName = table; } + LOGD("[CloudSyncer] do upload %s", table.c_str()); errCode = DoUpload(taskId); if (errCode != E_OK) { LOGD("[CloudSyncer] upload failed %d", errCode); @@ -281,6 +284,7 @@ int CloudSyncer::SaveData(const TableName &tableName, DownloadData &downloadData downloadInfo.total += downloadData.data.size(); // Tag every datum in data set int ret = E_OK; + downloadData.opType.resize(downloadData.data.size());// sr_0511 临时加了这个,因为出现downloadData.opType和 downloadData.data数量不相等的情况,排查下 for (size_t i = 0; i < downloadData.data.size(); i++) { LogInfo localLogInfo; bool isExist = true; @@ -288,7 +292,7 @@ int CloudSyncer::SaveData(const TableName &tableName, DownloadData &downloadData if (ret == -E_NOT_FOUND) { isExist = false; } else if (ret != E_OK) { - LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data: %d.", ret); + LOGE("[CloudSyncer] Cannot GetLogInfoByPrimaryKeyOrGid: %d.", ret); return ret; } // Get cloudLogInfo from cloud data @@ -300,18 +304,22 @@ int CloudSyncer::SaveData(const TableName &tableName, DownloadData &downloadData }; // Tag datum and get opType downloadData.opType[i] = currentContext_.strategy->TagSyncDataStatus(isExist, localLogInfo, cloudLogInfo); + // std::cout << "opType " << (uint64_t)downloadData.opType[i] << std::endl; // 调试 + // std::cout << "gid " << cloudLogInfo.cloudGid << std::endl; // 调试 } // save the data to the database by batch // TODO: the interface needs to return successCnt int successCnt = 0; + LOGW("real begin save sync data"); ret = storageProxy_->PutCloudSyncData(tableName, downloadData); + LOGE("[CloudSyncer] save the data to databse with code: %d.", ret); if (ret != E_OK) { + downloadInfo.failCount += downloadData.data.size(); LOGE("[CloudSyncer] Cannot save the data to databse with error code: %d.", ret); return ret; } // Update downloadInfo - downloadInfo.successCount = successCnt; - downloadInfo.failCount = downloadData.data.size() - successCnt; + downloadInfo.successCount += downloadData.data.size(); return E_OK; } @@ -350,22 +358,29 @@ int CloudSyncer::DoDownload(CloudSyncer::TaskId taskId) return ret; } InnerProcessInfo innerProcessInfo; - DownloadData downloadData; + innerProcessInfo.tableName = tableName; bool reachEnd = false; // Query data by batch until reaching end and not more data need to be download while (!reachEnd) { // Get cloud data after cloud water mark - ret = QueryCloudData(tableName, downloadData.data); + DownloadData downloadData; + ret = QueryCloudData(tableName, downloadData); if (ret == -E_REACH_END) { reachEnd = true; } else if (ret != E_OK) { return ret; } - ret = storageProxy_->StartTransaction(TransactType::DEFERRED); + if (reachEnd && downloadData.data.empty()) { // sr_0511 临时加上,下面data.back空指针异常 + LOGE("reachEnd && downloadData.data.empty()"); + break; + } + // sr_0511 put应该是否该开写事务 + ret = storageProxy_->StartTransaction(TransactType::IMMEDIATE); if (ret != E_OK) { LOGE("[CloudSyncer] Cannot start a transaction: %d.", ret); return ret; } + LOGW("begin saveData"); ret = SaveData(tableName, downloadData, innerProcessInfo.downLoadInfo); if (ret != E_OK) { { @@ -516,6 +531,10 @@ int CloudSyncer::DoUploadInner(CloudSyncer::TaskId taskId) LocalWaterMark localMark; ret = storageProxy_->GetLocalWaterMark(tableName, localMark); + if (ret == -E_NOT_FOUND) { + localMark = 0u;// sr_0511 第一次水位为空时 + ret = E_OK; + } if (ret != E_OK) { LOGE("[CloudSyncer] Failed to get local water mark when upload, %d.", ret); return ret; @@ -543,7 +562,7 @@ int CloudSyncer::DoUploadInner(CloudSyncer::TaskId taskId) } InnerProcessInfo innerProcessInfo; - innerProcessInfo.upLoadInfo.total = count; + innerProcessInfo.tableName = tableName; uint32_t batchIndex = 0; while (!CheckCloudSyncDataEmpty(uploadData)) { @@ -595,14 +614,11 @@ int CloudSyncer::DoUploadInner(CloudSyncer::TaskId taskId) return ret; } - // After each batch upload successed, call NotifyProcess. - { - std::lock_guard autoLock(contextLock_); - currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], innerProcessInfo); - } - ClearCloudSyncData(uploadData); - + + if (continueStmtToken == nullptr) { + break; + } ret = storageProxy_->GetCloudDataNext(continueStmtToken, uploadData); if ((ret != E_OK) && (ret != -E_UNFINISHED)) { LOGE("[CloudSyncer] Failed to get cloud data next when doing upload, %d.", ret); @@ -617,10 +633,14 @@ int CloudSyncer::CheckDataValid(const DownloadData &data, std::string &cursor) return E_OK; } -int CloudSyncer::QueryCloudData(const std::string &tableName, std::vector &data) +int CloudSyncer::QueryCloudData(const std::string &tableName, DownloadData &downloadData) { CloudWaterMark cloudWaterMark; int ret = storageProxy_->GetCloudWaterMark(tableName, cloudWaterMark); + if (cloudWaterMark == "") { + cloudWaterMark = "0"; + } + if (ret != E_OK) { LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data: %d.", ret); return ret; @@ -628,17 +648,18 @@ int CloudSyncer::QueryCloudData(const std::string &tableName, std::vector &devices, SyncMode mode) @@ -823,6 +844,7 @@ int CloudSyncer::GetLatestLocalWatrMark(CloudSyncData &uploadData, LocalWaterMar void CloudSyncer::ClearCloudSyncData(CloudSyncData &uploadData) { std::vector().swap(uploadData.insData.record); std::vector().swap(uploadData.insData.extend); + std::vector().swap(uploadData.insData.rowid); std::vector().swap(uploadData.updData.record); std::vector().swap(uploadData.updData.extend); std::vector().swap(uploadData.delData.record); diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h index 0c98ef59f0a689c91452528f2e37bcee0c355118..62567b813e00bc8c6e6c837739c9ad05ce1a2710 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h @@ -106,7 +106,7 @@ protected: int CheckDataValid(const DownloadData &data, std::string &cursor); - int QueryCloudData(const std::string &tableName, std::vector &data); + int QueryCloudData(const std::string &tableName, DownloadData &downloadData); bool CheckTaskIdValid(TaskId taskId); diff --git a/frameworks/libs/distributeddb/test/unittest/common/storage/cloud/distributeddb_interfaces_relational_cloud_sync_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/storage/cloud/distributeddb_interfaces_relational_cloud_sync_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..76da430348e0155c29d035895ac8a9da0c2b9038 --- /dev/null +++ b/frameworks/libs/distributeddb/test/unittest/common/storage/cloud/distributeddb_interfaces_relational_cloud_sync_test.cpp @@ -0,0 +1,457 @@ +/* + * Copyright (c) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifdef RELATIONAL_STORE +#include + +#include "distributeddb_tools_unit_test.h" +#include "relational_store_manager.h" +#include "distributeddb_data_generate_unit_test.h" +#include "relational_store_instance.h" +#include "sqlite_relational_store.h" +#include "log_table_manager_factory.h" +#include "cloud_db_constant.h" +#include "virtual_cloud_db.h" +#include "time_helper.h" + +using namespace testing::ext; +using namespace DistributedDB; +using namespace DistributedDBUnitTest; +using namespace std; + +namespace { +string g_storeID = "Relational_Store_SYNC"; +const string g_tableName1 = "worker1"; +const string g_tableName2 = "worker2"; +const int64_t g_startTime = 100000; +string DEVICE_CLOUD = "cloud"; +string DB_SUFFIX = ".db"; +string g_testDir; +string g_storePath; +std::mutex g_processMutex; +std::condition_variable g_processCondition; +std::shared_ptr g_virtualCloudDb; +DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID); +using CloudSyncStatusCallback = std::function; +const std::string CREATE_LOCAL_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS " + g_tableName1 + "(" \ + "name TEXT PRIMARY KEY," \ + "height REAL ," \ + "married INT ," \ + "photo BLOB NOT NULL," \ + "assert BLOB," \ + "age INT);"; +const std::string COMPOSITE_PRIMARY_KEY_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS " + g_tableName2 + "(" \ + "id INTEGER PRIMARY KEY," \ + "name TEXT ," \ + "height REAL ," \ + "photo BLOB ," \ + "asserts BLOB," \ + "age INT);"; +const std::vector g_cloudFiled1 = + { { "name", TYPE_INDEX , true}, { "height", TYPE_INDEX }, + { "married", TYPE_INDEX }, { "photo", TYPE_INDEX, false, false }, + { "assert", TYPE_INDEX }, { "age", TYPE_INDEX } }; +const std::vector g_cloudFiled2 = + { { "id", TYPE_INDEX, true }, { "name", TYPE_INDEX }, + { "height", TYPE_INDEX }, { "photo", TYPE_INDEX }, + { "asserts", TYPE_INDEX }, { "age", TYPE_INDEX }}; +const std::vector g_tables = {g_tableName1, g_tableName2}; +const std::vector g_tablesPKey = {g_cloudFiled1[0].colName, g_cloudFiled2[0].colName}; +const std::vector g_prefix = {"Local", ""}; + + +void CreateUserDBAndTable(sqlite3 *&db) +{ + EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK); + EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_LOCAL_TABLE_SQL), SQLITE_OK); + EXPECT_EQ(RelationalTestUtils::ExecSql(db, COMPOSITE_PRIMARY_KEY_TABLE_SQL), SQLITE_OK); +} + +void InsertUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count, int64_t photoSize) { + std::string photo(photoSize, 'v'); + for (int64_t i = begin; i < count; ++i) { + string sql = "INSERT OR REPLACE INTO " + g_tableName1 + + " (name, height, married, photo, assert, age) VALUES ('Local" + std::to_string(i) + + "', '175.8', '0', '" + photo + "', '', '18');"; + ASSERT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK); + } + for (int64_t i = begin; i < count; ++i) { + string sql = "INSERT OR REPLACE INTO " + g_tableName2 + + " (id, name, height, photo, asserts, age) VALUES ('" + std::to_string(i) + "', 'Local" + + std::to_string(i) + "', '155.10', '"+ photo + "', '', '21');"; + ASSERT_EQ(RelationalTestUtils::ExecSql(db, sql), SQLITE_OK); + } +} + +void UpdateUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count) { + for (size_t i = 0; i < g_tables.size(); i++) { + string updateAge = "UPDATE " + g_tables[i] + " SET age = '99' where " + g_tablesPKey[i] + + " in ("; + for (int64_t j = begin; j < begin + count; ++j) { + updateAge += "'" + g_prefix[i] + std::to_string(j) + "',"; + } + updateAge.pop_back(); + updateAge += ");"; + ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK); + } +} + +void DeleteUserTableRecord(sqlite3 *&db, int64_t begin, int64_t count) { + for (size_t i = 0; i < g_tables.size(); i++) { + string updateAge = "Delete from " + g_tables[i] + " where " + g_tablesPKey[i] + + " in ("; + for (int64_t j = begin; j < count; ++j) { + updateAge += "'" + g_prefix[i] + std::to_string(j) + "',"; + } + updateAge.pop_back(); + updateAge += ");"; + ASSERT_EQ(RelationalTestUtils::ExecSql(db, updateAge), SQLITE_OK); + } +} + +void InsertCloudTableRecord(int64_t begin, int64_t count, int photoSize) +{ + std::vector photo(photoSize, 'v'); + std::vector record1; + std::vector extend1; + for (int64_t i = begin; i < count; ++i) { + Timestamp now = TimeHelper::GetSysCurrentTime(); + VBucket data; + data.insert_or_assign("name", "Cloud" + std::to_string(i)); + data.insert_or_assign("height", 166.0); + data.insert_or_assign("married", (bool)0); + data.insert_or_assign("photo", photo); + data.insert_or_assign("assert", KEY_1); + data.insert_or_assign("age", 13L); + record1.push_back(data); + VBucket log; + log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now); + log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now); + log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false); + extend1.push_back(log); + } + int errCode = g_virtualCloudDb->BatchInsert(g_tableName1, std::move(record1), extend1); + ASSERT_EQ(errCode, DBStatus::OK); + + std::vector record2; + std::vector extend2; + for (int64_t i = begin; i < count; ++i) { + Timestamp now = TimeHelper::GetSysCurrentTime(); + VBucket data; + data.insert_or_assign("id", i); + data.insert_or_assign("name", "Cloud" + std::to_string(i)); + data.insert_or_assign("height", 180.3); + data.insert_or_assign("photo", photo); + data.insert_or_assign("asserts", KEY_1); + data.insert_or_assign("age", 28L); + record2.push_back(data); + VBucket log; + log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i)); + log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now); + log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now); + log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false); + extend2.push_back(log); + } + errCode = g_virtualCloudDb->BatchInsert(g_tableName2, std::move(record2), extend2); + ASSERT_EQ(errCode, DBStatus::OK); +} + +int QueryCountCallback(void *data, int count, char **colValue, char **colName) +{ + if (count != 1) { + return 0; + } + int64_t expectCount = reinterpret_cast(data); + EXPECT_EQ(std::atoi(colValue[0]), expectCount); + return 0; +} + +void CheckDownloadResult(sqlite3 *&db, std::vector expectCounts) +{ + for (size_t i = 0; i < g_tables.size(); ++i) { + string queryDownload = "select count(*) from " + g_tables[i] + " where name " + + " like 'Cloud%'"; + EXPECT_EQ(sqlite3_exec(db, queryDownload.c_str(), QueryCountCallback, + reinterpret_cast(expectCounts[i]), nullptr), SQLITE_OK); + } +} + +void CheckCloudTotalCount(std::vector expectCounts) +{ + VBucket extend; + extend[CloudDbConstant::CURSOR_FIELD] = "0"; + for (size_t i = 0; i < g_tables.size(); ++i) { + std::vector data; + g_virtualCloudDb->Query(g_tables[i], extend, data); + LOGE("%s : %d", g_tables[i].c_str(), data.size()); + EXPECT_EQ(data.size(), (uint64_t)expectCounts[i]); // expectCount表示云端数据总量 + } +} + +void GetCloudDbSchema(DataBaseSchema &dataBaseSchema) +{ + TableSchema tableSchema1 = { + .name = g_tableName1, + .fields = g_cloudFiled1 + }; + TableSchema tableSchema2 = { + .name = g_tableName2, + .fields = g_cloudFiled2 + }; + dataBaseSchema.tables.push_back(tableSchema1); + dataBaseSchema.tables.push_back(tableSchema2); +} + +void GetCallback(SyncProcess &syncProcess, CloudSyncStatusCallback &callback) +{ + callback = [&syncProcess](SyncProcess process) { + syncProcess = process; + LOGI("current sync process status:%d, db status:%d ", syncProcess.process, syncProcess.errCode); + std::for_each(g_tables.begin(), g_tables.end(), [&](const auto &item) { + auto table1 = syncProcess.tableProcess.find(item); + if (table1 != syncProcess.tableProcess.end()) { + LOGI("table[%s], sync process status:%d, [downloadInfo](batchIndex:%u, total:%u, successCount:%u, " + "failCount:%u)\n [uploadInfo](batchIndex:%u, total:%u, successCount:%u,failCount:%u", + item.c_str(), table1->second.process, table1->second.downLoadInfo.batchIndex, + table1->second.downLoadInfo.total, table1->second.downLoadInfo.successCount, + table1->second.downLoadInfo.failCount, table1->second.upLoadInfo.total, + table1->second.upLoadInfo.successCount, table1->second.upLoadInfo.failCount); + } + }); + }; +} + +void WaitForSyncFinish(SyncProcess &syncProcess, const int64_t &waitTime) +{ + std::unique_lock lock(g_processMutex); + bool result = g_processCondition.wait_for(lock, std::chrono::seconds(waitTime), [&syncProcess](){ + return syncProcess.process == FINISHED; + }); + ASSERT_EQ(result, true); +} + +void InitStoreProp(const std::string &storePath, const std::string &appId, const std::string &userId, + RelationalDBProperties &properties) +{ + properties.SetStringProp(RelationalDBProperties::DATA_DIR, storePath); + properties.SetStringProp(RelationalDBProperties::APP_ID, appId); + properties.SetStringProp(RelationalDBProperties::USER_ID, userId); + properties.SetStringProp(RelationalDBProperties::STORE_ID, g_storeID); + std::string identifier = userId + "-" + appId + "-" + g_storeID; + std::string hashIdentifier = DBCommon::TransferHashString(identifier); + properties.SetStringProp(RelationalDBProperties::IDENTIFIER_DATA, hashIdentifier); +} + +class DistributedDBInterfacesRelationalCloudSyncTest : public testing::Test { +public: + static void SetUpTestCase(void); + static void TearDownTestCase(void); + void SetUp(); + void TearDown(); +protected: + sqlite3 *db = nullptr; + RelationalStoreDelegate *delegate = nullptr; +}; + + +void DistributedDBInterfacesRelationalCloudSyncTest::SetUpTestCase(void) +{ + DistributedDBToolsUnitTest::TestDirInit(g_testDir); + g_storePath = g_testDir + "/" + g_storeID + DB_SUFFIX; + LOGI("The test db is:%s", g_testDir.c_str()); +} + +void DistributedDBInterfacesRelationalCloudSyncTest::TearDownTestCase(void) +{} + +void DistributedDBInterfacesRelationalCloudSyncTest::SetUp(void) +{ + if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) { + LOGE("rm test db files error."); + } + DistributedDBToolsUnitTest::PrintTestCaseInfo(); + LOGD("Test dir is %s", g_testDir.c_str()); + db = RelationalTestUtils::CreateDataBase(g_storePath); + ASSERT_NE(db, nullptr); + CreateUserDBAndTable(db); + ASSERT_EQ(g_mgr.OpenStore(g_storePath, g_storeID, RelationalStoreDelegate::Option {}, delegate), DBStatus::OK); + ASSERT_NE(delegate, nullptr); + ASSERT_EQ(delegate->CreateDistributedTable(g_tableName1, CLOUD_COOPERATION), DBStatus::OK); + ASSERT_EQ(delegate->CreateDistributedTable(g_tableName2, CLOUD_COOPERATION), DBStatus::OK); + g_virtualCloudDb = make_shared(); + ASSERT_EQ(delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK); + DataBaseSchema dataBaseSchema; + GetCloudDbSchema(dataBaseSchema); + ASSERT_EQ(delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK); +} + +void DistributedDBInterfacesRelationalCloudSyncTest::TearDown(void) +{ + g_virtualCloudDb = nullptr; + if (delegate != nullptr) { + EXPECT_EQ(g_mgr.CloseStore(delegate), DBStatus::OK); + delegate = nullptr; + } + EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK); + if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) { + LOGE("rm test db files error."); + } +} + +/** + * @tc.name: CloudSyncTest001 + * @tc.desc: Cloud data is older than local data. + * @tc.type: FUNC + * @tc.require: + * @tc.author: bty + */ +HWTEST_F(DistributedDBInterfacesRelationalCloudSyncTest, CloudSyncTest001, TestSize.Level1) +{ + int64_t paddingSize = 10; + LOGD("insert cloud record worker1[primary key]:[cloud0 - cloud20] , worker2[primary key]:[0 - 20]"); + InsertCloudTableRecord(0, 20, paddingSize); + LOGD("insert user record worker1[primary key]:[local0 - cloud10] , worker2[primary key]:[0 - 10]"); + InsertUserTableRecord(db, 0, 10, paddingSize); + Query query = Query::Select().FromTable(g_tables); + int64_t waitTime = 10; + SyncProcess syncProcess; + CloudSyncStatusCallback callback; + GetCallback(syncProcess, callback); + LOGD("call sync"); + ASSERT_EQ(delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, waitTime), DBStatus::OK); + LOGD("wait sync"); + WaitForSyncFinish(syncProcess, waitTime); + LOGD("expect get result download worker1[primary key]:[cloud0 - cloud20], worker2[primary key]:[10 - 20]"); + CheckDownloadResult(db, {20L, 10L}); + LOGD("expect get result upload worker1[primary key]:[local0 - local10], worker2[primary key]:[0 - 10]"); + CheckCloudTotalCount({30L, 20L}); +} + +/** + * @tc.name: CloudSyncTest002 + * @tc.desc: Local data is older than cloud data. + * @tc.type: FUNC + * @tc.require: + * @tc.author: bty + */ +HWTEST_F(DistributedDBInterfacesRelationalCloudSyncTest, CloudSyncTest002, TestSize.Level1) +{ + int64_t paddingSize = 100; + LOGD("insert user record worker1[primary key]:[local0 - local20] , worker2[primary key]:[0 - 20]"); + InsertUserTableRecord(db, 0, 20, paddingSize); + LOGD("insert cloud record worker1[primary key]:[cloud0 - cloud10] , worker2[primary key]:[0 - 10]"); + InsertCloudTableRecord(0, 10, paddingSize); + Query query = Query::Select().FromTable(g_tables); + int64_t waitTime = 30; + SyncProcess syncProcess; + CloudSyncStatusCallback callback; + GetCallback(syncProcess, callback); + ASSERT_EQ(delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, waitTime), DBStatus::OK); + WaitForSyncFinish(syncProcess, waitTime); + LOGD("expect get result download worker1[primary key]:[cloud0 - cloud10], worker2[primary key]:[0 - 10]"); + CheckDownloadResult(db, {10L, 10L}); + LOGD("expect get result upload worker1[primary key]:[local0 - local20], worker2[primary key]:[10 - 20]"); + CheckCloudTotalCount({30L, 20L}); +} + +/** + * @tc.name: CloudSyncTest003 + * @tc.desc: test with update and delete operator + * @tc.type: FUNC + * @tc.require: + * @tc.author: bty + */ +HWTEST_F(DistributedDBInterfacesRelationalCloudSyncTest, CloudSyncTest003, TestSize.Level1) +{ + int64_t paddingSize = 10; + LOGD("insert cloud record worker1[primary key]:[cloud0 - cloud20] , worker2[primary key]:[0 - 20]"); + InsertCloudTableRecord(0, 20, paddingSize); + LOGD("insert user record worker1[primary key]:[local0 - cloud20] , worker2[primary key]:[0 - 20]"); + InsertUserTableRecord(db, 0, 20, paddingSize); + Query query = Query::Select().FromTable(g_tables); + int64_t waitTime = 10; + SyncProcess syncProcess; + CloudSyncStatusCallback callback; + GetCallback(syncProcess, callback); + LOGD("call sync"); + ASSERT_EQ(delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, waitTime), DBStatus::OK); + LOGD("wait sync"); + WaitForSyncFinish(syncProcess, waitTime); + LOGD("expect get result download worker1[primary key]:[cloud0 - cloud20], worker2[primary key]:none"); + CheckDownloadResult(db, {20L, 0L}); + LOGD("expect get result upload worker1[primary key]:[local0 - local20], worker2[primary key]:[0 - 20]"); + CheckCloudTotalCount({40L, 20L}); + + LOGD("update local record worker1[primary key]:[local5 - local15] , worker2[primary key]:[5 - 15]"); + UpdateUserTableRecord(db, 5, 10); + LOGD("call sync"); + ASSERT_EQ(delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, waitTime), DBStatus::OK); + LOGD("wait sync"); + WaitForSyncFinish(syncProcess, waitTime); + + LOGD("expect get result upload worker1[primary key]:[local5 - local15]"); + VBucket extend; + extend[CloudDbConstant::CURSOR_FIELD] = "0"; + std::vector data1; + g_virtualCloudDb->Query(g_tables[0], extend, data1); + for (int j = 24; j < 35; ++j) { + EXPECT_EQ(std::get(data1[j]["age"]), 99); + } + + LOGD("expect get result upload worker2[primary key]:[5 - 15]"); + std::vector data2; + g_virtualCloudDb->Query(g_tables[1], extend, data2); + for (int j = 5; j < 15; ++j) { + EXPECT_EQ(std::get(data2[j]["age"]), 99); + } + + LOGD("delete local record worker1[primary key]:[local0 - local3] , worker2[primary key]:[0 - 3]"); + DeleteUserTableRecord(db, 0, 3); + + LOGD("call sync"); + ASSERT_EQ(delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, waitTime), DBStatus::OK); + LOGD("wait sync"); + WaitForSyncFinish(syncProcess, waitTime); + + LOGD("expect get result upload worker1[primary key]:[local0 - local3], worker2[primary key]:[0 - 3]"); + CheckCloudTotalCount({37L, 17L}); +} + +/** + * @tc.name: CloudSyncTest004 + * @tc.desc: Random write of local and cloud data + * @tc.type: FUNC + * @tc.require: + * @tc.author: bty + */ +HWTEST_F(DistributedDBInterfacesRelationalCloudSyncTest, CloudSyncTest004, TestSize.Level1) +{ + vector threads; + threads.emplace_back(InsertCloudTableRecord, 0, 30, 1024); + threads.emplace_back(InsertUserTableRecord, std::ref(db), 0, 30, 1024); + for (auto &thread: threads) { + thread.join(); + } + Query query = Query::Select().FromTable(g_tables); + int64_t waitTime = 30; + SyncProcess syncProcess; + CloudSyncStatusCallback callback; + GetCallback(syncProcess, callback); + ASSERT_EQ(delegate->Sync({DEVICE_CLOUD}, SYNC_MODE_CLOUD_MERGE, query, callback, waitTime), DBStatus::OK); + WaitForSyncFinish(syncProcess, waitTime); +} +} +#endif // RELATIONAL_STORE \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/unittest/common/storage/distributeddb_relational_cloud_syncable_storage_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/storage/distributeddb_relational_cloud_syncable_storage_test.cpp index 19f43522eba16aa23efc45eea78984df1ba8a81f..55951422f5222f7dfaa03eec2b6892cf328064d4 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/storage/distributeddb_relational_cloud_syncable_storage_test.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/storage/distributeddb_relational_cloud_syncable_storage_test.cpp @@ -22,7 +22,9 @@ #include "relational_store_instance.h" #include "sqlite_relational_store.h" #include "log_table_manager_factory.h" -#include "cloud/cloud_db_constant.h" +#include "cloud_db_constant.h" +#include "runtime_config.h" +#include "virtual_cloud_data_translate.h" using namespace testing::ext; using namespace DistributedDB; @@ -670,6 +672,10 @@ HWTEST_F(DistributedDBRelationalCloudSyncableStorageTest, GetCloudData001, TestS tableSchema.fields = g_cloudFiled; ContinueToken token; CloudSyncData cloudSyncData; + DataBaseSchema dataBaseSchema; + dataBaseSchema.tables.push_back(tableSchema); + EXPECT_EQ(g_cloudStore->SetCloudDbSchema(dataBaseSchema), E_OK); + RuntimeConfig::SetCloudTranslate(std::make_shared()); /** * @tc.steps: There is currently no handle under the transaction @@ -677,6 +683,7 @@ HWTEST_F(DistributedDBRelationalCloudSyncableStorageTest, GetCloudData001, TestS */ EXPECT_EQ(g_cloudStore->GetCloudData(tableSchema, g_startTime + 10, token, cloudSyncData), -E_INVALID_DB); + EXPECT_EQ(storageProxy->StartTransaction(), E_OK); EXPECT_EQ(storageProxy->GetCloudData(g_tableName, g_startTime + 10, token, cloudSyncData), E_OK); EXPECT_EQ(storageProxy->Commit(), E_OK); @@ -716,6 +723,14 @@ HWTEST_F(DistributedDBRelationalCloudSyncableStorageTest, GetCloudData002, TestS InitLogData(insCount, insCount, insCount, insCount); CreateAndInitUserTable(2 * insCount, 1024 * 8); + TableSchema tableSchema; + tableSchema.name = g_tableName; + tableSchema.fields = g_cloudFiled; + DataBaseSchema dataBaseSchema; + dataBaseSchema.tables.push_back(tableSchema); + EXPECT_EQ(g_cloudStore->SetCloudDbSchema(dataBaseSchema), E_OK); + RuntimeConfig::SetCloudTranslate(std::make_shared()); + /** * @tc.steps: GetCloudData has not finished querying yet. * @tc.expected: return -E_UNFINISHED. @@ -763,6 +778,14 @@ HWTEST_F(DistributedDBRelationalCloudSyncableStorageTest, GetCloudData003, TestS InitLogData(insCount, insCount, insCount, insCount); CreateAndInitUserTable(2 * insCount, 1024 * 8); + TableSchema tableSchema; + tableSchema.name = g_tableName; + tableSchema.fields = g_cloudFiled; + DataBaseSchema dataBaseSchema; + dataBaseSchema.tables.push_back(tableSchema); + EXPECT_EQ(g_cloudStore->SetCloudDbSchema(dataBaseSchema), E_OK); + RuntimeConfig::SetCloudTranslate(std::make_shared()); + ContinueToken token; CloudSyncData cloudSyncData; EXPECT_EQ(storageProxy->ReleaseContinueToken(token), E_OK); diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp index 3d74d9faa624aba81856d4811bbf6d516c0c966b..d5bb69556e22bff9aa928eb95a582f46cfbf5c6b 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp @@ -39,7 +39,7 @@ DBStatus VirtualCloudDb::BatchInsert(const std::string &tableName, std::vector(extend[CURSOR_FIELD]); for (auto &tableData : cloudData_[tableName]) { + std::string srcCursor = std::get(tableData.extend[CURSOR_FIELD]); if (std::stol(srcCursor) > std::stol(cursor)) { VBucket bucket = tableData.record; for (const auto &ex: tableData.extend) { bucket.insert(ex); } + std::string gid = std::get(bucket[GID_FIELD]); + LOGW("query gid %s", gid.c_str()); data.push_back(std::move(bucket)); } if (data.size() >= static_cast(queryLimit_)) { diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.h b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.h index 89410360e1e3abaf9870ef835460f9d7ba06d483..198082dd6c632fa64a0d5c9fab1c42b45016330f 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.h +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.h @@ -55,7 +55,7 @@ private: std::atomic lockStatus_ = false; std::atomic blockTimeMs_ = 0; std::atomic currentGid_ = 0; - std::atomic currentCursor_ = 0; + std::atomic currentCursor_ = 1; std::atomic queryLimit_ = 100; std::mutex cloudDataMutex_; std::map> cloudData_;