1+ import asyncio
12import pickle
23from logging import getLogger
3- from typing import Any , AsyncGenerator , Callable , Optional , TypeVar
4+ from typing import Any , Callable , Coroutine , Optional , TypeVar
45
56from redis .asyncio import ConnectionPool , Redis
67from taskiq .abc .broker import AsyncBroker
@@ -49,7 +50,7 @@ def __init__(
4950
5051 async def shutdown (self ) -> None :
5152 """Closes redis connection pool."""
52- self .connection_pool .disconnect ()
53+ await self .connection_pool .disconnect ()
5354
5455 async def kick (self , message : BrokerMessage ) -> None :
5556 """
@@ -69,26 +70,30 @@ async def kick(self, message: BrokerMessage) -> None:
6970 pickle .dumps (message ),
7071 )
7172
72- async def listen (self ) -> AsyncGenerator [BrokerMessage , None ]:
73+ async def listen (
74+ self ,
75+ callback : Callable [[BrokerMessage ], Coroutine [Any , Any , None ]],
76+ ) -> None :
7377 """
7478 Listen redis list for new messages.
7579
76- This function listens to list and yields new messages.
80+ This function listens to list calls callback on
81+ new messages.
7782
78- :yields: parsed broker messages .
83+ :param callback: function to call on new message .
7984 """
85+ loop = asyncio .get_event_loop ()
8086 async with Redis (connection_pool = self .connection_pool ) as redis_conn :
8187 redis_pubsub_channel = redis_conn .pubsub ()
8288 await redis_pubsub_channel .subscribe (self .redis_pubsub_channel )
83- while True :
84- redis_pickled_message = await redis_pubsub_channel .get_message ()
85- if redis_pickled_message :
89+ async for message in redis_pubsub_channel .listen ():
90+ if message :
8691 try :
8792 redis_message = pickle .loads (
88- redis_pickled_message ["data" ],
93+ message ["data" ],
8994 )
9095 if isinstance (redis_message , BrokerMessage ):
91- yield redis_message
96+ loop . create_task ( callback ( redis_message ))
9297 except (
9398 TypeError ,
9499 AttributeError ,
0 commit comments