diff --git a/redis/_parsers/commands.py b/redis/_parsers/commands.py index a7571ac195..cff2296b27 100644 --- a/redis/_parsers/commands.py +++ b/redis/_parsers/commands.py @@ -11,10 +11,12 @@ class RequestPolicy(Enum): ALL_NODES = 'all_nodes' ALL_SHARDS = 'all_shards' + ALL_REPLICAS = 'all_replicas' MULTI_SHARD = 'multi_shard' SPECIAL = 'special' DEFAULT_KEYLESS = 'default_keyless' DEFAULT_KEYED = 'default_keyed' + DEFAULT_NODE = 'default_node' class ResponsePolicy(Enum): ONE_SUCCEEDED = 'one_succeeded' @@ -162,7 +164,9 @@ def get_keys(self, redis_conn, *args): for subcmd in command["subcommands"]: if str_if_bytes(subcmd[0]) == subcmd_name: command = self.parse_subcommand(subcmd) - is_subcmd = True + + if command['first_key_pos'] > 0: + is_subcmd = True # The command doesn't have keys in it if not is_subcmd: diff --git a/redis/cluster.py b/redis/cluster.py index 839721edf1..8fc7ef5ef7 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -11,12 +11,14 @@ from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union from redis._parsers import CommandsParser, Encoder +from redis._parsers.commands import RequestPolicy, CommandPolicies, ResponsePolicy from redis._parsers.helpers import parse_scan from redis.backoff import ExponentialWithJitterBackoff, NoBackoff from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis from redis.commands import READ_COMMANDS, RedisClusterCommands from redis.commands.helpers import list_or_args +from redis.commands.policies import PolicyResolver, StaticPolicyResolver from redis.connection import ( Connection, ConnectionPool, @@ -531,6 +533,7 @@ def __init__( cache: Optional[CacheInterface] = None, cache_config: Optional[CacheConfig] = None, event_dispatcher: Optional[EventDispatcher] = None, + policy_resolver: PolicyResolver = StaticPolicyResolver(), **kwargs, ): """ @@ -712,7 +715,34 @@ def __init__( ) self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS) + # For backward compatibility, mapping from existing policies to new one + self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = { + self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS, + self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS, + self.__class__.ALL_NODES: RequestPolicy.ALL_NODES, + self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS, + self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE, + SLOT_ID: RequestPolicy.DEFAULT_KEYED, + } + + self._policies_callback_mapping: dict[Union[RequestPolicy, ResponsePolicy], Callable] = { + RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [self.get_random_primary_or_all_nodes(command_name)], + RequestPolicy.DEFAULT_KEYED: lambda command, *args: self.get_nodes_from_slot(command, *args), + RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()], + RequestPolicy.ALL_SHARDS: self.get_primaries, + RequestPolicy.ALL_NODES: self.get_nodes, + RequestPolicy.ALL_REPLICAS: self.get_replicas, + RequestPolicy.MULTI_SHARD: lambda *args, **kwargs: self._split_multi_shard_command(*args, **kwargs), + RequestPolicy.SPECIAL: self.get_special_nodes, + ResponsePolicy.DEFAULT_KEYLESS: lambda res: res, + ResponsePolicy.DEFAULT_KEYED: lambda res: res, + } + + self._policy_resolver = policy_resolver self.commands_parser = CommandsParser(self) + + # Node where FT.AGGREGATE command is executed. + self._aggregate_nodes = None self._lock = threading.RLock() def __enter__(self): @@ -775,6 +805,15 @@ def get_replicas(self): def get_random_node(self): return random.choice(list(self.nodes_manager.nodes_cache.values())) + def get_random_primary_or_all_nodes(self, command_name): + """ + Returns random primary or all nodes depends on READONLY mode. + """ + if self.read_from_replicas and command_name in READ_COMMANDS: + return self.get_random_node() + + return self.get_random_primary_node() + def get_nodes(self): return list(self.nodes_manager.nodes_cache.values()) @@ -804,6 +843,74 @@ def get_default_node(self): """ return self.nodes_manager.default_node + def get_nodes_from_slot(self, command: str, *args): + """ + Returns a list of nodes that hold the specified keys' slots. + """ + # get the node that holds the key's slot + slot = self.determine_slot(*args) + node = self.nodes_manager.get_node_from_slot( + slot, + self.read_from_replicas and command in READ_COMMANDS, + self.load_balancing_strategy if command in READ_COMMANDS else None, + ) + return [node] + + def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]: + """ + Splits the command with Multi-Shard policy, to the multiple commands + """ + keys = self._get_command_keys(*args) + commands = [] + + for key in keys: + commands.append({ + 'args': (args[0], key), + 'kwargs': kwargs, + }) + + return commands + + def get_special_nodes(self) -> Optional[list["ClusterNode"]]: + """ + Returns a list of nodes for commands with a special policy. + """ + if not self._aggregate_nodes: + raise RedisClusterException('Cannot execute FT.CURSOR commands without FT.AGGREGATE') + + return self._aggregate_nodes + + def get_random_primary_node(self) -> "ClusterNode": + """ + Returns a random primary node + """ + return random.choice(self.get_primaries()) + + def _evaluate_all_succeeded(self, res): + """ + Evaluate the result of a command with ResponsePolicy.ALL_SUCCEEDED + """ + first_successful_response = None + + if isinstance(res, dict): + for key, value in res.items(): + if value: + if first_successful_response is None: + first_successful_response = {key: value} + else: + return {key: False} + else: + for response in res: + if response: + if first_successful_response is None: + # Dynamically resolve type + first_successful_response = type(response)(response) + else: + return type(response)(False) + + return first_successful_response + + def set_default_node(self, node): """ Set the default node of the cluster. @@ -953,9 +1060,10 @@ def set_response_callback(self, command, callback): """Set a custom Response Callback""" self.cluster_response_callbacks[command] = callback - def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: - # Determine which nodes should be executed the command on. - # Returns a list of target nodes. + def _determine_nodes(self, *args, request_policy: RequestPolicy, **kwargs) -> List["ClusterNode"]: + """ + Determines a nodes the command should be executed on. + """ command = args[0].upper() if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: command = f"{args[0]} {args[1]}".upper() @@ -967,32 +1075,25 @@ def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: else: # get the nodes group for this command if it was predefined command_flag = self.command_flags.get(command) - if command_flag == self.__class__.RANDOM: - # return a random node - return [self.get_random_node()] - elif command_flag == self.__class__.PRIMARIES: - # return all primaries - return self.get_primaries() - elif command_flag == self.__class__.REPLICAS: - # return all replicas - return self.get_replicas() - elif command_flag == self.__class__.ALL_NODES: - # return all nodes - return self.get_nodes() - elif command_flag == self.__class__.DEFAULT_NODE: - # return the cluster's default node - return [self.nodes_manager.default_node] - elif command in self.__class__.SEARCH_COMMANDS[0]: - return [self.nodes_manager.default_node] + + if command_flag in self._command_flags_mapping: + request_policy = self._command_flags_mapping[command_flag] + + policy_callback = self._policies_callback_mapping[request_policy] + + if request_policy == RequestPolicy.DEFAULT_KEYED: + nodes = policy_callback(command, *args) + elif request_policy == RequestPolicy.MULTI_SHARD: + nodes = policy_callback(*args, **kwargs) + elif request_policy == RequestPolicy.DEFAULT_KEYLESS: + nodes = policy_callback(args[0]) else: - # get the node that holds the key's slot - slot = self.determine_slot(*args) - node = self.nodes_manager.get_node_from_slot( - slot, - self.read_from_replicas and command in READ_COMMANDS, - self.load_balancing_strategy if command in READ_COMMANDS else None, - ) - return [node] + nodes = policy_callback() + + if args[0].lower() == "ft.aggregate": + self._aggregate_nodes = nodes + + return nodes def _should_reinitialized(self): # To reinitialize the cluster on every MOVED error, @@ -1142,6 +1243,35 @@ def _internal_execute_command(self, *args, **kwargs): is_default_node = False target_nodes = None passed_targets = kwargs.pop("target_nodes", None) + command_policies = self._policy_resolver.resolve(args[0].lower()) + + if not command_policies: + command = args[0].upper() + if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags: + command = f"{args[0]} {args[1]}".upper() + + # We only could resolve key properties if command is not + # in a list of pre-defined request policies + command_flag = self.command_flags.get(command) + if not command_flag: + # Fallback to default policy + if not self.get_default_node(): + keys = None + else: + keys = self._get_command_keys(*args) + if not keys or len(keys) == 0: + command_policies = CommandPolicies() + else: + command_policies = CommandPolicies( + request_policy=RequestPolicy.DEFAULT_KEYED, + response_policy=ResponsePolicy.DEFAULT_KEYED, + ) + else: + if command_flag in self._command_flags_mapping: + command_policies = CommandPolicies(request_policy=self._command_flags_mapping[command_flag]) + else: + command_policies = CommandPolicies() + if passed_targets is not None and not self._is_nodes_flag(passed_targets): target_nodes = self._parse_target_nodes(passed_targets) target_nodes_specified = True @@ -1162,7 +1292,7 @@ def _internal_execute_command(self, *args, **kwargs): if not target_nodes_specified: # Determine the nodes to execute the command on target_nodes = self._determine_nodes( - *args, **kwargs, nodes_flag=passed_targets + *args, request_policy=command_policies.request_policy, nodes_flag=passed_targets ) if not target_nodes: raise RedisClusterException( @@ -1175,8 +1305,12 @@ def _internal_execute_command(self, *args, **kwargs): is_default_node = True for node in target_nodes: res[node.name] = self._execute_command(node, *args, **kwargs) + + if command_policies.response_policy == ResponsePolicy.ONE_SUCCEEDED: + break + # Return the processed result - return self._process_result(args[0], res, **kwargs) + return self._process_result(args[0], res, response_policy=command_policies.response_policy, **kwargs) except Exception as e: if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: if is_default_node: @@ -1316,7 +1450,7 @@ def close(self) -> None: # RedisCluster's __init__ can fail before nodes_manager is set pass - def _process_result(self, command, res, **kwargs): + def _process_result(self, command, res, response_policy: ResponsePolicy, **kwargs): """ Process the result of the executed command. The function would return a dict or a single value. @@ -1328,13 +1462,13 @@ def _process_result(self, command, res, **kwargs): Dict """ if command in self.result_callbacks: - return self.result_callbacks[command](command, res, **kwargs) + res = self.result_callbacks[command](command, res, **kwargs) elif len(res) == 1: # When we execute the command on a single node, we can # remove the dictionary and return a single response - return list(res.values())[0] - else: - return res + res = list(res.values())[0] + + return self._policies_callback_mapping[response_policy](res) def load_external_module(self, funcname, func): """ @@ -2155,6 +2289,7 @@ def __init__( retry: Optional[Retry] = None, lock=None, transaction=False, + policy_resolver: PolicyResolver = StaticPolicyResolver(), **kwargs, ): """ """ @@ -2193,6 +2328,31 @@ def __init__( PipelineStrategy(self) if not transaction else TransactionStrategy(self) ) + # For backward compatibility, mapping from existing policies to new one + self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = { + self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS, + self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS, + self.__class__.ALL_NODES: RequestPolicy.ALL_NODES, + self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS, + self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE, + SLOT_ID: RequestPolicy.DEFAULT_KEYED, + } + + self._policies_callback_mapping: dict[Union[RequestPolicy, ResponsePolicy], Callable] = { + RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [self.get_random_primary_or_all_nodes(command_name)], + RequestPolicy.DEFAULT_KEYED: lambda command, *args: self.get_nodes_from_slot(command, *args), + RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()], + RequestPolicy.ALL_SHARDS: self.get_primaries, + RequestPolicy.ALL_NODES: self.get_nodes, + RequestPolicy.ALL_REPLICAS: self.get_replicas, + RequestPolicy.MULTI_SHARD: lambda *args, **kwargs: self._split_multi_shard_command(*args, **kwargs), + RequestPolicy.SPECIAL: self.get_special_nodes, + ResponsePolicy.DEFAULT_KEYLESS: lambda res: res, + ResponsePolicy.DEFAULT_KEYED: lambda res: res, + } + + self._policy_resolver = policy_resolver + def __repr__(self): """ """ return f"{type(self).__name__}" @@ -2771,6 +2931,35 @@ def _send_cluster_commands( # we figure out the slot number that command maps to, then from # the slot determine the node. for c in attempt: + command_policies = self._pipe._policy_resolver.resolve(c.args[0].lower()) + + if not command_policies: + command = c.args[0].upper() + if len(c.args) >= 2 and f"{c.args[0]} {c.args[1]}".upper() in self._pipe.command_flags: + command = f"{c.args[0]} {c.args[1]}".upper() + + # We only could resolve key properties if command is not + # in a list of pre-defined request policies + command_flag = self.command_flags.get(command) + if not command_flag: + # Fallback to default policy + if not self._pipe.get_default_node(): + keys = None + else: + keys = self._pipe._get_command_keys(*c.args) + if not keys or len(keys) == 0: + command_policies = CommandPolicies() + else: + command_policies = CommandPolicies( + request_policy=RequestPolicy.DEFAULT_KEYED, + response_policy=ResponsePolicy.DEFAULT_KEYED, + ) + else: + if command_flag in self._pipe._command_flags_mapping: + command_policies = CommandPolicies(request_policy=self._pipe._command_flags_mapping[command_flag]) + else: + command_policies = CommandPolicies() + while True: # refer to our internal node -> slot table that # tells us where a given command should route to. @@ -2781,7 +2970,7 @@ def _send_cluster_commands( target_nodes = self._parse_target_nodes(passed_targets) else: target_nodes = self._determine_nodes( - *c.args, node_flag=passed_targets + *c.args, request_policy=command_policies.request_policy, node_flag=passed_targets ) if not target_nodes: raise RedisClusterException( @@ -2944,7 +3133,7 @@ def _parse_target_nodes(self, target_nodes): ) return nodes - def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: + def _determine_nodes(self, *args, request_policy: RequestPolicy, **kwargs) -> List["ClusterNode"]: # Determine which nodes should be executed the command on. # Returns a list of target nodes. command = args[0].upper() @@ -2961,34 +3150,25 @@ def _determine_nodes(self, *args, **kwargs) -> List["ClusterNode"]: else: # get the nodes group for this command if it was predefined command_flag = self._pipe.command_flags.get(command) - if command_flag == self._pipe.RANDOM: - # return a random node - return [self._pipe.get_random_node()] - elif command_flag == self._pipe.PRIMARIES: - # return all primaries - return self._pipe.get_primaries() - elif command_flag == self._pipe.REPLICAS: - # return all replicas - return self._pipe.get_replicas() - elif command_flag == self._pipe.ALL_NODES: - # return all nodes - return self._pipe.get_nodes() - elif command_flag == self._pipe.DEFAULT_NODE: - # return the cluster's default node - return [self._nodes_manager.default_node] - elif command in self._pipe.SEARCH_COMMANDS[0]: - return [self._nodes_manager.default_node] + + if command_flag in self._pipe._command_flags_mapping: + request_policy = self._pipe._command_flags_mapping[command_flag] + + policy_callback = self._pipe._policies_callback_mapping[request_policy] + + if request_policy == RequestPolicy.DEFAULT_KEYED: + nodes = policy_callback(command, *args) + elif request_policy == RequestPolicy.MULTI_SHARD: + nodes = policy_callback(*args, **kwargs) + elif request_policy == RequestPolicy.DEFAULT_KEYLESS: + nodes = policy_callback(args[0]) else: - # get the node that holds the key's slot - slot = self._pipe.determine_slot(*args) - node = self._nodes_manager.get_node_from_slot( - slot, - self._pipe.read_from_replicas and command in READ_COMMANDS, - self._pipe.load_balancing_strategy - if command in READ_COMMANDS - else None, - ) - return [node] + nodes = policy_callback() + + if args[0].lower() == "ft.aggregate": + self._aggregate_nodes = nodes + + return nodes def multi(self): raise RedisClusterException( diff --git a/redis/commands/policies.py b/redis/commands/policies.py index a2f7f45924..7413f3f68d 100644 --- a/redis/commands/policies.py +++ b/redis/commands/policies.py @@ -24,12 +24,15 @@ 'info': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS), 'sugadd': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYED, response_policy=ResponsePolicy.DEFAULT_KEYED), 'dictdump': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS), - 'cursor': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS), + 'cursor': CommandPolicies(request_policy=RequestPolicy.SPECIAL, response_policy=ResponsePolicy.DEFAULT_KEYLESS), 'search': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS), 'tagvals': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS), 'aliasdel': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS), 'sugdel': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYED, response_policy=ResponsePolicy.DEFAULT_KEYED), 'spellcheck': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS), + }, + 'core': { + 'command': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS), } } @@ -69,7 +72,7 @@ def __init__(self, policies: PolicyRecords, fallback: Optional[PolicyResolver] = self._policies = policies self._fallback = fallback - def resolve(self, command_name: str) -> CommandPolicies: + def resolve(self, command_name: str) -> Optional[CommandPolicies]: parts = command_name.split(".") if len(parts) > 2: @@ -81,13 +84,13 @@ def resolve(self, command_name: str) -> CommandPolicies: if self._fallback is not None: return self._fallback.resolve(command_name) else: - raise ValueError(f"Module {module} not found") + return None if self._policies.get(module).get(command, None) is None: if self._fallback is not None: return self._fallback.resolve(command_name) else: - raise ValueError(f"Command {command} not found in module {module}") + return None return self._policies.get(module).get(command) diff --git a/tests/conftest.py b/tests/conftest.py index 7eaccb1acb..9c174974ef 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -30,7 +30,7 @@ from tests.ssl_utils import get_tls_certificates REDIS_INFO = {} -default_redis_url = "redis://localhost:6379/0" +default_redis_url = "redis://localhost:16379/0" default_protocol = "2" default_redismod_url = "redis://localhost:6479" diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 2936bb0024..759c93ffc6 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -207,7 +207,28 @@ def cmd_init_mock(self, r): "first_key_pos": 1, "last_key_pos": 1, "step_count": 1, - } + }, + "cluster delslots": { + "name": "cluster delslots", + "flags": ["readonly", "fast"], + "first_key_pos": 0, + "last_key_pos": 0, + "step_count": 0, + }, + "cluster delslotsrange": { + "name": "cluster delslotsrange", + "flags": ["readonly", "fast"], + "first_key_pos": 0, + "last_key_pos": 0, + "step_count": 0, + }, + "cluster addslots": { + "name": "cluster delslotsrange", + "flags": ["readonly", "fast"], + "first_key_pos": 0, + "last_key_pos": 0, + "step_count": 0, + }, } cmd_parser_initialize.side_effect = cmd_init_mock diff --git a/tests/test_command_policies.py b/tests/test_command_policies.py index c0d057f0b0..ca3ecb1036 100644 --- a/tests/test_command_policies.py +++ b/tests/test_command_policies.py @@ -1,11 +1,15 @@ -from unittest.mock import Mock +import random +from unittest.mock import Mock, patch import pytest +from redis import ResponseError + from redis._parsers import CommandsParser from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy from redis.commands.policies import DynamicPolicyResolver, StaticPolicyResolver - +from redis.commands.search.aggregation import AggregateRequest +from redis.commands.search.field import TextField, NumericField @pytest.mark.onlycluster class TestBasePolicyResolver: @@ -28,11 +32,8 @@ def test_resolve(self): with pytest.raises(ValueError, match="Wrong command or module name: foo.bar.baz"): dynamic_resolver.resolve('foo.bar.baz') - with pytest.raises(ValueError, match="Module foo not found"): - dynamic_resolver.resolve('foo.bar') - - with pytest.raises(ValueError, match="Command foo not found in module core"): - dynamic_resolver.resolve('core.foo') + assert dynamic_resolver.resolve('foo.bar') is None + assert dynamic_resolver.resolve('core.foo') is None # Test that policy fallback correctly static_resolver = StaticPolicyResolver() @@ -54,4 +55,94 @@ def test_resolve(self): with_fallback_static_resolver = static_resolver.with_fallback(another_dynamic_resolver) with_double_fallback_dynamic_resolver = dynamic_resolver.with_fallback(with_fallback_static_resolver) - assert with_double_fallback_dynamic_resolver.resolve('foo.bar') == foo_bar_policy \ No newline at end of file + assert with_double_fallback_dynamic_resolver.resolve('foo.bar') == foo_bar_policy + +@pytest.mark.onlycluster +class TestClusterWithPolicies: + def test_resolves_correctly_policies(self, r, monkeypatch): + # original nodes selection method + determine_nodes = r._determine_nodes + determined_nodes = [] + primary_nodes = r.get_primaries() + calls = iter(list(range(len(primary_nodes)))) + + def wrapper(*args, request_policy: RequestPolicy, **kwargs): + nonlocal determined_nodes + determined_nodes = determine_nodes(*args, request_policy=request_policy, **kwargs) + return determined_nodes + + # Mock random.choice to always return a pre-defined sequence of nodes + monkeypatch.setattr(random, "choice", lambda seq: seq[next(calls)]) + + with patch.object(r, '_determine_nodes', side_effect=wrapper, autospec=True): + # Routed to a random primary node + r.ft().create_index( + ( + NumericField("random_num"), + TextField("title"), + TextField("body"), + TextField("parent"), + ) + ) + assert determined_nodes[0] == primary_nodes[0] + + # Routed to another random primary node + info = r.ft().info() + assert info['index_name'] == 'idx' + assert determined_nodes[0] == primary_nodes[1] + + expected_node = r.get_nodes_from_slot('ft.suglen', *['FT.SUGLEN', 'foo']) + r.ft().suglen('foo') + assert determined_nodes[0] == expected_node[0] + + # Indexing a document + r.hset( + "search", + mapping={ + "title": "RediSearch", + "body": "Redisearch impements a search engine on top of redis", + "parent": "redis", + "random_num": 10, + }, + ) + r.hset( + "ai", + mapping={ + "title": "RedisAI", + "body": "RedisAI executes Deep Learning/Machine Learning models and managing their data.", # noqa + "parent": "redis", + "random_num": 3, + }, + ) + r.hset( + "json", + mapping={ + "title": "RedisJson", + "body": "RedisJSON implements ECMA-404 The JSON Data Interchange Standard as a native data type.", # noqa + "parent": "redis", + "random_num": 8, + }, + ) + + req = AggregateRequest("redis").group_by( + "@parent" + ).cursor(1) + cursor = r.ft().aggregate(req).cursor + + # Ensure that aggregate node was cached. + assert determined_nodes[0] == r._aggregate_nodes[0] + + r.ft().aggregate(cursor) + + # Verify that FT.CURSOR dispatched to the same node. + assert determined_nodes[0] == r._aggregate_nodes[0] + + # Error propagates to a user + with pytest.raises(ResponseError, match="Cursor not found, id: 0"): + r.ft().aggregate(cursor) + + assert determined_nodes[0] == primary_nodes[2] + + # Core commands also randomly distributed across masters + r.randomkey() + assert determined_nodes[0] == primary_nodes[0] \ No newline at end of file