From d44c380735261df5c1c8eecedd96a72173e2086c Mon Sep 17 00:00:00 2001 From: hhhhsc <1710496817@qq.com> Date: Fri, 23 Jan 2026 11:38:06 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=95=B0=E6=8D=AE=E5=BA=93=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E5=8D=95=E4=BE=8B=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../img_similar_images_cleaner/process.py | 7 +- .../datamate/operator_runtime.py | 26 +++--- .../datamate/sql_manager/sql_manager.py | 83 ++++++++++++------- 3 files changed, 74 insertions(+), 42 deletions(-) diff --git a/runtime/ops/filter/img_similar_images_cleaner/process.py b/runtime/ops/filter/img_similar_images_cleaner/process.py index a6869cfd..0213adce 100644 --- a/runtime/ops/filter/img_similar_images_cleaner/process.py +++ b/runtime/ops/filter/img_similar_images_cleaner/process.py @@ -217,9 +217,10 @@ def determine_similar_images(self, file_features: List, p_hash: str, des_matrix: similarity = round(result, 2) if similarity >= self.similar_threshold: logger.info( - "fileName: %s, method: ImgSimilarCleaner, dataset: %s. This picture is similar to %s, " - "and the similarity is %.4f. The picture is filtered.", file_name, self.task_uuid, - file_name_decoded, similarity) + f"fileName: {file_name}, method: ImgSimilarCleaner, dataset: {self.task_uuid}. " + f"This picture is similar to {file_name_decoded}, " + f"and the similarity is {similarity:.4f}. The picture is filtered." + ) return True return False diff --git a/runtime/python-executor/datamate/operator_runtime.py b/runtime/python-executor/datamate/operator_runtime.py index 648eeb71..37d49ca8 100644 --- a/runtime/python-executor/datamate/operator_runtime.py +++ b/runtime/python-executor/datamate/operator_runtime.py @@ -3,6 +3,7 @@ import uvicorn import yaml +from contextlib import asynccontextmanager from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from jsonargparse import ArgumentParser @@ -25,7 +26,20 @@ enqueue=True ) -app = FastAPI() +@asynccontextmanager +async def lifespan(app: FastAPI): + try: + logger.info("Initializing background worker...") + start_auto_annotation_worker() + logger.info("Auto-annotation worker started successfully.") + except Exception as e: + logger.error("Failed to start auto-annotation worker: {}", e) + + yield + + logger.info("Shutting down background worker...") + +app = FastAPI(lifespan=lifespan) class APIException(Exception): @@ -50,16 +64,6 @@ def to_dict(self) -> Dict[str, Any]: return result -@app.on_event("startup") -async def startup_event(): - """FastAPI 启动时初始化后台自动标注 worker。""" - - try: - start_auto_annotation_worker() - except Exception as e: # pragma: no cover - 防御性日志 - logger.error("Failed to start auto-annotation worker: {}", e) - - @app.exception_handler(APIException) async def api_exception_handler(request: Request, exc: APIException): return JSONResponse( diff --git a/runtime/python-executor/datamate/sql_manager/sql_manager.py b/runtime/python-executor/datamate/sql_manager/sql_manager.py index 087264e5..80f71f23 100644 --- a/runtime/python-executor/datamate/sql_manager/sql_manager.py +++ b/runtime/python-executor/datamate/sql_manager/sql_manager.py @@ -2,50 +2,77 @@ import os import time from random import uniform +from threading import Lock from loguru import logger -from sqlalchemy import create_engine, inspect +from sqlalchemy import create_engine from sqlalchemy.engine import URL class SQLManager: + _engine = None + _lock = Lock() # 确保多线程环境下只创建一个引擎 - @staticmethod - def create_connect(max_retries=5, base_delay=1): + @classmethod + def _get_engine(cls): """ - 连接到 MySQL 数据库,使用 SQLAlchemy 和 PyMySQL。 - :param max_retries: 最大重试次数 - :param base_delay: 基础时延 - :return: 返回 SQLAlchemy 连接对象 + 单例模式获取 Engine,确保全局只有一个连接池 """ + if cls._engine is not None: + return cls._engine - connection_url = URL.create( - drivername="postgresql+psycopg2", # 核心修改:使用 pg 驱动 - username=os.getenv("PG_USER", "postgres"), # 建议修改环境变量名 - password=os.getenv("PG_PASSWORD", "password"), - host=os.getenv("PG_HOST", "datamate-database"), - port=int(os.getenv("PG_PORT", 5432)), # 修改默认端口为 5432 - database=os.getenv("PG_DATABASE", "datamate"), - ) + with cls._lock: + if cls._engine is not None: + return cls._engine - attempt = 0 + # 构建连接 URL + connection_url = URL.create( + drivername="postgresql+psycopg2", + username=os.getenv("PG_USER", "postgres"), + password=os.getenv("PG_PASSWORD", "password"), + host=os.getenv("PG_HOST", "datamate-database"), + port=int(os.getenv("PG_PORT", 5432)), + database=os.getenv("PG_DATABASE", "datamate"), + ) + # 创建引擎 (只执行一次) + # 注意:AUTOCOMMIT 虽然方便,但建议根据业务场景谨慎使用。 + # 如果需要事务控制(比如两张表必须同时更新成功),AUTOCOMMIT 会导致无法回滚。 + cls._engine = create_engine( + connection_url, + pool_pre_ping=True, + isolation_level="AUTOCOMMIT", + pool_size=5, # 显式指定池大小 + max_overflow=15, # 显式指定溢出 + pool_timeout=30, + pool_recycle=1800 # 10分钟回收连接 + ) + logger.info("Database Engine initialized successfully.") + return cls._engine + + @staticmethod + def create_connect(max_retries=5, base_delay=1): + """ + 从现有的 Engine 连接池中获取连接,包含重试逻辑 + """ + attempt = 0 while True: try: - engine = create_engine(connection_url, pool_pre_ping=True, isolation_level="AUTOCOMMIT") + # 1. 获取全局引擎 + engine = SQLManager._get_engine() + # 2. 从池中借出一个连接 return engine.connect() except Exception as e: - logger.error(f"Attempt {attempt + 1} failed with error: {str(e)}") - if attempt >= max_retries - 1: - raise - wait_time = min(30, base_delay * (2 ** attempt)) # 不超过30秒的最大延时 - jitter = uniform(-wait_time / 4, wait_time / 4) # 增加随机抖动因子 - time.sleep(wait_time + jitter) attempt += 1 + logger.error(f"Connection attempt {attempt} failed: {str(e)}") + if attempt >= max_retries: + logger.error("Max retries reached. Could not connect to database.") + raise -if __name__ == "__main__": - with SQLManager.create_connect() as connection: - inspector = inspect(connection) - print(inspector.get_table_names()) - + # 重试等待逻辑 + wait_time = min(30, base_delay * (2 ** (attempt - 1))) + jitter = uniform(-wait_time / 4, wait_time / 4) + sleep_time = wait_time + jitter + logger.info(f"Retrying in {sleep_time:.2f} seconds...") + time.sleep(sleep_time)