import logging import queue import threading from typing import Callable, Optional from .config import settings class SimpleTaskQueue: def __init__(self, name: str, maxsize: int = 0): self._name = name self._queue: "queue.Queue[tuple[Optional[str], Callable, tuple, dict]]" = queue.Queue(maxsize=maxsize) self._threads: list[threading.Thread] = [] self._started = False self._lock = threading.Lock() self._inflight: set[str] = set() self._inflight_lock = threading.Lock() def start(self, workers: int): with self._lock: if self._started: return self._started = True for idx in range(max(1, workers)): thread = threading.Thread( target=self._worker, name=f"{self._name}-worker-{idx + 1}", daemon=True, ) thread.start() self._threads.append(thread) def enqueue(self, func: Callable, *args, **kwargs) -> None: self._queue.put((None, func, args, kwargs)) def enqueue_unique(self, key: str, func: Callable, *args, **kwargs) -> bool: if not key: self.enqueue(func, *args, **kwargs) return True with self._inflight_lock: if key in self._inflight: return False self._inflight.add(key) self._queue.put((key, func, args, kwargs)) return True def _worker(self): while True: key, func, args, kwargs = self._queue.get() try: func(*args, **kwargs) except Exception: logging.exception("%s worker task failed", self._name) finally: if key: with self._inflight_lock: self._inflight.discard(key) self._queue.task_done() sing_task_queue = SimpleTaskQueue("sing") def start_sing_workers(): workers = max(1, settings.SING_MERGE_MAX_CONCURRENCY or 1) sing_task_queue.start(workers)