From 7a19a6f269bf8bcf7869006d8bf7e4035dbbb673 Mon Sep 17 00:00:00 2001 From: zqq Date: Tue, 15 Nov 2022 16:14:48 +0800 Subject: [PATCH 01/29] add communication optimization interface Signed-off-by: zqq --- .../distributeddb/common/include/db_common.h | 2 + .../common/include/runtime_context.h | 6 + .../distributeddb/common/src/db_common.cpp | 9 + .../common/src/runtime_context_impl.cpp | 35 ++- .../common/src/runtime_context_impl.h | 9 + .../include/communicator_aggregator.h | 10 +- .../communicator/include/db_status_adapter.h | 66 ++++++ .../include/icommunicator_aggregator.h | 3 +- .../src/communicator_aggregator.cpp | 76 ++++-- .../communicator/src/communicator_linker.cpp | 69 +++++- .../communicator/src/communicator_linker.h | 10 +- .../communicator/src/db_status_adapter.cpp | 224 ++++++++++++++++++ .../libs/distributeddb/distributeddb.gni | 1 + .../interfaces/include/db_info_handle.h | 34 +++ .../interfaces/include/runtime_config.h | 5 + .../interfaces/include/store_types.h | 9 + .../interfaces/src/runtime_config.cpp | 10 + .../storage/include/sync_generic_interface.h | 10 +- frameworks/libs/distributeddb/test/BUILD.gn | 1 + .../distributeddb_communicator_common.cpp | 2 +- ...tributeddb_interfaces_auto_launch_test.cpp | 2 +- .../virtual_communicator_aggregator.cpp | 3 +- .../syncer/virtual_communicator_aggregator.h | 2 +- 23 files changed, 557 insertions(+), 41 deletions(-) create mode 100644 frameworks/libs/distributeddb/communicator/include/db_status_adapter.h create mode 100644 frameworks/libs/distributeddb/communicator/src/db_status_adapter.cpp create mode 100644 frameworks/libs/distributeddb/interfaces/include/db_info_handle.h diff --git a/frameworks/libs/distributeddb/common/include/db_common.h b/frameworks/libs/distributeddb/common/include/db_common.h index 2cdb848cf2e..8f1951cfb4e 100644 --- a/frameworks/libs/distributeddb/common/include/db_common.h +++ b/frameworks/libs/distributeddb/common/include/db_common.h @@ -86,6 +86,8 @@ public: static bool CaseInsensitiveCompare(std::string first, std::string second); static bool CheckIsAlnumAndUnderscore(const std::string &text); + + static std::string GenerateHashLabel(const DBInfo &dbInfo); }; // Define short macro substitute for original long expression for convenience of using diff --git a/frameworks/libs/distributeddb/common/include/runtime_context.h b/frameworks/libs/distributeddb/common/include/runtime_context.h index c7fa00cc924..ed2cf757324 100644 --- a/frameworks/libs/distributeddb/common/include/runtime_context.h +++ b/frameworks/libs/distributeddb/common/include/runtime_context.h @@ -23,6 +23,7 @@ #include "auto_launch.h" #include "auto_launch_export.h" #include "cloud/icloud_data_translate.h" +#include "db_info_handle.h" #include "icommunicator_aggregator.h" #include "iprocess_system_api_adapter.h" #include "ithread_pool.h" @@ -30,6 +31,7 @@ #include "kvdb_properties.h" #include "macro_utils.h" #include "notification_chain.h" +#include "query_sync_object.h" #include "types_export.h" namespace DistributedDB { @@ -146,6 +148,10 @@ public: virtual void StopTimeTickMonitorIfNeed() = 0; + virtual void SetDBInfoHandle(const std::shared_ptr &handle) = 0; + + virtual void NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos) = 0; + virtual void SetTranslateToDeviceIdCallback(const TranslateToDeviceIdCallback &callback) = 0; virtual int TranslateDeviceId(const std::string &deviceId, diff --git a/frameworks/libs/distributeddb/common/src/db_common.cpp b/frameworks/libs/distributeddb/common/src/db_common.cpp index c60311cbb5f..ac7dcea628a 100644 --- a/frameworks/libs/distributeddb/common/src/db_common.cpp +++ b/frameworks/libs/distributeddb/common/src/db_common.cpp @@ -439,4 +439,13 @@ bool DBCommon::CheckIsAlnumAndUnderscore(const std::string &text) }); return iter == text.end(); } + +std::string DBCommon::GenerateHashLabel(const DBInfo &dbInfo) +{ + if (dbInfo.syncDualTupleMode) { + return DBCommon::TransferHashString(dbInfo.appId + "-" + dbInfo.storeId); + } else { + return DBCommon::TransferHashString(dbInfo.userId + "-" + dbInfo.appId + "-" + dbInfo.storeId); + } +} } // namespace DistributedDB diff --git a/frameworks/libs/distributeddb/common/src/runtime_context_impl.cpp b/frameworks/libs/distributeddb/common/src/runtime_context_impl.cpp index 8c249fd702d..675b8c29b07 100644 --- a/frameworks/libs/distributeddb/common/src/runtime_context_impl.cpp +++ b/frameworks/libs/distributeddb/common/src/runtime_context_impl.cpp @@ -32,7 +32,8 @@ RuntimeContextImpl::RuntimeContextImpl() timeTickMonitor_(nullptr), systemApiAdapter_(nullptr), lockStatusObserver_(nullptr), - currentSessionId_(1) + currentSessionId_(1), + dbStatusAdapter_(nullptr) { } @@ -59,6 +60,7 @@ RuntimeContextImpl::~RuntimeContextImpl() delete lockStatusObserver_; lockStatusObserver_ = nullptr; userChangeMonitor_ = nullptr; + dbStatusAdapter_ = nullptr; SetThreadPool(nullptr); } @@ -96,6 +98,7 @@ int RuntimeContextImpl::SetCommunicatorAdapter(IAdapter *adapter) int RuntimeContextImpl::GetCommunicatorAggregator(ICommunicatorAggregator *&outAggregator) { outAggregator = nullptr; + const std::shared_ptr statusAdapter = GetDBStatusAdapter(); std::lock_guard lock(communicatorLock_); if (communicatorAggregator_ != nullptr) { outAggregator = communicatorAggregator_; @@ -113,7 +116,7 @@ int RuntimeContextImpl::GetCommunicatorAggregator(ICommunicatorAggregator *&outA return -E_OUT_OF_MEMORY; } - int errCode = communicatorAggregator_->Initialize(adapter_); + int errCode = communicatorAggregator_->Initialize(adapter_, statusAdapter); if (errCode != E_OK) { LOGE("CommunicatorAggregator init failed, err = %d!", errCode); RefObject::KillAndDecObjRef(communicatorAggregator_); @@ -758,6 +761,34 @@ void RuntimeContextImpl::StopTimeTickMonitorIfNeed() } } +void RuntimeContextImpl::SetDBInfoHandle(const std::shared_ptr &handle) +{ + std::shared_ptr dbStatusAdapter = GetDBStatusAdapter(); + if (dbStatusAdapter != nullptr) { + dbStatusAdapter->SetDBInfoHandle(handle); + } +} + +void RuntimeContextImpl::NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos) +{ + std::shared_ptr dbStatusAdapter = GetDBStatusAdapter(); + if (dbStatusAdapter != nullptr) { + dbStatusAdapter->NotifyDBInfos(devInfos, dbInfos); + } +} + +std::shared_ptr RuntimeContextImpl::GetDBStatusAdapter() +{ + std::lock_guard autoLock(statusAdapterMutex_); + if (dbStatusAdapter_ == nullptr) { + dbStatusAdapter_ = std::make_unique(); + } + if (dbStatusAdapter_ == nullptr) { + LOGE("[RuntimeContextImpl] DbStatusAdapter create failed!"); + } + return dbStatusAdapter_; +} + void RuntimeContextImpl::SetTranslateToDeviceIdCallback(const TranslateToDeviceIdCallback &callback) { std::lock_guard autoLock(translateToDeviceIdLock_); diff --git a/frameworks/libs/distributeddb/common/src/runtime_context_impl.h b/frameworks/libs/distributeddb/common/src/runtime_context_impl.h index 5dfc7187f33..0705af15b9c 100644 --- a/frameworks/libs/distributeddb/common/src/runtime_context_impl.h +++ b/frameworks/libs/distributeddb/common/src/runtime_context_impl.h @@ -21,6 +21,7 @@ #include #include "auto_launch.h" +#include "db_status_adapter.h" #include "evloop/src/ievent.h" #include "evloop/src/ievent_loop.h" #include "icommunicator_aggregator.h" @@ -131,6 +132,10 @@ public: void StopTimeTickMonitorIfNeed() override; + void SetDBInfoHandle(const std::shared_ptr &handle) override; + + void NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos) override; + void SetTranslateToDeviceIdCallback(const TranslateToDeviceIdCallback &callback) override; int TranslateDeviceId(const std::string &deviceId, @@ -155,6 +160,7 @@ private: int PrepareLoop(IEventLoop *&loop); int PrepareTaskPool(); int AllocTimerId(IEvent *evTimer, TimerId &timerId); + std::shared_ptr GetDBStatusAdapter(); int ScheduleTaskByThreadPool(const TaskAction &task) const; @@ -221,6 +227,9 @@ private: mutable std::shared_mutex permissionConditionLock_; PermissionConditionCallback permissionConditionCallback_; + mutable std::mutex statusAdapterMutex_; + std::shared_ptr dbStatusAdapter_; + mutable std::mutex translateToDeviceIdLock_; TranslateToDeviceIdCallback translateToDeviceIdCallback_; std::map> deviceIdCache_; // cache > diff --git a/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h b/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h index ed30b0d6eb6..c94c66d2eec 100644 --- a/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h +++ b/frameworks/libs/distributeddb/communicator/include/communicator_aggregator.h @@ -54,7 +54,7 @@ public: DISABLE_COPY_ASSIGN_MOVE(CommunicatorAggregator); // See ICommunicatorAggregator for detail - int Initialize(IAdapter *inAdapter) override; + int Initialize(IAdapter *inAdapter, const std::shared_ptr &statusAdapter) override; // Must not call any other functions if Finalize had been called. In fact, Finalize has no chance to be called. void Finalize() override; @@ -135,6 +135,12 @@ private: // Record the protocol version of remote target. void SetRemoteCommunicatorVersion(const std::string &target, uint16_t version); + void OnRemoteDBStatusChange(const std::string &devInfo, const std::vector &dbInfos); + + void NotifyConnectChange(const std::string &srcTarget, const std::map &changedLabels); + + void RegDBChangeCallback(); + void InitSendThread(); void SendOnceData(); @@ -184,6 +190,8 @@ private: Finalizer onConnectFinalizer_; mutable std::mutex onConnectMutex_; + std::shared_ptr dbStatusAdapter_; + std::atomic useExclusiveThread_ = false; bool sendTaskStart_ = false; mutable std::mutex scheduleSendTaskMutex_; diff --git a/frameworks/libs/distributeddb/communicator/include/db_status_adapter.h b/frameworks/libs/distributeddb/communicator/include/db_status_adapter.h new file mode 100644 index 00000000000..8d58470f6b9 --- /dev/null +++ b/frameworks/libs/distributeddb/communicator/include/db_status_adapter.h @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2022 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DB_STATUS_ADAPTER_H +#define DB_STATUS_ADAPTER_H + +#include "db_info_handle.h" +#include +#include "macro_utils.h" + +namespace DistributedDB { +using RemoteDBChangeCallback = std::function &dbInfos)>; +using LocalDBChangeCallback = std::function; +class DBStatusAdapter { +public: + DBStatusAdapter(); + ~DBStatusAdapter() = default; + DISABLE_COPY_ASSIGN_MOVE(DBStatusAdapter); + + void SetDBInfoHandle(const std::shared_ptr &dbInfoHandle); + bool ExistDBInfoHandle() const; + bool IsSupport(const std::string &devInfo); + int GetDBInfos(std::vector &dbInfos) const; + void SetDBStatusChangeCallback(const RemoteDBChangeCallback &remote, const LocalDBChangeCallback &local); + void NotifyDBInfos(const DeviceInfos &devInfos, const std::vector &dbInfos); + void TargetOffline(const std::string &device); +private: + std::shared_ptr GetDBInfoHandle() const; + bool LoadIntoCache(const DeviceInfos &devInfos, const std::vector &dbInfos, + std::vector &changeDbInfos); + void ClearAllCache(); + + static int GetLocalDeviceId(std::string &deviceId); + static bool IsLocalDeviceId(const std::string &deviceId); + static void MergeDBInfos(const std::vector &srcDbInfos, std::vector &dstDbInfos, + std::vector &changeDbInfos); + mutable std::mutex handleMutex_; + std::shared_ptr dbInfoHandle_ = nullptr; + + mutable std::mutex callbackMutex_; + RemoteDBChangeCallback remoteCallback_; + LocalDBChangeCallback localCallback_; + + mutable std::mutex localInfoMutex_; + std::vector localDBInfos_; + + mutable std::mutex remoteInfoMutex_; + std::map> remoteDBInfos_; + + mutable std::mutex supportMutex_; + std::map remoteSupportInfo_; +}; +} +#endif // DB_STATUS_ADAPTER_H diff --git a/frameworks/libs/distributeddb/communicator/include/icommunicator_aggregator.h b/frameworks/libs/distributeddb/communicator/include/icommunicator_aggregator.h index 06ae9e7c1e9..70ee63cf8f1 100644 --- a/frameworks/libs/distributeddb/communicator/include/icommunicator_aggregator.h +++ b/frameworks/libs/distributeddb/communicator/include/icommunicator_aggregator.h @@ -18,6 +18,7 @@ #include #include "communicator_type_define.h" +#include "db_status_adapter.h" #include "iadapter.h" #include "ref_object.h" @@ -32,7 +33,7 @@ public: // The caller is the owner of inAdapter and responsible for manage its lifecycle. // The ICommunicatorAggregator is only the user of inAdapter // If Initialize fail, the ICommunicatorAggregator will rollback what had done to inAdapter so it can be reuse. - virtual int Initialize(IAdapter *inAdapter) = 0; + virtual int Initialize(IAdapter *inAdapter, const std::shared_ptr &statusAdapter) = 0; // Call this method after Initialize successfully and before destroy the ICommunicatorAggregator // Emphasize again : DO NOT CALL Finalize IF Initialize FAIL. diff --git a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp index 0b1efd0abfa..aaa58387acf 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp +++ b/frameworks/libs/distributeddb/communicator/src/communicator_aggregator.cpp @@ -50,7 +50,7 @@ CommunicatorAggregator::~CommunicatorAggregator() commLinker_ = nullptr; } -int CommunicatorAggregator::Initialize(IAdapter *inAdapter) +int CommunicatorAggregator::Initialize(IAdapter *inAdapter, const std::shared_ptr &statusAdapter) { if (inAdapter == nullptr) { return -E_INVALID_ARGS; @@ -62,7 +62,7 @@ int CommunicatorAggregator::Initialize(IAdapter *inAdapter) scheduler_.Initialize(); int errCode; - commLinker_ = new (std::nothrow) CommunicatorLinker(this); + commLinker_ = new (std::nothrow) CommunicatorLinker(this, statusAdapter); if (commLinker_ == nullptr) { errCode = -E_OUT_OF_MEMORY; goto ROLL_BACK; @@ -83,6 +83,8 @@ int CommunicatorAggregator::Initialize(IAdapter *inAdapter) shutdown_ = false; InitSendThread(); + dbStatusAdapter_ = statusAdapter; + RegDBChangeCallback(); return E_OK; ROLL_BACK: UnRegCallbackFromAdapter(); @@ -127,6 +129,7 @@ void CommunicatorAggregator::Finalize() commLinker_ = nullptr; retainer_.Finalize(); combiner_.Finalize(); + dbStatusAdapter_ = nullptr; } ICommunicator *CommunicatorAggregator::AllocCommunicator(uint64_t commLabel, int &outErrorNo) @@ -611,24 +614,7 @@ int CommunicatorAggregator::OnCommLayerFrameReceive(const std::string &srcTarget LOGE("[CommAggr][CommReceive] Receive LabelExchange Fail."); return errCode; } - if (!commLinker_->IsRemoteTargetOnline(srcTarget)) { - LOGW("[CommAggr][CommReceive] Receive LabelExchange from offline target=%s{private}.", srcTarget.c_str()); - for (const auto &entry : changedLabels) { - LOGW("[CommAggr][CommReceive] REMEMBER: label=%.3s, inOnline=%d.", VEC_TO_STR(entry.first), - entry.second); - } - return E_OK; - } - // Do target change notify - std::lock_guard commMapLockGuard(commMapMutex_); - for (auto &entry : changedLabels) { - // Ignore nonactivated communicator - if (commMap_.count(entry.first) != 0 && commMap_.at(entry.first).second) { - LOGI("[CommAggr][CommReceive] label=%.3s, srcTarget=%s{private}, isOnline=%d.", - VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second); - commMap_.at(entry.first).first->OnConnectChange(srcTarget, entry.second); - } - } + NotifyConnectChange(srcTarget, changedLabels); } return E_OK; } @@ -763,6 +749,7 @@ void CommunicatorAggregator::UnRegCallbackFromAdapter() adapterHandle_->RegBytesReceiveCallback(nullptr, nullptr); adapterHandle_->RegTargetChangeCallback(nullptr, nullptr); adapterHandle_->RegSendableCallback(nullptr, nullptr); + dbStatusAdapter_->SetDBStatusChangeCallback(nullptr, nullptr); } void CommunicatorAggregator::GenerateLocalSourceId() @@ -874,6 +861,55 @@ std::shared_ptr CommunicatorAggregator::GetExtendHeaderHandl return adapterHandle_->GetExtendHeaderHandle(paramInfo); } +void CommunicatorAggregator::OnRemoteDBStatusChange(const std::string &devInfo, const std::vector &dbInfos) +{ + std::map changedLabels; + for (const auto &dbInfo: dbInfos) { + std::string label = DBCommon::GenerateHashLabel(dbInfo); + LabelType labelType(label.begin(), label.end()); + changedLabels[labelType] = dbInfo.isNeedSync; + } + if (commLinker_ != nullptr) { + commLinker_->UpdateOnlineLabels(devInfo, changedLabels); + } + NotifyConnectChange(devInfo, changedLabels); +} + +void CommunicatorAggregator::NotifyConnectChange(const std::string &srcTarget, + const std::map &changedLabels) +{ + if (!commLinker_->IsRemoteTargetOnline(srcTarget)) { + LOGW("[CommAggr][NotifyConnectChange] from offline target=%s{private}.", srcTarget.c_str()); + for (const auto &entry : changedLabels) { + LOGW("[CommAggr] REMEMBER: label=%s, inOnline=%d.", VEC_TO_STR(entry.first), entry.second); + } + } + // Do target change notify + std::lock_guard commMapLockGuard(commMapMutex_); + for (auto &entry : changedLabels) { + // Ignore nonactivated communicator + if (commMap_.count(entry.first) != 0 && commMap_.at(entry.first).second) { + LOGI("[CommAggr][NotifyConnectChange] label=%s, srcTarget=%s{private}, isOnline=%d.", + VEC_TO_STR(entry.first), srcTarget.c_str(), entry.second); + commMap_.at(entry.first).first->OnConnectChange(srcTarget, entry.second); + } + } +} + +void CommunicatorAggregator::RegDBChangeCallback() +{ + if (dbStatusAdapter_ != nullptr) { + dbStatusAdapter_->SetDBStatusChangeCallback( + [this](const std::string &devInfo, const std::vector &dbInfos) { + OnRemoteDBStatusChange(devInfo, dbInfos); + }, + [this](){ + if (commLinker_ != nullptr) { + (void)commLinker_->TriggerLabelExchangeEvent(); + } + }); + } +} void CommunicatorAggregator::InitSendThread() { if (RuntimeContext::GetInstance()->GetThreadPool() != nullptr) { diff --git a/frameworks/libs/distributeddb/communicator/src/communicator_linker.cpp b/frameworks/libs/distributeddb/communicator/src/communicator_linker.cpp index ede623f08af..48dd32acb86 100644 --- a/frameworks/libs/distributeddb/communicator/src/communicator_linker.cpp +++ b/frameworks/libs/distributeddb/communicator/src/communicator_linker.cpp @@ -14,7 +14,10 @@ */ #include "communicator_linker.h" + +#include #include "communicator_aggregator.h" +#include "db_common.h" #include "db_errno.h" #include "hash.h" #include "log_print.h" @@ -29,10 +32,12 @@ constexpr uint32_t RETRANSMIT_LIMIT = 20; // Currently we do at most 20 retransm constexpr uint32_t RETRANSMIT_LIMIT_EQUAL_INTERVAL = 5; // First 5 retransmission will be equal interval } -CommunicatorLinker::CommunicatorLinker(CommunicatorAggregator *inAggregator) +CommunicatorLinker::CommunicatorLinker(CommunicatorAggregator *inAggregator, + std::shared_ptr statusAdapter) : incSequenceId_(0), incAckTriggerId_(0) { aggregator_ = inAggregator; + statusAdapter_ = std::move(statusAdapter); RefObject::IncObjRef(aggregator_); // The linker rely on CommunicatorAggregator } @@ -40,6 +45,7 @@ CommunicatorLinker::~CommunicatorLinker() { RefObject::DecObjRef(aggregator_); // The linker no longer rely on CommunicatorAggregator aggregator_ = nullptr; + statusAdapter_ = nullptr; } void CommunicatorLinker::Initialize() @@ -75,6 +81,9 @@ int CommunicatorLinker::TargetOnline(const std::string &inTarget, std::set