Ai_GirlFriend/lover/task_queue.py

67 lines
2.1 KiB
Python
Raw Normal View History

2026-01-31 19:15:41 +08:00
import logging
import queue
import threading
from typing import Callable, Optional
from .config import settings
class SimpleTaskQueue:
def __init__(self, name: str, maxsize: int = 0):
self._name = name
self._queue: "queue.Queue[tuple[Optional[str], Callable, tuple, dict]]" = queue.Queue(maxsize=maxsize)
self._threads: list[threading.Thread] = []
self._started = False
self._lock = threading.Lock()
self._inflight: set[str] = set()
self._inflight_lock = threading.Lock()
def start(self, workers: int):
with self._lock:
if self._started:
return
self._started = True
for idx in range(max(1, workers)):
thread = threading.Thread(
target=self._worker,
name=f"{self._name}-worker-{idx + 1}",
daemon=True,
)
thread.start()
self._threads.append(thread)
def enqueue(self, func: Callable, *args, **kwargs) -> None:
self._queue.put((None, func, args, kwargs))
def enqueue_unique(self, key: str, func: Callable, *args, **kwargs) -> bool:
if not key:
self.enqueue(func, *args, **kwargs)
return True
with self._inflight_lock:
if key in self._inflight:
return False
self._inflight.add(key)
self._queue.put((key, func, args, kwargs))
return True
def _worker(self):
while True:
key, func, args, kwargs = self._queue.get()
try:
func(*args, **kwargs)
except Exception:
logging.exception("%s worker task failed", self._name)
finally:
if key:
with self._inflight_lock:
self._inflight.discard(key)
self._queue.task_done()
sing_task_queue = SimpleTaskQueue("sing")
def start_sing_workers():
workers = max(1, settings.SING_MERGE_MAX_CONCURRENCY or 1)
sing_task_queue.start(workers)