Skip to content

Commit bb38ff0

Browse files
authored
Added policies parsing and policies resolving (#3805)
* Added poilcy resolution method * Moved main command proceessing on top * Fixed return type and keyless detection * Added Dynamic and Static policies * Added coverage for policy resolvers * Removed all policies except search (phase 1) * Applied comments
1 parent 028245c commit bb38ff0

File tree

5 files changed

+458
-2
lines changed

5 files changed

+458
-2
lines changed

redis/_parsers/commands.py

Lines changed: 202 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,43 @@
1+
from dataclasses import dataclass
2+
from enum import Enum
13
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union
24

3-
from redis.exceptions import RedisError, ResponseError
5+
from redis.exceptions import RedisError, ResponseError, IncorrectPolicyType
46
from redis.utils import str_if_bytes
57

68
if TYPE_CHECKING:
79
from redis.asyncio.cluster import ClusterNode
810

11+
class RequestPolicy(Enum):
12+
ALL_NODES = 'all_nodes'
13+
ALL_SHARDS = 'all_shards'
14+
MULTI_SHARD = 'multi_shard'
15+
SPECIAL = 'special'
16+
DEFAULT_KEYLESS = 'default_keyless'
17+
DEFAULT_KEYED = 'default_keyed'
18+
19+
class ResponsePolicy(Enum):
20+
ONE_SUCCEEDED = 'one_succeeded'
21+
ALL_SUCCEEDED = 'all_succeeded'
22+
AGG_LOGICAL_AND = 'agg_logical_and'
23+
AGG_LOGICAL_OR = 'agg_logical_or'
24+
AGG_MIN = 'agg_min'
25+
AGG_MAX = 'agg_max'
26+
AGG_SUM = 'agg_sum'
27+
SPECIAL = 'special'
28+
DEFAULT_KEYLESS = 'default_keyless'
29+
DEFAULT_KEYED = 'default_keyed'
30+
31+
class CommandPolicies:
32+
def __init__(
33+
self,
34+
request_policy: RequestPolicy = RequestPolicy.DEFAULT_KEYLESS,
35+
response_policy: ResponsePolicy = ResponsePolicy.DEFAULT_KEYLESS
36+
):
37+
self.request_policy = request_policy
38+
self.response_policy = response_policy
39+
40+
PolicyRecords = dict[str, dict[str, CommandPolicies]]
941

1042
class AbstractCommandsParser:
1143
def _get_pubsub_keys(self, *args):
@@ -64,7 +96,8 @@ class CommandsParser(AbstractCommandsParser):
6496

6597
def __init__(self, redis_connection):
6698
self.commands = {}
67-
self.initialize(redis_connection)
99+
self.redis_connection = redis_connection
100+
self.initialize(self.redis_connection)
68101

69102
def initialize(self, r):
70103
commands = r.command()
@@ -169,6 +202,173 @@ def _get_moveable_keys(self, redis_conn, *args):
169202
raise e
170203
return keys
171204

205+
def _is_keyless_command(self, command_name: str, subcommand_name: Optional[str]=None) -> bool:
206+
"""
207+
Determines whether a given command or subcommand is considered "keyless".
208+
209+
A keyless command does not operate on specific keys, which is determined based
210+
on the first key position in the command or subcommand details. If the command
211+
or subcommand's first key position is zero or negative, it is treated as keyless.
212+
213+
Parameters:
214+
command_name: str
215+
The name of the command to check.
216+
subcommand_name: Optional[str], default=None
217+
The name of the subcommand to check, if applicable. If not provided,
218+
the check is performed only on the command.
219+
220+
Returns:
221+
bool
222+
True if the specified command or subcommand is considered keyless,
223+
False otherwise.
224+
225+
Raises:
226+
ValueError
227+
If the specified subcommand is not found within the command or the
228+
specified command does not exist in the available commands.
229+
"""
230+
if subcommand_name:
231+
for subcommand in self.commands.get(command_name)['subcommands']:
232+
if str_if_bytes(subcommand[0]) == subcommand_name:
233+
parsed_subcmd = self.parse_subcommand(subcommand)
234+
return parsed_subcmd['first_key_pos'] <= 0
235+
raise ValueError(f"Subcommand {subcommand_name} not found in command {command_name}")
236+
else:
237+
command_details = self.commands.get(command_name, None)
238+
if command_details is not None:
239+
return command_details['first_key_pos'] <= 0
240+
241+
raise ValueError(f"Command {command_name} not found in commands")
242+
243+
def get_command_policies(self) -> PolicyRecords:
244+
"""
245+
Retrieve and process the command policies for all commands and subcommands.
246+
247+
This method traverses through commands and subcommands, extracting policy details
248+
from associated data structures and constructing a dictionary of commands with their
249+
associated policies. It supports nested data structures and handles both main commands
250+
and their subcommands.
251+
252+
Returns:
253+
PolicyRecords: A collection of commands and subcommands associated with their
254+
respective policies.
255+
256+
Raises:
257+
IncorrectPolicyType: If an invalid policy type is encountered during policy extraction.
258+
"""
259+
command_with_policies = {}
260+
261+
def extract_policies(data, module_name, command_name):
262+
"""
263+
Recursively extract policies from nested data structures.
264+
265+
Args:
266+
data: The data structure to search (can be list, dict, str, bytes, etc.)
267+
command_name: The command name to associate with found policies
268+
"""
269+
if isinstance(data, (str, bytes)):
270+
# Decode bytes to string if needed
271+
policy = str_if_bytes(data.decode())
272+
273+
# Check if this is a policy string
274+
if policy.startswith('request_policy') or policy.startswith('response_policy'):
275+
if policy.startswith('request_policy'):
276+
policy_type = policy.split(':')[1]
277+
278+
try:
279+
command_with_policies[module_name][command_name].request_policy = RequestPolicy(policy_type)
280+
except ValueError:
281+
raise IncorrectPolicyType(f"Incorrect request policy type: {policy_type}")
282+
283+
if policy.startswith('response_policy'):
284+
policy_type = policy.split(':')[1]
285+
286+
try:
287+
command_with_policies[module_name][command_name].response_policy = ResponsePolicy(policy_type)
288+
except ValueError:
289+
raise IncorrectPolicyType(f"Incorrect response policy type: {policy_type}")
290+
291+
elif isinstance(data, list):
292+
# For lists, recursively process each element
293+
for item in data:
294+
extract_policies(item, module_name, command_name)
295+
296+
elif isinstance(data, dict):
297+
# For dictionaries, recursively process each value
298+
for value in data.values():
299+
extract_policies(value, module_name, command_name)
300+
301+
for command, details in self.commands.items():
302+
# Check whether the command has keys
303+
is_keyless = self._is_keyless_command(command)
304+
305+
if is_keyless:
306+
default_request_policy = RequestPolicy.DEFAULT_KEYLESS
307+
default_response_policy = ResponsePolicy.DEFAULT_KEYLESS
308+
else:
309+
default_request_policy = RequestPolicy.DEFAULT_KEYED
310+
default_response_policy = ResponsePolicy.DEFAULT_KEYED
311+
312+
# Check if it's a core or module command
313+
split_name = command.split('.')
314+
315+
if len(split_name) > 1:
316+
module_name = split_name[0]
317+
command_name = split_name[1]
318+
else:
319+
module_name = 'core'
320+
command_name = split_name[0]
321+
322+
# Create a CommandPolicies object with default policies on the new command.
323+
if command_with_policies.get(module_name, None) is None:
324+
command_with_policies[module_name] = {command_name: CommandPolicies(
325+
request_policy=default_request_policy,
326+
response_policy=default_response_policy
327+
)}
328+
else:
329+
command_with_policies[module_name][command_name] = CommandPolicies(
330+
request_policy=default_request_policy,
331+
response_policy=default_response_policy
332+
)
333+
334+
tips = details.get('tips')
335+
subcommands = details.get('subcommands')
336+
337+
# Process tips for the main command
338+
if tips:
339+
extract_policies(tips, module_name, command_name)
340+
341+
# Process subcommands
342+
if subcommands:
343+
for subcommand_details in subcommands:
344+
# Get the subcommand name (first element)
345+
subcmd_name = subcommand_details[0]
346+
if isinstance(subcmd_name, bytes):
347+
subcmd_name = subcmd_name.decode()
348+
349+
# Check whether the subcommand has keys
350+
is_keyless = self._is_keyless_command(command, subcmd_name)
351+
352+
if is_keyless:
353+
default_request_policy = RequestPolicy.DEFAULT_KEYLESS
354+
default_response_policy = ResponsePolicy.DEFAULT_KEYLESS
355+
else:
356+
default_request_policy = RequestPolicy.DEFAULT_KEYED
357+
default_response_policy = ResponsePolicy.DEFAULT_KEYED
358+
359+
subcmd_name = subcmd_name.replace('|', ' ')
360+
361+
# Create a CommandPolicies object with default policies on the new command.
362+
command_with_policies[module_name][subcmd_name] = CommandPolicies(
363+
request_policy=default_request_policy,
364+
response_policy=default_response_policy
365+
)
366+
367+
# Recursively extract policies from the rest of the subcommand details
368+
for subcommand_detail in subcommand_details[1:]:
369+
extract_policies(subcommand_detail, module_name, subcmd_name)
370+
371+
return command_with_policies
172372

173373
class AsyncCommandsParser(AbstractCommandsParser):
174374
"""

redis/commands/policies.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
from abc import ABC, abstractmethod
2+
from typing import Optional
3+
4+
from redis._parsers.commands import CommandPolicies, PolicyRecords, RequestPolicy, ResponsePolicy, CommandsParser
5+
6+
STATIC_POLICIES: PolicyRecords = {
7+
'ft': {
8+
'explaincli': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
9+
'suglen': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYED, response_policy=ResponsePolicy.DEFAULT_KEYED),
10+
'profile': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
11+
'dropindex': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
12+
'aliasupdate': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
13+
'alter': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
14+
'aggregate': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
15+
'syndump': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
16+
'create': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
17+
'explain': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
18+
'sugget': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYED, response_policy=ResponsePolicy.DEFAULT_KEYED),
19+
'dictdel': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
20+
'aliasadd': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
21+
'dictadd': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
22+
'synupdate': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
23+
'drop': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
24+
'info': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
25+
'sugadd': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYED, response_policy=ResponsePolicy.DEFAULT_KEYED),
26+
'dictdump': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
27+
'cursor': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
28+
'search': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
29+
'tagvals': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
30+
'aliasdel': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
31+
'sugdel': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYED, response_policy=ResponsePolicy.DEFAULT_KEYED),
32+
'spellcheck': CommandPolicies(request_policy=RequestPolicy.DEFAULT_KEYLESS, response_policy=ResponsePolicy.DEFAULT_KEYLESS),
33+
}
34+
}
35+
36+
class PolicyResolver(ABC):
37+
38+
@abstractmethod
39+
def resolve(self, command_name: str) -> CommandPolicies:
40+
"""
41+
Resolves the command name and determines the associated command policies.
42+
43+
Args:
44+
command_name: The name of the command to resolve.
45+
46+
Returns:
47+
CommandPolicies: The policies associated with the specified command.
48+
"""
49+
pass
50+
51+
@abstractmethod
52+
def with_fallback(self, fallback: "PolicyResolver") -> "PolicyResolver":
53+
"""
54+
Factory method to instantiate a policy resolver with a fallback resolver.
55+
56+
Args:
57+
fallback: Fallback resolver
58+
59+
Returns:
60+
PolicyResolver: Returns a new policy resolver with the specified fallback resolver.
61+
"""
62+
pass
63+
64+
class BasePolicyResolver(PolicyResolver):
65+
"""
66+
Base class for policy resolvers.
67+
"""
68+
def __init__(self, policies: PolicyRecords, fallback: Optional[PolicyResolver] = None) -> None:
69+
self._policies = policies
70+
self._fallback = fallback
71+
72+
def resolve(self, command_name: str) -> CommandPolicies:
73+
parts = command_name.split(".")
74+
75+
if len(parts) > 2:
76+
raise ValueError(f"Wrong command or module name: {command_name}")
77+
78+
module, command = parts if len(parts) == 2 else ("core", parts[0])
79+
80+
if self._policies.get(module, None) is None:
81+
if self._fallback is not None:
82+
return self._fallback.resolve(command_name)
83+
else:
84+
raise ValueError(f"Module {module} not found")
85+
86+
if self._policies.get(module).get(command, None) is None:
87+
if self._fallback is not None:
88+
return self._fallback.resolve(command_name)
89+
else:
90+
raise ValueError(f"Command {command} not found in module {module}")
91+
92+
return self._policies.get(module).get(command)
93+
94+
@abstractmethod
95+
def with_fallback(self, fallback: "PolicyResolver") -> "PolicyResolver":
96+
pass
97+
98+
99+
class DynamicPolicyResolver(BasePolicyResolver):
100+
"""
101+
Resolves policy dynamically based on the COMMAND output.
102+
"""
103+
def __init__(self, commands_parser: CommandsParser, fallback: Optional[PolicyResolver] = None) -> None:
104+
"""
105+
Parameters:
106+
commands_parser (CommandsParser): COMMAND output parser.
107+
fallback (Optional[PolicyResolver]): An optional resolver to be used when the
108+
primary policies cannot handle a specific request.
109+
"""
110+
self._commands_parser = commands_parser
111+
super().__init__(commands_parser.get_command_policies(), fallback)
112+
113+
def with_fallback(self, fallback: "PolicyResolver") -> "PolicyResolver":
114+
return DynamicPolicyResolver(self._commands_parser, fallback)
115+
116+
117+
class StaticPolicyResolver(BasePolicyResolver):
118+
"""
119+
Resolves policy from a static list of policy records.
120+
"""
121+
def __init__(self, fallback: Optional[PolicyResolver] = None) -> None:
122+
"""
123+
Parameters:
124+
fallback (Optional[PolicyResolver]): An optional fallback policy resolver
125+
used for resolving policies if static policies are inadequate.
126+
"""
127+
super().__init__(STATIC_POLICIES, fallback)
128+
129+
def with_fallback(self, fallback: "PolicyResolver") -> "PolicyResolver":
130+
return StaticPolicyResolver(fallback)

redis/exceptions.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,3 +245,9 @@ class InvalidPipelineStack(RedisClusterException):
245245
"""
246246

247247
pass
248+
249+
class IncorrectPolicyType(Exception):
250+
"""
251+
Raised when a policy type isn't matching to any known policy types.
252+
"""
253+
pass

0 commit comments

Comments
 (0)