-
Notifications
You must be signed in to change notification settings - Fork 22
Add UseDetectedLocalDC balancing policy #564
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add UseDetectedLocalDC balancing policy #564
Conversation
| //! location is a name of datacenter (VLA, MAN), if location is nullopt local datacenter is used | ||
| static TBalancingPolicy UsePreferableLocation(const std::optional<std::string>& location = {}); | ||
|
|
||
| //! Use detected local dc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Распиши чуть подробнее, чем этот вариант от предыдущего отличается. И какие предостережения к использованию (если есть)
| std::vector<std::uint64_t> timings; | ||
| timings.reserve(PING_COUNT); | ||
| for (size_t i = 0; i < PING_COUNT; ++i) { | ||
| const auto& ep = endpoints[i % endpoints.size()].get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Тут кажется проще каждый эндпоинт по PING_COUNT раз пинговать, читаемее будет
|
|
||
| static constexpr std::uint32_t PING_TIMEOUT_SECONDS = 5; | ||
|
|
||
| TDuration PingEndpoint(const Ydb::Discovery::EndpointInfo& endpoint); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Лучше тут std::chrono::duration сделать, мы от TDuration и TInstant потихоньку избавляемся в SDK
|
|
||
| namespace NYdb::inline V3 { | ||
|
|
||
| static constexpr std::uint32_t PING_TIMEOUT_SECONDS = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Тут лучше временной тип использовать, а не в сырую хранить
| #include <src/client/impl/internal/internal_header.h> | ||
| #include <src/api/protos/ydb_discovery.pb.h> | ||
|
|
||
| #include <util/network/sock.h> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Это инклюд можно в .cpp унести
| std::vector<std::string> removed; | ||
| if (result.DiscoveryStatus.Status == EStatus::SUCCESS) { | ||
| if (BalancingPolicy_.PolicyType == TBalancingPolicy::TImpl::EPolicyType::UseDetectedLocalDC) { | ||
| LocalDCDetector_.DetectLocalDC(result.Result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
А это ведь прям тяжелая операция. А мы ее синхронно делаем. Это может discovery сильно замедлить, тут замеры не помешали бы. Лучше пинги в фоне делать, как по мне, но такое писать сложнее будет
| std::string FindNearestLocation(const TEndpointsByLocation& endpointsByLocation); | ||
|
|
||
| private: | ||
| static constexpr std::size_t MAX_ENDPOINTS_PER_LOCATION = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
С одной стороны, маловато нод, но тогда сильно тяжелее будет операция. Может действительно надо асинхронно
| timings.reserve(PING_COUNT); | ||
| for (size_t i = 0; i < PING_COUNT; ++i) { | ||
| const auto& ep = endpoints[i % endpoints.size()].get(); | ||
| timings.push_back(PingEndpoint_(ep).MicroSeconds()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
А это же синхронные пинги из одного потока? На первый взгляд так нельзя делать:
Во-первых, 2*3 - это 6 пингов один за другим.
Во-вторых, 3 ендпоинта в ДЦ может быть слишком мало.
Мы можем параллельно пинги рассылать? Странно, если нет. А если да, то видимо надо пинговать асинхронно и бОльшее количество нод (может и по 1 разу) для точности
| return measures; | ||
| } | ||
|
|
||
| Y_UNIT_TEST_SUITE(LocalDCDetectionTest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Новые тесты мы пишем на googletest, от unittest мы отказываемся потихоньку
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds a new UseDetectedLocalDC balancing policy that automatically detects the nearest datacenter by measuring endpoint latencies through network pings. The implementation includes:
- A local DC detector that pings endpoints across different locations and identifies the nearest one based on median RTT measurements
- Integration with the existing balancing policy framework
- Unit tests covering basic detection, single location scenarios, unavailable DC handling, and offline DC scenarios
Reviewed Changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 15 comments.
Show a summary per file
| File | Description |
|---|---|
include/ydb-cpp-sdk/client/types/ydb.h |
Added public API declaration for UseDetectedLocalDC() |
src/client/types/ydb.cpp |
Implemented the UseDetectedLocalDC() factory method |
src/client/impl/internal/common/balancing_policies.h |
Added UseDetectedLocalDC enum value to policy types |
src/client/impl/internal/common/balancing_policies.cpp |
Implemented internal policy initialization for detected local DC |
src/client/impl/internal/local_dc_detector/local_dc_detector.h |
Defined detector class interface with configurable pinger |
src/client/impl/internal/local_dc_detector/local_dc_detector.cpp |
Implemented DC detection logic using sampling and RTT measurement |
src/client/impl/internal/local_dc_detector/pinger.h |
Declared endpoint ping function |
src/client/impl/internal/local_dc_detector/pinger.cpp |
Implemented TCP connection-based ping with timeout handling |
src/client/impl/internal/db_driver_state/endpoint_pool.h |
Added LocalDCDetector_ member to endpoint pool |
src/client/impl/internal/db_driver_state/endpoint_pool.cpp |
Integrated detector into endpoint preference evaluation |
src/client/table/impl/table_client.cpp |
Extended foreign location scanning to include detected local DC policy |
tests/unit/client/local_dc_detector/local_dc_detector_ut.cpp |
Added comprehensive unit tests with mocked ping functionality |
| CMake files | Added build configuration for the new local_dc_detector component |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| explicit TMockedPinger(std::unordered_map<std::string, std::vector<TDuration>> measuresByAdress) { | ||
| EndpointByAdress_.reserve(measuresByAdress.size()); | ||
|
|
||
| for (auto& [adress, measures] : measuresByAdress) { | ||
| EndpointByAdress_.emplace(std::move(adress), std::move(measures)); | ||
| } | ||
| } | ||
|
|
||
| TDuration operator()(const Ydb::Discovery::EndpointInfo& endpoint) const { | ||
| auto it = EndpointByAdress_.find(endpoint.address()); | ||
| if (it == EndpointByAdress_.end() || Blacklist_.contains(endpoint.address())) { | ||
| return TDuration::Max(); | ||
| } | ||
| return it->second.Ping(); | ||
| } | ||
|
|
||
| void BanEndpoint(const std::string& adress) { | ||
| Blacklist_.insert(adress); | ||
| } | ||
|
|
||
| void UnbanEndpoint(const std::string& adress) { | ||
| Blacklist_.erase(adress); | ||
| } | ||
|
|
||
| private: |
Copilot
AI
Oct 27, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'measuresByAdress' to 'measuresByAddress'.
| explicit TMockedPinger(std::unordered_map<std::string, std::vector<TDuration>> measuresByAdress) { | |
| EndpointByAdress_.reserve(measuresByAdress.size()); | |
| for (auto& [adress, measures] : measuresByAdress) { | |
| EndpointByAdress_.emplace(std::move(adress), std::move(measures)); | |
| } | |
| } | |
| TDuration operator()(const Ydb::Discovery::EndpointInfo& endpoint) const { | |
| auto it = EndpointByAdress_.find(endpoint.address()); | |
| if (it == EndpointByAdress_.end() || Blacklist_.contains(endpoint.address())) { | |
| return TDuration::Max(); | |
| } | |
| return it->second.Ping(); | |
| } | |
| void BanEndpoint(const std::string& adress) { | |
| Blacklist_.insert(adress); | |
| } | |
| void UnbanEndpoint(const std::string& adress) { | |
| Blacklist_.erase(adress); | |
| } | |
| private: | |
| explicit TMockedPinger(std::unordered_map<std::string, std::vector<TDuration>> measuresByAddress) { | |
| EndpointByAddress_.reserve(measuresByAddress.size()); | |
| for (auto& [address, measures] : measuresByAddress) { | |
| EndpointByAddress_.emplace(std::move(address), std::move(measures)); | |
| } | |
| } | |
| TDuration operator()(const Ydb::Discovery::EndpointInfo& endpoint) const { | |
| auto it = EndpointByAddress_.find(endpoint.address()); | |
| if (it == EndpointByAddress_.end() || Blacklist_.contains(endpoint.address())) { | |
| return TDuration::Max(); | |
| } | |
| return it->second.Ping(); | |
| } | |
| void BanEndpoint(const std::string& address) { | |
| Blacklist_.insert(address); | |
| } | |
| void UnbanEndpoint(const std::string& address) { | |
| Blacklist_.erase(address); | |
| } | |
| private: | |
| // Update the member variable name below from EndpointByAdress_ to EndpointByAddress_ |
| explicit TMockedPinger(std::unordered_map<std::string, std::vector<TDuration>> measuresByAdress) { | ||
| EndpointByAdress_.reserve(measuresByAdress.size()); | ||
|
|
||
| for (auto& [adress, measures] : measuresByAdress) { | ||
| EndpointByAdress_.emplace(std::move(adress), std::move(measures)); | ||
| } | ||
| } | ||
|
|
||
| TDuration operator()(const Ydb::Discovery::EndpointInfo& endpoint) const { | ||
| auto it = EndpointByAdress_.find(endpoint.address()); | ||
| if (it == EndpointByAdress_.end() || Blacklist_.contains(endpoint.address())) { | ||
| return TDuration::Max(); | ||
| } | ||
| return it->second.Ping(); | ||
| } | ||
|
|
||
| void BanEndpoint(const std::string& adress) { | ||
| Blacklist_.insert(adress); | ||
| } | ||
|
|
||
| void UnbanEndpoint(const std::string& adress) { | ||
| Blacklist_.erase(adress); |
Copilot
AI
Oct 27, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'EndpointByAdress_' to 'EndpointByAddress_' and 'measuresByAdress' to 'measuresByAddress'.
| explicit TMockedPinger(std::unordered_map<std::string, std::vector<TDuration>> measuresByAdress) { | |
| EndpointByAdress_.reserve(measuresByAdress.size()); | |
| for (auto& [adress, measures] : measuresByAdress) { | |
| EndpointByAdress_.emplace(std::move(adress), std::move(measures)); | |
| } | |
| } | |
| TDuration operator()(const Ydb::Discovery::EndpointInfo& endpoint) const { | |
| auto it = EndpointByAdress_.find(endpoint.address()); | |
| if (it == EndpointByAdress_.end() || Blacklist_.contains(endpoint.address())) { | |
| return TDuration::Max(); | |
| } | |
| return it->second.Ping(); | |
| } | |
| void BanEndpoint(const std::string& adress) { | |
| Blacklist_.insert(adress); | |
| } | |
| void UnbanEndpoint(const std::string& adress) { | |
| Blacklist_.erase(adress); | |
| explicit TMockedPinger(std::unordered_map<std::string, std::vector<TDuration>> measuresByAddress) { | |
| EndpointByAddress_.reserve(measuresByAddress.size()); | |
| for (auto& [address, measures] : measuresByAddress) { | |
| EndpointByAddress_.emplace(std::move(address), std::move(measures)); | |
| } | |
| } | |
| TDuration operator()(const Ydb::Discovery::EndpointInfo& endpoint) const { | |
| auto it = EndpointByAddress_.find(endpoint.address()); | |
| if (it == EndpointByAddress_.end() || Blacklist_.contains(endpoint.address())) { | |
| return TDuration::Max(); | |
| } | |
| return it->second.Ping(); | |
| } | |
| void BanEndpoint(const std::string& address) { | |
| Blacklist_.insert(address); | |
| } | |
| void UnbanEndpoint(const std::string& address) { | |
| Blacklist_.erase(address); |
| explicit TMockedPinger(std::unordered_map<std::string, std::vector<TDuration>> measuresByAdress) { | ||
| EndpointByAdress_.reserve(measuresByAdress.size()); | ||
|
|
||
| for (auto& [adress, measures] : measuresByAdress) { | ||
| EndpointByAdress_.emplace(std::move(adress), std::move(measures)); | ||
| } | ||
| } | ||
|
|
||
| TDuration operator()(const Ydb::Discovery::EndpointInfo& endpoint) const { | ||
| auto it = EndpointByAdress_.find(endpoint.address()); | ||
| if (it == EndpointByAdress_.end() || Blacklist_.contains(endpoint.address())) { | ||
| return TDuration::Max(); | ||
| } | ||
| return it->second.Ping(); | ||
| } | ||
|
|
||
| void BanEndpoint(const std::string& adress) { | ||
| Blacklist_.insert(adress); | ||
| } | ||
|
|
||
| void UnbanEndpoint(const std::string& adress) { | ||
| Blacklist_.erase(adress); | ||
| } | ||
|
|
||
| private: | ||
| mutable std::unordered_map<std::string, TMockedEndpoint> EndpointByAdress_; |
Copilot
AI
Oct 27, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'adress' to 'address' and 'measuresByAdress' to 'measuresByAddress'.
| explicit TMockedPinger(std::unordered_map<std::string, std::vector<TDuration>> measuresByAdress) { | |
| EndpointByAdress_.reserve(measuresByAdress.size()); | |
| for (auto& [adress, measures] : measuresByAdress) { | |
| EndpointByAdress_.emplace(std::move(adress), std::move(measures)); | |
| } | |
| } | |
| TDuration operator()(const Ydb::Discovery::EndpointInfo& endpoint) const { | |
| auto it = EndpointByAdress_.find(endpoint.address()); | |
| if (it == EndpointByAdress_.end() || Blacklist_.contains(endpoint.address())) { | |
| return TDuration::Max(); | |
| } | |
| return it->second.Ping(); | |
| } | |
| void BanEndpoint(const std::string& adress) { | |
| Blacklist_.insert(adress); | |
| } | |
| void UnbanEndpoint(const std::string& adress) { | |
| Blacklist_.erase(adress); | |
| } | |
| private: | |
| mutable std::unordered_map<std::string, TMockedEndpoint> EndpointByAdress_; | |
| explicit TMockedPinger(std::unordered_map<std::string, std::vector<TDuration>> measuresByAddress) { | |
| EndpointByAddress_.reserve(measuresByAddress.size()); | |
| for (auto& [address, measures] : measuresByAddress) { | |
| EndpointByAddress_.emplace(std::move(address), std::move(measures)); | |
| } | |
| } | |
| TDuration operator()(const Ydb::Discovery::EndpointInfo& endpoint) const { | |
| auto it = EndpointByAddress_.find(endpoint.address()); | |
| if (it == EndpointByAddress_.end() || Blacklist_.contains(endpoint.address())) { | |
| return TDuration::Max(); | |
| } | |
| return it->second.Ping(); | |
| } | |
| void BanEndpoint(const std::string& address) { | |
| Blacklist_.insert(address); | |
| } | |
| void UnbanEndpoint(const std::string& address) { | |
| Blacklist_.erase(address); | |
| } | |
| private: | |
| mutable std::unordered_map<std::string, TMockedEndpoint> EndpointByAddress_; |
| explicit TMockedPinger(std::unordered_map<std::string, std::vector<TDuration>> measuresByAdress) { | ||
| EndpointByAdress_.reserve(measuresByAdress.size()); | ||
|
|
||
| for (auto& [adress, measures] : measuresByAdress) { | ||
| EndpointByAdress_.emplace(std::move(adress), std::move(measures)); | ||
| } | ||
| } | ||
|
|
||
| TDuration operator()(const Ydb::Discovery::EndpointInfo& endpoint) const { | ||
| auto it = EndpointByAdress_.find(endpoint.address()); | ||
| if (it == EndpointByAdress_.end() || Blacklist_.contains(endpoint.address())) { | ||
| return TDuration::Max(); | ||
| } | ||
| return it->second.Ping(); | ||
| } | ||
|
|
||
| void BanEndpoint(const std::string& adress) { | ||
| Blacklist_.insert(adress); | ||
| } | ||
|
|
||
| void UnbanEndpoint(const std::string& adress) { | ||
| Blacklist_.erase(adress); | ||
| } | ||
|
|
||
| private: | ||
| mutable std::unordered_map<std::string, TMockedEndpoint> EndpointByAdress_; |
Copilot
AI
Oct 27, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'EndpointByAdress_' to 'EndpointByAddress_' and 'adress' to 'address'.
| explicit TMockedPinger(std::unordered_map<std::string, std::vector<TDuration>> measuresByAdress) { | |
| EndpointByAdress_.reserve(measuresByAdress.size()); | |
| for (auto& [adress, measures] : measuresByAdress) { | |
| EndpointByAdress_.emplace(std::move(adress), std::move(measures)); | |
| } | |
| } | |
| TDuration operator()(const Ydb::Discovery::EndpointInfo& endpoint) const { | |
| auto it = EndpointByAdress_.find(endpoint.address()); | |
| if (it == EndpointByAdress_.end() || Blacklist_.contains(endpoint.address())) { | |
| return TDuration::Max(); | |
| } | |
| return it->second.Ping(); | |
| } | |
| void BanEndpoint(const std::string& adress) { | |
| Blacklist_.insert(adress); | |
| } | |
| void UnbanEndpoint(const std::string& adress) { | |
| Blacklist_.erase(adress); | |
| } | |
| private: | |
| mutable std::unordered_map<std::string, TMockedEndpoint> EndpointByAdress_; | |
| explicit TMockedPinger(std::unordered_map<std::string, std::vector<TDuration>> measuresByAddress) { | |
| EndpointByAddress_.reserve(measuresByAddress.size()); | |
| for (auto& [address, measures] : measuresByAddress) { | |
| EndpointByAddress_.emplace(std::move(address), std::move(measures)); | |
| } | |
| } | |
| TDuration operator()(const Ydb::Discovery::EndpointInfo& endpoint) const { | |
| auto it = EndpointByAddress_.find(endpoint.address()); | |
| if (it == EndpointByAddress_.end() || Blacklist_.contains(endpoint.address())) { | |
| return TDuration::Max(); | |
| } | |
| return it->second.Ping(); | |
| } | |
| void BanEndpoint(const std::string& address) { | |
| Blacklist_.insert(address); | |
| } | |
| void UnbanEndpoint(const std::string& address) { | |
| Blacklist_.erase(address); | |
| } | |
| private: | |
| mutable std::unordered_map<std::string, TMockedEndpoint> EndpointByAddress_; |
| } | ||
|
|
||
| TDuration operator()(const Ydb::Discovery::EndpointInfo& endpoint) const { | ||
| auto it = EndpointByAdress_.find(endpoint.address()); |
Copilot
AI
Oct 27, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'EndpointByAdress_' to 'EndpointByAddress_'.
| const std::vector<std::string> endpointsB = {"B1", "B2", "B3"}; | ||
| const std::vector<std::string> endpointsC = {"C1", "C2", "C3", "C4", "C5", "C6", "C7"}; | ||
|
|
||
| const std::size_t epoches = 3; |
Copilot
AI
Oct 27, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'epoches' to 'epochs'.
| const std::vector<std::string> endpointsC = {"C1", "C2", "C3", "C4", "C5", "C6", "C7"}; | ||
|
|
||
| const std::size_t epoches = 3; | ||
| const std::size_t measuresAmount = 10 * epoches; |
Copilot
AI
Oct 27, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'epoches' to 'epochs'.
| std::function<TDuration(const Ydb::Discovery::EndpointInfo& endpoint)> pinger = TMockedPinger(mockData); | ||
| TLocalDCDetector detector(pinger); | ||
|
|
||
| for (std::size_t i = 0; i < epoches; ++i) { |
Copilot
AI
Oct 27, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'epoches' to 'epochs'.
| const std::size_t epoches = 3; | ||
| const std::size_t measuresAmount = 10 * epoches; |
Copilot
AI
Oct 27, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'epoches' to 'epochs'.
| const std::size_t epoches = 3; | |
| const std::size_t measuresAmount = 10 * epoches; | |
| const std::size_t epochs = 3; | |
| const std::size_t measuresAmount = 10 * epochs; |
| const std::size_t epoches = 3; | ||
| const std::size_t measuresAmount = 10 * epoches; |
Copilot
AI
Oct 27, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'epoches' to 'epochs'.
| const std::size_t epoches = 3; | |
| const std::size_t measuresAmount = 10 * epoches; | |
| const std::size_t epochs = 3; | |
| const std::size_t measuresAmount = 10 * epochs; |
No description provided.