felicity-lims/felicity/core/events.py
Aurthur Musendame 594471fb43 pytype fixes
2024-09-24 18:12:10 +02:00

124 lines
3.9 KiB
Python

import asyncio
import inspect
import logging
import traceback
from asyncio import create_task, gather, Lock as ALock
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
from typing import Callable, List, Dict, Any, Tuple
# Initialize logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
subscribers: Dict[str, List[Callable[..., Any]]] = {}
sync_lock = Lock() # Synchronous lock
async_lock = ALock() # Asynchronous lock
def partition_functions(
functions: List[Callable[..., Any]],
) -> Tuple[List[Callable[..., Any]], List[Callable[..., Any]]]:
async_funcs = [fn for fn in functions if inspect.iscoroutinefunction(fn)]
sync_funcs = [fn for fn in functions if not inspect.iscoroutinefunction(fn)]
return async_funcs, sync_funcs
def get_or_create_event_loop() -> asyncio.AbstractEventLoop:
try:
return asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
# Synchronous Event System
def subscribe(event_type: str, fn: Callable[..., Any]) -> None:
with sync_lock:
subscribers.setdefault(event_type, []).append(fn)
def unsubscribe(event_type: str, fn: Callable[..., Any]) -> None:
with sync_lock:
if event_type in subscribers:
try:
subscribers[event_type].remove(fn)
except ValueError:
logger.error(
f"Function not found in subscribers for event type: {event_type}\n{traceback.format_exc()}"
)
def post_event(event_type: str, **kwargs: Any) -> None:
with sync_lock:
current_subscribers = subscribers.get(event_type, [])
# Separate sync from async functions
async_funcs, sync_funcs = partition_functions(current_subscribers)
# Handle async functions
loop = get_or_create_event_loop()
for fn in async_funcs:
if loop.is_running():
loop.create_task(fn(**kwargs))
else:
asyncio.run(fn(**kwargs))
# Handle sync functions
with ThreadPoolExecutor() as executor:
futures = [executor.submit(safe_execute, fn, **kwargs) for fn in sync_funcs]
for future in as_completed(futures):
try:
future.result() # Re-raise any exception from the executed function
except Exception as e:
logger.error(
f"Error executing event handler: {e}\n{traceback.format_exc()}"
)
def safe_execute(fn: Callable[..., Any], **kwargs: Any) -> None:
try:
fn(**kwargs)
except Exception as e:
logger.error(
f"Error in event subscriber {fn.__name__}: {e}\n{traceback.format_exc()}"
)
# Asynchronous Event System
async def asubscribe(event_type: str, fn: Callable[..., Any]) -> None:
async with async_lock:
subscribers.setdefault(event_type, []).append(fn)
async def aunsubscribe(event_type: str, fn: Callable[..., Any]) -> None:
async with async_lock:
if event_type in subscribers:
try:
subscribers[event_type].remove(fn)
except ValueError:
logger.error(
f"Function not found in subscribers for event type: {event_type}\n{traceback.format_exc()}"
)
async def apost_event(event_type: str, **kwargs: Any) -> None:
async with async_lock:
current_subscribers = subscribers.get(event_type, [])
tasks = [create_task(asafe_execute(fn, **kwargs)) for fn in current_subscribers]
await gather(*tasks)
async def asafe_execute(fn: Callable[..., Any], **kwargs: Any) -> None:
try:
if inspect.iscoroutinefunction(fn):
await fn(**kwargs)
else:
fn(**kwargs)
except Exception as e:
logger.error(
f"Error in event subscriber {fn.__name__}: {e}\n{traceback.format_exc()}"
)