From 993b05fe08623dcd30bb0a6a133687881fa14f7d Mon Sep 17 00:00:00 2001 From: shawn Date: Thu, 13 Nov 2025 15:25:16 +0800 Subject: [PATCH] Optimize upload data exception handling Signed-off-by: shawn --- .../common/include/cloud/cloud_db_types.h | 1 + .../distributeddb/common/include/db_errno.h | 1 + .../interfaces/include/store_types.h | 1 + .../include/cloud/cloud_storage_utils.h | 3 ++ .../storage/src/cloud/cloud_storage_utils.cpp | 13 +++++++ ...ver_relational_storage_extend_executor.cpp | 3 ++ .../syncer/src/cloud/cloud_db_proxy.cpp | 2 + .../syncer/src/cloud/cloud_syncer.cpp | 4 +- .../syncer/src/cloud/cloud_syncer.h | 4 ++ .../syncer/src/cloud/cloud_syncer_extend.cpp | 19 +++++++--- .../src/cloud/cloud_syncer_extend_extend.cpp | 23 ++++++++++++ .../syncer/src/cloud/icloud_syncer.h | 1 + .../rdb/distributeddb_basic_rdb_test.cpp | 37 +++++++++++++++++++ .../common/syncer/cloud/virtual_cloud_db.cpp | 20 +++++++++- .../common/syncer/cloud/virtual_cloud_db.h | 3 ++ 15 files changed, 125 insertions(+), 10 deletions(-) diff --git a/frameworks/libs/distributeddb/common/include/cloud/cloud_db_types.h b/frameworks/libs/distributeddb/common/include/cloud/cloud_db_types.h index a5b1f60032d..105e5d1e150 100644 --- a/frameworks/libs/distributeddb/common/include/cloud/cloud_db_types.h +++ b/frameworks/libs/distributeddb/common/include/cloud/cloud_db_types.h @@ -34,6 +34,7 @@ struct CloudSyncBatch { std::vector timestamp; std::vector assets; std::vector hashKey; + bool abnormal = false; }; struct ReviseModTimeInfo { diff --git a/frameworks/libs/distributeddb/common/include/db_errno.h b/frameworks/libs/distributeddb/common/include/db_errno.h index 66d7d8b7a53..62e9431760c 100644 --- a/frameworks/libs/distributeddb/common/include/db_errno.h +++ b/frameworks/libs/distributeddb/common/include/db_errno.h @@ -153,6 +153,7 @@ constexpr const int E_CLOUD_SYNC_TASK_MERGED = (E_BASE + 128); // sync task is m constexpr const int E_SQLITE_CANT_OPEN = (E_BASE + 129); // the sqlite cannot open. constexpr const int E_LOCAL_ASSET_NOT_FOUND = (E_BASE + 130); // local asset not found. constexpr const int E_ASSET_NOT_FOUND_FOR_DOWN_ONLY = (E_BASE + 131); // asset not found for download asset only. +constexpr const int E_FILE_NOT_FOUND = (E_BASE + 132); // local asset file abnormality // Num 150+ is reserved for schema related errno, since it may be added regularly constexpr const int E_JSON_PARSE_FAIL = (E_BASE + 150); // Parse json fail in grammatical level constexpr const int E_JSON_INSERT_PATH_EXIST = (E_BASE + 151); // Path already exist before insert diff --git a/frameworks/libs/distributeddb/interfaces/include/store_types.h b/frameworks/libs/distributeddb/interfaces/include/store_types.h index bbf20a7c82a..6395a9be59a 100644 --- a/frameworks/libs/distributeddb/interfaces/include/store_types.h +++ b/frameworks/libs/distributeddb/interfaces/include/store_types.h @@ -97,6 +97,7 @@ enum DBStatus { LOW_VERSION_TARGET, // The target device is a low version device NEED_CORRECT_TARGET_USER, // The target user ID is incorrect and needs to be re-obtained CLOUD_ASSET_NOT_FOUND, // The cloud download asset return 404 error + FILE_NOT_FOUND, // local asset file abnormality BUTT_STATUS = 27394048 // end of status }; diff --git a/frameworks/libs/distributeddb/storage/include/cloud/cloud_storage_utils.h b/frameworks/libs/distributeddb/storage/include/cloud/cloud_storage_utils.h index addeeb3cb22..7e90cfa024b 100644 --- a/frameworks/libs/distributeddb/storage/include/cloud/cloud_storage_utils.h +++ b/frameworks/libs/distributeddb/storage/include/cloud/cloud_storage_utils.h @@ -81,6 +81,9 @@ public: static int FillAssetForUploadFailed(Asset &asset, Asset &dbAsset, AssetOperationUtils::AssetOpType assetOpType); static void FillAssetsForUploadFailed(Assets &assets, Assets &dbAssets, const std::map &assetOpTypeMap); + static int FillAssetForAbnormal(Asset &asset, Asset &dbAsset, AssetOperationUtils::AssetOpType assetOpType); + static void FillAssetsForAbnormal(Assets &assets, Assets &dbAssets, + const std::map &assetOpTypeMap); static void PrepareToFillAssetFromVBucket(VBucket &vBucket, std::function fillAsset); static void FillAssetFromVBucketFinish(const AssetOperationUtils::RecordAssetOpType &assetOpType, VBucket &vBucket, VBucket &dbAssets, std::function fillAsset, diff --git a/frameworks/libs/distributeddb/storage/src/cloud/cloud_storage_utils.cpp b/frameworks/libs/distributeddb/storage/src/cloud/cloud_storage_utils.cpp index d3dacdd31da..646529b4aed 100644 --- a/frameworks/libs/distributeddb/storage/src/cloud/cloud_storage_utils.cpp +++ b/frameworks/libs/distributeddb/storage/src/cloud/cloud_storage_utils.cpp @@ -755,6 +755,19 @@ void CloudStorageUtils::FillAssetsForUploadFailed(Assets &assets, Assets &dbAsse MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetForUploadFailed); } +int CloudStorageUtils::FillAssetForAbnormal(Asset &asset, Asset &dbAsset, + AssetOperationUtils::AssetOpType assetOpType) +{ + dbAsset.status = AssetStatus::ABNORMAL; + return E_OK; +} + +void CloudStorageUtils::FillAssetsForAbnormal(Assets &assets, Assets &dbAssets, + const std::map &assetOpTypeMap) +{ + MergeAssetWithFillFunc(assets, dbAssets, assetOpTypeMap, FillAssetForAbnormal); +} + int CloudStorageUtils::FillAssetAfterDownloadFail(Asset &asset, Asset &dbAsset, AssetOperationUtils::AssetOpType assetOpType) { diff --git a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_extend_executor.cpp b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_extend_executor.cpp index 6d67e5a4df8..2a6aef81fed 100644 --- a/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_extend_executor.cpp +++ b/frameworks/libs/distributeddb/storage/src/sqlite/relational/sqlite_single_ver_relational_storage_extend_executor.cpp @@ -354,6 +354,9 @@ int SQLiteSingleVerRelationalStorageExecutor::InitFillUploadAssetStatement(OpTyp if (DBCommon::IsRecordError(data.extend.at(index)) || DBCommon::IsRecordAssetsMissing(data.extend.at(index))) { CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets, CloudStorageUtils::FillAssetForUploadFailed, CloudStorageUtils::FillAssetsForUploadFailed); + } else if(data.abnormal) { + CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets, + CloudStorageUtils::FillAssetForAbnormal, CloudStorageUtils::FillAssetsForAbnormal); } else { CloudStorageUtils::FillAssetFromVBucketFinish(assetOpType, vBucket, dbAssets, CloudStorageUtils::FillAssetForUpload, CloudStorageUtils::FillAssetsForUpload); diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp index 27a685682b4..beee686b5f2 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_db_proxy.cpp @@ -502,6 +502,8 @@ int CloudDBProxy::GetInnerErrorCode(DBStatus status) return -E_CLOUD_DISABLED; case CLOUD_ASSET_NOT_FOUND: return -E_CLOUD_ASSET_NOT_FOUND; + case FILE_NOT_FOUND: + return -E_FILE_NOT_FOUND; default: return -E_CLOUD_ERROR; } diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp index 789262c17fd..7d1efca3915 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.cpp @@ -1231,7 +1231,6 @@ int CloudSyncer::DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadPar } // After each batch upload successed, call NotifyProcess NotifyInBatchUpload(uploadParam, innerProcessInfo, lastBatch); - // if batch upload successed, update local water mark // The cloud water mark cannot be updated here, because the cloud api doesn't return cursor here. errCode = PutWaterMarkAfterBatchUpload(uploadData.tableName, uploadParam); @@ -1962,8 +1961,9 @@ void CloudSyncer::ClearContextAndNotify(TaskId taskId, int errCode) LOGW("[CloudSyncer] clear unlocking status failed! errCode = %d", err); } contextCv_.notify_all(); + // input errCode have higher priority info.tempErrCode if (info.errCode == E_OK) { - info.errCode = errCode; + info.errCode = errCode == E_OK ? info.tempErrCode : errCode; } LOGI("[CloudSyncer] finished storeId %.3s taskId %" PRIu64 " errCode %d", info.storeId.c_str(), taskId, info.errCode); diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h index d6bc53b30a7..e4cf4f23977 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer.h @@ -556,6 +556,10 @@ protected: bool IsCloudForcePush(TaskId taskId); + TaskId GetCurrentTaskId(); + + int SetIfLocalFileNotFound(TaskId taskId); + mutable std::mutex dataLock_; TaskId lastTaskId_; std::multimap> taskQueue_; diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend.cpp index b64e84de116..9cd87c7ede0 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend.cpp @@ -187,6 +187,15 @@ int CloudSyncer::BatchInsertOrUpdate(Info &uploadInfo, CloudSyncData &uploadData } innerProcessInfo.upLoadInfo.successCount += uploadInfo.successCount; innerProcessInfo.upLoadInfo.failCount += uploadInfo.failCount; + if (errCode == -E_FILE_NOT_FOUND) { + TaskId currentTaskId = GetCurrentTaskId(); + errCode = SetIfLocalFileNotFound(currentTaskId); + if (isInsert) { + uploadData.insData.abnormal = true; + } else { + uploadData.updData.abnormal = true; + } + } if (errCode == -E_CLOUD_VERSION_CONFLICT) { ProcessVersionConflictInfo(innerProcessInfo, retryCount); } @@ -723,6 +732,10 @@ int CloudSyncer::BatchDelete(Info &deleteInfo, CloudSyncData &uploadData, InnerP innerProcessInfo.upLoadInfo.successCount += deleteInfo.successCount; innerProcessInfo.upLoadInfo.deleteCount += deleteInfo.successCount; innerProcessInfo.upLoadInfo.failCount += deleteInfo.failCount; + if (errCode == -E_FILE_NOT_FOUND) { + TaskId currentTaskId = GetCurrentTaskId(); + errCode = SetIfLocalFileNotFound(currentTaskId); + } if (errCode == -E_CLOUD_VERSION_CONFLICT) { ProcessVersionConflictInfo(innerProcessInfo, retryCount); } @@ -2142,10 +2155,4 @@ int CloudSyncer::ClearCloudWatermark(std::function &clearFunc) std::lock_guard lock(syncMutex_); return clearFunc(); } - -bool CloudSyncer::IsCloudForcePush(TaskId taskId) -{ - std::lock_guard autoLock(dataLock_); - return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH; -} } \ No newline at end of file diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp index bd6735be435..215fea99b74 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp +++ b/frameworks/libs/distributeddb/syncer/src/cloud/cloud_syncer_extend_extend.cpp @@ -33,6 +33,12 @@ #include "version.h" namespace DistributedDB { +bool CloudSyncer::IsCloudForcePush(TaskId taskId) +{ + std::lock_guard autoLock(dataLock_); + return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH; +} + int CloudSyncer::HandleDownloadResultForAsyncDownload(const DownloadItem &downloadItem, InnerProcessInfo &info, DownloadCommitList &commitList, uint32_t &successCount) { @@ -118,4 +124,21 @@ void CloudSyncer::ExecuteAsyncDownloadAssets(TaskId taskId) } asyncTaskCv_.notify_all(); } + +TaskId CloudSyncer::GetCurrentTaskId() +{ + std::lock_guard guard(dataLock_); + return currentContext_.currentTaskId; +} + +int CloudSyncer::SetIfLocalFileNotFound(TaskId taskId) +{ + std::lock_guard guard(dataLock_); + auto it = cloudTaskInfos_.find(taskId); + if (it == cloudTaskInfos_.end()) { + return -E_INTERNAL_ERROR; + } + (it->second).tempErrCode = -E_FILE_NOT_FOUND; + return E_OK; +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/syncer/src/cloud/icloud_syncer.h b/frameworks/libs/distributeddb/syncer/src/cloud/icloud_syncer.h index 92faf8133c3..d95b559d438 100644 --- a/frameworks/libs/distributeddb/syncer/src/cloud/icloud_syncer.h +++ b/frameworks/libs/distributeddb/syncer/src/cloud/icloud_syncer.h @@ -37,6 +37,7 @@ public: bool merge = false; bool asyncDownloadAssets = false; int errCode = 0; + int tempErrCode = 0; SyncMode mode = SyncMode::SYNC_MODE_PUSH_ONLY; ProcessStatus status = ProcessStatus::PREPARED; LockAction lockAction = LockAction::INSERT; diff --git a/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_basic_rdb_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_basic_rdb_test.cpp index c81788f59cb..0683197ad7d 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_basic_rdb_test.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/store_test/rdb/distributeddb_basic_rdb_test.cpp @@ -200,6 +200,43 @@ HWTEST_F(DistributedDBBasicRDBTest, RdbCloudSyncExample002, TestSize.Level0) RDBGeneralUt::CloudBlockSync(info1, query); EXPECT_EQ(RDBGeneralUt::CountTableData(info1, g_defaultTable1), 20); } + +/** + * @tc.name: RdbCloudSyncExample003 + * @tc.desc: Test upload failed, when return FILE_NOT_FOUND + * @tc.type: FUNC + * @tc.require: + * @tc.author: xiefengzhu + */ +HWTEST_F(DistributedDBBasicRDBTest, RdbCloudSyncExample003, TestSize.Level0) +{ + RelationalStoreDelegate::Option option; + option.tableMode = DistributedTableMode::COLLABORATION; + SetOption(option); + auto info1 = GetStoreInfo1(); + ASSERT_EQ(BasicUnitTest::InitDelegate(info1, "dev1"), E_OK); + InsertLocalDBData(0, 2, info1); + EXPECT_EQ(RDBGeneralUt::CountTableData(info1, g_defaultTable1), 2); + + std::shared_ptr virtualCloudDb = RDBGeneralUt::GetVirtualCloudDb(); + ASSERT_NE(virtualCloudDb, nullptr); + virtualCloudDb->SetLocalFileNotFound(true); + + ASSERT_EQ(SetDistributedTables(info1, {g_defaultTable1}, TableSyncType::CLOUD_COOPERATION), E_OK); + RDBGeneralUt::SetCloudDbConfig(info1); + Query query = Query::Select().FromTable({g_defaultTable1}); + RDBGeneralUt::CloudBlockSync(info1, query, OK, static_cast(-E_FILE_NOT_FOUND)); + + std::string sql = "UPDATE " + g_defaultTable1 + " SET name='update'"; + EXPECT_EQ(ExecuteSQL(sql, info1), E_OK); + EXPECT_EQ(RDBGeneralUt::CountTableData(info1, g_defaultTable1), 2); + RDBGeneralUt::CloudBlockSync(info1, query, OK, static_cast(-E_FILE_NOT_FOUND)); + + sql = "DELETE FROM " + g_defaultTable1 + " WHERE name='update'"; + EXPECT_EQ(ExecuteSQL(sql, info1), E_OK); + EXPECT_EQ(RDBGeneralUt::CountTableData(info1, g_defaultTable1), 0); + RDBGeneralUt::CloudBlockSync(info1, query, OK, static_cast(-E_FILE_NOT_FOUND)); +} #endif // USE_DISTRIBUTEDDB_CLOUD /** diff --git a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp index 8ffb2ac7bee..9ae85cd4747 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/syncer/cloud/virtual_cloud_db.cpp @@ -63,6 +63,9 @@ DBStatus VirtualCloudDb::BatchInsert(const std::string &tableName, std::vector &extend) @@ -176,7 +183,16 @@ DBStatus VirtualCloudDb::BatchDelete(const std::string &tableName, std::vector &insertCheckFunc); + + void SetLocalFileNotFound(bool isLocalFileAbnormal); private: DBStatus InnerBatchInsert(const std::string &tableName, std::vector &&record, std::vector &extend); @@ -132,6 +134,7 @@ private: std::atomic heartbeatError_ = false; std::atomic lockStatus_ = false; std::atomic conflictInUpload_ = false; + std::atomic localFileAbnormal_ = false; std::atomic blockTimeMs_ = 0; std::atomic heartbeatBlockTimeMs_ = 0; std::atomic currentGid_ = 0; -- Gitee