@@ -165,6 +165,7 @@ def __init__(
165165 consumer_id : str = "$" ,
166166 mkstream : bool = True ,
167167 xread_block : int = 10000 ,
168+ maxlen : Optional [int ] = None ,
168169 additional_streams : Optional [Dict [str , str ]] = None ,
169170 ** connection_kwargs : Any ,
170171 ) -> None :
@@ -184,6 +185,8 @@ def __init__(
184185 :param mkstream: create stream if it does not exist.
185186 :param xread_block: block time in ms for xreadgroup.
186187 Better to set it to a bigger value, to avoid unnecessary calls.
188+ :param maxlen: sets the maximum length of the stream
189+ trims (the old values of) the stream each time a new element is added
187190 :param additional_streams: additional streams to read from.
188191 Each key is a stream name, value is a consumer id.
189192 """
@@ -200,6 +203,7 @@ def __init__(
200203 self .consumer_id = consumer_id
201204 self .mkstream = mkstream
202205 self .block = xread_block
206+ self .maxlen = maxlen
203207 self .additional_streams = additional_streams or {}
204208
205209 async def _declare_consumer_group (self ) -> None :
@@ -235,7 +239,11 @@ async def kick(self, message: BrokerMessage) -> None:
235239 :param message: message to append.
236240 """
237241 async with Redis (connection_pool = self .connection_pool ) as redis_conn :
238- await redis_conn .xadd (self .queue_name , {b"data" : message .message })
242+ await redis_conn .xadd (
243+ self .queue_name ,
244+ {b"data" : message .message },
245+ maxlen = self .maxlen ,
246+ )
239247
240248 def _ack_generator (self , id : str ) -> Callable [[], Awaitable [None ]]:
241249 async def _ack () -> None :
0 commit comments