xinli/rag-python/event_store.py

30 lines
755 B
Python
Raw Normal View History

from __future__ import annotations
from collections import deque
from datetime import datetime, timezone
from threading import Lock
from typing import Any, Dict, List, Optional
_events: "deque[Dict[str, Any]]" = deque(maxlen=2000)
_lock = Lock()
def record_event(event_type: str, **payload: Any) -> None:
event = {
"ts": datetime.now(timezone.utc).isoformat(),
"type": event_type,
"payload": payload,
}
with _lock:
_events.append(event)
def get_events(limit: Optional[int] = None, newest_first: bool = True) -> List[Dict[str, Any]]:
with _lock:
items = list(_events)
if newest_first:
items.reverse()
if limit is not None:
return items[: max(0, limit)]
return items