From b7a916bb32a866e0099b44979e3e5e7a03ce6e0b Mon Sep 17 00:00:00 2001 From: zqq Date: Thu, 11 Sep 2025 17:02:57 +0800 Subject: [PATCH] support remote pull notify Signed-off-by: zqq --- .../libs/distributeddb/include/types_export.h | 9 ++ .../interfaces/include/kv_store_nb_delegate.h | 9 ++ .../src/kv_store_nb_delegate_impl.cpp | 9 ++ .../src/kv_store_nb_delegate_impl.h | 2 + .../storage/include/ikvdb_connection.h | 2 + .../src/kv/generic_kvdb_connection.cpp | 6 + .../storage/src/kv/generic_kvdb_connection.h | 2 + .../storage/src/kv/sync_able_kvdb.cpp | 5 + .../storage/src/kv/sync_able_kvdb.h | 2 + .../src/kv/sync_able_kvdb_connection.cpp | 9 ++ .../src/kv/sync_able_kvdb_connection.h | 2 + .../distributeddb/syncer/include/isyncer.h | 2 + .../syncer/include/syncer_proxy.h | 2 + .../syncer/src/device/generic_syncer.cpp | 20 +++ .../syncer/src/device/generic_syncer.h | 2 + .../syncer/src/device/isync_engine.h | 2 + .../syncer/src/device/isync_task_context.h | 2 + .../device/singlever/single_ver_data_sync.cpp | 1 - .../single_ver_sync_state_machine.cpp | 1 + .../syncer/src/device/sync_engine.cpp | 60 ++++++++ .../syncer/src/device/sync_engine.h | 8 +- .../syncer/src/device/sync_task_context.cpp | 20 +++ .../syncer/src/device/sync_task_context.h | 7 + .../syncer/src/device/syncer_proxy.cpp | 8 ++ frameworks/libs/distributeddb/test/BUILD.gn | 7 +- .../unittest/common/common/kv_general_ut.cpp | 7 +- .../unittest/common/common/kv_general_ut.h | 1 + .../kv/distributeddb_kv_notify_test.cpp | 132 ++++++++++++++++++ 28 files changed, 335 insertions(+), 4 deletions(-) create mode 100644 frameworks/libs/distributeddb/test/unittest/common/store_test/kv/distributeddb_kv_notify_test.cpp diff --git a/frameworks/libs/distributeddb/include/types_export.h b/frameworks/libs/distributeddb/include/types_export.h index d471aa1be07..7bdac3a8b3e 100644 --- a/frameworks/libs/distributeddb/include/types_export.h +++ b/frameworks/libs/distributeddb/include/types_export.h @@ -244,5 +244,14 @@ struct StoreInfo { } }; using TranslateToDeviceIdCallback = std::function; + +struct DeviceSyncNotifyInfo { + std::string deviceId; +}; + +enum class DeviceSyncEvent : int { + REMOTE_PULL_STARTED = 0 +}; +using DeviceSyncNotifier = std::function; } // namespace DistributedDB #endif // DISTRIBUTEDDB_TYPES_EXPORT_H diff --git a/frameworks/libs/distributeddb/interfaces/include/kv_store_nb_delegate.h b/frameworks/libs/distributeddb/interfaces/include/kv_store_nb_delegate.h index 8c63e761a10..24bb48050ed 100644 --- a/frameworks/libs/distributeddb/interfaces/include/kv_store_nb_delegate.h +++ b/frameworks/libs/distributeddb/interfaces/include/kv_store_nb_delegate.h @@ -332,6 +332,15 @@ public: { return OK; } + + // Set a notifier callback, it will be called when event {@link DeviceSyncEvent} happened. + // If set nullptr, means unregister the notifier. + // If repeat set, subject to the last time. + DB_API virtual DBStatus SetDeviceSyncNotify([[gnu::unused]] DeviceSyncEvent event, + [[gnu::unused]] const DeviceSyncNotifier ¬ifier) + { + return OK; + } }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.cpp b/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.cpp index 1fdf381d191..defc53f1bca 100644 --- a/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.cpp +++ b/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.cpp @@ -1437,4 +1437,13 @@ void KvStoreNbDelegateImpl::SetHandle(void *handle) dlHandle_ = handle; #endif } + +DBStatus KvStoreNbDelegateImpl::SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) +{ + if (conn_ == nullptr) { + LOGE("%s", INVALID_CONNECTION); + return DB_ERROR; + } + return TransferDBErrno(conn_->SetDeviceSyncNotify(event, notifier)); +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.h b/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.h index 88323d87213..ff3acc5c1ef 100644 --- a/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.h +++ b/frameworks/libs/distributeddb/interfaces/src/kv_store_nb_delegate_impl.h @@ -201,6 +201,8 @@ public: DBStatus OperateDataStatus(uint32_t dataOperator) override; + DBStatus SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) override; + void SetHandle(void *handle); private: diff --git a/frameworks/libs/distributeddb/storage/include/ikvdb_connection.h b/frameworks/libs/distributeddb/storage/include/ikvdb_connection.h index b102ffb925a..3ad0172cc6d 100644 --- a/frameworks/libs/distributeddb/storage/include/ikvdb_connection.h +++ b/frameworks/libs/distributeddb/storage/include/ikvdb_connection.h @@ -174,6 +174,8 @@ public: virtual int ClearCloudWatermark() = 0; virtual int OperateDataStatus(uint32_t dataOperator) = 0; + + virtual int SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) = 0; }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/storage/src/kv/generic_kvdb_connection.cpp b/frameworks/libs/distributeddb/storage/src/kv/generic_kvdb_connection.cpp index 957d0dc8d2a..ca3566fbce9 100644 --- a/frameworks/libs/distributeddb/storage/src/kv/generic_kvdb_connection.cpp +++ b/frameworks/libs/distributeddb/storage/src/kv/generic_kvdb_connection.cpp @@ -466,4 +466,10 @@ int GenericKvDBConnection::OperateDataStatus([[gnu::unused]] uint32_t dataOperat { return -E_NOT_SUPPORT; } + +int GenericKvDBConnection::SetDeviceSyncNotify([[gnu::unused]] DeviceSyncEvent event, + [[gnu::unused]] const DeviceSyncNotifier ¬ifier) +{ + return -E_NOT_SUPPORT; +} } \ No newline at end of file diff --git a/frameworks/libs/distributeddb/storage/src/kv/generic_kvdb_connection.h b/frameworks/libs/distributeddb/storage/src/kv/generic_kvdb_connection.h index fe37fc7718f..194dc3f0677 100644 --- a/frameworks/libs/distributeddb/storage/src/kv/generic_kvdb_connection.h +++ b/frameworks/libs/distributeddb/storage/src/kv/generic_kvdb_connection.h @@ -116,6 +116,8 @@ public: int ClearCloudWatermark() override; int OperateDataStatus(uint32_t dataOperator) override; + + int SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) override; protected: // Get the stashed 'KvDB_ pointer' without ref. template diff --git a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb.cpp b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb.cpp index a624def667c..20436b6e991 100644 --- a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb.cpp +++ b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb.cpp @@ -738,4 +738,9 @@ bool SyncAbleKvDB::CheckSchemaSupportForCloudSync() const return true; // default is valid } #endif + +int SyncAbleKvDB::SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) +{ + return syncer_.SetDeviceSyncNotify(event, notifier); +} } diff --git a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb.h b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb.h index ae32b9bc3d9..6d137fc95de 100644 --- a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb.h +++ b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb.h @@ -119,6 +119,8 @@ public: void SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback); #endif + + int SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier); protected: virtual IKvDBSyncInterface *GetSyncInterface() = 0; diff --git a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.cpp b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.cpp index 6c7b10775b5..719ae4bf46e 100644 --- a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.cpp +++ b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.cpp @@ -457,4 +457,13 @@ int SyncAbleKvDBConnection::SetReceiveDataInterceptor(const DataInterceptor &int kvDB->SetReceiveDataInterceptor(interceptor); return E_OK; } + +int SyncAbleKvDBConnection::SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) +{ + auto *kvDB = GetDB(); + if (kvDB == nullptr) { + return -E_INVALID_CONNECTION; + } + return kvDB->SetDeviceSyncNotify(event, notifier); +} } diff --git a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.h b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.h index 4594b3dd9b4..441e12efd21 100644 --- a/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.h +++ b/frameworks/libs/distributeddb/storage/src/kv/sync_able_kvdb_connection.h @@ -50,6 +50,8 @@ public: void SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback) override; #endif + + int SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) override; protected: int DisableManualSync(); diff --git a/frameworks/libs/distributeddb/syncer/include/isyncer.h b/frameworks/libs/distributeddb/syncer/include/isyncer.h index 426f83df086..cfa1108e490 100644 --- a/frameworks/libs/distributeddb/syncer/include/isyncer.h +++ b/frameworks/libs/distributeddb/syncer/include/isyncer.h @@ -142,6 +142,8 @@ public: virtual int32_t GetTaskCount() = 0; virtual bool ExchangeClosePending(bool expected) = 0; + + virtual int SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) = 0; }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/syncer/include/syncer_proxy.h b/frameworks/libs/distributeddb/syncer/include/syncer_proxy.h index 481d592d395..4ed470732fe 100644 --- a/frameworks/libs/distributeddb/syncer/include/syncer_proxy.h +++ b/frameworks/libs/distributeddb/syncer/include/syncer_proxy.h @@ -123,6 +123,8 @@ public: int32_t GetTaskCount() override; bool ExchangeClosePending(bool expected) override; + + int SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) override; private: std::mutex syncerLock_; std::shared_ptr syncer_; diff --git a/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.cpp b/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.cpp index 426d110a290..7a104156cfd 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.cpp @@ -1314,4 +1314,24 @@ bool GenericSyncer::ExchangeClosePending(bool expected) RefObject::DecObjRef(syncEngine); return res; } + +int GenericSyncer::SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) +{ + if (event != DeviceSyncEvent::REMOTE_PULL_STARTED) { + LOGE("[GenericSyncer] Invalid device sync event[%d]", static_cast(event)); + return -E_INVALID_ARGS; + } + ISyncEngine *syncEngine = nullptr; + { + std::lock_guard lock(syncerLock_); + if (syncEngine_ == nullptr) { + return -E_NOT_INIT; + } + syncEngine = syncEngine_; + RefObject::IncObjRef(syncEngine); + } + syncEngine->SetRemotePullStartNotify(notifier); + RefObject::DecObjRef(syncEngine); + return E_OK; +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.h b/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.h index 0f002124c18..b8f1c74f2b5 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.h +++ b/frameworks/libs/distributeddb/syncer/src/device/generic_syncer.h @@ -120,6 +120,8 @@ public: int32_t GetTaskCount() override; bool ExchangeClosePending(bool expected) override; + + int SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) override; protected: // trigger query auto sync or auto subscribe diff --git a/frameworks/libs/distributeddb/syncer/src/device/isync_engine.h b/frameworks/libs/distributeddb/syncer/src/device/isync_engine.h index 0c652a39858..a599a68a398 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/isync_engine.h +++ b/frameworks/libs/distributeddb/syncer/src/device/isync_engine.h @@ -108,6 +108,8 @@ public: virtual int32_t GetRemoteQueryTaskCount() = 0; virtual bool ExchangeClosePending(bool expected) = 0; + + virtual void SetRemotePullStartNotify(const DeviceSyncNotifier ¬ifier) = 0; protected: ~ISyncEngine() override {}; }; diff --git a/frameworks/libs/distributeddb/syncer/src/device/isync_task_context.h b/frameworks/libs/distributeddb/syncer/src/device/isync_task_context.h index 0ad79138a64..f1bc040dffb 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/isync_task_context.h +++ b/frameworks/libs/distributeddb/syncer/src/device/isync_task_context.h @@ -196,6 +196,8 @@ public: virtual bool IsRetryTask() const = 0; virtual bool IsSavingTask(uint32_t timeout) const = 0; + + virtual void RegOnRemotePullStart(const std::function &callback) = 0; protected: virtual ~ISyncTaskContext() {}; }; diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_data_sync.cpp b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_data_sync.cpp index 9d9d10afbaa..72537fd5c00 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_data_sync.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_data_sync.cpp @@ -1024,7 +1024,6 @@ int SingleVerDataSync::DataRequestRecvInner(SingleVerSyncTaskContext *context, c return errCode; } SingleVerDataSyncUtils::UpdateSyncProcess(context, packet); - if (pullEndWatermark > 0 && !storage_->IsReadable()) { // pull mode pullEndWatermark = 0; errCode = SendDataAck(context, message, -E_EKEYREVOKED, dataTime.endTime); diff --git a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_state_machine.cpp b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_state_machine.cpp index 0b8e449df97..d182f79e090 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_state_machine.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/singlever/single_ver_sync_state_machine.cpp @@ -1023,6 +1023,7 @@ void SingleVerSyncStateMachine::AddPullResponseTarget(const Message *inMsg, Wate LOGE("[StateMachine][AddPullResponseTarget] pullEndWatermark is 0!"); return; } + context_->NotifyRemotePullStart(); if (context_->GetResponseSessionId() == sessionId || context_->FindResponseSyncTarget(sessionId)) { LOGI("[StateMachine][AddPullResponseTarget] task is already running"); return; diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp index 4b29db84ba7..f8c4da533a2 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.cpp @@ -683,6 +683,9 @@ ISyncTaskContext *SyncEngine::GetSyncTaskContext(const DeviceSyncTarget &target, storage->DecRefCount(); }); context->RegOnSyncTask([this, context] { return ExecSyncTask(context); }); + context->RegOnRemotePullStart([this](const std::string &dev) { + NotifyRemotePullStart(dev); + }); return context; } @@ -1497,4 +1500,61 @@ void SyncEngine::CorrectTargetUserId(std::mapSetTargetUserId(newTargetUserId); syncTaskContextMap_[{targetDev, newTargetUserId}] = context; } + +void SyncEngine::SetRemotePullStartNotify(const DeviceSyncNotifier ¬ifier) +{ + ExtendInfo extendInfo = GetExtendInfo(); + std::lock_guard autoLock(pullStartNotifierMutex_); + pullStartNotifier_ = notifier; + LOGI("[SyncEngine] Set remote pull notify finished appId[%s] userId[%s] storeId[%s]", + DBCommon::StringMiddleMasking(extendInfo.appId).c_str(), + DBCommon::StringMiddleMasking(extendInfo.userId).c_str(), + DBCommon::StringMiddleMasking(extendInfo.storeId).c_str()); +} + +void SyncEngine::NotifyRemotePullStart(const std::string &dev) +{ + ExtendInfo extendInfo = GetExtendInfo(); + RefObject::IncObjRef(this); + int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, dev, extendInfo]() { + DeviceSyncNotifier notifier; + { + std::lock_guard autoLock(pullStartNotifierMutex_); + if (pullStartNotifier_ == nullptr) { + RefObject::DecObjRef(this); + return; + } + notifier = pullStartNotifier_; + } + notifier({dev}); + LOGI("[SyncEngine] Notifier remote pull start appId[%s] userId[%s] storeId[%s]", + DBCommon::StringMiddleMasking(extendInfo.appId).c_str(), + DBCommon::StringMiddleMasking(extendInfo.userId).c_str(), + DBCommon::StringMiddleMasking(extendInfo.storeId).c_str()); + RefObject::DecObjRef(this); + }); + if (errCode != E_OK) { + RefObject::DecObjRef(this); + } + LOGI("[SyncEngine] Schedule notifier remote pull errCode[%d] appId[%s] userId[%s] storeId[%s]", errCode, + DBCommon::StringMiddleMasking(extendInfo.appId).c_str(), + DBCommon::StringMiddleMasking(extendInfo.userId).c_str(), + DBCommon::StringMiddleMasking(extendInfo.storeId).c_str()); +} + +ExtendInfo SyncEngine::GetExtendInfo() const +{ + ExtendInfo extendInfo; + std::lock_guard storeLock(storageMutex_); + if (syncInterface_ == nullptr) { + LOGE("[SyncEngine] Storage is null"); + return extendInfo; + } + DBProperties properties = syncInterface_->GetDbProperties(); + extendInfo.appId = properties.GetStringProp(DBProperties::APP_ID, ""); + extendInfo.userId = properties.GetStringProp(DBProperties::USER_ID, ""); + extendInfo.storeId = properties.GetStringProp(DBProperties::STORE_ID, ""); + extendInfo.subUserId = properties.GetStringProp(DBProperties::SUB_USER, ""); + return extendInfo; +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h index 474bca03004..854c5bf3739 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h +++ b/frameworks/libs/distributeddb/syncer/src/device/sync_engine.h @@ -141,6 +141,8 @@ public: int32_t GetRemoteQueryTaskCount() override; bool ExchangeClosePending(bool expected) override; + + void SetRemotePullStartNotify(const DeviceSyncNotifier ¬ifier) override; protected: // Create a context virtual ISyncTaskContext *CreateSyncTaskContext(const ISyncInterface &syncInterface) = 0; @@ -156,14 +158,18 @@ protected: void SetSyncInterface(ISyncInterface *syncInterface); ISyncTaskContext *GetSyncTaskContext(const DeviceSyncTarget &target, int &errCode); + void NotifyRemotePullStart(const std::string &dev); + ExtendInfo GetExtendInfo() const; - std::mutex storageMutex_; + mutable std::mutex storageMutex_; ISyncInterface *syncInterface_; // Used to store all send sync task infos (such as pull sync response, and push sync request) std::map syncTaskContextMap_; std::mutex contextMapLock_; std::shared_ptr subManager_; std::function queryAutoSyncCallback_; + std::mutex pullStartNotifierMutex_; + DeviceSyncNotifier pullStartNotifier_; private: diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.cpp b/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.cpp index 6c18cf3c049..f4401277f39 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.cpp @@ -888,4 +888,24 @@ void SyncTaskContext::SetErrCodeWhenWaitTimeOut(int errCode) SetCommFailErrCode(errCode); } } + +void SyncTaskContext::RegOnRemotePullStart(const std::function &callback) +{ + std::lock_guard autoLock(remotePullMutex_); + remotePullNotifier_ = callback; +} + +void SyncTaskContext::NotifyRemotePullStart() +{ + std::function notifier; + { + std::lock_guard autoLock(remotePullMutex_); + if (remotePullNotifier_ == nullptr) { + LOGE("[SyncTaskContext] Notifier is null when remote pull start"); + return; + } + notifier = remotePullNotifier_; + } + notifier(deviceId_); +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.h b/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.h index 7885b91fea9..4b3df0daa4f 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.h +++ b/frameworks/libs/distributeddb/syncer/src/device/sync_task_context.h @@ -224,6 +224,10 @@ public: int GetCommErrCode() const; void SetCommFailErrCode(int errCode); + + void RegOnRemotePullStart(const std::function &callback) override; + + void NotifyRemotePullStart(); protected: const static int KILL_WAIT_SECONDS = INT32_MAX; @@ -302,6 +306,9 @@ protected: volatile uint32_t negotiationCount_; volatile bool isAutoSubscribe_; + mutable std::mutex remotePullMutex_; + std::function remotePullNotifier_; + // For global ISyncTaskContext Set, used by CommErrCallback. static std::mutex synTaskContextSetLock_; static std::set synTaskContextSet_; diff --git a/frameworks/libs/distributeddb/syncer/src/device/syncer_proxy.cpp b/frameworks/libs/distributeddb/syncer/src/device/syncer_proxy.cpp index cabd4bf1056..bb23c405678 100644 --- a/frameworks/libs/distributeddb/syncer/src/device/syncer_proxy.cpp +++ b/frameworks/libs/distributeddb/syncer/src/device/syncer_proxy.cpp @@ -297,4 +297,12 @@ bool SyncerProxy::ExchangeClosePending(bool expected) } return syncer_->ExchangeClosePending(expected); } + +int SyncerProxy::SetDeviceSyncNotify(DeviceSyncEvent event, const DeviceSyncNotifier ¬ifier) +{ + if (syncer_ == nullptr) { + return -E_NOT_INIT; + } + return syncer_->SetDeviceSyncNotify(event, notifier); +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/test/BUILD.gn b/frameworks/libs/distributeddb/test/BUILD.gn index 1cdb63578fe..49290b71ded 100644 --- a/frameworks/libs/distributeddb/test/BUILD.gn +++ b/frameworks/libs/distributeddb/test/BUILD.gn @@ -963,6 +963,10 @@ distributeddb_unittest("DistributedDBKvCloudSyncTest") { [ "unittest/common/store_test/kv/distributeddb_kv_cloud_sync_test.cpp" ] } +distributeddb_unittest("DistributedDBKVNotifyTest") { + sources = [ "unittest/common/store_test/kv/distributeddb_kv_notify_test.cpp" ] +} + config("tokenizer_config_unittest") { visibility = [ ":*" ] include_dirs = [ "//foundation/distributeddatamgr/kv_store/frameworks/libs/distributeddb/common/include" ] @@ -1238,6 +1242,7 @@ group("unittest") { ":DistributedDBJsonPrecheckUnitTest", ":DistributedDBKVCompressTest", ":DistributedDBKVDataStatusTest", + ":DistributedDBKVNotifyTest", ":DistributedDBKvCloudSyncTest", ":DistributedDBKvDeviceSyncTest", ":DistributedDBKvMultiUserSyncTest", @@ -1342,8 +1347,8 @@ group("distributeddatamgr_fuzztest") { "fuzztest/kvdelegatemanager_fuzzer:fuzztest", "fuzztest/kvstoreresultset_fuzzer:fuzztest", "fuzztest/nbdelegate_fuzzer:fuzztest", - "fuzztest/parseckeck_fuzzer:fuzztest", "fuzztest/parsecheckfield_fuzzer:fuzztest", + "fuzztest/parseckeck_fuzzer:fuzztest", "fuzztest/query_fuzzer:fuzztest", "fuzztest/querycompare_fuzzer:fuzztest", "fuzztest/queryin_fuzzer:fuzztest", diff --git a/frameworks/libs/distributeddb/test/unittest/common/common/kv_general_ut.cpp b/frameworks/libs/distributeddb/test/unittest/common/common/kv_general_ut.cpp index ba3041fb91e..af120332fb2 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/common/kv_general_ut.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/common/kv_general_ut.cpp @@ -124,13 +124,18 @@ KvStoreNbDelegate *KVGeneralUt::GetDelegate(const DistributedDB::StoreInfo &info } void KVGeneralUt::BlockPush(const StoreInfo &from, const StoreInfo &to, DBStatus expectRet) +{ + BlockDeviceSync(from, to, SyncMode::SYNC_MODE_PUSH_ONLY, expectRet); +} + +void KVGeneralUt::BlockDeviceSync(const StoreInfo &from, const StoreInfo &to, SyncMode mode, DBStatus expectRet) { auto fromStore = GetDelegate(from); ASSERT_NE(fromStore, nullptr); auto toDevice = GetDevice(to); ASSERT_FALSE(toDevice.empty()); std::map syncRet; - tool_.SyncTest(fromStore, {toDevice}, SyncMode::SYNC_MODE_PUSH_ONLY, syncRet); + tool_.SyncTest(fromStore, {toDevice}, mode, syncRet); for (const auto &item : syncRet) { EXPECT_EQ(item.second, expectRet); } diff --git a/frameworks/libs/distributeddb/test/unittest/common/common/kv_general_ut.h b/frameworks/libs/distributeddb/test/unittest/common/common/kv_general_ut.h index d80558cf993..52eead0cc11 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/common/kv_general_ut.h +++ b/frameworks/libs/distributeddb/test/unittest/common/common/kv_general_ut.h @@ -31,6 +31,7 @@ protected: void SetOption(const KvStoreNbDelegate::Option &option); KvStoreNbDelegate *GetDelegate(const StoreInfo &info) const; void BlockPush(const StoreInfo &from, const StoreInfo &to, DBStatus expectRet = DBStatus::OK); + void BlockDeviceSync(const StoreInfo &from, const StoreInfo &to, SyncMode mode, DBStatus expectRet); DBStatus SetCloud(KvStoreNbDelegate *&delegate, bool invalidSchema = false); static DataBaseSchema GetDataBaseSchema(bool invalidSchema); DBStatus GetDeviceEntries(KvStoreNbDelegate *delegate, const std::string &deviceId, bool isSelfDevice, diff --git a/frameworks/libs/distributeddb/test/unittest/common/store_test/kv/distributeddb_kv_notify_test.cpp b/frameworks/libs/distributeddb/test/unittest/common/store_test/kv/distributeddb_kv_notify_test.cpp new file mode 100644 index 00000000000..61f723916af --- /dev/null +++ b/frameworks/libs/distributeddb/test/unittest/common/store_test/kv/distributeddb_kv_notify_test.cpp @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2025 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "kv_general_ut.h" + +namespace DistributedDB { +using namespace testing::ext; +class DistributedDBKVNotifyTest : public KVGeneralUt { +public: + void SetUp() override; +protected: + void NotifySyncTest(SyncMode mode, int expectCount); +}; + +void DistributedDBKVNotifyTest::SetUp() +{ + KVGeneralUt::SetUp(); + auto storeInfo1 = GetStoreInfo1(); + ASSERT_EQ(BasicUnitTest::InitDelegate(storeInfo1, "dev1"), E_OK); + auto storeInfo2 = GetStoreInfo2(); + ASSERT_EQ(BasicUnitTest::InitDelegate(storeInfo2, "dev2"), E_OK); +} + +void DistributedDBKVNotifyTest::NotifySyncTest(SyncMode mode, int expectCount) +{ + /** + * @tc.steps: step1. store1 register pull notify + * @tc.expected: step1. register ok. + */ + auto storeInfo1 = GetStoreInfo1(); + auto store1 = GetDelegate(storeInfo1); + ASSERT_NE(store1, nullptr); + std::atomic count = 0; + int ret = store1->SetDeviceSyncNotify(DeviceSyncEvent::REMOTE_PULL_STARTED, + [&count](const DeviceSyncNotifyInfo &info) { + EXPECT_EQ(info.deviceId, "dev2"); + count++; + }); + ASSERT_EQ(ret, OK); + /** + * @tc.steps: step2. store2 pull store1 + * @tc.expected: step2. pull ok and notify was triggered. + */ + auto storeInfo2 = GetStoreInfo2(); + BlockDeviceSync(storeInfo2, storeInfo1, mode, OK); + EXPECT_EQ(count, expectCount); +} + +/** + * @tc.name: NotifySync001 + * @tc.desc: Test notify when pull sync. + * @tc.type: FUNC + * @tc.author: zqq + */ +HWTEST_F(DistributedDBKVNotifyTest, NotifySync001, TestSize.Level0) +{ + ASSERT_NO_FATAL_FAILURE(NotifySyncTest(SyncMode::SYNC_MODE_PULL_ONLY, 1)); +} + +/** + * @tc.name: NotifySync002 + * @tc.desc: Test notify when push pull sync. + * @tc.type: FUNC + * @tc.author: zqq + */ +HWTEST_F(DistributedDBKVNotifyTest, NotifySync002, TestSize.Level0) +{ + ASSERT_NO_FATAL_FAILURE(NotifySyncTest(SyncMode::SYNC_MODE_PUSH_PULL, 1)); +} + +/** + * @tc.name: NotifySync003 + * @tc.desc: Test notify when push sync. + * @tc.type: FUNC + * @tc.author: zqq + */ +HWTEST_F(DistributedDBKVNotifyTest, NotifySync003, TestSize.Level0) +{ + ASSERT_NO_FATAL_FAILURE(NotifySyncTest(SyncMode::SYNC_MODE_PUSH_ONLY, 0)); +} + +/** + * @tc.name: NotifySync005 + * @tc.desc: Test cancel notify when pull sync. + * @tc.type: FUNC + * @tc.author: zqq + */ +HWTEST_F(DistributedDBKVNotifyTest, NotifySync004, TestSize.Level0) +{ + /** + * @tc.steps: step1. store1 register pull notify + * @tc.expected: step1. register ok. + */ + auto storeInfo = GetStoreInfo1(); + auto store = GetDelegate(storeInfo); + ASSERT_NE(store, nullptr); + std::atomic count = 0; + int ret = store->SetDeviceSyncNotify(DeviceSyncEvent::REMOTE_PULL_STARTED, + [&count](const DeviceSyncNotifyInfo &info) { + EXPECT_EQ(info.deviceId, "dev2"); + count++; + }); + ASSERT_EQ(ret, OK); + /** + * @tc.steps: step2. store2 pull store + * @tc.expected: step2. pull ok and notify was triggered. + */ + auto storeInfo2 = GetStoreInfo2(); + BlockDeviceSync(storeInfo2, storeInfo, SyncMode::SYNC_MODE_PULL_ONLY, OK); + EXPECT_EQ(count, 1); + /** + * @tc.steps: step3. store2 pull store after store cancel notify + * @tc.expected: step3. pull ok and notify was not triggered. + */ + count = 0; + ASSERT_EQ(store->SetDeviceSyncNotify(DeviceSyncEvent::REMOTE_PULL_STARTED, nullptr), OK); + BlockDeviceSync(storeInfo2, storeInfo, SyncMode::SYNC_MODE_PULL_ONLY, OK); + EXPECT_EQ(count, 0); +} +} \ No newline at end of file -- Gitee