11import pickle
2- from typing import Dict , Optional , TypeVar , Union
2+ from typing import Any , Dict , Optional , TypeVar , Union
33
4- from redis .asyncio import ConnectionPool , Redis
4+ from redis .asyncio import BlockingConnectionPool , Redis
55from redis .asyncio .cluster import RedisCluster
66from taskiq import AsyncResultBackend
77from taskiq .abc .result_backend import TaskiqResult
@@ -24,6 +24,8 @@ def __init__(
2424 keep_results : bool = True ,
2525 result_ex_time : Optional [int ] = None ,
2626 result_px_time : Optional [int ] = None ,
27+ max_connection_pool_size : Optional [int ] = None ,
28+ ** connection_kwargs : Any ,
2729 ) -> None :
2830 """
2931 Constructs a new result backend.
@@ -32,13 +34,19 @@ def __init__(
3234 :param keep_results: flag to not remove results from Redis after reading.
3335 :param result_ex_time: expire time in seconds for result.
3436 :param result_px_time: expire time in milliseconds for result.
37+ :param max_connection_pool_size: maximum number of connections in pool.
38+ :param connection_kwargs: additional arguments for redis BlockingConnectionPool.
3539
3640 :raises DuplicateExpireTimeSelectedError: if result_ex_time
3741 and result_px_time are selected.
3842 :raises ExpireTimeMustBeMoreThanZeroError: if result_ex_time
3943 and result_px_time are equal zero.
4044 """
41- self .redis_pool = ConnectionPool .from_url (redis_url )
45+ self .redis_pool = BlockingConnectionPool .from_url (
46+ url = redis_url ,
47+ max_connections = max_connection_pool_size ,
48+ ** connection_kwargs ,
49+ )
4250 self .keep_results = keep_results
4351 self .result_ex_time = result_ex_time
4452 self .result_px_time = result_px_time
@@ -146,6 +154,7 @@ def __init__(
146154 keep_results : bool = True ,
147155 result_ex_time : Optional [int ] = None ,
148156 result_px_time : Optional [int ] = None ,
157+ ** connection_kwargs : Any ,
149158 ) -> None :
150159 """
151160 Constructs a new result backend.
@@ -154,13 +163,17 @@ def __init__(
154163 :param keep_results: flag to not remove results from Redis after reading.
155164 :param result_ex_time: expire time in seconds for result.
156165 :param result_px_time: expire time in milliseconds for result.
166+ :param connection_kwargs: additional arguments for RedisCluster.
157167
158168 :raises DuplicateExpireTimeSelectedError: if result_ex_time
159169 and result_px_time are selected.
160170 :raises ExpireTimeMustBeMoreThanZeroError: if result_ex_time
161171 and result_px_time are equal zero.
162172 """
163- self .redis : RedisCluster [bytes ] = RedisCluster .from_url (redis_url )
173+ self .redis : RedisCluster [bytes ] = RedisCluster .from_url (
174+ redis_url ,
175+ ** connection_kwargs ,
176+ )
164177 self .keep_results = keep_results
165178 self .result_ex_time = result_ex_time
166179 self .result_px_time = result_px_time
0 commit comments