diff --git a/cmake/common.cmake b/cmake/common.cmake index 546ce4e81f0..a1dea572ed2 100644 --- a/cmake/common.cmake +++ b/cmake/common.cmake @@ -200,6 +200,7 @@ function(_ydb_sdk_add_library Tgt) ) target_compile_definitions(${Tgt} ${includeMode} YDB_SDK_USE_STD_STRING + YDB_TOPIC_DISABLE_COUNTERS ) endfunction() diff --git a/include/ydb-cpp-sdk/client/extension_common/extension.h b/include/ydb-cpp-sdk/client/extension_common/extension.h index 23861eecdc7..047142bb85d 100644 --- a/include/ydb-cpp-sdk/client/extension_common/extension.h +++ b/include/ydb-cpp-sdk/client/extension_common/extension.h @@ -65,8 +65,9 @@ void TDriver::AddExtension(typename TExtension::TParams params) { typename TExtension::IApi* api = TExtension::IApi::Create(*this); auto extension = new TExtension(params, api); extension->SelfRegister(*this); - if (api) + if (api) { api->SelfRegister(*this); + } } } // namespace NYdb diff --git a/include/ydb-cpp-sdk/client/extensions/solomon_stats/pull_client.h b/include/ydb-cpp-sdk/client/extensions/solomon_stats/pull_client.h index c2f1775f6a3..f27f0da6f11 100644 --- a/include/ydb-cpp-sdk/client/extensions/solomon_stats/pull_client.h +++ b/include/ydb-cpp-sdk/client/extensions/solomon_stats/pull_client.h @@ -18,23 +18,22 @@ class TSolomonStatPullExtension: public NYdb::IExtension { friend class TSolomonStatPullExtension; public: - TParams(const std::string& host - , ui16 port - , const std::string& project - , const std::string& service - , const std::string& cluster - , const std::vector>& labels = {}); + TParams(const std::string& host, + std::uint16_t port, + const std::string& project, + const std::string& service, + const std::string& cluster, + const std::vector>& labels = {}); NMonitoring::TLabels GetLabels() const; private: const std::string Host_; - ui16 Port_; + std::uint16_t Port_; NMonitoring::TLabels Labels_; }; TSolomonStatPullExtension(const TParams& params, IApi* api); - ~TSolomonStatPullExtension(); private: class TSolomonStatPage: public NMonitoring::IMonPage { diff --git a/include/ydb-cpp-sdk/client/extensions/solomon_stats/pull_connector.h b/include/ydb-cpp-sdk/client/extensions/solomon_stats/pull_connector.h index a33121063b9..62b8e63d966 100644 --- a/include/ydb-cpp-sdk/client/extensions/solomon_stats/pull_connector.h +++ b/include/ydb-cpp-sdk/client/extensions/solomon_stats/pull_connector.h @@ -45,24 +45,30 @@ class TMetricRegistryConnector: public NYdb::IExtension { }; inline void AddMetricRegistry(NYdb::TDriver& driver, NMonitoring::IMetricRegistry* ptr) { - if (!ptr) + if (!ptr) { return; + } + using TConnector = TMetricRegistryConnector; driver.AddExtension(TConnector::TParams(ptr)); } inline void AddMetricRegistry(NYdb::TDriver& driver, std::shared_ptr ptr) { - if (!ptr) + if (!ptr) { return; + } + using TConnector = TMetricRegistryConnector>; driver.AddExtension(TConnector::TParams(ptr)); } inline void AddMetricRegistry(NYdb::TDriver& driver, TAtomicSharedPtr ptr) { - if (!ptr) + if (!ptr) { return; + } + using TConnector = TMetricRegistryConnector>; driver.AddExtension(TConnector::TParams(ptr)); diff --git a/include/ydb-cpp-sdk/client/federated_topic/federated_topic.h b/include/ydb-cpp-sdk/client/federated_topic/federated_topic.h index 5bf3f4c4ede..ab3eb910dcb 100644 --- a/include/ydb-cpp-sdk/client/federated_topic/federated_topic.h +++ b/include/ydb-cpp-sdk/client/federated_topic/federated_topic.h @@ -479,8 +479,10 @@ class IFederatedReadSession { //! TSessionClosedEvent arrives. virtual bool Close(TDuration timeout = TDuration::Max()) = 0; +#ifndef YDB_TOPIC_DISABLE_COUNTERS //! Reader counters with different stats (see TReaderConuters). virtual NTopic::TReaderCounters::TPtr GetCounters() const = 0; +#endif //! Get unique identifier of read session. virtual std::string GetSessionId() const = 0; diff --git a/include/ydb-cpp-sdk/client/topic/read_session.h b/include/ydb-cpp-sdk/client/topic/read_session.h index b7d4e698411..7d815a5e37d 100644 --- a/include/ydb-cpp-sdk/client/topic/read_session.h +++ b/include/ydb-cpp-sdk/client/topic/read_session.h @@ -1,10 +1,13 @@ #pragma once -#include "counters.h" #include "executor.h" #include "read_events.h" #include "retry_policy.h" +#ifndef YDB_TOPIC_DISABLE_COUNTERS +#include "counters.h" +#endif + #include #include #include @@ -183,10 +186,12 @@ struct TReadSessionSettings: public TRequestSettings { //! If not set, default executor will be used. FLUENT_SETTING(IExecutor::TPtr, DecompressionExecutor); +#ifndef YDB_TOPIC_DISABLE_COUNTERS //! Counters. //! If counters are not provided explicitly, //! they will be created inside session (without link with parent counters). FLUENT_SETTING(TReaderCounters::TPtr, Counters); +#endif FLUENT_SETTING_DEFAULT(TDuration, ConnectTimeout, TDuration::Seconds(30)); @@ -244,8 +249,10 @@ class IReadSession { //! TSessionClosedEvent arrives. virtual bool Close(TDuration timeout = TDuration::Max()) = 0; +#ifndef YDB_TOPIC_DISABLE_COUNTERS //! Reader counters with different stats (see TReaderConuters). virtual TReaderCounters::TPtr GetCounters() const = 0; +#endif //! Get unique identifier of read session. virtual std::string GetSessionId() const = 0; diff --git a/include/ydb-cpp-sdk/client/topic/write_session.h b/include/ydb-cpp-sdk/client/topic/write_session.h index 68525b87fef..50fa2af71b5 100644 --- a/include/ydb-cpp-sdk/client/topic/write_session.h +++ b/include/ydb-cpp-sdk/client/topic/write_session.h @@ -1,11 +1,14 @@ #pragma once #include "codecs.h" -#include "counters.h" #include "executor.h" #include "retry_policy.h" #include "write_events.h" +#ifndef YDB_TOPIC_DISABLE_COUNTERS +#include "counters.h" +#endif + #include #include #include @@ -91,7 +94,9 @@ struct TWriteSessionSettings : public TRequestSettings { FLUENT_SETTING_DEFAULT(TDuration, ConnectTimeout, TDuration::Seconds(30)); +#ifndef YDB_TOPIC_DISABLE_COUNTERS FLUENT_SETTING_OPTIONAL(TWriterCounters::TPtr, Counters); +#endif //! Executor for compression tasks. //! If not set, default executor will be used. @@ -222,7 +227,9 @@ class ISimpleBlockingWriteSession : public TThrRefBase { //! Returns true if write session is alive and acitve. False if session was closed. virtual bool IsAlive() const = 0; +#ifndef YDB_TOPIC_DISABLE_COUNTERS virtual TWriterCounters::TPtr GetCounters() = 0; +#endif //! Close immediately and destroy, don't wait for anything. virtual ~ISimpleBlockingWriteSession() = default; @@ -269,8 +276,10 @@ class IWriteSession { //! Return true if all writes were completed and acked, false if timeout was reached and some writes were aborted. virtual bool Close(TDuration closeTimeout = TDuration::Max()) = 0; +#ifndef YDB_TOPIC_DISABLE_COUNTERS //! Writer counters with different stats (see TWriterConuters). virtual TWriterCounters::TPtr GetCounters() = 0; +#endif //! Close() with timeout = 0 and destroy everything instantly. virtual ~IWriteSession() = default; diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index e7f448e8675..c9762f0b14e 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -12,6 +12,7 @@ add_subdirectory(iam) add_subdirectory(iam_private) add_subdirectory(impl) add_subdirectory(import) +add_subdirectory(metrics_providers) add_subdirectory(monitoring) add_subdirectory(operation) add_subdirectory(params) diff --git a/src/client/extension_common/CMakeLists.txt b/src/client/extension_common/CMakeLists.txt index e099b35a031..0ec5f4f1c96 100644 --- a/src/client/extension_common/CMakeLists.txt +++ b/src/client/extension_common/CMakeLists.txt @@ -3,6 +3,7 @@ _ydb_sdk_add_library(client-extension_common) target_link_libraries(client-extension_common PUBLIC yutil monlib-metrics + client-metrics_providers-monlib client-ydb_driver ) diff --git a/src/client/extension_common/extension.cpp b/src/client/extension_common/extension.cpp index 98d3fcfe6df..8579ddbb169 100644 --- a/src/client/extension_common/extension.cpp +++ b/src/client/extension_common/extension.cpp @@ -2,9 +2,11 @@ #define INCLUDE_YDB_INTERNAL_H #include -#include #undef INCLUDE_YDB_INTERNAL_H +#include + + namespace NYdb::inline V3 { void IExtension::SelfRegister(TDriver driver) { @@ -15,6 +17,34 @@ void IExtensionApi::SelfRegister(TDriver driver) { CreateInternalInterface(driver)->RegisterExtensionApi(this); } +class TStatsExtractor: public NSdkStats::IStatApi { +public: + TStatsExtractor(std::shared_ptr client) + : Client_(client) + {} + + virtual void SetMetricRegistry(::NMonitoring::IMetricRegistry* sensorsRegistry) override { + auto strong = Client_.lock(); + if (strong) { + strong->StartStatCollecting(NMetrics::CreateMonlibMetricsProvider(sensorsRegistry)); + } + } + + void Accept(NMonitoring::IMetricConsumer* consumer) const override { + auto strong = Client_.lock(); + if (strong) { + auto sensorsRegistry = strong->GetMetricRegistry(); + auto monlibRegistry = NMetrics::TryGetUnderlyingMetricsRegistry(sensorsRegistry); + Y_ABORT_UNLESS(monlibRegistry, "IMetricRegistry is not a TMonlibMetricsProvider in Stats Extractor"); + monlibRegistry->Accept(TInstant::Zero(), consumer); + } else { + throw NSdkStats::DestroyedClientException(); + } + } +private: + std::weak_ptr Client_; +}; + namespace NSdkStats { IStatApi* IStatApi::Create(TDriver driver) { diff --git a/src/client/extensions/solomon_stats/pull_client.cpp b/src/client/extensions/solomon_stats/pull_client.cpp index fbb6129061a..42e4ca44a34 100644 --- a/src/client/extensions/solomon_stats/pull_client.cpp +++ b/src/client/extensions/solomon_stats/pull_client.cpp @@ -2,19 +2,21 @@ namespace NSolomonStatExtension::inline V3 { -TSolomonStatPullExtension::TParams::TParams(const std::string& host - , ui16 port - , const std::string& project - , const std::string& service - , const std::string& cluster - , const std::vector>& labels) - : Host_(host), Port_(port), Labels_() +TSolomonStatPullExtension::TParams::TParams(const std::string& host, + std::uint16_t port, + const std::string& project, + const std::string& service, + const std::string& cluster, + const std::vector>& labels) + : Host_(host) + , Port_(port) + , Labels_() { Labels_.Add("project", project); Labels_.Add("service", service); Labels_.Add("cluster", cluster); for (const auto& label: labels) { - Labels_.Add(label.first, label.second); + Labels_.Add(label.first, label.second); } } @@ -22,10 +24,10 @@ NMonitoring::TLabels TSolomonStatPullExtension::TParams::GetLabels() const { return Labels_; } - TSolomonStatPullExtension::TSolomonStatPage::TSolomonStatPage(const std::string& title, const std::string& path, IApi* api) - : NMonitoring::IMonPage(TString(title), TString(path)), Api_(api) - { } + : NMonitoring::IMonPage(TString(title), TString(path)) + , Api_(api) +{} void TSolomonStatPullExtension::TSolomonStatPage::Output(NMonitoring::IMonHttpRequest& request) { request.Output() << NMonitoring::HTTPOKJSON; @@ -35,13 +37,12 @@ void TSolomonStatPullExtension::TSolomonStatPage::Output(NMonitoring::IMonHttpRe TSolomonStatPullExtension::TSolomonStatPullExtension(const TSolomonStatPullExtension::TParams& params, IApi* api) : MetricRegistry_(new NMonitoring::TMetricRegistry(params.GetLabels())) - , MonService_(params.Port_, TString(params.Host_), 0), Page_( new TSolomonStatPage("stats", "Statistics", api) ) { - api->SetMetricRegistry(MetricRegistry_.get()); - MonService_.Register(Page_); - MonService_.StartOrThrow(); - } - -TSolomonStatPullExtension::~TSolomonStatPullExtension() - { } + , MonService_(params.Port_, TString(params.Host_), 0) + , Page_(new TSolomonStatPage("stats", "Statistics", api)) +{ + api->SetMetricRegistry(MetricRegistry_.get()); + MonService_.Register(Page_); + MonService_.StartOrThrow(); +} } // NSolomonStatExtension diff --git a/src/client/federated_topic/impl/federated_read_session.h b/src/client/federated_topic/impl/federated_read_session.h index c93b9722bda..fee6ae8b9e4 100644 --- a/src/client/federated_topic/impl/federated_read_session.h +++ b/src/client/federated_topic/impl/federated_read_session.h @@ -151,9 +151,11 @@ class TFederatedReadSessionImpl : public NTopic::TEnableSelfContextGetSessionId(); } +#ifndef YDB_TOPIC_DISABLE_COUNTERS inline NTopic::TReaderCounters::TPtr GetCounters() const override { return TryGetImpl()->GetCounters(); } +#endif private: void Start() { diff --git a/src/client/federated_topic/impl/federated_write_session.h b/src/client/federated_topic/impl/federated_write_session.h index 31c0ed289dc..1cbd80a66d7 100644 --- a/src/client/federated_topic/impl/federated_write_session.h +++ b/src/client/federated_topic/impl/federated_write_session.h @@ -196,8 +196,9 @@ class TFederatedWriteSession : public NTopic::IWriteSession, bool Close(TDuration timeout) override { return TryGetImpl()->Close(timeout); } - +#ifndef YDB_TOPIC_DISABLE_COUNTERS inline NTopic::TWriterCounters::TPtr GetCounters() override {Y_ABORT("Unimplemented"); } //ToDo - unimplemented; +#endif private: diff --git a/src/client/impl/endpoints/endpoints.cpp b/src/client/impl/endpoints/endpoints.cpp index 670874a45e3..edadc808d25 100644 --- a/src/client/impl/endpoints/endpoints.cpp +++ b/src/client/impl/endpoints/endpoints.cpp @@ -1,6 +1,5 @@ #include "endpoints.h" -#include #include #include @@ -80,7 +79,7 @@ std::vector TEndpointElectorSafe::SetNewState(std::vector + #include #include #include #include #include -#include namespace NYdb::inline V3 { @@ -124,9 +125,10 @@ class TEndpointElectorSafe { std::unordered_map KnownEndpointsByNodeId_; std::int32_t BestK_ = -1; std::atomic_int PessimizationRatio_ = 0; - NSdkStats::TAtomicCounter<::NMonitoring::TIntGauge> EndpointCountGauge_; - NSdkStats::TAtomicCounter<::NMonitoring::TIntGauge> PessimizationRatioGauge_; - NSdkStats::TAtomicCounter<::NMonitoring::TIntGauge> EndpointActiveGauge_; + + NSdkStats::TAtomicCounter EndpointCountGauge_; + NSdkStats::TAtomicCounter PessimizationRatioGauge_; + NSdkStats::TAtomicCounter EndpointActiveGauge_; }; // Used to track object diff --git a/src/client/impl/internal/db_driver_state/state.cpp b/src/client/impl/internal/db_driver_state/state.cpp index ffaaf2f0e11..98f76c5020c 100644 --- a/src/client/impl/internal/db_driver_state/state.cpp +++ b/src/client/impl/internal/db_driver_state/state.cpp @@ -271,7 +271,7 @@ NThreading::TFuture TDbDriverStateTracker::SendNotification( return NThreading::WaitExceptionOrAll(results); } -void TDbDriverStateTracker::SetMetricRegistry(NMonitoring::TMetricRegistry *sensorsRegistry) { +void TDbDriverStateTracker::SetMetricRegistry(std::shared_ptr sensorsRegistry) { std::vector> states; { std::shared_lock lock(Lock_); diff --git a/src/client/impl/internal/db_driver_state/state.h b/src/client/impl/internal/db_driver_state/state.h index ce72760bf65..f44c7ec1281 100644 --- a/src/client/impl/internal/db_driver_state/state.h +++ b/src/client/impl/internal/db_driver_state/state.h @@ -2,10 +2,11 @@ #include "endpoint_pool.h" -#include +#include +#include #include -#include + #include namespace NYdb::inline V3 { @@ -97,7 +98,7 @@ class TDbDriverStateTracker { ); NThreading::TFuture SendNotification( TDbDriverState::ENotifyType type); - void SetMetricRegistry(::NMonitoring::TMetricRegistry *sensorsRegistry); + void SetMetricRegistry(std::shared_ptr sensorsRegistry); private: IInternalClient* DiscoveryClient_; std::unordered_map, TStateKeyHash> States_; diff --git a/src/client/impl/internal/grpc_connections/grpc_connections.cpp b/src/client/impl/internal/grpc_connections/grpc_connections.cpp index aa5bfc21b4d..492116eb50b 100644 --- a/src/client/impl/internal/grpc_connections/grpc_connections.cpp +++ b/src/client/impl/internal/grpc_connections/grpc_connections.cpp @@ -386,25 +386,21 @@ TBalancingPolicy::TImpl TGRpcConnectionsImpl::GetBalancingSettings() const { return BalancingSettings_; } -bool TGRpcConnectionsImpl::StartStatCollecting(NMonitoring::IMetricRegistry* sensorsRegistry) { +bool TGRpcConnectionsImpl::StartStatCollecting(std::shared_ptr metricsProvider) { { std::lock_guard lock(ExtensionsLock_); if (MetricRegistryPtr_) { return false; } - if (auto ptr = dynamic_cast(sensorsRegistry)) { - MetricRegistryPtr_ = ptr; - } else { - std::cerr << "Unknown IMetricRegistry impl" << std::endl; - return false; - } + + MetricRegistryPtr_ = std::move(metricsProvider); } StateTracker_.SetMetricRegistry(MetricRegistryPtr_); return true; } -NMonitoring::TMetricRegistry* TGRpcConnectionsImpl::GetMetricRegistry() { +std::shared_ptr TGRpcConnectionsImpl::GetMetricRegistry() { std::lock_guard lock(ExtensionsLock_); return MetricRegistryPtr_; } diff --git a/src/client/impl/internal/grpc_connections/grpc_connections.h b/src/client/impl/internal/grpc_connections/grpc_connections.h index 8008cb7c834..99f5c40c313 100644 --- a/src/client/impl/internal/grpc_connections/grpc_connections.h +++ b/src/client/impl/internal/grpc_connections/grpc_connections.h @@ -646,8 +646,8 @@ class TGRpcConnectionsImpl bool GetDrainOnDtors() const; TBalancingPolicy::TImpl GetBalancingSettings() const override; - bool StartStatCollecting(::NMonitoring::IMetricRegistry* sensorsRegistry) override; - ::NMonitoring::TMetricRegistry* GetMetricRegistry() override; + bool StartStatCollecting(std::shared_ptr metricsProvider) override; + std::shared_ptr GetMetricRegistry() override; void RegisterExtension(IExtension* extension); void RegisterExtensionApi(IExtensionApi* api); void SetDiscoveryMutator(IDiscoveryMutatorApi::TMutatorCb&& cb); @@ -748,7 +748,7 @@ class TGRpcConnectionsImpl private: std::mutex ExtensionsLock_; - ::NMonitoring::TMetricRegistry* MetricRegistryPtr_ = nullptr; + std::shared_ptr MetricRegistryPtr_; const size_t ClientThreadsNum_; std::unique_ptr ResponseQueue_; diff --git a/src/client/impl/internal/internal_client/client.h b/src/client/impl/internal/internal_client/client.h index f53015355aa..6acde432a52 100644 --- a/src/client/impl/internal/internal_client/client.h +++ b/src/client/impl/internal/internal_client/client.h @@ -1,24 +1,25 @@ #pragma once -#include +#include +#include #include -#include + #include #include #include -namespace NMonitoring { - class IMetricRegistry; - class TMetricRegistry; -} namespace NYdb::inline V3 { class TDbDriverState; struct TListEndpointsResult; +namespace NMetrics { + class IMetricsProvider; +} + class IInternalClient { public: virtual NThreading::TFuture GetEndpoints(std::shared_ptr dbState) = 0; @@ -27,8 +28,8 @@ class IInternalClient { virtual void DeleteChannels(const std::vector& endpoints) = 0; #endif virtual TBalancingPolicy::TImpl GetBalancingSettings() const = 0; - virtual bool StartStatCollecting(::NMonitoring::IMetricRegistry* sensorsRegistry) = 0; - virtual ::NMonitoring::TMetricRegistry* GetMetricRegistry() = 0; + virtual bool StartStatCollecting(std::shared_ptr metricsProvider) = 0; + virtual std::shared_ptr GetMetricRegistry() = 0; virtual const TLog& GetLog() const = 0; }; diff --git a/src/client/impl/internal/metrics/metrics.h b/src/client/impl/internal/metrics/metrics.h new file mode 100644 index 00000000000..34879abe880 --- /dev/null +++ b/src/client/impl/internal/metrics/metrics.h @@ -0,0 +1,62 @@ +#pragma once + +#include +#include +#include + +#include + + +namespace NYdb::inline V3::NMetrics { + +struct TLabel { + std::string Name; + std::string Value; +}; + +using TLabels = std::vector; +using TBucketBounds = std::vector; + +class IIntGauge { +public: + virtual void Add(std::int64_t value) = 0; + virtual void Set(std::int64_t value) = 0; + + virtual ~IIntGauge() = default; +}; + +class ICounter { +public: + virtual void Add(std::uint64_t value) = 0; + + virtual ~ICounter() = default; +}; + +class IRate { +public: + virtual void Add(std::uint64_t value) = 0; + + virtual ~IRate() = default; +}; + +class IHistogram { +public: + virtual void Record(double value) = 0; + virtual ~IHistogram() = default; +}; + +class IMetricsProvider { +public: + virtual std::shared_ptr IntGauge(TLabels labels) = 0; + virtual std::shared_ptr Counter(TLabels labels) = 0; + + virtual std::shared_ptr Rate(TLabels labels) = 0; + + virtual std::shared_ptr HistogramCounter(TLabels labels, TBucketBounds bounds) = 0; + + virtual std::shared_ptr HistogramRate(TLabels labels, TBucketBounds bounds) = 0; + + virtual ~IMetricsProvider() = default; +}; + +} diff --git a/src/client/impl/internal/stats_extractor/extractor.h b/src/client/impl/internal/stats_extractor/extractor.h deleted file mode 100644 index 6ebc84a8f65..00000000000 --- a/src/client/impl/internal/stats_extractor/extractor.h +++ /dev/null @@ -1,44 +0,0 @@ -#pragma once - -#include - -#include - -#include - -#include - -namespace NYdb::inline V3 { - -class TStatsExtractor: public NSdkStats::IStatApi { -public: - - TStatsExtractor(std::shared_ptr client) - : Client_(client) - { } - - virtual void SetMetricRegistry(::NMonitoring::IMetricRegistry* sensorsRegistry) override { - auto strong = Client_.lock(); - if (strong) { - strong->StartStatCollecting(sensorsRegistry); - } else { - return; - } - } - - void Accept(NMonitoring::IMetricConsumer* consumer) const override { - - auto strong = Client_.lock(); - if (strong) { - auto sensorsRegistry = strong->GetMetricRegistry(); - Y_ABORT_UNLESS(sensorsRegistry, "TMetricRegistry is null in Stats Extractor"); - sensorsRegistry->Accept(TInstant::Zero(), consumer); - } else { - throw NSdkStats::DestroyedClientException(); - } - } -private: - std::weak_ptr Client_; -}; - -} // namespace NYdb diff --git a/src/client/impl/session/session_pool.cpp b/src/client/impl/session/session_pool.cpp index e91861a4a97..6580c236733 100644 --- a/src/client/impl/session/session_pool.cpp +++ b/src/client/impl/session/session_pool.cpp @@ -14,7 +14,7 @@ namespace NSessionPool { using namespace NThreading; -constexpr ui64 KEEP_ALIVE_RANDOM_FRACTION = 4; +constexpr std::uint64_t KEEP_ALIVE_RANDOM_FRACTION = 4; static const TStatus CLIENT_RESOURCE_EXHAUSTED_ACTIVE_SESSION_LIMIT = TStatus( TPlainStatus( EStatus::CLIENT_RESOURCE_EXHAUSTED, @@ -33,10 +33,10 @@ TStatus GetStatus(const TStatus& status) { TDuration RandomizeThreshold(TDuration duration) { TDuration::TValue value = duration.GetValue(); if (KEEP_ALIVE_RANDOM_FRACTION) { - const i64 randomLimit = value / KEEP_ALIVE_RANDOM_FRACTION; + const std::int64_t randomLimit = value / KEEP_ALIVE_RANDOM_FRACTION; if (randomLimit < 2) return duration; - value += static_cast(RandomNumber(randomLimit)); + value += static_cast(RandomNumber(randomLimit)); } return TDuration::FromValue(value); } @@ -53,7 +53,7 @@ bool IsSessionCloseRequested(const TStatus& status) { return false; } -TSessionPool::TWaitersQueue::TWaitersQueue(ui32 maxQueueSize, TDuration maxWaitSessionTimeout) +TSessionPool::TWaitersQueue::TWaitersQueue(std::uint32_t maxQueueSize, TDuration maxWaitSessionTimeout) : MaxQueueSize_(maxQueueSize) , MaxWaitSessionTimeout_(maxWaitSessionTimeout) { @@ -89,12 +89,12 @@ void TSessionPool::TWaitersQueue::GetOld(TInstant now, std::vector weakC return periodicCb; } -i64 TSessionPool::GetActiveSessions() const { +std::int64_t TSessionPool::GetActiveSessions() const { std::lock_guard guard(Mtx_); return ActiveSessions_; } -i64 TSessionPool::GetActiveSessionsLimit() const { +std::int64_t TSessionPool::GetActiveSessionsLimit() const { return MaxActiveSessions_; } -i64 TSessionPool::GetCurrentPoolSize() const { +std::int64_t TSessionPool::GetCurrentPoolSize() const { std::lock_guard guard(Mtx_); return Sessions_.size(); } diff --git a/src/client/impl/session/session_pool.h b/src/client/impl/session/session_pool.h index 9533460efd3..40767c3057b 100644 --- a/src/client/impl/session/session_pool.h +++ b/src/client/impl/session/session_pool.h @@ -22,7 +22,7 @@ class IGetSessionCtx : private TNonCopyable { //How often run session pool keep alive check constexpr TDuration PERIODIC_ACTION_INTERVAL = TDuration::Seconds(5); constexpr TDuration MAX_WAIT_SESSION_TIMEOUT = TDuration::Seconds(5); //Max time to wait session -constexpr ui64 PERIODIC_ACTION_BATCH_SIZE = 10; //Max number of tasks to perform during one interval +constexpr std::uint64_t PERIODIC_ACTION_BATCH_SIZE = 10; //Max number of tasks to perform during one interval constexpr TDuration CREATE_SESSION_INTERNAL_TIMEOUT = TDuration::Seconds(2); //Timeout for createSession call inside session pool TStatus GetStatus(const TOperation& operation); @@ -87,24 +87,24 @@ class TSessionPool : public IServerCloseHandler { private: class TWaitersQueue { public: - TWaitersQueue(ui32 maxQueueSize, TDuration maxWaitSessionTimeout=MAX_WAIT_SESSION_TIMEOUT); + TWaitersQueue(std::uint32_t maxQueueSize, TDuration maxWaitSessionTimeout=MAX_WAIT_SESSION_TIMEOUT); // returns true and gets ownership if queue size less than limit // otherwise returns false and doesn't not touch ctx bool TryPush(std::unique_ptr& p); std::unique_ptr TryGet(); void GetOld(TInstant now, std::vector>& oldWaiters); - ui32 Size() const; + std::uint32_t Size() const; private: - const ui32 MaxQueueSize_; + const std::uint32_t MaxQueueSize_; const TDuration MaxWaitSessionTimeout_; std::multimap> Waiters_; }; public: using TKeepAliveCmd = std::function; using TDeletePredicate = std::function; - TSessionPool(ui32 maxActiveSessions); + TSessionPool(std::uint32_t maxActiveSessions); // Extracts session from pool or creates new one ising given ctx void GetSession(std::unique_ptr ctx); @@ -117,9 +117,9 @@ class TSessionPool : public IServerCloseHandler { bool CheckAndFeedWaiterNewSession(bool active); TPeriodicCb CreatePeriodicTask(std::weak_ptr weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate); - i64 GetActiveSessions() const; - i64 GetActiveSessionsLimit() const; - i64 GetCurrentPoolSize() const; + std::int64_t GetActiveSessions() const; + std::int64_t GetActiveSessionsLimit() const; + std::int64_t GetCurrentPoolSize() const; void DecrementActiveCounter(); void IncrementActiveCounterUnsafe(); @@ -138,12 +138,12 @@ class TSessionPool : public IServerCloseHandler { std::multimap> Sessions_; TWaitersQueue WaitersQueue_; - i64 ActiveSessions_; - const ui32 MaxActiveSessions_; + std::int64_t ActiveSessions_; + const std::uint32_t MaxActiveSessions_; NSdkStats::TSessionCounter ActiveSessionsCounter_; NSdkStats::TSessionCounter InPoolSessionsCounter_; NSdkStats::TSessionCounter SessionWaiterCounter_; - NSdkStats::TAtomicCounter<::NMonitoring::TRate> FakeSessionsCounter_; + NSdkStats::TAtomicCounter FakeSessionsCounter_; }; } diff --git a/src/client/impl/stats/stats.cpp b/src/client/impl/stats/stats.cpp index 4139ba9dd93..bf46744ff0d 100644 --- a/src/client/impl/stats/stats.cpp +++ b/src/client/impl/stats/stats.cpp @@ -5,39 +5,74 @@ namespace NYdb::inline V3 { namespace NSdkStats { -using std::string; +const NMetrics::TLabel SESSIONS_ON_KQP_HOST_LABEL = NMetrics::TLabel {"sensor", "SessionsByYdbHost"}; +const NMetrics::TLabel TRANSPORT_ERRORS_BY_HOST_LABEL = NMetrics::TLabel {"sensor", "TransportErrorsByYdbHost"}; +const NMetrics::TLabel GRPC_INFLIGHT_BY_HOST_LABEL = NMetrics::TLabel {"sensor", "Grpc/InFlightByYdbHost"}; -const NMonitoring::TLabel SESSIONS_ON_KQP_HOST_LABEL = NMonitoring::TLabel {"sensor", "SessionsByYdbHost"}; -const NMonitoring::TLabel TRANSPORT_ERRORS_BY_HOST_LABEL = NMonitoring::TLabel {"sensor", "TransportErrorsByYdbHost"}; -const NMonitoring::TLabel GRPC_INFLIGHT_BY_HOST_LABEL = NMonitoring::TLabel {"sensor", "Grpc/InFlightByYdbHost"}; +std::string UnderscoreToUpperCamel(const std::string& in) { + std::string result; + result.reserve(in.size()); -void TStatCollector::IncSessionsOnHost(const string& host) { - if (TMetricRegistry* ptr = MetricRegistryPtr_.Get()) { - ptr->IntGauge({ DatabaseLabel_, SESSIONS_ON_KQP_HOST_LABEL, {"YdbHost", host} })->Inc(); + if (in.empty()) { + return {}; + } + + result.push_back(toupper(in[0])); + + size_t i = 1; + + while (i < in.size()) { + if (in[i] == '_') { + if (++i < in.size()) { + result.push_back(toupper(in[i++])); + } else { + break; + } + } else { + result.push_back(tolower(in[i++])); + } + } + return result; +} + +NMetrics::TBucketBounds ExponentialHistogram(std::uint32_t bucketsCount, double base, double scale) { + NMetrics::TBucketBounds bounds; + bounds.reserve(bucketsCount); + + for (std::uint32_t i = 0; i < bucketsCount; ++i) { + bounds.push_back(scale * std::pow(base, i)); + } + + return bounds; +} + +void TStatCollector::IncSessionsOnHost(const std::string& host) { + if (auto ptr = MetricRegistryPtr_.Get()) { + ptr->IntGauge({ DatabaseLabel_, SESSIONS_ON_KQP_HOST_LABEL, {"YdbHost", host} })->Add(1); } } -void TStatCollector::DecSessionsOnHost(const string& host) { - if (TMetricRegistry* ptr = MetricRegistryPtr_.Get()) { - ptr->IntGauge({ DatabaseLabel_, SESSIONS_ON_KQP_HOST_LABEL, {"YdbHost", host} })->Dec(); +void TStatCollector::DecSessionsOnHost(const std::string& host) { + if (auto ptr = MetricRegistryPtr_.Get()) { + ptr->IntGauge({ DatabaseLabel_, SESSIONS_ON_KQP_HOST_LABEL, {"YdbHost", host} })->Add(-1); } } -void TStatCollector::IncTransportErrorsByHost(const string& host) { - if (TMetricRegistry* ptr = MetricRegistryPtr_.Get()) { - ptr->Rate({ DatabaseLabel_, TRANSPORT_ERRORS_BY_HOST_LABEL, {"YdbHost", host} })->Inc(); +void TStatCollector::IncTransportErrorsByHost(const std::string& host) { + if (auto ptr = MetricRegistryPtr_.Get()) { + ptr->Rate({ DatabaseLabel_, TRANSPORT_ERRORS_BY_HOST_LABEL, {"YdbHost", host} })->Add(1); } } -void TStatCollector::IncGRpcInFlightByHost(const string& host) { - if (TMetricRegistry* ptr = MetricRegistryPtr_.Get()) { - ptr->IntGauge({ DatabaseLabel_, GRPC_INFLIGHT_BY_HOST_LABEL, {"YdbHost", host} })->Inc(); +void TStatCollector::IncGRpcInFlightByHost(const std::string& host) { + if (auto ptr = MetricRegistryPtr_.Get()) { + ptr->IntGauge({ DatabaseLabel_, GRPC_INFLIGHT_BY_HOST_LABEL, {"YdbHost", host} })->Add(1); } } -void TStatCollector::DecGRpcInFlightByHost(const string& host) { - if (TMetricRegistry* ptr = MetricRegistryPtr_.Get()) { - ptr->IntGauge({ DatabaseLabel_, GRPC_INFLIGHT_BY_HOST_LABEL, {"YdbHost", host} })->Dec(); +void TStatCollector::DecGRpcInFlightByHost(const std::string& host) { + if (auto ptr = MetricRegistryPtr_.Get()) { + ptr->IntGauge({ DatabaseLabel_, GRPC_INFLIGHT_BY_HOST_LABEL, {"YdbHost", host} })->Add(-1); } } diff --git a/src/client/impl/stats/stats.h b/src/client/impl/stats/stats.h index d545764c887..f88f3a16034 100644 --- a/src/client/impl/stats/stats.h +++ b/src/client/impl/stats/stats.h @@ -2,48 +2,25 @@ #include +#include #include -#include -#include #include -#include + namespace NYdb::inline V3 { namespace NSdkStats { // works only for case normal (foo_bar) underscore -inline std::string UnderscoreToUpperCamel(const std::string& in) { - std::string result; - result.reserve(in.size()); - - if (in.empty()) - return {}; - - result.push_back(toupper(in[0])); +std::string UnderscoreToUpperCamel(const std::string& in); - size_t i = 1; - - while (i < in.size()) { - if (in[i] == '_') { - if (++i < in.size()) { - result.push_back(toupper(in[i++])); - } else { - break; - } - } else { - result.push_back(tolower(in[i++])); - } - } - return result; -} +NMetrics::TBucketBounds ExponentialHistogram(std::uint32_t bucketsCount, double base, double scale); template class TAtomicPointer { public: - - TAtomicPointer(TPointer* pointer = nullptr) { + TAtomicPointer(std::shared_ptr pointer = nullptr) { Set(pointer); } @@ -56,71 +33,71 @@ class TAtomicPointer { return *this; } - TPointer* Get() const { + std::shared_ptr Get() const { return Pointer_.load(); } - void Set(TPointer* pointer) { + void Set(std::shared_ptr pointer) { Pointer_.store(pointer); } private: - std::atomic Pointer_; + std::atomic> Pointer_; }; template -class TAtomicCounter: public TAtomicPointer { - public: - void Add(ui64 value) { - if (auto counter = this->Get()) { - counter->Add(value); - } +class TAtomicCounter : public TAtomicPointer { +public: + void Add(std::uint64_t value) { + if (auto counter = this->Get()) { + counter->Add(value); } + } - void Inc() { - if (auto counter = this->Get()) { - counter->Inc(); - } + void Inc() { + if (auto counter = this->Get()) { + counter->Add(1); } + } - void Dec() { - if (auto counter = this->Get()) { - counter->Dec(); - } + void Dec() { + if (auto counter = this->Get()) { + counter->Add(-1); } + } - void SetValue(ui64 value) { - if (auto counter = this->Get()) { - counter->Set(value); - } + void SetValue(std::uint64_t value) { + if (auto counter = this->Get()) { + counter->Set(value); } + } }; template -class FastLocalCounter { +class TFastLocalCounter { public: - FastLocalCounter(TAtomicCounter& counter) - : Counter(counter), Value(0) - { } + TFastLocalCounter(TAtomicCounter& counter) + : Counter(counter) + , Value(0) + {} - ~FastLocalCounter() { + ~TFastLocalCounter() { Counter.Add(Value); } - FastLocalCounter& operator++ () { + TFastLocalCounter& operator++ () { ++Value; return *this; } TAtomicCounter& Counter; - ui64 Value; + std::uint64_t Value; }; template class TAtomicHistogram: public TAtomicPointer { public: - - void Record(i64 value) { + void Record(std::int64_t value) { if (auto histogram = this->Get()) { histogram->Record(value); } @@ -134,11 +111,10 @@ class TAtomicHistogram: public TAtomicPointer { // Sessions count for all clients // Every client has 3 TSessionCounter for active, in session pool, in settler sessions // TSessionCounters in different clients with same role share one sensor -class TSessionCounter: public TAtomicPointer<::NMonitoring::TIntGauge> { +class TSessionCounter : public TAtomicPointer { public: - // Call with mutex - void Apply(i64 newValue) { + void Apply(std::int64_t newValue) { if (auto gauge = this->Get()) { gauge->Add(newValue - oldValue); oldValue = newValue; @@ -146,111 +122,110 @@ class TSessionCounter: public TAtomicPointer<::NMonitoring::TIntGauge> { } ~TSessionCounter() { - ::NMonitoring::TIntGauge* gauge = this->Get(); + auto gauge = this->Get(); if (gauge) { gauge->Add(-oldValue); } } private: - i64 oldValue = 0; + std::int64_t oldValue = 0; }; struct TStatCollector { - using TMetricRegistry = ::NMonitoring::TMetricRegistry; - public: struct TEndpointElectorStatCollector { - - TEndpointElectorStatCollector(::NMonitoring::TIntGauge* endpointCount = nullptr - , ::NMonitoring::TIntGauge* pessimizationRatio = nullptr - , ::NMonitoring::TIntGauge* activeEndpoints = nullptr) - : EndpointCount(endpointCount) - , PessimizationRatio(pessimizationRatio) - , EndpointActive(activeEndpoints) - { } - - ::NMonitoring::TIntGauge* EndpointCount; - ::NMonitoring::TIntGauge* PessimizationRatio; - ::NMonitoring::TIntGauge* EndpointActive; + TEndpointElectorStatCollector(std::shared_ptr endpointCount = nullptr, + std::shared_ptr pessimizationRatio = nullptr, + std::shared_ptr activeEndpoints = nullptr) + : EndpointCount(endpointCount) + , PessimizationRatio(pessimizationRatio) + , EndpointActive(activeEndpoints) + {} + + std::shared_ptr EndpointCount; + std::shared_ptr PessimizationRatio; + std::shared_ptr EndpointActive; }; struct TSessionPoolStatCollector { - TSessionPoolStatCollector(::NMonitoring::TIntGauge* activeSessions = nullptr - , ::NMonitoring::TIntGauge* inPoolSessions = nullptr - , ::NMonitoring::TRate* fakeSessions = nullptr - , ::NMonitoring::TIntGauge* waiters = nullptr) - : ActiveSessions(activeSessions) - , InPoolSessions(inPoolSessions) - , FakeSessions(fakeSessions) - , Waiters(waiters) - { } - - ::NMonitoring::TIntGauge* ActiveSessions; - ::NMonitoring::TIntGauge* InPoolSessions; - ::NMonitoring::TRate* FakeSessions; - ::NMonitoring::TIntGauge* Waiters; + TSessionPoolStatCollector(std::shared_ptr activeSessions = nullptr, + std::shared_ptr inPoolSessions = nullptr, + std::shared_ptr fakeSessions = nullptr, + std::shared_ptr waiters = nullptr) + : ActiveSessions(activeSessions) + , InPoolSessions(inPoolSessions) + , FakeSessions(fakeSessions) + , Waiters(waiters) + {} + + std::shared_ptr ActiveSessions; + std::shared_ptr InPoolSessions; + std::shared_ptr FakeSessions; + std::shared_ptr Waiters; }; struct TClientRetryOperationStatCollector { + TClientRetryOperationStatCollector() + : MetricRegistry_() + , Database_() + {} - TClientRetryOperationStatCollector() : MetricRegistry_(), Database_() {} - - TClientRetryOperationStatCollector(::NMonitoring::TMetricRegistry* registry, + TClientRetryOperationStatCollector(std::shared_ptr registry, const std::string& database, const std::string& clientType) : MetricRegistry_(registry) , Database_(database) , ClientType_(clientType) - { } + {} void IncSyncRetryOperation(const EStatus& status) { - if (auto registry = MetricRegistry_.Get()) { + if (MetricRegistry_) { std::string statusName = TStringBuilder() << status; std::string sensor = TStringBuilder() << "RetryOperation/" << UnderscoreToUpperCamel(statusName); - registry->Rate({ {"database", Database_}, {"ydb_client", ClientType_}, {"sensor", sensor} })->Inc(); + MetricRegistry_->Rate({ {"database", Database_}, {"ydb_client", ClientType_}, {"sensor", sensor} })->Add(1); } } void IncAsyncRetryOperation(const EStatus& status) { - if (auto registry = MetricRegistry_.Get()) { + if (auto registry = MetricRegistry_) { std::string statusName = TStringBuilder() << status; std::string sensor = TStringBuilder() << "RetryOperation/" << UnderscoreToUpperCamel(statusName); - registry->Rate({ {"database", Database_}, {"ydb_client", ClientType_}, {"sensor", sensor} })->Inc(); + MetricRegistry_->Rate({ {"database", Database_}, {"ydb_client", ClientType_}, {"sensor", sensor} })->Add(1); } } private: - TAtomicPointer<::NMonitoring::TMetricRegistry> MetricRegistry_; + std::shared_ptr MetricRegistry_; std::string Database_; std::string ClientType_; }; struct TClientStatCollector { + TClientStatCollector(std::shared_ptr cacheMiss = nullptr, + std::shared_ptr querySize = nullptr, + std::shared_ptr paramsSize = nullptr, + std::shared_ptr sessionRemoved = nullptr, + std::shared_ptr requestMigrated = nullptr, + TClientRetryOperationStatCollector retryOperationStatCollector = TClientRetryOperationStatCollector()) + : CacheMiss(cacheMiss) + , QuerySize(querySize) + , ParamsSize(paramsSize) + , SessionRemovedDueBalancing(sessionRemoved) + , RequestMigrated(requestMigrated) + , RetryOperationStatCollector(retryOperationStatCollector) + {} + + std::shared_ptr CacheMiss; + std::shared_ptr QuerySize; + std::shared_ptr ParamsSize; + std::shared_ptr SessionRemovedDueBalancing; + std::shared_ptr RequestMigrated; - TClientStatCollector(::NMonitoring::TRate* cacheMiss = nullptr - , ::NMonitoring::THistogram* querySize = nullptr - , ::NMonitoring::THistogram* paramsSize = nullptr - , ::NMonitoring::TRate* sessionRemoved = nullptr - , ::NMonitoring::TRate* requestMigrated = nullptr - , TClientRetryOperationStatCollector retryOperationStatCollector = TClientRetryOperationStatCollector()) - : CacheMiss(cacheMiss) - , QuerySize(querySize) - , ParamsSize(paramsSize) - , SessionRemovedDueBalancing(sessionRemoved) - , RequestMigrated(requestMigrated) - , RetryOperationStatCollector(retryOperationStatCollector) - { } - - ::NMonitoring::TRate* CacheMiss; - ::NMonitoring::THistogram* QuerySize; - ::NMonitoring::THistogram* ParamsSize; - ::NMonitoring::TRate* SessionRemovedDueBalancing; - ::NMonitoring::TRate* RequestMigrated; TClientRetryOperationStatCollector RetryOperationStatCollector; }; - TStatCollector(const std::string& database, TMetricRegistry* sensorsRegistry) + TStatCollector(const std::string& database, std::shared_ptr sensorsRegistry) : Database_(database) , DatabaseLabel_({"database", database}) { @@ -259,7 +234,7 @@ struct TStatCollector { } } - void SetMetricRegistry(TMetricRegistry* sensorsRegistry) { + void SetMetricRegistry(std::shared_ptr sensorsRegistry) { Y_ABORT_UNLESS(sensorsRegistry, "TMetricRegistry is null in stats collector."); MetricRegistryPtr_.Set(sensorsRegistry); DiscoveryDuePessimization_.Set(sensorsRegistry->Rate({ DatabaseLabel_, {"sensor", "Discovery/TooManyBadEndpoints"} })); @@ -272,9 +247,9 @@ struct TStatCollector { GRpcInFlight_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Grpc/InFlight"} })); RequestLatency_.Set(sensorsRegistry->HistogramRate({ DatabaseLabel_, {"sensor", "Request/Latency"} }, - ::NMonitoring::ExponentialHistogram(20, 2, 1))); + ExponentialHistogram(20, 2, 1))); ResultSize_.Set(sensorsRegistry->HistogramRate({ DatabaseLabel_, {"sensor", "Request/ResultSize"} }, - ::NMonitoring::ExponentialHistogram(20, 2, 32))); + ExponentialHistogram(20, 2, 32))); } void IncDiscoveryDuePessimization() { @@ -311,19 +286,19 @@ struct TStatCollector { void IncCounter(const std::string& sensor) { if (auto registry = MetricRegistryPtr_.Get()) { - registry->Counter({ {"database", Database_}, {"sensor", sensor} })->Inc(); + registry->Counter({ {"database", Database_}, {"sensor", sensor} })->Add(1); } } - void SetSessionCV(ui32 cv) { + void SetSessionCV(std::uint32_t cv) { SessionCV_.SetValue(cv); } - void IncGRpcInFlight () { + void IncGRpcInFlight() { GRpcInFlight_.Inc(); } - void DecGRpcInFlight () { + void DecGRpcInFlight() { GRpcInFlight_.Dec(); } @@ -357,9 +332,9 @@ struct TStatCollector { TClientStatCollector GetClientStatCollector(const std::string& clientType) { if (auto registry = MetricRegistryPtr_.Get()) { - ::NMonitoring::TRate* cacheMiss = nullptr; - ::NMonitoring::TRate* sessionRemovedDueBalancing = nullptr; - ::NMonitoring::TRate* requestMigrated = nullptr; + std::shared_ptr cacheMiss = nullptr; + std::shared_ptr sessionRemovedDueBalancing = nullptr; + std::shared_ptr requestMigrated = nullptr; if (clientType == "Table") { cacheMiss = registry->Rate({ DatabaseLabel_, {"ydb_client", clientType}, @@ -371,9 +346,9 @@ struct TStatCollector { } auto querySize = registry->HistogramRate({ DatabaseLabel_, {"ydb_client", clientType}, - {"sensor", "Request/QuerySize"} }, ::NMonitoring::ExponentialHistogram(20, 2, 32)); + {"sensor", "Request/QuerySize"} }, ExponentialHistogram(20, 2, 32)); auto paramsSize = registry->HistogramRate({ DatabaseLabel_, {"ydb_client", clientType}, - {"sensor", "Request/ParamsSize"} }, ::NMonitoring::ExponentialHistogram(10, 2, 32)); + {"sensor", "Request/ParamsSize"} }, ExponentialHistogram(10, 2, 32)); return TClientStatCollector(cacheMiss, querySize, paramsSize, sessionRemovedDueBalancing, requestMigrated, TClientRetryOperationStatCollector(MetricRegistryPtr_.Get(), Database_, clientType)); @@ -396,18 +371,20 @@ struct TStatCollector { private: const std::string Database_; - const ::NMonitoring::TLabel DatabaseLabel_; - TAtomicPointer MetricRegistryPtr_; - TAtomicCounter<::NMonitoring::TRate> DiscoveryDuePessimization_; - TAtomicCounter<::NMonitoring::TRate> DiscoveryDueExpiration_; - TAtomicCounter<::NMonitoring::TRate> RequestFailDueQueueOverflow_; - TAtomicCounter<::NMonitoring::TRate> RequestFailDueNoEndpoint_; - TAtomicCounter<::NMonitoring::TRate> RequestFailDueTransportError_; - TAtomicCounter<::NMonitoring::TRate> DiscoveryFailDueTransportError_; - TAtomicCounter<::NMonitoring::TIntGauge> SessionCV_; - TAtomicCounter<::NMonitoring::TIntGauge> GRpcInFlight_; - TAtomicHistogram<::NMonitoring::THistogram> RequestLatency_; - TAtomicHistogram<::NMonitoring::THistogram> ResultSize_; + const NMetrics::TLabel DatabaseLabel_; + + TAtomicPointer MetricRegistryPtr_; + + TAtomicCounter DiscoveryDuePessimization_; + TAtomicCounter DiscoveryDueExpiration_; + TAtomicCounter RequestFailDueQueueOverflow_; + TAtomicCounter RequestFailDueNoEndpoint_; + TAtomicCounter RequestFailDueTransportError_; + TAtomicCounter DiscoveryFailDueTransportError_; + TAtomicCounter SessionCV_; + TAtomicCounter GRpcInFlight_; + TAtomicHistogram RequestLatency_; + TAtomicHistogram ResultSize_; }; } // namespace NSdkStats diff --git a/src/client/metrics_providers/CMakeLists.txt b/src/client/metrics_providers/CMakeLists.txt new file mode 100644 index 00000000000..c0a963e6e97 --- /dev/null +++ b/src/client/metrics_providers/CMakeLists.txt @@ -0,0 +1 @@ +add_subdirectory(monlib) diff --git a/src/client/metrics_providers/monlib/CMakeLists.txt b/src/client/metrics_providers/monlib/CMakeLists.txt new file mode 100644 index 00000000000..265ba0b9873 --- /dev/null +++ b/src/client/metrics_providers/monlib/CMakeLists.txt @@ -0,0 +1,11 @@ +_ydb_sdk_add_library(client-metrics_providers-monlib) + +target_link_libraries(client-metrics_providers-monlib PUBLIC + yutil + monlib-metrics +) + +target_sources(client-metrics_providers-monlib + PRIVATE + provider.cpp +) diff --git a/src/client/metrics_providers/monlib/provider.cpp b/src/client/metrics_providers/monlib/provider.cpp new file mode 100644 index 00000000000..2dcd50d931a --- /dev/null +++ b/src/client/metrics_providers/monlib/provider.cpp @@ -0,0 +1,128 @@ +#include "provider.h" + +#include +#include + + +namespace NYdb::inline V3::NMetrics { + +class TMonlibIntGauge: public NMetrics::IIntGauge { +public: + TMonlibIntGauge(::NMonitoring::IIntGauge* gauge) + : Gauge_(gauge) + {} + + void Add(std::int64_t value) override { + Gauge_->Add(value); + } + + void Set(std::int64_t value) override { + Gauge_->Set(value); + } + +private: + ::NMonitoring::IIntGauge* Gauge_; +}; + +class TMonlibCounter: public NMetrics::ICounter { +public: + TMonlibCounter(::NMonitoring::ICounter* counter) + : Counter_(counter) + {} + + void Add(std::uint64_t value) override { + Counter_->Add(value); + } + +private: + ::NMonitoring::ICounter* Counter_; +}; + +class TMonlibRate: public NMetrics::IRate { +public: + TMonlibRate(::NMonitoring::IRate* rate) + : Rate_(rate) + {} + + void Add(std::uint64_t value) override { + Rate_->Add(value); + } + +private: + ::NMonitoring::IRate* Rate_; +}; + +class TMonlibHistogram: public NMetrics::IHistogram { +public: + TMonlibHistogram(::NMonitoring::IHistogram* histogram) + : Histogram_(histogram) + {} + + void Record(double value) override { + Histogram_->Record(value); + } + +private: + ::NMonitoring::IHistogram* Histogram_; +}; + +class TMonlibMetricsProvider: public NMetrics::IMetricsProvider { +public: + TMonlibMetricsProvider(NMonitoring::IMetricRegistry* registry) + : Registry_(registry) + {} + + ::NMonitoring::IMetricRegistry* GetRegistry() const { + return Registry_; + } + + std::shared_ptr IntGauge(TLabels labels) override { + return std::make_shared(Registry_->IntGauge(MakeLabels(labels))); + } + + std::shared_ptr Counter(TLabels labels) override { + return std::make_shared(Registry_->Counter(MakeLabels(labels))); + } + + std::shared_ptr Rate(TLabels labels) override { + return std::make_shared(Registry_->Rate(MakeLabels(labels))); + } + + std::shared_ptr HistogramCounter(TLabels labels, TBucketBounds bounds) override { + return std::make_shared( + Registry_->HistogramCounter(MakeLabels(labels), MakeHistogramCollector(bounds))); + } + + std::shared_ptr HistogramRate(TLabels labels, TBucketBounds bounds) override { + return std::make_shared( + Registry_->HistogramRate(MakeLabels(labels), MakeHistogramCollector(bounds))); + } + +private: + ::NMonitoring::ILabelsPtr MakeLabels(TLabels labels) { + auto monlibLabels = MakeIntrusive<::NMonitoring::TLabels>(); + for (const auto& label : labels) { + monlibLabels->Add(TStringBuf(label.Name), TStringBuf(label.Value)); + } + return monlibLabels; + } + + ::NMonitoring::IHistogramCollectorPtr MakeHistogramCollector(TBucketBounds bounds) { + return ::NMonitoring::ExplicitHistogram(TVector(bounds.begin(), bounds.end())); + } + + NMonitoring::IMetricRegistry* Registry_; +}; + +std::shared_ptr CreateMonlibMetricsProvider(::NMonitoring::IMetricRegistry* registry) { + return std::make_shared(registry); +} + +::NMonitoring::IMetricRegistry* TryGetUnderlyingMetricsRegistry(std::shared_ptr metricsProvider) { + if (auto monlibMetricsProvider = std::dynamic_pointer_cast(metricsProvider)) { + return monlibMetricsProvider->GetRegistry(); + } + return nullptr; +} + +} diff --git a/src/client/metrics_providers/monlib/provider.h b/src/client/metrics_providers/monlib/provider.h new file mode 100644 index 00000000000..d0d405b2e4e --- /dev/null +++ b/src/client/metrics_providers/monlib/provider.h @@ -0,0 +1,13 @@ +#pragma once + +#include + +#include + +namespace NYdb::inline V3::NMetrics { + +std::shared_ptr CreateMonlibMetricsProvider(::NMonitoring::IMetricRegistry* registry); + +::NMonitoring::IMetricRegistry* TryGetUnderlyingMetricsRegistry(std::shared_ptr metricsProvider); + +} diff --git a/src/client/persqueue_public/impl/aliases.h b/src/client/persqueue_public/impl/aliases.h index 96de4ce1c8c..c4c842f851b 100644 --- a/src/client/persqueue_public/impl/aliases.h +++ b/src/client/persqueue_public/impl/aliases.h @@ -32,8 +32,10 @@ using NTopic::Cancel; using NTopic::IsErrorMessage; using NTopic::MakeErrorFromProto; +#ifndef YDB_TOPIC_DISABLE_COUNTERS // counters_logger using TCountersLogger = NTopic::TCountersLogger; +#endif // read_session_impl using NTopic::HasNullCounters; diff --git a/src/client/persqueue_public/impl/read_session.cpp b/src/client/persqueue_public/impl/read_session.cpp index 3ded33a2798..841c898f075 100644 --- a/src/client/persqueue_public/impl/read_session.cpp +++ b/src/client/persqueue_public/impl/read_session.cpp @@ -45,7 +45,9 @@ TReadSession::TReadSession(const TReadSessionSettings& settings, Settings.RetryPolicy_ = IRetryPolicy::GetDefaultPolicy(); } +#ifndef YDB_TOPIC_DISABLE_COUNTERS MakeCountersIfNeeded(); +#endif } TReadSession::~TReadSession() { @@ -64,9 +66,11 @@ TReadSession::~TReadSession() { for (const auto& ctx : CbContexts) { ctx->Cancel(); } +#ifndef YDB_TOPIC_DISABLE_COUNTERS if (DumpCountersContext) { DumpCountersContext->Cancel(); } +#endif } Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest TReadSession::MakeClusterDiscoveryRequest() const { @@ -174,7 +178,9 @@ void TReadSession::ProceedWithoutClusterDiscovery() { clusterSessionInfo.Topics = Settings.Topics_; CreateClusterSessionsImpl(deferred); } +#ifndef YDB_TOPIC_DISABLE_COUNTERS SetupCountersLogger(); +#endif } void TReadSession::CreateClusterSessionsImpl(TDeferredActions& deferred) { @@ -226,7 +232,9 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu } if (!status.IsSuccess()) { +#ifndef YDB_TOPIC_DISABLE_COUNTERS ++*Settings.Counters_->Errors; +#endif if (!ClusterDiscoveryRetryState) { ClusterDiscoveryRetryState = Settings.RetryPolicy_->CreateRetryState(); } @@ -249,7 +257,9 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu // Init ClusterSessions. if (static_cast(result.read_sessions_clusters_size()) != Settings.Topics_.size()) { +#ifndef YDB_TOPIC_DISABLE_COUNTERS ++*Settings.Counters_->Errors; +#endif AbortImpl(EStatus::INTERNAL_ERROR, TStringBuilder() << "Unexpected reply from cluster discovery. Sizes of topics arrays don't match: " << result.read_sessions_clusters_size() << " vs " << Settings.Topics_.size(), deferred); return; @@ -309,14 +319,18 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu } if (issues) { +#ifndef YDB_TOPIC_DISABLE_COUNTERS ++*Settings.Counters_->Errors; +#endif AbortImpl(errorStatus, std::move(issues), deferred); return; } CreateClusterSessionsImpl(deferred); } +#ifndef YDB_TOPIC_DISABLE_COUNTERS SetupCountersLogger(); +#endif } void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions& deferred) { @@ -345,14 +359,13 @@ void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions bool TReadSession::Close(TDuration timeout) { LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout); - - +#ifndef YDB_TOPIC_DISABLE_COUNTERS // the program may not have reached SetupCountersLogger if (CountersLogger) { // Log final counters. CountersLogger->Stop(); } - +#endif std::vector sessions; NThreading::TPromise promise = NThreading::NewPromise(); std::shared_ptr> count = std::make_shared>(0); @@ -412,7 +425,9 @@ bool TReadSession::Close(TDuration timeout) { issues.AddIssue("Session was gracefully closed"); EventsQueue->Close(TSessionClosedEvent(EStatus::SUCCESS, std::move(issues)), deferred); } else { +#ifndef YDB_TOPIC_DISABLE_COUNTERS ++*Settings.Counters_->Errors; +#endif for (const auto& session : sessions) { session->Abort(); } @@ -438,10 +453,12 @@ void TReadSession::AbortImpl(TDeferredActions&) { ClusterDiscoveryDelayContext->Cancel(); ClusterDiscoveryDelayContext.reset(); } +#ifndef YDB_TOPIC_DISABLE_COUNTERS if (DumpCountersContext) { DumpCountersContext->Cancel(); DumpCountersContext.reset(); } +#endif for (auto& [cluster, sessionInfo] : ClusterSessions) { if (sessionInfo.Session) { sessionInfo.Session->Abort(); @@ -526,6 +543,7 @@ void TReadSession::ResumeReadingData() { } } +#ifndef YDB_TOPIC_DISABLE_COUNTERS void TReadSession::MakeCountersIfNeeded() { if (!Settings.Counters_ || HasNullCounters(*Settings.Counters_)) { TReaderCounters::TPtr counters = MakeIntrusive(); @@ -544,6 +562,7 @@ void TReadSession::SetupCountersLogger() { DumpCountersContext = CountersLogger->MakeCallbackContext(); CountersLogger->Start(); } +#endif //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // NPersQueue::TReadSessionEvent diff --git a/src/client/persqueue_public/impl/read_session.h b/src/client/persqueue_public/impl/read_session.h index df67c60d214..21b4b750237 100644 --- a/src/client/persqueue_public/impl/read_session.h +++ b/src/client/persqueue_public/impl/read_session.h @@ -48,9 +48,11 @@ class TReadSession : public IReadSession, return SessionId; } +#ifndef YDB_TOPIC_DISABLE_COUNTERS TReaderCounters::TPtr GetCounters() const override { return Settings.Counters_; // Always not nullptr. } +#endif void AddTopic(const TTopicReadSettings& topicReadSettings) /*override*/ { Y_UNUSED(topicReadSettings); @@ -97,8 +99,10 @@ class TReadSession : public IReadSession, void AbortImpl(EStatus statusCode, NYdb::NIssue::TIssues&& issues, TDeferredActions& deferred); void AbortImpl(EStatus statusCode, const std::string& message, TDeferredActions& deferred); +#ifndef YDB_TOPIC_DISABLE_COUNTERS void MakeCountersIfNeeded(); void SetupCountersLogger(); +#endif private: TReadSessionSettings Settings; @@ -121,8 +125,10 @@ class TReadSession : public IReadSession, std::vector CbContexts; +#ifndef YDB_TOPIC_DISABLE_COUNTERS std::shared_ptr CountersLogger; std::shared_ptr> DumpCountersContext; +#endif }; } // namespace NYdb::NPersQueue \ No newline at end of file diff --git a/src/client/persqueue_public/impl/write_session.cpp b/src/client/persqueue_public/impl/write_session.cpp index efafbfa2d5d..d44033fc05e 100644 --- a/src/client/persqueue_public/impl/write_session.cpp +++ b/src/client/persqueue_public/impl/write_session.cpp @@ -135,9 +135,11 @@ std::optional TSimpleBlockingWriteSession::WaitForToken(cons return std::nullopt; } +#ifndef YDB_TOPIC_DISABLE_COUNTERS TWriterCounters::TPtr TSimpleBlockingWriteSession::GetCounters() { return Writer->GetCounters(); } +#endif bool TSimpleBlockingWriteSession::IsAlive() const { diff --git a/src/client/persqueue_public/impl/write_session.h b/src/client/persqueue_public/impl/write_session.h index 7fa9f4c2c61..6277e97fcd9 100644 --- a/src/client/persqueue_public/impl/write_session.h +++ b/src/client/persqueue_public/impl/write_session.h @@ -46,8 +46,9 @@ class TWriteSession : public IWriteSession, // Empty maybe - block till all work is done. Otherwise block at most at closeTimeout duration. bool Close(TDuration closeTimeout = TDuration::Max()) override; - +#ifndef YDB_TOPIC_DISABLE_COUNTERS TWriterCounters::TPtr GetCounters() override {Y_ABORT("Unimplemented"); } //ToDo - unimplemented; +#endif ~TWriteSession(); // will not call close - destroy everything without acks @@ -82,7 +83,9 @@ class TSimpleBlockingWriteSession : public ISimpleBlockingWriteSession { bool Close(TDuration closeTimeout = TDuration::Max()) override; bool IsAlive() const override; +#ifndef YDB_TOPIC_DISABLE_COUNTERS TWriterCounters::TPtr GetCounters() override; +#endif protected: std::shared_ptr Writer; diff --git a/src/client/persqueue_public/impl/write_session_impl.cpp b/src/client/persqueue_public/impl/write_session_impl.cpp index 022bea0714e..354e5cdb645 100644 --- a/src/client/persqueue_public/impl/write_session_impl.cpp +++ b/src/client/persqueue_public/impl/write_session_impl.cpp @@ -43,12 +43,13 @@ TWriteSessionImpl::TWriteSessionImpl( TargetCluster = *Settings.PreferredCluster_; NUtils::ToLower(TargetCluster); } +#ifndef YDB_TOPIC_DISABLE_COUNTERS if (Settings.Counters_.has_value()) { Counters = *Settings.Counters_; } else { Counters = MakeIntrusive(new ::NMonitoring::TDynamicCounters()); } - +#endif } void TWriteSessionImpl::Start(const TDuration& delay) { @@ -391,7 +392,9 @@ void TWriteSessionImpl::Write( TWriteSessionImpl::THandleResult TWriteSessionImpl::OnErrorImpl(NYdb::TPlainStatus&& status) { Y_ABORT_UNLESS(Lock.IsLocked()); +#ifndef YDB_TOPIC_DISABLE_COUNTERS (*Counters->Errors)++; +#endif auto result = RestartImpl(status); if (result.DoStop) { CloseImpl(status.Status, std::move(status.Issues)); @@ -828,28 +831,38 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 id) { size = front.Data.size(); } +#ifndef YDB_TOPIC_DISABLE_COUNTERS (*Counters->MessagesWritten) += front.MessageCount; (*Counters->MessagesInflight) -= front.MessageCount; (*Counters->BytesWritten) += front.OriginalSize; +#endif SentPackedMessage.pop(); } else { size = sentFront.Size; +#ifndef YDB_TOPIC_DISABLE_COUNTERS (*Counters->BytesWritten) += sentFront.Size; (*Counters->MessagesWritten)++; (*Counters->MessagesInflight)--; +#endif } +#ifndef YDB_TOPIC_DISABLE_COUNTERS (*Counters->BytesInflightCompressed) -= compressedSize; (*Counters->BytesWrittenCompressed) += compressedSize; (*Counters->BytesInflightUncompressed) -= size; +#endif +#ifndef YDB_TOPIC_DISABLE_COUNTERS Y_ABORT_UNLESS(Counters->BytesInflightCompressed->Val() >= 0); Y_ABORT_UNLESS(Counters->BytesInflightUncompressed->Val() >= 0); +#endif Y_ABORT_UNLESS(sentFront.Id == id); +#ifndef YDB_TOPIC_DISABLE_COUNTERS (*Counters->BytesInflightTotal) = MemoryUsage; +#endif SentOriginalMessages.pop(); return result; } @@ -945,8 +958,10 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) { UpdateTimedCountersImpl(); Y_ABORT_UNLESS(block.Valid); auto memoryUsage = OnMemoryUsageChangedImpl(static_cast(block.Data.size()) - block.OriginalMemoryUsage); +#ifndef YDB_TOPIC_DISABLE_COUNTERS (*Counters->BytesInflightUncompressed) -= block.OriginalSize; (*Counters->BytesInflightCompressed) += block.Data.size(); +#endif PackedMessagesToSend.emplace(std::move(block)); SendImpl(); @@ -1041,9 +1056,11 @@ size_t TWriteSessionImpl::WriteBatchImpl() { } size += datum.size(); UpdateTimedCountersImpl(); +#ifndef YDB_TOPIC_DISABLE_COUNTERS (*Counters->BytesInflightUncompressed) += datum.size(); (*Counters->MessagesInflight)++; OriginalMessagesToSend.emplace(id, createTs, datum.size()); +#endif } block.Data = std::move(CurrentBatch.Data); if (skipCompression) { @@ -1313,11 +1330,13 @@ void TWriteSessionImpl::UpdateTimedCountersImpl() { auto delta = (now - LastCountersUpdateTs).MilliSeconds(); double percent = 100.0 / Settings.MaxMemoryUsage_; +#ifndef YDB_TOPIC_DISABLE_COUNTERS Counters->TotalBytesInflightUsageByTime->Collect(*Counters->BytesInflightTotal * percent, delta); Counters->UncompressedBytesInflightUsageByTime->Collect(*Counters->BytesInflightUncompressed * percent, delta); Counters->CompressedBytesInflightUsageByTime->Collect(*Counters->BytesInflightCompressed * percent, delta); *Counters->CurrentSessionLifetimeMs = (TInstant::Now() - SessionStartedTs).MilliSeconds(); +#endif LastCountersUpdateTs = now; if (LastCountersLogTs == TInstant::Zero() || TInstant::Now() - LastCountersLogTs > TDuration::Seconds(60)) { LastCountersLogTs = TInstant::Now(); @@ -1326,7 +1345,7 @@ void TWriteSessionImpl::UpdateTimedCountersImpl() { << " " Y_STRINGIZE(counter) ": " \ << Counters->counter->Val() \ /**/ - +#ifndef YDB_TOPIC_DISABLE_COUNTERS LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Counters: {" LOG_COUNTER(Errors) @@ -1340,7 +1359,7 @@ void TWriteSessionImpl::UpdateTimedCountersImpl() { LOG_COUNTER(MessagesInflight) << " }" ); - +#endif #undef LOG_COUNTER } } diff --git a/src/client/persqueue_public/impl/write_session_impl.h b/src/client/persqueue_public/impl/write_session_impl.h index 483570c944c..0f4f34b9f1e 100644 --- a/src/client/persqueue_public/impl/write_session_impl.h +++ b/src/client/persqueue_public/impl/write_session_impl.h @@ -312,7 +312,9 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, // Empty maybe - block till all work is done. Otherwise block at most at closeTimeout duration. bool Close(TDuration closeTimeout = TDuration::Max()); +#ifndef YDB_TOPIC_DISABLE_COUNTERS TWriterCounters::TPtr GetCounters() {Y_ABORT("Unimplemented"); } //ToDo - unimplemented; +#endif const TWriteSessionSettings& GetSettings() const { return Settings; @@ -433,7 +435,9 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, TInstant SessionStartedTs; TInstant LastCountersUpdateTs = TInstant::Zero(); TInstant LastCountersLogTs; +#ifndef YDB_TOPIC_DISABLE_COUNTERS TWriterCounters::TPtr Counters; +#endif TDuration WakeupInterval; std::string StateStr; diff --git a/src/client/persqueue_public/include/aliases.h b/src/client/persqueue_public/include/aliases.h index 0cd8c0c8354..2a162408ca2 100644 --- a/src/client/persqueue_public/include/aliases.h +++ b/src/client/persqueue_public/include/aliases.h @@ -1,12 +1,15 @@ #pragma once #include -#include #include #include #include #include +#ifndef YDB_TOPIC_DISABLE_COUNTERS +#include +#endif + namespace NYdb::inline V3::NPersQueue { // codecs @@ -17,12 +20,14 @@ using NTopic::TGzipCodec; using NTopic::TZstdCodec; using NTopic::TUnsupportedCodec; +#ifndef YDB_TOPIC_DISABLE_COUNTERS // counters using NTopic::TCounterPtr; using NTopic::TReaderCounters; using NTopic::TWriterCounters; using NTopic::MakeCountersNotNull; using NTopic::HasNullCounters; +#endif // errors // using NTopic::GetRetryErrorClass; diff --git a/src/client/persqueue_public/include/read_session.h b/src/client/persqueue_public/include/read_session.h index 9d80184840f..66530b9d816 100644 --- a/src/client/persqueue_public/include/read_session.h +++ b/src/client/persqueue_public/include/read_session.h @@ -173,10 +173,12 @@ struct TReadSessionSettings : public TRequestSettings { //! If not set, default executor will be used. FLUENT_SETTING(IExecutor::TPtr, DecompressionExecutor); +#ifndef YDB_TOPIC_DISABLE_COUNTERS //! Counters. //! If counters are not provided explicitly, //! they will be created inside session (without link with parent counters). FLUENT_SETTING(TReaderCounters::TPtr, Counters); +#endif //! Read only original topic instance, don't read mirrored. //! @@ -250,8 +252,10 @@ class IReadSession { //! TSessionClosedEvent arrives. virtual bool Close(TDuration timeout = TDuration::Max()) = 0; +#ifndef YDB_TOPIC_DISABLE_COUNTERS //! Reader counters with different stats (see TReaderConuters). virtual TReaderCounters::TPtr GetCounters() const = 0; +#endif //! Get unique identifier of read session. virtual std::string GetSessionId() const = 0; diff --git a/src/client/persqueue_public/include/write_session.h b/src/client/persqueue_public/include/write_session.h index 98852c46488..8f9bc1ba268 100644 --- a/src/client/persqueue_public/include/write_session.h +++ b/src/client/persqueue_public/include/write_session.h @@ -83,8 +83,9 @@ struct TWriteSessionSettings : public TRequestSettings { FLUENT_SETTING_OPTIONAL(ui64, BatchFlushSizeBytes); FLUENT_SETTING_DEFAULT(TDuration, ConnectTimeout, TDuration::Seconds(30)); - +#ifndef YDB_TOPIC_DISABLE_COUNTERS FLUENT_SETTING_OPTIONAL(TWriterCounters::TPtr, Counters); +#endif //! Executor for compression tasks. //! If not set, default executor will be used. @@ -157,7 +158,9 @@ class ISimpleBlockingWriteSession : public TThrRefBase { //! Returns true if write session is alive and acitve. False if session was closed. virtual bool IsAlive() const = 0; +#ifndef YDB_TOPIC_DISABLE_COUNTERS virtual TWriterCounters::TPtr GetCounters() = 0; +#endif //! Close immediately and destroy, don't wait for anything. virtual ~ISimpleBlockingWriteSession() = default; @@ -195,8 +198,10 @@ class IWriteSession { //! return - true if all writes were completed and acked. false if timeout was reached and some writes were aborted. virtual bool Close(TDuration closeTimeout = TDuration::Max()) = 0; +#ifndef YDB_TOPIC_DISABLE_COUNTERS //! Writer counters with different stats (see TWriterConuters). virtual TWriterCounters::TPtr GetCounters() = 0; +#endif //! Close() with timeout = 0 and destroy everything instantly. virtual ~IWriteSession() = default; diff --git a/src/client/query/client.cpp b/src/client/query/client.cpp index 118c96f1624..f023d86c0fc 100644 --- a/src/client/query/client.cpp +++ b/src/client/query/client.cpp @@ -539,8 +539,8 @@ class TQueryClient::TImpl: public TClientImplCommon, public private: NSdkStats::TStatCollector::TClientRetryOperationStatCollector RetryOperationStatCollector_; - NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> QuerySizeHistogram_; - NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> ParamsSizeHistogram_; + NSdkStats::TAtomicHistogram QuerySizeHistogram_; + NSdkStats::TAtomicHistogram ParamsSizeHistogram_; TClientSettings Settings_; NSessionPool::TSessionPool SessionPool_; diff --git a/src/client/table/impl/table_client.cpp b/src/client/table/impl/table_client.cpp index 1df7dc34ee5..502016ba6dc 100644 --- a/src/client/table/impl/table_client.cpp +++ b/src/client/table/impl/table_client.cpp @@ -1190,7 +1190,7 @@ void TTableClient::TImpl::SetParams( void TTableClient::TImpl::CollectParams( ::google::protobuf::Map* params, - NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> histgoram) + NSdkStats::TAtomicHistogram histgoram) { if (params && histgoram.IsCollecting()) { @@ -1204,7 +1204,7 @@ void TTableClient::TImpl::CollectParams( void TTableClient::TImpl::CollectParams( const ::google::protobuf::Map& params, - NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> histgoram) + NSdkStats::TAtomicHistogram histgoram) { if (histgoram.IsCollecting()) { @@ -1216,13 +1216,13 @@ void TTableClient::TImpl::CollectParams( } } -void TTableClient::TImpl::CollectQuerySize(const std::string& query, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>& querySizeHistogram) { +void TTableClient::TImpl::CollectQuerySize(const std::string& query, NSdkStats::TAtomicHistogram& querySizeHistogram) { if (querySizeHistogram.IsCollecting()) { querySizeHistogram.Record(query.size()); } } -void TTableClient::TImpl::CollectQuerySize(const TDataQuery&, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>&) {} +void TTableClient::TImpl::CollectQuerySize(const TDataQuery&, NSdkStats::TAtomicHistogram&) {} void TTableClient::TImpl::SetTxSettings(const TTxSettings& txSettings, Ydb::Table::TransactionSettings* proto) { diff --git a/src/client/table/impl/table_client.h b/src/client/table/impl/table_client.h index e26ef4d4784..5c4685e24f4 100644 --- a/src/client/table/impl/table_client.h +++ b/src/client/table/impl/table_client.h @@ -166,15 +166,15 @@ class TTableClient::TImpl: public TClientImplCommon, public static void CollectParams( ::google::protobuf::Map* params, - NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> histgoram); + NSdkStats::TAtomicHistogram histgoram); static void CollectParams( const ::google::protobuf::Map& params, - NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> histgoram); + NSdkStats::TAtomicHistogram histgoram); - static void CollectQuerySize(const std::string& query, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>& querySizeHistogram); + static void CollectQuerySize(const std::string& query, NSdkStats::TAtomicHistogram& querySizeHistogram); - static void CollectQuerySize(const TDataQuery&, NSdkStats::TAtomicHistogram<::NMonitoring::THistogram>&); + static void CollectQuerySize(const TDataQuery&, NSdkStats::TAtomicHistogram&); template TAsyncDataQueryResult ExecuteDataQueryImpl(const TSession& session, const TQueryType& query, @@ -319,11 +319,11 @@ class TTableClient::TImpl: public TClientImplCommon, public static std::optional GetQueryText(const TDataQuery& queryData); public: - NSdkStats::TAtomicCounter<::NMonitoring::TRate> CacheMissCounter; + NSdkStats::TAtomicCounter CacheMissCounter; NSdkStats::TStatCollector::TClientRetryOperationStatCollector RetryOperationStatCollector; - NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> QuerySizeHistogram; - NSdkStats::TAtomicHistogram<::NMonitoring::THistogram> ParamsSizeHistogram; - NSdkStats::TAtomicCounter<::NMonitoring::TRate> SessionRemovedDueBalancing; + NSdkStats::TAtomicHistogram QuerySizeHistogram; + NSdkStats::TAtomicHistogram ParamsSizeHistogram; + NSdkStats::TAtomicCounter SessionRemovedDueBalancing; private: NSessionPool::TSessionPool SessionPool_; diff --git a/src/client/topic/impl/counters_logger.h b/src/client/topic/impl/counters_logger.h index 9edc26a6b20..d2c1707c3fb 100644 --- a/src/client/topic/impl/counters_logger.h +++ b/src/client/topic/impl/counters_logger.h @@ -1,6 +1,7 @@ #pragma once #include + #include #include diff --git a/src/client/topic/impl/direct_reader.cpp b/src/client/topic/impl/direct_reader.cpp index c58d7de977a..656a9604b3f 100644 --- a/src/client/topic/impl/direct_reader.cpp +++ b/src/client/topic/impl/direct_reader.cpp @@ -610,7 +610,9 @@ void TDirectReadSession::OnReadDone(NYdbGrpc::TGrpcStatus&& grpcStatus, size_t c } if (!errorStatus.Ok()) { +#ifndef YDB_TOPIC_DISABLE_COUNTERS ReadSessionSettings.Counters_->Errors->Inc(); +#endif if (!Reconnect(errorStatus)) { with_lock (Lock) { @@ -908,7 +910,9 @@ void TDirectReadSession::OnConnect( } if (!status.Ok()) { +#ifndef YDB_TOPIC_DISABLE_COUNTERS ReadSessionSettings.Counters_->Errors->Inc(); +#endif if (!Reconnect(status)) { with_lock (Lock) { AbortImpl(TPlainStatus( diff --git a/src/client/topic/impl/direct_reader.h b/src/client/topic/impl/direct_reader.h index 2a25df56b4c..adc7d09e62d 100644 --- a/src/client/topic/impl/direct_reader.h +++ b/src/client/topic/impl/direct_reader.h @@ -242,8 +242,7 @@ class TDirectReadSessionManager { void Close(); private: - - using TNodeSessionsMap = TMap; + using TNodeSessionsMap = std::map; TDirectReadSessionContextPtr CreateDirectReadSession(TNodeId); void DeletePartitionSession(TPartitionSessionId id, TNodeSessionsMap::iterator it); @@ -258,7 +257,7 @@ class TDirectReadSessionManager { IDirectReadSessionControlCallbacks::TPtr ControlCallbacks; TNodeSessionsMap NodeSessions; - TMap Locations; + std::map Locations; TLog Log; }; diff --git a/src/client/topic/impl/read_session.cpp b/src/client/topic/impl/read_session.cpp index 79dbe2e41e8..276e0430e2b 100644 --- a/src/client/topic/impl/read_session.cpp +++ b/src/client/topic/impl/read_session.cpp @@ -33,7 +33,9 @@ TReadSession::TReadSession(const TReadSessionSettings& settings, Settings.RetryPolicy_ = IRetryPolicy::GetDefaultPolicy(); } +#ifndef YDB_TOPIC_DISABLE_COUNTERS MakeCountersIfNeeded(); +#endif } TReadSession::~TReadSession() { @@ -45,9 +47,11 @@ TReadSession::~TReadSession() { if (CbContext) { CbContext->Cancel(); } +#ifndef YDB_TOPIC_DISABLE_COUNTERS if (DumpCountersContext) { DumpCountersContext->Cancel(); } +#endif } void TReadSession::Start() { @@ -67,7 +71,9 @@ void TReadSession::Start() { Topics = Settings.Topics_; CreateClusterSessionsImpl(deferred); } +#ifndef YDB_TOPIC_DISABLE_COUNTERS SetupCountersLogger(); +#endif } void TReadSession::CreateClusterSessionsImpl(TDeferredActions& deferred) { @@ -175,6 +181,7 @@ std::optional TReadSession::GetEvent(const TReadSessi bool TReadSession::Close(TDuration timeout) { LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout); +#ifndef YDB_TOPIC_DISABLE_COUNTERS // Log final counters. if (CountersLogger) { CountersLogger->Stop(); @@ -184,6 +191,7 @@ bool TReadSession::Close(TDuration timeout) { DumpCountersContext->Cancel(); } } +#endif TSingleClusterReadSessionImpl::TPtr session; NThreading::TPromise promise = NThreading::NewPromise(); @@ -232,7 +240,9 @@ bool TReadSession::Close(TDuration timeout) { issues.AddIssue("Session was gracefully closed"); EventsQueue->Close(TSessionClosedEvent(EStatus::SUCCESS, std::move(issues)), deferred); } else { +#ifndef YDB_TOPIC_DISABLE_COUNTERS ++*Settings.Counters_->Errors; +#endif session->Abort(); NYdb::NIssue::TIssues issues; @@ -254,6 +264,7 @@ TStringBuilder TReadSession::GetLogPrefix() const { return TStringBuilder() << GetDatabaseLogPrefix(DbDriverState->Database) << "[" << SessionId << "] "; } +#ifndef YDB_TOPIC_DISABLE_COUNTERS void TReadSession::MakeCountersIfNeeded() { if (!Settings.Counters_ || HasNullCounters(*Settings.Counters_)) { TReaderCounters::TPtr counters = MakeIntrusive(); @@ -274,15 +285,18 @@ void TReadSession::SetupCountersLogger() { DumpCountersContext = CountersLogger->MakeCallbackContext(); CountersLogger->Start(); } +#endif void TReadSession::AbortImpl(TDeferredActions&) { Y_ABORT_UNLESS(Lock.IsLocked()); if (!Aborting) { Aborting = true; +#ifndef YDB_TOPIC_DISABLE_COUNTERS if (DumpCountersContext) { DumpCountersContext->Cancel(); } +#endif if (CbContext) { CbContext->TryGet()->Abort(); } diff --git a/src/client/topic/impl/read_session.h b/src/client/topic/impl/read_session.h index 21cf15479cf..29b45fd54ba 100644 --- a/src/client/topic/impl/read_session.h +++ b/src/client/topic/impl/read_session.h @@ -34,9 +34,11 @@ class TReadSession : public IReadSession { return SessionId; } +#ifndef YDB_TOPIC_DISABLE_COUNTERS inline TReaderCounters::TPtr GetCounters() const override { return Settings.Counters_; // Always not nullptr. } +#endif void Abort(TSessionClosedEvent&& closeEvent); @@ -50,8 +52,10 @@ class TReadSession : public IReadSession { void CreateClusterSessionsImpl(TDeferredActions& deferred); +#ifndef YDB_TOPIC_DISABLE_COUNTERS void MakeCountersIfNeeded(); void SetupCountersLogger(); +#endif // Shutdown. void Abort(EStatus statusCode, NYdb::NIssue::TIssues&& issues); @@ -77,8 +81,10 @@ class TReadSession : public IReadSession { std::shared_ptr>> CbContext; std::vector Topics; +#ifndef YDB_TOPIC_DISABLE_COUNTERS std::shared_ptr> CountersLogger; std::shared_ptr>> DumpCountersContext; +#endif // Exiting. bool Aborting = false; diff --git a/src/client/topic/impl/read_session_impl.ipp b/src/client/topic/impl/read_session_impl.ipp index 4eeb0b3ba3d..935a4c7c886 100644 --- a/src/client/topic/impl/read_session_impl.ipp +++ b/src/client/topic/impl/read_session_impl.ipp @@ -432,7 +432,9 @@ void TSingleClusterReadSessionImpl::OnConnectTimeout(const } } +#ifndef YDB_TOPIC_DISABLE_COUNTERS ++*Settings.Counters_->Errors; +#endif TStringBuilder description; description << "Failed to establish connection to server. Attempts done: " << ConnectionAttemptsDone; if (!Reconnect(TPlainStatus(EStatus::TIMEOUT, description))) { @@ -469,7 +471,9 @@ void TSingleClusterReadSessionImpl::OnConnect( } if (!st.Ok()) { +#ifndef YDB_TOPIC_DISABLE_COUNTERS ++*Settings.Counters_->Errors; +#endif if (!Reconnect(st)) { AbortSession( st.Status, MakeIssueWithSubIssues(TStringBuilder() << "Failed to establish connection to server \"" @@ -832,9 +836,11 @@ void TSingleClusterReadSessionImpl::OnUserRetrievedEvent(i << decompressedSize << " bytes"); +#ifndef YDB_TOPIC_DISABLE_COUNTERS *Settings.Counters_->MessagesInflight -= messagesCount; *Settings.Counters_->BytesInflightTotal -= decompressedSize; *Settings.Counters_->BytesInflightUncompressed -= decompressedSize; +#endif TDeferredActions deferred; std::lock_guard guard(Lock); @@ -992,8 +998,9 @@ void TSingleClusterReadSessionImpl::OnReadDone(NYdbGrpc::T } } if (!errorStatus.Ok()) { +#ifndef YDB_TOPIC_DISABLE_COUNTERS ++*Settings.Counters_->Errors; - +#endif if (!Reconnect(errorStatus)) { AbortSession(std::move(errorStatus)); } @@ -1030,7 +1037,9 @@ inline void TSingleClusterReadSessionImpl::OnReadDoneImpl( for (TPartitionData& partitionData : *msg.mutable_partition_data()) { auto partitionStreamIt = PartitionStreams.find(partitionData.cookie().assign_id()); if (partitionStreamIt == PartitionStreams.end()) { +#ifndef YDB_TOPIC_DISABLE_COUNTERS ++*Settings.Counters_->Errors; +#endif BreakConnectionAndReconnectImpl(EStatus::INTERNAL_ERROR, TStringBuilder() << "Got unexpected partition stream data message. Topic: " @@ -1063,9 +1072,11 @@ inline void TSingleClusterReadSessionImpl::OnReadDoneImpl( partitionStream->UpdateMaxReadOffset(currentOffset); const i64 messageSize = static_cast(messageData.data().size()); CompressedDataSize += messageSize; +#ifndef YDB_TOPIC_DISABLE_COUNTERS *Settings.Counters_->BytesInflightTotal += messageSize; *Settings.Counters_->BytesInflightCompressed += messageSize; ++*Settings.Counters_->MessagesInflight; +#endif } } if (firstOffset == std::numeric_limits::max()) { @@ -1283,7 +1294,9 @@ inline void TSingleClusterReadSessionImpl::OnReadDoneImpl( LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() << "Got unexpected partition stream data message. PartitionSessionId: " << partitionData.partition_session_id()); continue; } else { +#ifndef YDB_TOPIC_DISABLE_COUNTERS ++*Settings.Counters_->Errors; +#endif BreakConnectionAndReconnectImpl(EStatus::INTERNAL_ERROR, TStringBuilder() << "Got unexpected partition stream data message. " << "PartitionSessionId: " << partitionData.partition_session_id(), @@ -1313,9 +1326,11 @@ inline void TSingleClusterReadSessionImpl::OnReadDoneImpl( partitionStream->UpdateMaxReadOffset(currentOffset); const i64 messageSize = static_cast(messageData.data().size()); CompressedDataSize += messageSize; +#ifndef YDB_TOPIC_DISABLE_COUNTERS *Settings.Counters_->BytesInflightTotal += messageSize; *Settings.Counters_->BytesInflightCompressed += messageSize; ++*Settings.Counters_->MessagesInflight; +#endif } } if (firstOffset == std::numeric_limits::max()) { @@ -1783,11 +1798,12 @@ void TSingleClusterReadSessionImpl::OnCreateNewDecompressi template void TSingleClusterReadSessionImpl::OnDecompressionInfoDestroy(i64 compressedSize, i64 decompressedSize, i64 messagesCount, i64 serverBytesSize) { - +#ifndef YDB_TOPIC_DISABLE_COUNTERS *Settings.Counters_->MessagesInflight -= messagesCount; *Settings.Counters_->BytesInflightUncompressed -= decompressedSize; *Settings.Counters_->BytesInflightCompressed -= compressedSize; *Settings.Counters_->BytesInflightTotal -= (compressedSize + decompressedSize); +#endif TDeferredActions deferred; std::lock_guard guard(Lock); @@ -1813,12 +1829,14 @@ void TSingleClusterReadSessionImpl::OnDataDecompressed(i64 Y_ABORT_UNLESS(DecompressionTasksInflight > 0); --DecompressionTasksInflight; +#ifndef YDB_TOPIC_DISABLE_COUNTERS *Settings.Counters_->BytesRead += decompressedSize; *Settings.Counters_->BytesReadCompressed += sourceSize; *Settings.Counters_->MessagesRead += messagesCount; *Settings.Counters_->BytesInflightUncompressed += decompressedSize; *Settings.Counters_->BytesInflightCompressed -= sourceSize; *Settings.Counters_->BytesInflightTotal += (decompressedSize - sourceSize); +#endif std::lock_guard guard(Lock); UpdateMemoryUsageStatisticsImpl(); @@ -1971,9 +1989,11 @@ void TSingleClusterReadSessionImpl::UpdateMemoryUsageStati UsageStatisticsLastUpdateTime = now; const double percent = 100.0 / static_cast(Settings.MaxMemoryUsageBytes_); +#ifndef YDB_TOPIC_DISABLE_COUNTERS Settings.Counters_->TotalBytesInflightUsageByTime->Collect((DecompressedDataSize + CompressedDataSize) * percent, delta); Settings.Counters_->UncompressedBytesInflightUsageByTime->Collect(DecompressedDataSize * percent, delta); Settings.Counters_->CompressedBytesInflightUsageByTime->Collect(CompressedDataSize * percent, delta); +#endif } template diff --git a/src/client/topic/impl/write_session.cpp b/src/client/topic/impl/write_session.cpp index 5f2f54605b7..a109b610316 100644 --- a/src/client/topic/impl/write_session.cpp +++ b/src/client/topic/impl/write_session.cpp @@ -165,9 +165,11 @@ std::optional TSimpleBlockingWriteSession::WaitForToken(cons return std::nullopt; } +#ifndef YDB_TOPIC_DISABLE_COUNTERS TWriterCounters::TPtr TSimpleBlockingWriteSession::GetCounters() { return Writer->GetCounters(); } +#endif bool TSimpleBlockingWriteSession::IsAlive() const { return !Closed.load(); diff --git a/src/client/topic/impl/write_session.h b/src/client/topic/impl/write_session.h index 3112fd1361f..de9a4cdec09 100644 --- a/src/client/topic/impl/write_session.h +++ b/src/client/topic/impl/write_session.h @@ -46,9 +46,9 @@ class TWriteSession : public IWriteSession, // Empty maybe - block till all work is done. Otherwise block at most at closeTimeout duration. bool Close(TDuration closeTimeout = TDuration::Max()) override; - +#ifndef YDB_TOPIC_DISABLE_COUNTERS TWriterCounters::TPtr GetCounters() override {Y_ABORT("Unimplemented"); } //ToDo - unimplemented; - +#endif ~TWriteSession(); // will not call close - destroy everything without acks private: @@ -78,7 +78,9 @@ class TSimpleBlockingWriteSession : public ISimpleBlockingWriteSession { bool Close(TDuration closeTimeout = TDuration::Max()) override; bool IsAlive() const override; +#ifndef YDB_TOPIC_DISABLE_COUNTERS TWriterCounters::TPtr GetCounters() override; +#endif protected: std::shared_ptr Writer; diff --git a/src/client/topic/impl/write_session_impl.cpp b/src/client/topic/impl/write_session_impl.cpp index 774bf0c1302..0345c2c18ac 100644 --- a/src/client/topic/impl/write_session_impl.cpp +++ b/src/client/topic/impl/write_session_impl.cpp @@ -93,11 +93,13 @@ TWriteSessionImpl::TWriteSessionImpl( if (!Settings.RetryPolicy_) { Settings.RetryPolicy_ = IRetryPolicy::GetDefaultPolicy(); } +#ifndef YDB_TOPIC_DISABLE_COUNTERS if (Settings.Counters_.has_value()) { Counters = *Settings.Counters_; } else { Counters = MakeIntrusive(new ::NMonitoring::TDynamicCounters()); } +#endif } void TWriteSessionImpl::Start(const TDuration& delay) { @@ -679,7 +681,9 @@ void TWriteSessionImpl::WriteEncoded(TContinuationToken&& token, TWriteMessage&& TWriteSessionImpl::THandleResult TWriteSessionImpl::OnErrorImpl(NYdb::TPlainStatus&& status) { Y_ABORT_UNLESS(Lock.IsLocked()); +#ifndef YDB_TOPIC_DISABLE_COUNTERS (*Counters->Errors)++; +#endif auto result = RestartImpl(status); if (result.DoStop) { CloseImpl(status.Status, std::move(status.Issues)); @@ -1192,28 +1196,35 @@ bool TWriteSessionImpl::CleanupOnAcknowledgedImpl(uint64_t id) { size = front.Data.size(); } +#ifndef YDB_TOPIC_DISABLE_COUNTERS (*Counters->MessagesWritten) += front.MessageCount; (*Counters->MessagesInflight) -= front.MessageCount; (*Counters->BytesWritten) += front.OriginalSize; - +#endif SentPackedMessage.pop(); } else { size = sentFront.Size; +#ifndef YDB_TOPIC_DISABLE_COUNTERS (*Counters->BytesWritten) += sentFront.Size; (*Counters->MessagesWritten)++; (*Counters->MessagesInflight)--; +#endif } +#ifndef YDB_TOPIC_DISABLE_COUNTERS (*Counters->BytesInflightCompressed) -= compressedSize; (*Counters->BytesWrittenCompressed) += compressedSize; (*Counters->BytesInflightUncompressed) -= size; Y_ABORT_UNLESS(Counters->BytesInflightCompressed->Val() >= 0); Y_ABORT_UNLESS(Counters->BytesInflightUncompressed->Val() >= 0); +#endif Y_ABORT_UNLESS(sentFront.Id == id); +#ifndef YDB_TOPIC_DISABLE_COUNTERS (*Counters->BytesInflightTotal) = MemoryUsage; +#endif SentOriginalMessages.pop(); WrittenInTx.erase(id); @@ -1312,8 +1323,10 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) { UpdateTimedCountersImpl(); Y_ABORT_UNLESS(block.Valid); auto memoryUsage = OnMemoryUsageChangedImpl(static_cast(block.Data.size()) - block.OriginalMemoryUsage); +#ifndef YDB_TOPIC_DISABLE_COUNTERS (*Counters->BytesInflightUncompressed) -= block.OriginalSize; (*Counters->BytesInflightCompressed) += block.Data.size(); +#endif PackedMessagesToSend.emplace(std::move(block)); @@ -1422,8 +1435,10 @@ size_t TWriteSessionImpl::WriteBatchImpl() { } size += datum.size(); UpdateTimedCountersImpl(); +#ifndef YDB_TOPIC_DISABLE_COUNTERS (*Counters->BytesInflightUncompressed) += datum.size(); (*Counters->MessagesInflight)++; +#endif if (!currMessage.MessageMeta.empty()) { OriginalMessagesToSend.emplace(id, createTs, datum.size(), std::move(currMessage.MessageMeta), @@ -1661,11 +1676,13 @@ void TWriteSessionImpl::UpdateTimedCountersImpl() { auto delta = (now - LastCountersUpdateTs).MilliSeconds(); double percent = 100.0 / Settings.MaxMemoryUsage_; +#ifndef YDB_TOPIC_DISABLE_COUNTERS Counters->TotalBytesInflightUsageByTime->Collect(*Counters->BytesInflightTotal * percent, delta); Counters->UncompressedBytesInflightUsageByTime->Collect(*Counters->BytesInflightUncompressed * percent, delta); Counters->CompressedBytesInflightUsageByTime->Collect(*Counters->BytesInflightCompressed * percent, delta); *Counters->CurrentSessionLifetimeMs = (TInstant::Now() - SessionStartedTs).MilliSeconds(); +#endif LastCountersUpdateTs = now; if (LastCountersLogTs == TInstant::Zero() || TInstant::Now() - LastCountersLogTs > TDuration::Seconds(60)) { LastCountersLogTs = TInstant::Now(); @@ -1674,7 +1691,7 @@ void TWriteSessionImpl::UpdateTimedCountersImpl() { << " " Y_STRINGIZE(counter) ": " \ << Counters->counter->Val() \ /**/ - +#ifndef YDB_TOPIC_DISABLE_COUNTERS LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefixImpl() << "Counters: {" LOG_COUNTER(Errors) @@ -1688,7 +1705,7 @@ void TWriteSessionImpl::UpdateTimedCountersImpl() { LOG_COUNTER(MessagesInflight) << " }" ); - +#endif #undef LOG_COUNTER } } diff --git a/src/client/topic/impl/write_session_impl.h b/src/client/topic/impl/write_session_impl.h index 0434ed07e67..5b5e9f63a29 100644 --- a/src/client/topic/impl/write_session_impl.h +++ b/src/client/topic/impl/write_session_impl.h @@ -358,7 +358,9 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, // Empty maybe - block till all work is done. Otherwise block at most at closeTimeout duration. bool Close(TDuration closeTimeout = TDuration::Max()); +#ifndef YDB_TOPIC_DISABLE_COUNTERS TWriterCounters::TPtr GetCounters() {Y_ABORT("Unimplemented"); } //ToDo - unimplemented; +#endif const TWriteSessionSettings& GetSettings() const { return Settings; @@ -485,7 +487,9 @@ class TWriteSessionImpl : public TContinuationTokenIssuer, TInstant SessionStartedTs; TInstant LastCountersUpdateTs = TInstant::Zero(); TInstant LastCountersLogTs; +#ifndef YDB_TOPIC_DISABLE_COUNTERS TWriterCounters::TPtr Counters; +#endif TDuration WakeupInterval; // Set by the write session, if Settings.DirectWriteToPartition is true and Settings.PartitionId is unset. Otherwise ignored. diff --git a/tests/integration/topic/direct_read.cpp b/tests/integration/topic/direct_read.cpp index 2e3d1debb76..9bba4ce7218 100644 --- a/tests/integration/topic/direct_read.cpp +++ b/tests/integration/topic/direct_read.cpp @@ -723,8 +723,10 @@ TDirectReadSessionImplTestSetup::TDirectReadSessionImplTestSetup() { // .DirectRead(true) .AppendTopics({"TestTopic"}) .ConsumerName("TestConsumer") - .RetryPolicy(NYdb::NTopic::IRetryPolicy::GetFixedIntervalPolicy(TDuration::MilliSeconds(10))) - .Counters(MakeIntrusive(MakeIntrusive<::NMonitoring::TDynamicCounters>())); +#ifndef YDB_TOPIC_DISABLE_COUNTERS + .Counters(MakeIntrusive(MakeIntrusive<::NMonitoring::TDynamicCounters>())) +#endif + .RetryPolicy(NYdb::NTopic::IRetryPolicy::GetFixedIntervalPolicy(TDuration::MilliSeconds(10))); Log.SetFormatter(GetPrefixLogFormatter("")); }