30 lines
755 B
Python
30 lines
755 B
Python
from __future__ import annotations
|
|
|
|
from collections import deque
|
|
from datetime import datetime, timezone
|
|
from threading import Lock
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
_events: "deque[Dict[str, Any]]" = deque(maxlen=2000)
|
|
_lock = Lock()
|
|
|
|
|
|
def record_event(event_type: str, **payload: Any) -> None:
|
|
event = {
|
|
"ts": datetime.now(timezone.utc).isoformat(),
|
|
"type": event_type,
|
|
"payload": payload,
|
|
}
|
|
with _lock:
|
|
_events.append(event)
|
|
|
|
|
|
def get_events(limit: Optional[int] = None, newest_first: bool = True) -> List[Dict[str, Any]]:
|
|
with _lock:
|
|
items = list(_events)
|
|
if newest_first:
|
|
items.reverse()
|
|
if limit is not None:
|
|
return items[: max(0, limit)]
|
|
return items
|