2026-01-31 19:15:41 +08:00
|
|
|
import logging
|
|
|
|
|
import queue
|
|
|
|
|
import threading
|
|
|
|
|
from typing import Callable, Optional
|
|
|
|
|
|
|
|
|
|
from .config import settings
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SimpleTaskQueue:
|
|
|
|
|
def __init__(self, name: str, maxsize: int = 0):
|
|
|
|
|
self._name = name
|
|
|
|
|
self._queue: "queue.Queue[tuple[Optional[str], Callable, tuple, dict]]" = queue.Queue(maxsize=maxsize)
|
|
|
|
|
self._threads: list[threading.Thread] = []
|
|
|
|
|
self._started = False
|
|
|
|
|
self._lock = threading.Lock()
|
|
|
|
|
self._inflight: set[str] = set()
|
|
|
|
|
self._inflight_lock = threading.Lock()
|
|
|
|
|
|
|
|
|
|
def start(self, workers: int):
|
2026-02-02 20:08:28 +08:00
|
|
|
import logging
|
|
|
|
|
logger = logging.getLogger(self._name)
|
2026-01-31 19:15:41 +08:00
|
|
|
with self._lock:
|
|
|
|
|
if self._started:
|
2026-02-02 20:08:28 +08:00
|
|
|
logger.warning(f"{self._name} 队列已经启动,跳过")
|
2026-01-31 19:15:41 +08:00
|
|
|
return
|
|
|
|
|
self._started = True
|
2026-02-02 20:08:28 +08:00
|
|
|
logger.info(f"启动 {workers} 个 {self._name} worker 线程")
|
2026-01-31 19:15:41 +08:00
|
|
|
for idx in range(max(1, workers)):
|
|
|
|
|
thread = threading.Thread(
|
|
|
|
|
target=self._worker,
|
|
|
|
|
name=f"{self._name}-worker-{idx + 1}",
|
|
|
|
|
daemon=True,
|
|
|
|
|
)
|
|
|
|
|
thread.start()
|
|
|
|
|
self._threads.append(thread)
|
2026-02-02 20:08:28 +08:00
|
|
|
logger.info(f"Worker 线程 {self._name}-worker-{idx + 1} 已启动")
|
|
|
|
|
logger.info(f"{self._name} 队列启动完成,共 {len(self._threads)} 个 worker")
|
2026-01-31 19:15:41 +08:00
|
|
|
|
|
|
|
|
def enqueue(self, func: Callable, *args, **kwargs) -> None:
|
|
|
|
|
self._queue.put((None, func, args, kwargs))
|
|
|
|
|
|
|
|
|
|
def enqueue_unique(self, key: str, func: Callable, *args, **kwargs) -> bool:
|
|
|
|
|
if not key:
|
|
|
|
|
self.enqueue(func, *args, **kwargs)
|
|
|
|
|
return True
|
|
|
|
|
with self._inflight_lock:
|
|
|
|
|
if key in self._inflight:
|
|
|
|
|
return False
|
|
|
|
|
self._inflight.add(key)
|
|
|
|
|
self._queue.put((key, func, args, kwargs))
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def _worker(self):
|
2026-02-02 20:08:28 +08:00
|
|
|
import logging
|
|
|
|
|
logger = logging.getLogger(self._name)
|
|
|
|
|
logger.info(f"Worker 线程 {threading.current_thread().name} 开始运行")
|
2026-01-31 19:15:41 +08:00
|
|
|
while True:
|
|
|
|
|
try:
|
2026-02-02 20:08:28 +08:00
|
|
|
logger.debug(f"Worker {threading.current_thread().name} 等待任务...")
|
|
|
|
|
key, func, args, kwargs = self._queue.get()
|
|
|
|
|
logger.info(f"Worker {threading.current_thread().name} 获取到任务: {func.__name__}, key={key}")
|
|
|
|
|
try:
|
|
|
|
|
func(*args, **kwargs)
|
|
|
|
|
logger.info(f"Worker {threading.current_thread().name} 任务完成: {func.__name__}")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.exception(f"{self._name} worker 任务失败: {e}")
|
|
|
|
|
finally:
|
|
|
|
|
if key:
|
|
|
|
|
with self._inflight_lock:
|
|
|
|
|
self._inflight.discard(key)
|
|
|
|
|
self._queue.task_done()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.exception(f"Worker {threading.current_thread().name} 发生异常: {e}")
|
2026-01-31 19:15:41 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
sing_task_queue = SimpleTaskQueue("sing")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def start_sing_workers():
|
2026-02-02 20:08:28 +08:00
|
|
|
import logging
|
|
|
|
|
logger = logging.getLogger("sing")
|
2026-01-31 19:15:41 +08:00
|
|
|
workers = max(1, settings.SING_MERGE_MAX_CONCURRENCY or 1)
|
2026-02-02 20:08:28 +08:00
|
|
|
logger.info(f"启动 {workers} 个唱歌任务 worker 线程")
|
2026-01-31 19:15:41 +08:00
|
|
|
sing_task_queue.start(workers)
|
2026-02-02 20:08:28 +08:00
|
|
|
logger.info("唱歌任务 worker 线程启动完成")
|