67 lines
2.1 KiB
Python
67 lines
2.1 KiB
Python
|
|
import logging
|
||
|
|
import queue
|
||
|
|
import threading
|
||
|
|
from typing import Callable, Optional
|
||
|
|
|
||
|
|
from .config import settings
|
||
|
|
|
||
|
|
|
||
|
|
class SimpleTaskQueue:
|
||
|
|
def __init__(self, name: str, maxsize: int = 0):
|
||
|
|
self._name = name
|
||
|
|
self._queue: "queue.Queue[tuple[Optional[str], Callable, tuple, dict]]" = queue.Queue(maxsize=maxsize)
|
||
|
|
self._threads: list[threading.Thread] = []
|
||
|
|
self._started = False
|
||
|
|
self._lock = threading.Lock()
|
||
|
|
self._inflight: set[str] = set()
|
||
|
|
self._inflight_lock = threading.Lock()
|
||
|
|
|
||
|
|
def start(self, workers: int):
|
||
|
|
with self._lock:
|
||
|
|
if self._started:
|
||
|
|
return
|
||
|
|
self._started = True
|
||
|
|
for idx in range(max(1, workers)):
|
||
|
|
thread = threading.Thread(
|
||
|
|
target=self._worker,
|
||
|
|
name=f"{self._name}-worker-{idx + 1}",
|
||
|
|
daemon=True,
|
||
|
|
)
|
||
|
|
thread.start()
|
||
|
|
self._threads.append(thread)
|
||
|
|
|
||
|
|
def enqueue(self, func: Callable, *args, **kwargs) -> None:
|
||
|
|
self._queue.put((None, func, args, kwargs))
|
||
|
|
|
||
|
|
def enqueue_unique(self, key: str, func: Callable, *args, **kwargs) -> bool:
|
||
|
|
if not key:
|
||
|
|
self.enqueue(func, *args, **kwargs)
|
||
|
|
return True
|
||
|
|
with self._inflight_lock:
|
||
|
|
if key in self._inflight:
|
||
|
|
return False
|
||
|
|
self._inflight.add(key)
|
||
|
|
self._queue.put((key, func, args, kwargs))
|
||
|
|
return True
|
||
|
|
|
||
|
|
def _worker(self):
|
||
|
|
while True:
|
||
|
|
key, func, args, kwargs = self._queue.get()
|
||
|
|
try:
|
||
|
|
func(*args, **kwargs)
|
||
|
|
except Exception:
|
||
|
|
logging.exception("%s worker task failed", self._name)
|
||
|
|
finally:
|
||
|
|
if key:
|
||
|
|
with self._inflight_lock:
|
||
|
|
self._inflight.discard(key)
|
||
|
|
self._queue.task_done()
|
||
|
|
|
||
|
|
|
||
|
|
sing_task_queue = SimpleTaskQueue("sing")
|
||
|
|
|
||
|
|
|
||
|
|
def start_sing_workers():
|
||
|
|
workers = max(1, settings.SING_MERGE_MAX_CONCURRENCY or 1)
|
||
|
|
sing_task_queue.start(workers)
|