22import json
33import logging
44import os
5- import threading
6- import time
75import typing
8- from contextlib import suppress
9- from functools import lru_cache
106from typing import Callable
117
128import aiohttp
@@ -63,79 +59,60 @@ async def fetch(
6359 kwargs = {"params" : params , "json" : data }
6460 if data :
6561 kwargs ["json" ].update (data )
66- response = await self .get_session ().request (
67- method , url , ** kwargs , timeout = TIMEOUT
68- ) # noqa
69- if not response .ok :
70- logger .error (response .text )
71- buffer = ""
72- line_groups = b""
73- decoder = json .JSONDecoder ()
74- data_received = False
75- async for line in response .content .iter_any ():
76- line_groups += line
77- try :
78- buffer += line_groups .decode ("utf-8" )
79- line_groups = b""
80- except UnicodeDecodeError :
81- continue
82- while buffer :
83- try :
84- if buffer .startswith (self .DELIMITER ):
85- buffer = buffer [self .DELIMITER_LEN :]
86- json_obj , index = decoder .raw_decode (buffer )
87- if not annotation_is_valid (json_obj ):
88- logger .warning (
89- f"Invalid JSON detected in small annotations stream process, json: { json_obj } ."
90- )
91- if data_received :
92- raise AppException (
93- "Invalid JSON detected in small annotations stream process."
94- )
95- else :
96- self .rest_session ()
97- raise BackendError (
98- "Invalid JSON detected at the start of the small annotations stream process."
99- )
100- data_received = True
101- yield json_obj
102- if len (buffer [index :]) >= self .DELIMITER_LEN :
103- buffer = buffer [index + self .DELIMITER_LEN :]
104- else :
105- buffer = buffer [index :]
106- break
107- except json .decoder .JSONDecodeError as e :
108- logger .debug (
109- f"Failed to parse buffer, buffer_len: { len (buffer )} || start buffer:"
110- f" { buffer [:50 ]} || buffer_end: ...{ buffer [- 50 :]} || error: { e } "
111- )
112- break
113-
114- @lru_cache (maxsize = 32 )
115- def _get_session (self , thread_id , ttl = None ): # noqa
116- del ttl
117- del thread_id
118- session = AIOHttpSession (
62+ async with AIOHttpSession (
11963 headers = self ._headers ,
12064 timeout = TIMEOUT ,
12165 connector = aiohttp .TCPConnector (
12266 ssl = self .VERIFY_SSL , keepalive_timeout = 2 ** 32
12367 ),
12468 raise_for_status = True ,
125- )
126- self ._active_sessions .add (session )
127- return session
128-
129- def get_session (self ):
130- return self ._get_session (
131- thread_id = threading .get_ident (), ttl = round (time .time () / 360 )
132- )
133-
134- def rest_session (self ):
135- for s in self ._active_sessions :
136- with suppress (Exception ):
137- s .close ()
138- self ._get_session .cache_clear ()
69+ ) as session :
70+ response = await session .request (
71+ method , url , ** kwargs , timeout = TIMEOUT
72+ ) # noqa
73+ if not response .ok :
74+ logger .error (response .text )
75+ buffer = ""
76+ line_groups = b""
77+ decoder = json .JSONDecoder ()
78+ data_received = False
79+ async for line in response .content .iter_any ():
80+ line_groups += line
81+ try :
82+ buffer += line_groups .decode ("utf-8" )
83+ line_groups = b""
84+ except UnicodeDecodeError :
85+ continue
86+ while buffer :
87+ try :
88+ if buffer .startswith (self .DELIMITER ):
89+ buffer = buffer [self .DELIMITER_LEN :]
90+ json_obj , index = decoder .raw_decode (buffer )
91+ if not annotation_is_valid (json_obj ):
92+ logger .warning (
93+ f"Invalid JSON detected in small annotations stream process, json: { json_obj } ."
94+ )
95+ if data_received :
96+ raise AppException (
97+ "Invalid JSON detected in small annotations stream process."
98+ )
99+ else :
100+ raise BackendError (
101+ "Invalid JSON detected at the start of the small annotations stream process."
102+ )
103+ data_received = True
104+ yield json_obj
105+ if len (buffer [index :]) >= self .DELIMITER_LEN :
106+ buffer = buffer [index + self .DELIMITER_LEN :]
107+ else :
108+ buffer = buffer [index :]
109+ break
110+ except json .decoder .JSONDecodeError as e :
111+ logger .debug (
112+ f"Failed to parse buffer, buffer_len: { len (buffer )} || start buffer:"
113+ f" { buffer [:50 ]} || buffer_end: ...{ buffer [- 50 :]} || error: { e } "
114+ )
115+ break
139116
140117 async def list_annotations (
141118 self ,
@@ -197,7 +174,4 @@ def _store_annotation(path, annotation: dict, callback: Callable = None):
197174 def _process_data (self , data ):
198175 if data and self ._map_function :
199176 return self ._map_function (data )
200- return data
201-
202- def __del__ (self ):
203- self .rest_session ()
177+ return data
0 commit comments