From ef2d701d8d750e68dea74b676a41e86c7a5ca3ab Mon Sep 17 00:00:00 2001 From: zqq Date: Tue, 15 Nov 2022 16:14:48 +0800 Subject: [PATCH 1/8] add communication optimization interface Signed-off-by: zqq --- .../interfaces/include/db_info_handle.h | 46 +++++++++++++++++++ .../include/relational/runtime_config.h | 5 ++ .../src/relational/runtime_config.cpp | 11 +++++ 3 files changed, 62 insertions(+) create mode 100644 frameworks/libs/distributeddb/interfaces/include/db_info_handle.h diff --git a/frameworks/libs/distributeddb/interfaces/include/db_info_handle.h b/frameworks/libs/distributeddb/interfaces/include/db_info_handle.h new file mode 100644 index 00000000000..2df00f91ca4 --- /dev/null +++ b/frameworks/libs/distributeddb/interfaces/include/db_info_handle.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DB_INFO_HANDLE_H +#define DB_INFO_HANDLE_H + +#include "iprocess_communicator.h" + +namespace DistributedDB { +struct DBInfo { + std::string userId; + std::string appId; + std::string storeId; + bool syncDualTupleMode = false; + bool isNeedSync = false; +}; + +// For all functions with returnType DBStatus: +// return DBStatus::OK if successful, otherwise DBStatus::DB_ERROR if anything wrong. +class DBInfoHandle { +public: + DBInfoHandle() = default; + virtual ~DBInfoHandle() = default; + // return true if you can notify with RuntimeConfig::NotifyDBInfo, + // otherwise return false, and then we will call GetDBInfos for exchange local dbInfo with other devices. + virtual bool IsSupport(const DeviceInfos &devInfo) = 0; + + // This function will be call when IsSupport return false + // The dbInfos will be sent to online devices for auto sync and subscribe if return OK + // Otherwise will send the dbInfo which is opened + virtual DBStatus GetDBInfos(std::vector &dbInfos) = 0; +}; +} +#endif // DB_INFO_HANDLE_H diff --git a/frameworks/libs/distributeddb/interfaces/include/relational/runtime_config.h b/frameworks/libs/distributeddb/interfaces/include/relational/runtime_config.h index 1386e47410a..5675d20353b 100644 --- a/frameworks/libs/distributeddb/interfaces/include/relational/runtime_config.h +++ b/frameworks/libs/distributeddb/interfaces/include/relational/runtime_config.h @@ -19,6 +19,7 @@ #include #include +#include "db_info_handle.h" #include "iprocess_communicator.h" #include "iprocess_system_api_adapter.h" #include "store_types.h" @@ -50,6 +51,10 @@ public: DB_API static bool IsProcessSystemApiAdapterValid(); + DB_API static void SetDBInfoHandle(const std::shared_ptr &handle); + + DB_API static void NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos); + private: static std::mutex communicatorMutex_; static std::mutex multiUserMutex_; diff --git a/frameworks/libs/distributeddb/interfaces/src/relational/runtime_config.cpp b/frameworks/libs/distributeddb/interfaces/src/relational/runtime_config.cpp index 1dbabc517ce..7c25863b700 100644 --- a/frameworks/libs/distributeddb/interfaces/src/relational/runtime_config.cpp +++ b/frameworks/libs/distributeddb/interfaces/src/relational/runtime_config.cpp @@ -128,5 +128,16 @@ DBStatus RuntimeConfig::SetPermissionConditionCallback(const PermissionCondition int errCode = RuntimeContext::GetInstance()->SetPermissionConditionCallback(callback); return TransferDBErrno(errCode); } + +void RuntimeConfig::SetDBInfoHandle(const std::shared_ptr &handle) +{ + (void)handle; +} + +void RuntimeConfig::NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos) +{ + (void)devInfos; + (void)dbInfos; +} } // namespace DistributedDB #endif \ No newline at end of file -- Gitee From cae1af6893d1622db11be08de0ca0dadabcc9f9b Mon Sep 17 00:00:00 2001 From: zqq Date: Tue, 15 Nov 2022 20:35:16 +0800 Subject: [PATCH 2/8] add dbStatusAdapter Signed-off-by: zqq --- frameworks/libs/distributeddb/BUILD.gn | 1 + .../common/include/runtime_context.h | 5 + .../common/src/runtime_context_impl.cpp | 32 +++- .../common/src/runtime_context_impl.h | 9 + .../communicator/include/db_status_adapter.h | 62 +++++++ .../communicator/src/db_status_adapter.cpp | 171 ++++++++++++++++++ .../src/relational/runtime_config.cpp | 5 +- frameworks/libs/distributeddb/test/BUILD.gn | 1 + 8 files changed, 282 insertions(+), 4 deletions(-) create mode 100644 frameworks/libs/distributeddb/communicator/include/db_status_adapter.h create mode 100644 frameworks/libs/distributeddb/communicator/src/db_status_adapter.cpp diff --git a/frameworks/libs/distributeddb/BUILD.gn b/frameworks/libs/distributeddb/BUILD.gn index 4308a9b12c8..7b18ec0ef9f 100644 --- a/frameworks/libs/distributeddb/BUILD.gn +++ b/frameworks/libs/distributeddb/BUILD.gn @@ -118,6 +118,7 @@ ohos_shared_library("distributeddb") { "communicator/src/communicator.cpp", "communicator/src/communicator_aggregator.cpp", "communicator/src/communicator_linker.cpp", + "communicator/src/db_status_adapter.cpp", "communicator/src/frame_combiner.cpp", "communicator/src/frame_retainer.cpp", "communicator/src/header_converter.cpp", diff --git a/frameworks/libs/distributeddb/common/include/runtime_context.h b/frameworks/libs/distributeddb/common/include/runtime_context.h index 85c22557ed5..094316ce4e8 100644 --- a/frameworks/libs/distributeddb/common/include/runtime_context.h +++ b/frameworks/libs/distributeddb/common/include/runtime_context.h @@ -22,6 +22,7 @@ #include "auto_launch.h" #include "auto_launch_export.h" +#include "db_info_handle.h" #include "icommunicator_aggregator.h" #include "iprocess_system_api_adapter.h" #include "kv_store_observer.h" @@ -139,6 +140,10 @@ public: virtual std::map GetPermissionCheckParam(const DBProperties &properties) = 0; virtual void StopTaskPool() = 0; + + virtual void SetDBInfoHandle(const std::shared_ptr &handle) = 0; + + virtual void NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos) = 0; protected: RuntimeContext() = default; virtual ~RuntimeContext() {} diff --git a/frameworks/libs/distributeddb/common/src/runtime_context_impl.cpp b/frameworks/libs/distributeddb/common/src/runtime_context_impl.cpp index 8398c979b1f..f00d3f24372 100644 --- a/frameworks/libs/distributeddb/common/src/runtime_context_impl.cpp +++ b/frameworks/libs/distributeddb/common/src/runtime_context_impl.cpp @@ -31,7 +31,8 @@ RuntimeContextImpl::RuntimeContextImpl() timeTickMonitor_(nullptr), systemApiAdapter_(nullptr), lockStatusObserver_(nullptr), - currentSessionId_(1) + currentSessionId_(1), + dbStatusAdapter_(nullptr) { } @@ -58,6 +59,7 @@ RuntimeContextImpl::~RuntimeContextImpl() delete lockStatusObserver_; lockStatusObserver_ = nullptr; userChangeMonitor_ = nullptr; + dbStatusAdapter_ = nullptr; } // Set the label of this process. @@ -733,4 +735,32 @@ void RuntimeContextImpl::StopTaskPool() taskPool_ = nullptr; } } + +void RuntimeContextImpl::SetDBInfoHandle(const std::shared_ptr &handle) +{ + std::shared_ptr dbStatusAdapter = GetDBStatusAdapter(); + if (dbStatusAdapter != nullptr) { + dbStatusAdapter->SetDBInfoHandle(handle); + } +} + +void RuntimeContextImpl::NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos) +{ + std::shared_ptr dbStatusAdapter = GetDBStatusAdapter(); + if (dbStatusAdapter != nullptr) { + dbStatusAdapter->NotifyDBInfos(devInfos, dbInfos); + } +} + +std::shared_ptr RuntimeContextImpl::GetDBStatusAdapter() +{ + std::lock_guard autoLock(statusAdapterMutex_); + if (dbStatusAdapter_ == nullptr) { + dbStatusAdapter_ = std::make_unique(); + } + if (dbStatusAdapter_ == nullptr) { + LOGE("[RuntimeContextImpl] DbStatusAdapter create failed!"); + } + return dbStatusAdapter_; +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/common/src/runtime_context_impl.h b/frameworks/libs/distributeddb/common/src/runtime_context_impl.h index e79d23eee3d..cacf9a8cd08 100644 --- a/frameworks/libs/distributeddb/common/src/runtime_context_impl.h +++ b/frameworks/libs/distributeddb/common/src/runtime_context_impl.h @@ -21,6 +21,7 @@ #include #include "auto_launch.h" +#include "db_status_adapter.h" #include "evloop/src/ievent.h" #include "evloop/src/ievent_loop.h" #include "icommunicator_aggregator.h" @@ -127,6 +128,10 @@ public: std::map GetPermissionCheckParam(const DBProperties &properties) override; void StopTaskPool() override; + + void SetDBInfoHandle(const std::shared_ptr &handle) override; + + void NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos) override; private: static constexpr int MAX_TP_THREADS = 10; // max threads of the task pool. static constexpr int MIN_TP_THREADS = 1; // min threads of the task pool. @@ -135,6 +140,7 @@ private: int PrepareLoop(IEventLoop *&loop); int PrepareTaskPool(); int AllocTimerId(IEvent *evTimer, TimerId &timerId); + std::shared_ptr GetDBStatusAdapter(); // Context fields mutable std::mutex labelMutex_; @@ -189,6 +195,9 @@ private: // Get map from this callback, use for run permission check in remote device mutable std::shared_mutex permissionConditionLock_; PermissionConditionCallback permissionConditionCallback_; + + mutable std::mutex statusAdapterMutex_; + std::shared_ptr dbStatusAdapter_; }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/communicator/include/db_status_adapter.h b/frameworks/libs/distributeddb/communicator/include/db_status_adapter.h new file mode 100644 index 00000000000..5d82ebcbf6c --- /dev/null +++ b/frameworks/libs/distributeddb/communicator/include/db_status_adapter.h @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DB_STATUS_ADAPTER_H +#define DB_STATUS_ADAPTER_H + +#include "db_info_handle.h" +#include +#include "icommunicator_aggregator.h" +#include "macro_utils.h" + +namespace DistributedDB { +using DBChangeCallback = std::function &dbInfos)>; +class DBStatusAdapter { +public: + DBStatusAdapter(); + ~DBStatusAdapter() = default; + DISABLE_COPY_ASSIGN_MOVE(DBStatusAdapter); + + void SetDBInfoHandle(const std::shared_ptr &dbInfoHandle); + bool ExistDBInfoHandle(); + bool IsSupport(const DeviceInfos &devInfo); + int GetDBInfos(std::vector &dbInfo); + void SetDBChangeCallback(const DBChangeCallback &callback); + void NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos); +private: + std::shared_ptr GetDBInfoHandle(); + bool LoadIntoCache(const DeviceInfos &devInfos, const std::vector &dbInfos); + + static int GetLocalDeviceId(std::string &deviceId); + static bool IsLocalDeviceId(const std::string &deviceId); + static void MergeDBInfos(const std::vector &srcDbInfos, std::vector &dstDbInfos); + mutable std::mutex handleMutex_; + std::shared_ptr dbInfoHandle_ = nullptr; + + mutable std::mutex callbackMutex_; + DBChangeCallback dbChangeCallback_; + + mutable std::mutex localInfoMutex_; + bool cacheLocalInfo; + std::vector localDBInfos_; + + mutable std::mutex remoteInfoMutex_; + std::map> remoteDBInfos_; + + mutable std::mutex supportMutex_; + std::map remoteSupportInfo_; +}; +} +#endif // DB_STATUS_ADAPTER_H diff --git a/frameworks/libs/distributeddb/communicator/src/db_status_adapter.cpp b/frameworks/libs/distributeddb/communicator/src/db_status_adapter.cpp new file mode 100644 index 00000000000..b66f625a0d8 --- /dev/null +++ b/frameworks/libs/distributeddb/communicator/src/db_status_adapter.cpp @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "db_status_adapter.h" + +#include "db_errno.h" +#include "runtime_context.h" +#include "log_print.h" + +namespace DistributedDB { +DBStatusAdapter::DBStatusAdapter() + : dbInfoHandle_(nullptr), + cacheLocalInfo(false) +{ +} + +void DBStatusAdapter::SetDBInfoHandle(const std::shared_ptr &dbInfoHandle) +{ + std::lock_guard autoLock(handleMutex_); + dbInfoHandle_ = dbInfoHandle; + LOGI("[DBStatusAdapter] SetDBInfoHandle Finish"); +} + +bool DBStatusAdapter::ExistDBInfoHandle() +{ + std::lock_guard autoLock(handleMutex_); + return dbInfoHandle_ == nullptr; +} + +bool DBStatusAdapter::IsSupport(const DeviceInfos &devInfo) +{ + std::shared_ptr dbInfoHandle = GetDBInfoHandle(); + if (dbInfoHandle == nullptr) { + LOGD("[DBStatusAdapter] dbInfoHandle not set"); + return false; + } + { + std::lock_guard autoLock(supportMutex_); + if (remoteSupportInfo_.find(devInfo.identifier) != remoteSupportInfo_.end()) { + return remoteSupportInfo_[devInfo.identifier]; + } + } + bool res = dbInfoHandle->IsSupport(devInfo); + std::lock_guard autoLock(supportMutex_); + remoteSupportInfo_[devInfo.identifier] = res; + return res; +} + +int DBStatusAdapter::GetDBInfos(std::vector &dbInfo) +{ + std::shared_ptr dbInfoHandle = GetDBInfoHandle(); + if (dbInfoHandle == nullptr) { + LOGD("[DBStatusAdapter][GetDBInfos] handle not set"); + return -E_NOT_SUPPORT; + } + { + std::lock_guard autoLock(localInfoMutex_); + if (cacheLocalInfo) { + dbInfo = localDBInfos_; + return E_OK; + } + } + DBStatus status = dbInfoHandle->GetDBInfos(dbInfo); + if (status != OK) { + LOGW("[DBStatusAdapter] GetDBInfos return error %d", status); + return -E_PERIPHERAL_INTERFACE_FAIL; + } + DeviceInfos devInfo; + int errCode = GetLocalDeviceId(devInfo.identifier); + if (errCode != E_OK) { + return errCode; + } + (void)LoadIntoCache(devInfo, dbInfo); + std::lock_guard autoLock(localInfoMutex_); + cacheLocalInfo = true; + return E_OK; +} + +void DBStatusAdapter::SetDBChangeCallback(const DBChangeCallback &callback) +{ + { + std::lock_guard autoLock(callbackMutex_); + dbChangeCallback_ = callback; + } + // avoid notify before set callback + std::map> remoteDBInfos; + { + std::lock_guard autoLock(remoteInfoMutex_); + remoteDBInfos = remoteDBInfos_; + } + std::lock_guard autoLock(callbackMutex_); + for (const auto &[devInfo, dbInfos]: remoteDBInfos) { + dbChangeCallback_({ devInfo }, dbInfos); + } +} + +void DBStatusAdapter::NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos) +{ + bool isRemote = LoadIntoCache(devInfos, dbInfos); + std::lock_guard autoLock(callbackMutex_); + if (isRemote && dbChangeCallback_ != nullptr) { + dbChangeCallback_(devInfos, dbInfos); + } +} + +std::shared_ptr DBStatusAdapter::GetDBInfoHandle() +{ + std::lock_guard autoLock(handleMutex_); + return dbInfoHandle_; +} + +bool DBStatusAdapter::LoadIntoCache(const DeviceInfos &devInfos, const std::vector &dbInfos) +{ + if (IsLocalDeviceId(devInfos.identifier)) { + std::lock_guard autoLock(localInfoMutex_); + MergeDBInfos(dbInfos, localDBInfos_); + return false; + } + std::lock_guard autoLock(remoteInfoMutex_); + if (remoteDBInfos_.find(devInfos.identifier) == remoteDBInfos_.end()) { + remoteDBInfos_.insert({devInfos.identifier, {}}); + } + MergeDBInfos(dbInfos, remoteDBInfos_[devInfos.identifier]); + return true; +} + +void DBStatusAdapter::MergeDBInfos(const std::vector &srcDbInfos, std::vector &dstDbInfos) +{ + for (const auto &srcInfo: srcDbInfos) { + auto res = std::find_if(dstDbInfos.begin(), dstDbInfos.end(), [&srcInfo](const DBInfo &dstInfo) { + return srcInfo.appId == dstInfo.appId && srcInfo.userId == dstInfo.userId + && srcInfo.storeId == dstInfo.storeId && srcInfo.syncDualTupleMode == dstInfo.syncDualTupleMode; + }); + if (res == dstDbInfos.end()) { + dstDbInfos.push_back(srcInfo); + } else { + res->isNeedSync = srcInfo.isNeedSync; + } + } +} + +int DBStatusAdapter::GetLocalDeviceId(std::string &deviceId) +{ + ICommunicatorAggregator *communicatorAggregator = nullptr; + int errCode = RuntimeContext::GetInstance()->GetCommunicatorAggregator(communicatorAggregator); + if (errCode != E_OK) { + return errCode; + } + return communicatorAggregator->GetLocalIdentity(deviceId); +} + +bool DBStatusAdapter::IsLocalDeviceId(const std::string &deviceId) +{ + std::string localId; + if (GetLocalDeviceId(localId) != E_OK) { + return false; + } + return deviceId == localId; +} +} \ No newline at end of file diff --git a/frameworks/libs/distributeddb/interfaces/src/relational/runtime_config.cpp b/frameworks/libs/distributeddb/interfaces/src/relational/runtime_config.cpp index 7c25863b700..373884b90b5 100644 --- a/frameworks/libs/distributeddb/interfaces/src/relational/runtime_config.cpp +++ b/frameworks/libs/distributeddb/interfaces/src/relational/runtime_config.cpp @@ -131,13 +131,12 @@ DBStatus RuntimeConfig::SetPermissionConditionCallback(const PermissionCondition void RuntimeConfig::SetDBInfoHandle(const std::shared_ptr &handle) { - (void)handle; + RuntimeContext::GetInstance()->SetDBInfoHandle(handle); } void RuntimeConfig::NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos) { - (void)devInfos; - (void)dbInfos; + RuntimeContext::GetInstance()->NotifyDBInfos(devInfos, dbInfos); } } // namespace DistributedDB #endif \ No newline at end of file diff --git a/frameworks/libs/distributeddb/test/BUILD.gn b/frameworks/libs/distributeddb/test/BUILD.gn index 2dd72ada340..9b21a578b77 100644 --- a/frameworks/libs/distributeddb/test/BUILD.gn +++ b/frameworks/libs/distributeddb/test/BUILD.gn @@ -119,6 +119,7 @@ ohos_source_set("src_file") { "../communicator/src/communicator.cpp", "../communicator/src/communicator_aggregator.cpp", "../communicator/src/communicator_linker.cpp", + "../communicator/src/db_status_adapter.cpp", "../communicator/src/frame_combiner.cpp", "../communicator/src/frame_retainer.cpp", "../communicator/src/header_converter.cpp", -- Gitee From 75bc99ecae807283d3c94ffe4cb1c60793009c7f Mon Sep 17 00:00:00 2001 From: zqq Date: Wed, 16 Nov 2022 17:57:31 +0800 Subject: [PATCH 3/8] notify change Signed-off-by: zqq --- .../distributeddb/common/include/db_common.h | 2 + .../common/src/runtime_context_impl.cpp | 3 +- .../include/communicator_aggregator.h | 8 ++- .../communicator/include/db_status_adapter.h | 1 + .../include/icommunicator_aggregator.h | 3 +- .../src/communicator_aggregator.cpp | 65 ++++++++++++++----- .../communicator/src/communicator_linker.cpp | 16 +++++ .../communicator/src/communicator_linker.h | 2 + .../communicator/src/db_status_adapter.cpp | 14 ++++ .../distributeddb_communicator_common.cpp | 2 +- 10 files changed, 94 insertions(+), 22 deletions(-) diff --git a/frameworks/libs/distributeddb/common/include/db_common.h b/frameworks/libs/distributeddb/common/include/db_common.h index 2b51e51e45d..1743bc44470 100644 --- a/frameworks/libs/distributeddb/common/include/db_common.h +++ b/frameworks/libs/distributeddb/common/include/db_common.h @@ -70,6 +70,8 @@ public: static bool IsSameCipher(CipherType srcType, CipherType inputType); static bool CheckIsAlnumAndUnderscore(const std::string &text); + + static void GenerateHashLabel(std::string userId, std::string appId, std::string storeId, bool syncDualTupleMode) }; // Define short macro substitute for original long expression for convenience of using diff --git a/frameworks/libs/distributeddb/common/src/runtime_context_impl.cpp b/frameworks/libs/distributeddb/common/src/runtime_context_impl.cpp index f00d3f24372..334b4b4a6e9 100644 --- a/frameworks/libs/distributeddb/common/src/runtime_context_impl.cpp +++ b/frameworks/libs/distributeddb/common/src/runtime_context_impl.cpp @@ -96,6 +96,7 @@ int RuntimeContextImpl::SetCommunicatorAdapter(IAdapter *adapter) int RuntimeContextImpl::GetCommunicatorAggregator(ICommunicatorAggregator *&outAggregator) { outAggregator = nullptr; + const std::shared_ptr statusAdapter = GetDBStatusAdapter(); std::lock_guard lock(communicatorLock_); if (communicatorAggregator_ != nullptr) { outAggregator = communicatorAggregator_; @@ -113,7 +114,7 @@ int RuntimeContextImpl::GetCommunicatorAggregator(ICommunicatorAggregator *&outA return -E_OUT_OF_MEMORY; } - int errCode = communicatorAggregator_->Initialize(adapter_); + int errCode = communicatorAggregator_->Initialize(adapter_, statusAdapter); if (errCode != E_OK) { LOGE("CommunicatorAggregator init failed, err = %d!", errCode); RefObject::KillAndDecObjRef(communicatorAggregator_); diff --git a/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h b/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h index c3a31cdc127..cfc798508e9 100644 --- a/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h +++ b/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h @@ -54,7 +54,7 @@ public: DISABLE_COPY_ASSIGN_MOVE(CommunicatorAggregator); // See ICommunicatorAggregator for detail - int Initialize(IAdapter *inAdapter) override; + int Initialize(IAdapter *inAdapter, const std::shared_ptr &statusAdapter) override; // Must not call any other functions if Finalize had been called. In fact, Finalize has no chance to be called. void Finalize() override; @@ -135,6 +135,10 @@ private: // Record the protocol version of remote target. void SetRemoteCommunicatorVersion(const std::string &target, uint16_t version); + void OnDBStatusChange(const DeviceInfos &devInfo, const std::vector &dbInfos); + + void NotifyConnectChange(const std::string &srcTarget, const std::map &changedLabels); + DECLARE_OBJECT_TAG(CommunicatorAggregator); static std::atomic isCommunicatorNotFoundFeedbackEnable_; @@ -175,6 +179,8 @@ private: OnConnectCallback onConnectHandle_; Finalizer onConnectFinalizer_; mutable std::mutex onConnectMutex_; + + std::shared_ptr dbStatusAdapter_; }; } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/communicator/include/db_status_adapter.h b/frameworks/libs/distributeddb/communicator/include/db_status_adapter.h index 5d82ebcbf6c..063ad8b1a50 100644 --- a/frameworks/libs/distributeddb/communicator/include/db_status_adapter.h +++ b/frameworks/libs/distributeddb/communicator/include/db_status_adapter.h @@ -35,6 +35,7 @@ public: int GetDBInfos(std::vector &dbInfo); void SetDBChangeCallback(const DBChangeCallback &callback); void NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos); + void TargetOffline(const std::string &device); private: std::shared_ptr GetDBInfoHandle(); bool LoadIntoCache(const DeviceInfos &devInfos, const std::vector &dbInfos); diff --git a/frameworks/libs/distributeddb/communicator/include/icommunicator_aggregator.h b/frameworks/libs/distributeddb/communicator/include/icommunicator_aggregator.h index 06ae9e7c1e9..70ee63cf8f1 100644 --- a/frameworks/libs/distributeddb/communicator/include/icommunicator_aggregator.h +++ b/frameworks/libs/distributeddb/communicator/include/icommunicator_aggregator.h @@ -18,6 +18,7 @@ #include #include "communicator_type_define.h" +#include "db_status_adapter.h" #include "iadapter.h" #include "ref_object.h" @@ -32,7 +33,7 @@ public: // The caller is the owner of inAdapter and responsible for manage its lifecycle. // The ICommunicatorAggregator is only the user of inAdapter // If Initialize fail, the ICommunicatorAggregator will rollback what had done to inAdapter so it can be reuse. - virtual int Initialize(IAdapter *inAdapter) = 0; + virtual int Initialize(IAdapter *inAdapter, const std::shared_ptr &statusAdapter) = 0; // Call this method after Initialize successfully and before destroy the ICommunicatorAggregator // Emphasize again : DO NOT CALL Finalize IF Initialize FAIL. diff --git a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp index 3cb9f9aab44..68fb0ceeb92 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp +++ b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp @@ -50,7 +50,7 @@ CommunicatorAggregator::~CommunicatorAggregator() commLinker_ = nullptr; } -int CommunicatorAggregator::Initialize(IAdapter *inAdapter) +int CommunicatorAggregator::Initialize(IAdapter *inAdapter, const std::shared_ptr &statusAdapter) { if (inAdapter == nullptr) { return -E_INVALID_ARGS; @@ -83,6 +83,12 @@ int CommunicatorAggregator::Initialize(IAdapter *inAdapter) shutdown_ = false; exclusiveThread_ = std::thread(&CommunicatorAggregator::SendDataRoutine, this); + if (dbStatusAdapter_ != nullptr) { + dbStatusAdapter_->SetDBChangeCallback([this](const DeviceInfos &devInfo, const std::vector &dbInfos) { + OnDBStatusChange(devInfo, dbInfos); + }); + } + dbStatusAdapter_ = statusAdapter; return E_OK; ROLL_BACK: UnRegCallbackFromAdapter(); @@ -617,23 +623,7 @@ int CommunicatorAggregator::OnCommLayerFrameReceive(const std::string &srcTarget LOGE("[CommAggr][CommReceive] Receive LabelExchange Fail."); return errCode; } - if (!commLinker_->IsRemoteTargetOnline(srcTarget)) { - LOGW("[CommAggr][CommReceive] Receive LabelExchange from offline target=%s{private}.", srcTarget.c_str()); - for (const auto &entry : changedLabels) { - LOGW("[CommAggr][CommReceive] REMEMBER: label=%s, inOnline=%d.", VEC_TO_STR(entry.first), entry.second); - } - return E_OK; - } - // Do target change notify - std::lock_guard commMapLockGuard(commMapMutex_); - for (auto &entry : changedLabels) { - // Ignore nonactivated communicator - if (commMap_.count(entry.first) != 0 && commMap_.at(entry.first).second) { - LOGI("[CommAggr][CommReceive] label=%s, srcTarget=%s{private}, isOnline=%d.", - VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second); - commMap_.at(entry.first).first->OnConnectChange(srcTarget, entry.second); - } - } + NotifyConnectChange(srcTarget, changedLabels); } return E_OK; } @@ -879,5 +869,44 @@ std::shared_ptr CommunicatorAggregator::GetExtendHeaderHandl return adapterHandle_->GetExtendHeaderHandle(paramInfo); } +void CommunicatorAggregator::OnDBStatusChange(const DeviceInfos &devInfo, const std::vector &dbInfos) +{ + std::map changedLabels; + for (const auto &dbInfo: dbInfos) { + std::string label; + if (dbInfo.syncDualTupleMode) { + label = DBCommon::TransferHashString(dbInfo.appId + "-" + dbInfo.storeId); + } else { + label = DBCommon::TransferHashString(dbInfo.userId + "-" + dbInfo.appId + "-" + dbInfo.storeId); + } + LabelType labelType(label.begin(), label.end()); + changedLabels[labelType] = dbInfo.isNeedSync; + } + if (commLinker_ != nullptr) { + commLinker_->UpdateOnlineLabels(devInfo.identifier, changedLabels); + } + NotifyConnectChange(devInfo.identifier, changedLabels); +} + +void CommunicatorAggregator::NotifyConnectChange(const std::string &srcTarget, + const std::map &changedLabels) +{ + if (!commLinker_->IsRemoteTargetOnline(srcTarget)) { + LOGW("[CommAggr][NotifyConnectChange] from offline target=%s{private}.", srcTarget.c_str()); + for (const auto &entry : changedLabels) { + LOGW("[CommAggr] REMEMBER: label=%s, inOnline=%d.", VEC_TO_STR(entry.first), entry.second); + } + } + // Do target change notify + std::lock_guard commMapLockGuard(commMapMutex_); + for (auto &entry : changedLabels) { + // Ignore nonactivated communicator + if (commMap_.count(entry.first) != 0 && commMap_.at(entry.first).second) { + LOGI("[CommAggr][NotifyConnectChange] label=%s, srcTarget=%s{private}, isOnline=%d.", + VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second); + commMap_.at(entry.first).first->OnConnectChange(srcTarget, entry.second); + } + } +} DEFINE_OBJECT_TAG_FACILITIES(CommunicatorAggregator) } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/communicator/src/communicator_linker.cpp b/frameworks/libs/distributeddb/communicator/src/communicator_linker.cpp index ab51b370d34..39cb0303ac4 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator_linker.cpp +++ b/frameworks/libs/distributeddb/communicator/src/communicator_linker.cpp @@ -268,6 +268,10 @@ void CommunicatorLinker::DetectDistinctValueChange(const std::string &inTarget, return; } + if (inDistinctValue == UINT64_MAX) { + return; + } + // DistinctValue change detected !!! This must be caused by malfunctioning of underlayer communication component. LOGE("[Linker][Detect] ######## DISTINCT VALUE CHANGE DETECTED : %" PRIu64 " VS %" PRIu64 " ########", ULL(inDistinctValue), ULL(targetDistinctValue_[inTarget])); @@ -466,5 +470,17 @@ void CommunicatorLinker::SendLabelExchangeAck(const std::string &toTarget, Seria } } +void CommunicatorLinker::UpdateOnlineLabels(const std::string &device, const std::map &labels) +{ + std::lock_guard entireInfoLockGuard(entireInfoMutex_); + for (const auto &[label, isOnline]: labels) { + if (isOnline) { + targetMapOnlineLabels_[device].insert(label); + } else { + targetMapOnlineLabels_[device].erase(label); + } + } +} + DEFINE_OBJECT_TAG_FACILITIES(CommunicatorLinker) } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/communicator/src/communicator_linker.h b/frameworks/libs/distributeddb/communicator/src/communicator_linker.h index 3d0d701d312..e899f96befa 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator_linker.h +++ b/frameworks/libs/distributeddb/communicator/src/communicator_linker.h @@ -71,6 +71,8 @@ public: std::set GetOnlineRemoteTarget() const; bool IsRemoteTargetOnline(const std::string &inTarget) const; + + void UpdateOnlineLabels(const std::string &device, const std::map &labels); private: DECLARE_OBJECT_TAG(CommunicatorLinker); diff --git a/frameworks/libs/distributeddb/communicator/src/db_status_adapter.cpp b/frameworks/libs/distributeddb/communicator/src/db_status_adapter.cpp index b66f625a0d8..b44ef69dfac 100644 --- a/frameworks/libs/distributeddb/communicator/src/db_status_adapter.cpp +++ b/frameworks/libs/distributeddb/communicator/src/db_status_adapter.cpp @@ -114,6 +114,20 @@ void DBStatusAdapter::NotifyDBInfos(const DeviceInfos &devInfos, const std::vect } } +void DBStatusAdapter::TargetOffline(const std::string &device) +{ + std::shared_ptr dbInfoHandle = GetDBInfoHandle(); + if (dbInfoHandle == nullptr) { + return; + } + { + std::lock_guard autoLock(supportMutex_); + if (remoteSupportInfo_.find(device) != remoteSupportInfo_.end()) { + remoteSupportInfo_.erase(device); + } + } +} + std::shared_ptr DBStatusAdapter::GetDBInfoHandle() { std::lock_guard autoLock(handleMutex_); diff --git a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_common.cpp b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_common.cpp index 27d86ff996b..211fcb38c90 100644 --- a/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_common.cpp +++ b/frameworks/libs/distributeddb/test/unittest/common/communicator/distributeddb_communicator_common.cpp @@ -42,7 +42,7 @@ bool SetUpEnv(EnvHandle &inEnv, const string &inName) return false; } - int errCode = inEnv.commAggrHandle->Initialize(inEnv.adapterHandle); + int errCode = inEnv.commAggrHandle->Initialize(inEnv.adapterHandle, nullptr); if (errCode != E_OK) { LOGI("[UT][Common][SetUp] Init CommunicatorAggregator fail for %s", inName.c_str()); return false; -- Gitee From e2e3f6ac0738bdd25d6596dd5b14c08b364b983e Mon Sep 17 00:00:00 2001 From: zqq Date: Wed, 16 Nov 2022 19:22:35 +0800 Subject: [PATCH 4/8] intercept communication frame sending Signed-off-by: zqq --- .../distributeddb/common/include/db_common.h | 2 +- .../distributeddb/common/src/db_common.cpp | 9 ++++++ .../communicator/include/db_status_adapter.h | 2 +- .../src/communicator_aggregator.cpp | 9 ++---- .../communicator/src/communicator_linker.cpp | 31 +++++++++++++++---- .../communicator/src/communicator_linker.h | 6 +++- .../communicator/src/db_status_adapter.cpp | 10 +++--- .../interfaces/include/db_info_handle.h | 9 +----- .../interfaces/include/store_types.h | 8 +++++ 9 files changed, 57 insertions(+), 29 deletions(-) diff --git a/frameworks/libs/distributeddb/common/include/db_common.h b/frameworks/libs/distributeddb/common/include/db_common.h index 1743bc44470..3fdb3f4b245 100644 --- a/frameworks/libs/distributeddb/common/include/db_common.h +++ b/frameworks/libs/distributeddb/common/include/db_common.h @@ -71,7 +71,7 @@ public: static bool CheckIsAlnumAndUnderscore(const std::string &text); - static void GenerateHashLabel(std::string userId, std::string appId, std::string storeId, bool syncDualTupleMode) + static std::string GenerateHashLabel(const DBInfo &dbInfo); }; // Define short macro substitute for original long expression for convenience of using diff --git a/frameworks/libs/distributeddb/common/src/db_common.cpp b/frameworks/libs/distributeddb/common/src/db_common.cpp index 9b51158ddca..359a93090b3 100644 --- a/frameworks/libs/distributeddb/common/src/db_common.cpp +++ b/frameworks/libs/distributeddb/common/src/db_common.cpp @@ -403,4 +403,13 @@ bool DBCommon::CheckIsAlnumAndUnderscore(const std::string &text) }); return iter == text.end(); } + +std::string DBCommon::GenerateHashLabel(const DBInfo &dbInfo) +{ + if (dbInfo.syncDualTupleMode) { + return DBCommon::TransferHashString(dbInfo.appId + "-" + dbInfo.storeId); + } else { + return DBCommon::TransferHashString(dbInfo.userId + "-" + dbInfo.appId + "-" + dbInfo.storeId); + } +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/communicator/include/db_status_adapter.h b/frameworks/libs/distributeddb/communicator/include/db_status_adapter.h index 063ad8b1a50..2dbc378350e 100644 --- a/frameworks/libs/distributeddb/communicator/include/db_status_adapter.h +++ b/frameworks/libs/distributeddb/communicator/include/db_status_adapter.h @@ -31,7 +31,7 @@ public: void SetDBInfoHandle(const std::shared_ptr &dbInfoHandle); bool ExistDBInfoHandle(); - bool IsSupport(const DeviceInfos &devInfo); + bool IsSupport(const std::string &devInfo); int GetDBInfos(std::vector &dbInfo); void SetDBChangeCallback(const DBChangeCallback &callback); void NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos); diff --git a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp index 68fb0ceeb92..e47091b9984 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp +++ b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp @@ -62,7 +62,7 @@ int CommunicatorAggregator::Initialize(IAdapter *inAdapter, const std::shared_pt scheduler_.Initialize(); int errCode; - commLinker_ = new (std::nothrow) CommunicatorLinker(this); + commLinker_ = new (std::nothrow) CommunicatorLinker(this, statusAdapter); if (commLinker_ == nullptr) { errCode = -E_OUT_OF_MEMORY; goto ROLL_BACK; @@ -873,12 +873,7 @@ void CommunicatorAggregator::OnDBStatusChange(const DeviceInfos &devInfo, const { std::map changedLabels; for (const auto &dbInfo: dbInfos) { - std::string label; - if (dbInfo.syncDualTupleMode) { - label = DBCommon::TransferHashString(dbInfo.appId + "-" + dbInfo.storeId); - } else { - label = DBCommon::TransferHashString(dbInfo.userId + "-" + dbInfo.appId + "-" + dbInfo.storeId); - } + std::string label = DBCommon::GenerateHashLabel(dbInfo); LabelType labelType(label.begin(), label.end()); changedLabels[labelType] = dbInfo.isNeedSync; } diff --git a/frameworks/libs/distributeddb/communicator/src/communicator_linker.cpp b/frameworks/libs/distributeddb/communicator/src/communicator_linker.cpp index 39cb0303ac4..2599b98e45c 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator_linker.cpp +++ b/frameworks/libs/distributeddb/communicator/src/communicator_linker.cpp @@ -14,7 +14,10 @@ */ #include "communicator_linker.h" + +#include #include "communicator_aggregator.h" +#include "db_common.h" #include "db_errno.h" #include "hash.h" #include "log_print.h" @@ -29,10 +32,12 @@ constexpr uint32_t RETRANSMIT_LIMIT = 20; // Currently we do at most 20 retransm constexpr uint32_t RETRANSMIT_LIMIT_EQUAL_INTERVAL = 5; // First 5 retransmission will be equal interval } -CommunicatorLinker::CommunicatorLinker(CommunicatorAggregator *inAggregator) +CommunicatorLinker::CommunicatorLinker(CommunicatorAggregator *inAggregator, + std::shared_ptr statusAdapter) : incSequenceId_(0), incAckTriggerId_(0) { aggregator_ = inAggregator; + statusAdapter_ = std::move(statusAdapter); RefObject::IncObjRef(aggregator_); // The linker rely on CommunicatorAggregator } @@ -75,6 +80,9 @@ int CommunicatorLinker::TargetOnline(const std::string &inTarget, std::set