|
54 | 54 | MultipleMessagesError, |
55 | 55 | ) |
56 | 56 | from .models import MessagesResponse |
57 | | -from .utils import get_message_type_value |
| 57 | +from .utils import check_unix_socket_valid, get_message_type_value |
58 | 58 |
|
59 | 59 | logger = logging.getLogger(__name__) |
60 | 60 |
|
@@ -94,14 +94,14 @@ def func_caller(*args, **kwargs): |
94 | 94 |
|
95 | 95 |
|
96 | 96 | async def run_async_watcher( |
97 | | - *args, output_queue: queue.Queue, api_server: str, **kwargs |
| 97 | + *args, output_queue: queue.Queue, api_server: Optional[str], **kwargs |
98 | 98 | ): |
99 | 99 | async with AlephClient(api_server=api_server) as session: |
100 | 100 | async for message in session.watch_messages(*args, **kwargs): |
101 | 101 | output_queue.put(message) |
102 | 102 |
|
103 | 103 |
|
104 | | -def watcher_thread(output_queue: queue.Queue, api_server: str, args, kwargs): |
| 104 | +def watcher_thread(output_queue: queue.Queue, api_server: Optional[str], args, kwargs): |
105 | 105 | asyncio.run( |
106 | 106 | run_async_watcher( |
107 | 107 | output_queue=output_queue, api_server=api_server, *args, **kwargs |
@@ -443,9 +443,39 @@ class AlephClient: |
443 | 443 | api_server: str |
444 | 444 | http_session: aiohttp.ClientSession |
445 | 445 |
|
446 | | - def __init__(self, api_server: str): |
447 | | - self.api_server = api_server |
448 | | - self.http_session = aiohttp.ClientSession(base_url=api_server) |
| 446 | + def __init__( |
| 447 | + self, |
| 448 | + api_server: Optional[str], |
| 449 | + api_unix_socket: Optional[str] = None, |
| 450 | + allow_unix_sockets: bool = True, |
| 451 | + timeout: Optional[aiohttp.ClientTimeout] = None, |
| 452 | + ): |
| 453 | + """AlephClient can use HTTP(S) or HTTP over Unix sockets. |
| 454 | + Unix sockets are used when running inside a virtual machine, |
| 455 | + and can be shared across containers in a more secure way than TCP ports. |
| 456 | + """ |
| 457 | + self.api_server = api_server or settings.API_HOST |
| 458 | + if not self.api_server: |
| 459 | + raise ValueError("Missing API host") |
| 460 | + |
| 461 | + unix_socket_path = api_unix_socket or settings.API_UNIX_SOCKET |
| 462 | + if unix_socket_path and allow_unix_sockets: |
| 463 | + check_unix_socket_valid(unix_socket_path) |
| 464 | + connector = aiohttp.UnixConnector(path=unix_socket_path) |
| 465 | + else: |
| 466 | + connector = None |
| 467 | + |
| 468 | + # ClientSession timeout defaults to a private sentinel object and may not be None. |
| 469 | + self.http_session = ( |
| 470 | + aiohttp.ClientSession( |
| 471 | + base_url=self.api_server, connector=connector, timeout=timeout |
| 472 | + ) |
| 473 | + if timeout |
| 474 | + else aiohttp.ClientSession( |
| 475 | + base_url=self.api_server, |
| 476 | + connector=connector, |
| 477 | + ) |
| 478 | + ) |
449 | 479 |
|
450 | 480 | def __enter__(self) -> UserSessionSync: |
451 | 481 | return UserSessionSync(async_session=self) |
@@ -825,8 +855,20 @@ class AuthenticatedAlephClient(AlephClient): |
825 | 855 | "channel", |
826 | 856 | } |
827 | 857 |
|
828 | | - def __init__(self, account: Account, api_server: str): |
829 | | - super().__init__(api_server=api_server) |
| 858 | + def __init__( |
| 859 | + self, |
| 860 | + account: Account, |
| 861 | + api_server: Optional[str], |
| 862 | + api_unix_socket: Optional[str] = None, |
| 863 | + allow_unix_sockets: bool = True, |
| 864 | + timeout: Optional[aiohttp.ClientTimeout] = None, |
| 865 | + ): |
| 866 | + super().__init__( |
| 867 | + api_server=api_server, |
| 868 | + api_unix_socket=api_unix_socket, |
| 869 | + allow_unix_sockets=allow_unix_sockets, |
| 870 | + timeout=timeout, |
| 871 | + ) |
830 | 872 | self.account = account |
831 | 873 |
|
832 | 874 | def __enter__(self) -> "AuthenticatedUserSessionSync": |
|
0 commit comments