11from __future__ import annotations
22
33import abc
4- import asyncio
4+ from asyncio import (
5+ FIRST_COMPLETED ,
6+ Event ,
7+ Queue ,
8+ Task ,
9+ create_task ,
10+ gather ,
11+ get_running_loop ,
12+ wait ,
13+ )
514from collections import Counter
615from collections .abc import Iterator
716from contextlib import AsyncExitStack
@@ -41,6 +50,7 @@ class Layout:
4150 "root" ,
4251 "_event_handlers" ,
4352 "_rendering_queue" ,
53+ "_render_tasks" ,
4454 "_root_life_cycle_state_id" ,
4555 "_model_states_by_life_cycle_state_id" ,
4656 )
@@ -58,6 +68,7 @@ def __init__(self, root: ComponentType) -> None:
5868 async def __aenter__ (self ) -> Layout :
5969 # create attributes here to avoid access before entering context manager
6070 self ._event_handlers : EventHandlerDict = {}
71+ self ._render_tasks : set [Task [LayoutUpdateMessage ]] = set ()
6172
6273 self ._rendering_queue : _ThreadSafeQueue [_LifeCycleStateId ] = _ThreadSafeQueue ()
6374 root_model_state = _new_root_model_state (self .root , self ._rendering_queue .put )
@@ -72,6 +83,7 @@ async def __aenter__(self) -> Layout:
7283 async def __aexit__ (self , * exc : Any ) -> None :
7384 root_csid = self ._root_life_cycle_state_id
7485 root_model_state = self ._model_states_by_life_cycle_state_id [root_csid ]
86+ await gather (* self ._render_tasks , return_exceptions = True )
7587 await self ._unmount_model_states ([root_model_state ])
7688
7789 # delete attributes here to avoid access after exiting context manager
@@ -102,21 +114,35 @@ async def deliver(self, event: LayoutEventMessage) -> None:
102114 async def render (self ) -> LayoutUpdateMessage :
103115 """Await the next available render. This will block until a component is updated"""
104116 while True :
105- model_state_id = await self ._rendering_queue .get ()
106- try :
107- model_state = self ._model_states_by_life_cycle_state_id [model_state_id ]
108- except KeyError :
109- logger .debug (
110- "Did not render component with model state ID "
111- f"{ model_state_id !r} - component already unmounted"
112- )
117+ render_completed = (
118+ create_task (wait (self ._render_tasks , return_when = FIRST_COMPLETED ))
119+ if self ._render_tasks
120+ else get_running_loop ().create_future ()
121+ )
122+ await wait (
123+ (create_task (self ._rendering_queue .ready ()), render_completed ),
124+ return_when = FIRST_COMPLETED ,
125+ )
126+ if render_completed .done ():
127+ done , _ = await render_completed
128+ update_task : Task [LayoutUpdateMessage ] = done .pop ()
129+ self ._render_tasks .remove (update_task )
130+ return update_task .result ()
113131 else :
114- update = await self ._create_layout_update (model_state )
115- if REACTPY_CHECK_VDOM_SPEC .current :
116- root_id = self ._root_life_cycle_state_id
117- root_model = self ._model_states_by_life_cycle_state_id [root_id ]
118- validate_vdom_json (root_model .model .current )
119- return update
132+ model_state_id = await self ._rendering_queue .get ()
133+ try :
134+ model_state = self ._model_states_by_life_cycle_state_id [
135+ model_state_id
136+ ]
137+ except KeyError :
138+ logger .debug (
139+ "Did not render component with model state ID "
140+ f"{ model_state_id !r} - component already unmounted"
141+ )
142+ else :
143+ self ._render_tasks .add (
144+ create_task (self ._create_layout_update (model_state ))
145+ )
120146
121147 async def _create_layout_update (
122148 self , old_state : _ModelState
@@ -127,6 +153,9 @@ async def _create_layout_update(
127153 async with AsyncExitStack () as exit_stack :
128154 await self ._render_component (exit_stack , old_state , new_state , component )
129155
156+ if REACTPY_CHECK_VDOM_SPEC .current :
157+ validate_vdom_json (new_state .model .current )
158+
130159 return {
131160 "type" : "layout-update" ,
132161 "path" : new_state .patch_path ,
@@ -540,6 +569,7 @@ class _ModelState:
540569 __slots__ = (
541570 "__weakref__" ,
542571 "_parent_ref" ,
572+ "_render_semaphore" ,
543573 "children_by_key" ,
544574 "index" ,
545575 "key" ,
@@ -651,24 +681,27 @@ class _LifeCycleState(NamedTuple):
651681
652682
653683class _ThreadSafeQueue (Generic [_Type ]):
654- __slots__ = "_loop" , "_queue" , "_pending"
655-
656684 def __init__ (self ) -> None :
657- self ._loop = asyncio . get_running_loop ()
658- self ._queue : asyncio . Queue [_Type ] = asyncio . Queue ()
685+ self ._loop = get_running_loop ()
686+ self ._queue : Queue [_Type ] = Queue ()
659687 self ._pending : set [_Type ] = set ()
688+ self ._ready = Event ()
660689
661690 def put (self , value : _Type ) -> None :
662691 if value not in self ._pending :
663692 self ._pending .add (value )
664693 self ._loop .call_soon_threadsafe (self ._queue .put_nowait , value )
694+ self ._ready .set ()
695+
696+ async def ready (self ) -> None :
697+ """Return when the next value is available"""
698+ await self ._ready .wait ()
665699
666700 async def get (self ) -> _Type :
667- while True :
668- value = await self ._queue .get ()
669- if value in self ._pending :
670- break
701+ value = await self ._queue .get ()
671702 self ._pending .remove (value )
703+ if not self ._pending :
704+ self ._ready .clear ()
672705 return value
673706
674707
0 commit comments