2424 no_type_check ,
2525)
2626
27- import aioredis
28- from aioredis .client import Pipeline
27+ from more_itertools import ichunked
2928from pydantic import BaseModel , validator
3029from pydantic .fields import FieldInfo as PydanticFieldInfo
3130from pydantic .fields import ModelField , Undefined , UndefinedType
3534from typing_extensions import Protocol , get_args , get_origin
3635from ulid import ULID
3736
37+ from .. import redis
3838from ..checks import has_redis_json , has_redisearch
3939from ..connections import get_redis_connection
40- from ..unasync_util import ASYNC_MODE
40+ from ..util import ASYNC_MODE
4141from .encoders import jsonable_encoder
4242from .render_tree import render_tree
4343from .token_escaper import TokenEscaper
@@ -760,6 +760,9 @@ async def all(self, batch_size=DEFAULT_PAGE_SIZE):
760760 return await query .execute ()
761761 return await self .execute ()
762762
763+ async def page (self , offset = 0 , limit = 10 ):
764+ return await self .copy (offset = offset , limit = limit ).execute ()
765+
763766 def sort_by (self , * fields : str ):
764767 if not fields :
765768 return self
@@ -975,7 +978,7 @@ class BaseMeta(Protocol):
975978 global_key_prefix : str
976979 model_key_prefix : str
977980 primary_key_pattern : str
978- database : aioredis .Redis
981+ database : redis .Redis
979982 primary_key : PrimaryKey
980983 primary_key_creator_cls : Type [PrimaryKeyCreator ]
981984 index_name : str
@@ -994,7 +997,7 @@ class DefaultMeta:
994997 global_key_prefix : Optional [str ] = None
995998 model_key_prefix : Optional [str ] = None
996999 primary_key_pattern : Optional [str ] = None
997- database : Optional [aioredis .Redis ] = None
1000+ database : Optional [redis .Redis ] = None
9981001 primary_key : Optional [PrimaryKey ] = None
9991002 primary_key_creator_cls : Optional [Type [PrimaryKeyCreator ]] = None
10001003 index_name : Optional [str ] = None
@@ -1115,9 +1118,17 @@ def key(self):
11151118 return self .make_primary_key (pk )
11161119
11171120 @classmethod
1118- async def delete (cls , pk : Any ) -> int :
1121+ async def _delete (cls , db , * pks ):
1122+ return await db .delete (* pks )
1123+
1124+ @classmethod
1125+ async def delete (
1126+ cls , pk : Any , pipeline : Optional [redis .client .Pipeline ] = None
1127+ ) -> int :
11191128 """Delete data at this key."""
1120- return await cls .db ().delete (cls .make_primary_key (pk ))
1129+ db = cls ._get_db (pipeline )
1130+
1131+ return await cls ._delete (db , cls .make_primary_key (pk ))
11211132
11221133 @classmethod
11231134 async def get (cls , pk : Any ) -> "RedisModel" :
@@ -1127,14 +1138,15 @@ async def update(self, **field_values):
11271138 """Update this model instance with the specified key-value pairs."""
11281139 raise NotImplementedError
11291140
1130- async def save (self , pipeline : Optional [Pipeline ] = None ) -> "RedisModel" :
1141+ async def save (
1142+ self , pipeline : Optional [redis .client .Pipeline ] = None
1143+ ) -> "RedisModel" :
11311144 raise NotImplementedError
11321145
1133- async def expire (self , num_seconds : int , pipeline : Optional [Pipeline ] = None ):
1134- if pipeline is None :
1135- db = self .db ()
1136- else :
1137- db = pipeline
1146+ async def expire (
1147+ self , num_seconds : int , pipeline : Optional [redis .client .Pipeline ] = None
1148+ ):
1149+ db = self ._get_db (pipeline )
11381150
11391151 # TODO: Wrap any Redis response errors in a custom exception?
11401152 await db .expire (self .make_primary_key (self .pk ), num_seconds )
@@ -1223,19 +1235,10 @@ def get_annotations(cls):
12231235 async def add (
12241236 cls ,
12251237 models : Sequence ["RedisModel" ],
1226- pipeline : Optional [Pipeline ] = None ,
1238+ pipeline : Optional [redis . client . Pipeline ] = None ,
12271239 pipeline_verifier : Callable [..., Any ] = verify_pipeline_response ,
12281240 ) -> Sequence ["RedisModel" ]:
1229- if pipeline is None :
1230- # By default, send commands in a pipeline. Saving each model will
1231- # be atomic, but Redis may process other commands in between
1232- # these saves.
1233- db = cls .db ().pipeline (transaction = False )
1234- else :
1235- # If the user gave us a pipeline, add our commands to that. The user
1236- # will be responsible for executing the pipeline after they've accumulated
1237- # the commands they want to send.
1238- db = pipeline
1241+ db = cls ._get_db (pipeline , bulk = True )
12391242
12401243 for model in models :
12411244 # save() just returns the model, we don't need that here.
@@ -1249,6 +1252,31 @@ async def add(
12491252
12501253 return models
12511254
1255+ @classmethod
1256+ def _get_db (
1257+ self , pipeline : Optional [redis .client .Pipeline ] = None , bulk : bool = False
1258+ ):
1259+ if pipeline is not None :
1260+ return pipeline
1261+ elif bulk :
1262+ return self .db ().pipeline (transaction = False )
1263+ else :
1264+ return self .db ()
1265+
1266+ @classmethod
1267+ async def delete_many (
1268+ cls ,
1269+ models : Sequence ["RedisModel" ],
1270+ pipeline : Optional [redis .client .Pipeline ] = None ,
1271+ ) -> int :
1272+ db = cls ._get_db (pipeline )
1273+
1274+ for chunk in ichunked (models , 100 ):
1275+ pks = [cls .make_primary_key (model .pk ) for model in chunk ]
1276+ await cls ._delete (db , * pks )
1277+
1278+ return len (models )
1279+
12521280 @classmethod
12531281 def redisearch_schema (cls ):
12541282 raise NotImplementedError
@@ -1283,11 +1311,11 @@ def __init_subclass__(cls, **kwargs):
12831311 f"HashModels cannot index dataclass fields. Field: { name } "
12841312 )
12851313
1286- def dict (self ) -> Dict [str , Any ]:
1314+ def dict (self ) -> Dict [str , Any ]: # type: ignore
12871315 # restore none values
12881316 return dict (self )
12891317
1290- async def save (self , pipeline : Optional [Pipeline ] = None ) -> "HashModel" :
1318+ async def save (self , pipeline : Optional [redis . client . Pipeline ] = None ) -> "HashModel" :
12911319 self .check ()
12921320 if pipeline is None :
12931321 db = self .db ()
@@ -1461,12 +1489,12 @@ def __init__(self, *args, **kwargs):
14611489 )
14621490 super ().__init__ (* args , ** kwargs )
14631491
1464- async def save (self , pipeline : Optional [Pipeline ] = None ) -> "JsonModel" :
1492+ async def save (
1493+ self , pipeline : Optional [redis .client .Pipeline ] = None
1494+ ) -> "JsonModel" :
14651495 self .check ()
1466- if pipeline is None :
1467- db = self .db ()
1468- else :
1469- db = pipeline
1496+ db = self ._get_db (pipeline )
1497+
14701498 # TODO: Wrap response errors in a custom exception?
14711499 await db .execute_command ("JSON.SET" , self .key (), "." , self .json ())
14721500 return self
0 commit comments