diff --git a/libs/admin-api-lib/.openapi-generator-ignore b/libs/admin-api-lib/.openapi-generator-ignore index e4cf10f9..7a090d4f 100644 --- a/libs/admin-api-lib/.openapi-generator-ignore +++ b/libs/admin-api-lib/.openapi-generator-ignore @@ -32,3 +32,9 @@ setup.cfg .gitignore docker-compose.yaml .openapi-generator/FILES + +# These files are hand-maintained in this repo (DI wiring and impl loading) +src/admin_api_lib/main.py +src/admin_api_lib/apis/admin_api.py +src/admin_api_lib/apis/admin_api_base.py +src/admin_api_lib/security_api.py diff --git a/libs/admin-api-lib/openapi.yaml b/libs/admin-api-lib/openapi.yaml index e847c5b2..b8b73f16 100644 --- a/libs/admin-api-lib/openapi.yaml +++ b/libs/admin-api-lib/openapi.yaml @@ -1,378 +1,366 @@ -openapi: 3.1.0 +openapi: 3.0.2 info: - title: admin-api-lib - version: 1.0.0 - description: >- - The API is used for the communication between the admin frontend and the admin backend in the - rag project. + description: The API is used for the communication between the admin frontend and + the admin backend in the rag project. + title: admin-api-lib + version: 1.0.0 servers: - - - url: /api +- url: /api paths: - '/delete_document/{identification}': - delete: - tags: - - admin - parameters: - - - style: simple - explode: false - name: identification - schema: - title: Identification - description: '' - type: string - in: path - required: true - responses: - '200': - content: - application/json: - schema: {} - description: Deleted - '422': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPValidationError' - description: Validation Error - '500': - description: Internal server error - operationId: delete_document - summary: Delete Document - description: |- - Asynchronously deletes a document based on the provided identification. + /delete_document/{identification}: + delete: + description: |- + Asynchronously deletes a document based on the provided identification. - Parameters - ---------- - identification : str - The unique identifier of the document to be deleted. + If an upload/ingestion for this document is currently running (status `PROCESSING`), + the backend will request cancellation and perform best-effort cleanup so the document + can be re-uploaded. Cancellation is cooperative: a currently running extraction step + may still finish, but cancelled uploads will not publish new READY/ERROR status or + re-upload data after cancellation. - Returns - ------- - None - '/document_reference/{identification}': - get: - tags: - - admin - parameters: - - - style: simple - explode: false - name: identification - description: Identifier of the document. - schema: - title: Identification - description: Identifier of the document. - type: string - in: path - required: true - responses: - '200': - content: - application/json: - schema: - format: binary - title: Response 200 Document Reference Document Reference Identification Get - type: string - description: Returns the pdf in binary form. - '400': - content: - application/json: - schema: - title: Response 400 Document Reference Document Reference Identification Get - type: string - description: Bad request - '404': - content: - application/json: - schema: - title: Response 404 Document Reference Document Reference Identification Get - type: string - description: Document not found. - '422': - content: - application/json: - schema: - $ref: '#/components/schemas/HTTPValidationError' - description: Validation Error - '500': - content: - application/json: - schema: - title: Response 500 Document Reference Document Reference Identification Get - type: string - description: Internal server error - operationId: document_reference - summary: Document Reference Id Get - description: |- - Asynchronously retrieve a document reference by its identification. + Parameters + ---------- + identification : str + The unique identifier of the document to be deleted. - Parameters - ---------- - identification : str - The unique identifier for the document reference. + Returns + ------- + None + operationId: delete_document + parameters: + - explode: false + in: path + name: identification + required: true + schema: + description: "" + title: Identification + type: string + style: simple + responses: + "200": + content: + application/json: + schema: {} + description: Deleted + "422": + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + description: Validation Error + "500": + description: Internal server error + summary: Delete Document + tags: + - admin + /document_reference/{identification}: + get: + description: |- + Asynchronously retrieve a document reference by its identification. - Returns - ------- - Response - The response object containing the document reference details. - /all_documents_status: - get: - tags: - - admin - responses: - '200': - content: - application/json: - schema: - type: array - items: - $ref: '#/components/schemas/DocumentStatus' - description: List of document links - '500': - description: Internal server error - operationId: get_all_documents_status - summary: Get All Documents Status - description: |- - Asynchronously retrieves the status of all documents. + Parameters + ---------- + identification : str + The unique identifier for the document reference. - Returns - ------- - list[DocumentStatus] - A list containing the status of all documents. - /upload_file: - post: - requestBody: - content: - multipart/form-data: - schema: - $ref: '#/components/schemas/Body_upload_file_upload_file_post' - required: true - tags: - - admin - responses: - '200': - content: - application/json: - schema: {} - description: ok - '400': - description: Bad request - '422': - description: Unprocessable Content - '500': - description: Internal server error - operationId: upload_file - summary: Upload File - description: Uploads user selected sources. - /upload_source: - post: - requestBody: - content: - application/json: - schema: - description: '' - type: array - items: - $ref: '#/components/schemas/KeyValuePair' - tags: - - admin - parameters: - - - style: form - explode: true - name: source_type - schema: - title: Type - description: '' - type: string - in: query - required: false - - - style: form - explode: true - name: name - schema: - title: Name - description: '' - type: string - in: query - required: false - responses: - '200': - content: - application/json: - schema: {} - description: ok - '400': - description: Bad request - '422': - description: Unprocessable Content - '500': - description: Internal server error - operationId: upload_source - summary: Upload Source - description: Uploads user selected sources. + Returns + ------- + Response + The response object containing the document reference details. + operationId: document_reference + parameters: + - description: Identifier of the document. + explode: false + in: path + name: identification + required: true + schema: + description: Identifier of the document. + title: Identification + type: string + style: simple + responses: + "200": + content: + application/json: + schema: + format: binary + title: Response 200 Document Reference Document Reference Identification Get + type: string + description: Returns the pdf in binary form. + "400": + content: + application/json: + schema: + title: Response 400 Document Reference Document Reference Identification Get + type: string + description: Bad request + "404": + content: + application/json: + schema: + title: Response 404 Document Reference Document Reference Identification Get + type: string + description: Document not found. + "422": + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' + description: Validation Error + "500": + content: + application/json: + schema: + title: Response 500 Document Reference Document Reference Identification Get + type: string + description: Internal server error + summary: Document Reference Id Get + tags: + - admin + /all_documents_status: + get: + description: |- + Asynchronously retrieves the status of all documents. + + Returns + ------- + list[DocumentStatus] + A list containing the status of all documents. + operationId: get_all_documents_status + responses: + "200": + content: + application/json: + schema: + items: + $ref: '#/components/schemas/DocumentStatus' + type: array + description: List of document links + "500": + description: Internal server error + summary: Get All Documents Status + tags: + - admin + /upload_file: + post: + description: Uploads user selected sources. + operationId: upload_file + requestBody: + content: + multipart/form-data: + schema: + $ref: '#/components/schemas/Body_upload_file_upload_file_post' + required: true + responses: + "200": + content: + application/json: + schema: {} + description: ok + "400": + description: Bad request + "422": + description: Unprocessable Content + "500": + description: Internal server error + summary: Upload File + tags: + - admin + /upload_source: + post: + description: Uploads user selected sources. + operationId: upload_source + parameters: + - explode: true + in: query + name: source_type + required: false + schema: + description: "" + title: Type + type: string + style: form + - explode: true + in: query + name: name + required: false + schema: + description: "" + title: Name + type: string + style: form + requestBody: + content: + application/json: + schema: + description: "" + items: + $ref: '#/components/schemas/KeyValuePair' + type: array + responses: + "200": + content: + application/json: + schema: {} + description: ok + "400": + description: Bad request + "422": + description: Unprocessable Content + "500": + description: Internal server error + summary: Upload Source + tags: + - admin components: - schemas: - Body_upload_file_upload_file_post: - title: Body_upload_file_upload_file_post - required: - - file - properties: - file: - format: binary - title: File - type: string - DocumentStatus: - title: DocumentStatus - description: DocumentStatus - required: - - name - - status - properties: - name: - title: Name - type: string - status: - $ref: '#/components/schemas/Status' - example: - name: name - status: UPLOADING - HTTPValidationError: - title: HTTPValidationError - description: HTTPValidationError - properties: - detail: - nullable: true - title: detail - type: array - items: - $ref: '#/components/schemas/ValidationError' - example: - detail: - - - msg: msg - loc: - - - anyof_schema_1_validator: anyof_schema_1_validator - actual_instance: '' - any_of_schemas: - - any_of_schemas - - any_of_schemas - anyof_schema_2_validator: 0 - - - anyof_schema_1_validator: anyof_schema_1_validator - actual_instance: '' - any_of_schemas: - - any_of_schemas - - any_of_schemas - anyof_schema_2_validator: 0 - type: type - - - msg: msg - loc: - - - anyof_schema_1_validator: anyof_schema_1_validator - actual_instance: '' - any_of_schemas: - - any_of_schemas - - any_of_schemas - anyof_schema_2_validator: 0 - - - anyof_schema_1_validator: anyof_schema_1_validator - actual_instance: '' - any_of_schemas: - - any_of_schemas - - any_of_schemas - anyof_schema_2_validator: 0 - type: type - KeyValuePair: - title: KeyValuePair - description: KeyValuePair - required: - - key - - value - properties: - key: - title: Key - type: string - value: - title: Value - type: string - example: - value: value - key: key - Status: - title: Status - description: allowed enum values - enum: - - UPLOADING - - PROCESSING - - READY - - ERROR + schemas: + Body_upload_file_upload_file_post: + properties: + file: + format: binary + title: File + type: string + required: + - file + title: Body_upload_file_upload_file_post + DocumentStatus: + description: DocumentStatus + example: + name: name + status: UPLOADING + properties: + name: + title: Name + type: string + status: + $ref: '#/components/schemas/Status' + required: + - name + - status + title: DocumentStatus + HTTPValidationError: + description: HTTPValidationError + example: + detail: + - msg: msg + loc: + - anyof_schema_1_validator: anyof_schema_1_validator + actual_instance: "" + any_of_schemas: + - any_of_schemas + - any_of_schemas + anyof_schema_2_validator: 0 + - anyof_schema_1_validator: anyof_schema_1_validator + actual_instance: "" + any_of_schemas: + - any_of_schemas + - any_of_schemas + anyof_schema_2_validator: 0 + type: type + - msg: msg + loc: + - anyof_schema_1_validator: anyof_schema_1_validator + actual_instance: "" + any_of_schemas: + - any_of_schemas + - any_of_schemas + anyof_schema_2_validator: 0 + - anyof_schema_1_validator: anyof_schema_1_validator + actual_instance: "" + any_of_schemas: + - any_of_schemas + - any_of_schemas + anyof_schema_2_validator: 0 + type: type + properties: + detail: + items: + $ref: '#/components/schemas/ValidationError' + nullable: true + type: array + title: HTTPValidationError + KeyValuePair: + description: KeyValuePair + example: + value: value + key: key + properties: + key: + title: Key + type: string + value: + title: Value + type: string + required: + - key + - value + title: KeyValuePair + Status: + description: allowed enum values + enum: + - UPLOADING + - PROCESSING + - READY + - ERROR + title: Status + type: string + ValidationError: + description: ValidationError + example: + msg: msg + loc: + - anyof_schema_1_validator: anyof_schema_1_validator + actual_instance: "" + any_of_schemas: + - any_of_schemas + - any_of_schemas + anyof_schema_2_validator: 0 + - anyof_schema_1_validator: anyof_schema_1_validator + actual_instance: "" + any_of_schemas: + - any_of_schemas + - any_of_schemas + anyof_schema_2_validator: 0 + type: type + properties: + loc: + items: + $ref: '#/components/schemas/ValidationErrorLocInner' + type: array + msg: + title: Msg + type: string + type: + title: Type + type: string + required: + - loc + - msg + - type + title: ValidationError + ValidationErrorLocInner: + description: ValidationErrorLocInner + example: + anyof_schema_1_validator: anyof_schema_1_validator + actual_instance: "" + any_of_schemas: + - any_of_schemas + - any_of_schemas + anyof_schema_2_validator: 0 + properties: + anyof_schema_1_validator: + nullable: true + title: anyof_schema_1_validator + type: string + anyof_schema_2_validator: + nullable: true + title: anyof_schema_2_validator + type: integer + actual_instance: {} + any_of_schemas: + items: type: string - ValidationError: - title: ValidationError - description: ValidationError - required: - - loc - - msg - - type - properties: - loc: - title: loc - type: array - items: - $ref: '#/components/schemas/ValidationErrorLocInner' - msg: - title: Msg - type: string - type: - title: Type - type: string - example: - msg: msg - loc: - - - anyof_schema_1_validator: anyof_schema_1_validator - actual_instance: '' - any_of_schemas: - - any_of_schemas - - any_of_schemas - anyof_schema_2_validator: 0 - - - anyof_schema_1_validator: anyof_schema_1_validator - actual_instance: '' - any_of_schemas: - - any_of_schemas - - any_of_schemas - anyof_schema_2_validator: 0 - type: type - ValidationErrorLocInner: - title: ValidationErrorLocInner - description: ValidationErrorLocInner - properties: - anyof_schema_1_validator: - nullable: true - title: anyof_schema_1_validator - type: string - anyof_schema_2_validator: - nullable: true - title: anyof_schema_2_validator - type: integer - actual_instance: - title: actual_instance - any_of_schemas: - title: any_of_schemas - type: array - items: - type: string - example: - anyof_schema_1_validator: anyof_schema_1_validator - actual_instance: '' - any_of_schemas: - - any_of_schemas - - any_of_schemas - anyof_schema_2_validator: 0 + type: array + title: ValidationErrorLocInner diff --git a/libs/admin-api-lib/src/admin_api_lib/api_endpoints/file_uploader.py b/libs/admin-api-lib/src/admin_api_lib/api_endpoints/file_uploader.py index db55213c..8348dd8b 100644 --- a/libs/admin-api-lib/src/admin_api_lib/api_endpoints/file_uploader.py +++ b/libs/admin-api-lib/src/admin_api_lib/api_endpoints/file_uploader.py @@ -10,6 +10,17 @@ class FileUploader(UploaderBase): """File uploader endpoint of the admin API.""" + @abstractmethod + def cancel_upload(self, identification: str) -> None: + """ + Signal cancellation for an in-flight upload identified by document id. + + Parameters + ---------- + identification : str + Document identification (for example ``file:my_doc.pdf``). + """ + @abstractmethod async def upload_file( self, diff --git a/libs/admin-api-lib/src/admin_api_lib/api_endpoints/source_uploader.py b/libs/admin-api-lib/src/admin_api_lib/api_endpoints/source_uploader.py index 68365c89..e8887735 100644 --- a/libs/admin-api-lib/src/admin_api_lib/api_endpoints/source_uploader.py +++ b/libs/admin-api-lib/src/admin_api_lib/api_endpoints/source_uploader.py @@ -12,6 +12,17 @@ class SourceUploader(UploaderBase): """Abstract base class for source uploader API endpoints.""" + @abstractmethod + def cancel_upload(self, identification: str) -> None: + """ + Signal cancellation for an in-flight source upload. + + Parameters + ---------- + identification : str + Document identification (for example ``confluence:my_space``). + """ + @abstractmethod async def upload_source( self, diff --git a/libs/admin-api-lib/src/admin_api_lib/apis/admin_api.py b/libs/admin-api-lib/src/admin_api_lib/apis/admin_api.py index bc705761..ca5e7013 100644 --- a/libs/admin-api-lib/src/admin_api_lib/apis/admin_api.py +++ b/libs/admin-api-lib/src/admin_api_lib/apis/admin_api.py @@ -58,6 +58,12 @@ async def delete_document( """ Asynchronously deletes a document based on the provided identification. + If an upload/ingestion for this document is currently running (status `PROCESSING`), + the backend will request cancellation and perform best-effort cleanup so the document + can be re-uploaded. Cancellation is cooperative: a currently running extraction step + may still finish, but cancelled uploads will not publish new READY/ERROR status or + re-upload data after cancellation. + Parameters ---------- identification : str diff --git a/libs/admin-api-lib/src/admin_api_lib/dependency_container.py b/libs/admin-api-lib/src/admin_api_lib/dependency_container.py index 236c2f38..035a5524 100644 --- a/libs/admin-api-lib/src/admin_api_lib/dependency_container.py +++ b/libs/admin-api-lib/src/admin_api_lib/dependency_container.py @@ -84,18 +84,22 @@ class DependencyContainer(DeclarativeContainer): chunker_selector_config = Configuration() # Settings - s3_settings = S3Settings() + # Instantiate lazily to avoid requiring S3 env vars during import/test collection. + s3_settings = Singleton(S3Settings) chunker_settings = ChunkerSettings() chunker_embedder_type_settings = EmbedderClassTypeSettings() stackit_chunker_embedder_settings = StackitEmbedderSettings() ollama_chunker_embedder_settings = OllamaEmbedderSettings() ollama_settings = OllamaSettings() - langfuse_settings = LangfuseSettings() - stackit_vllm_settings = StackitVllmSettings() + # Instantiate lazily: env vars are required and should not be needed during import/test collection. + langfuse_settings = Singleton(LangfuseSettings) + # Instantiate lazily: STACKIT_VLLM_API_KEY may not be set for unit tests. + stackit_vllm_settings = Singleton(StackitVllmSettings) document_extractor_settings = DocumentExtractorSettings() rag_class_type_settings = RAGClassTypeSettings() rag_api_settings = RAGAPISettings() - key_value_store_settings = KeyValueSettings() + # Instantiate lazily: USECASE_KEYVALUE_HOST / USECASE_KEYVALUE_PORT are required. + key_value_store_settings = Singleton(KeyValueSettings) summarizer_settings = SummarizerSettings() source_uploader_settings = SourceUploaderSettings() retry_decorator_settings = RetryDecoratorSettings() @@ -167,9 +171,9 @@ class DependencyContainer(DeclarativeContainer): langfuse = Singleton( Langfuse, - public_key=langfuse_settings.public_key, - secret_key=langfuse_settings.secret_key, - host=langfuse_settings.host, + public_key=langfuse_settings.provided.public_key, + secret_key=langfuse_settings.provided.secret_key, + host=langfuse_settings.provided.host, ) summarizer_prompt = SUMMARIZE_PROMPT diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/admin_api.py b/libs/admin-api-lib/src/admin_api_lib/impl/admin_api.py index aabe1dfd..3c5ed602 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/admin_api.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/admin_api.py @@ -46,6 +46,8 @@ async def delete_document( self, identification: str, document_deleter: DocumentDeleter = Depends(Provide[DependencyContainer.document_deleter]), + file_uploader: FileUploader = Depends(Provide[DependencyContainer.file_uploader]), + source_uploader: SourceUploader = Depends(Provide[DependencyContainer.source_uploader]), ) -> None: """ Delete a document asynchronously. @@ -62,6 +64,8 @@ async def delete_document( ------- None """ + file_uploader.cancel_upload(identification) + source_uploader.cancel_upload(identification) await document_deleter.adelete_document(identification) @inject diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_file_uploader.py b/libs/admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_file_uploader.py index 3ec5536e..2ee3adf8 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_file_uploader.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_file_uploader.py @@ -20,13 +20,17 @@ from admin_api_lib.chunker.chunker import Chunker from admin_api_lib.models.status import Status from admin_api_lib.impl.key_db.file_status_key_value_store import FileStatusKeyValueStore +from admin_api_lib.impl.api_endpoints.upload_pipeline_mixin import ( + UploadCancelledError, + UploadPipelineMixin, +) from admin_api_lib.information_enhancer.information_enhancer import InformationEnhancer from admin_api_lib.utils.utils import sanitize_document_name logger = logging.getLogger(__name__) -class DefaultFileUploader(FileUploader): +class DefaultFileUploader(UploadPipelineMixin, FileUploader): """The DefaultFileUploader is responsible for adding a new source file document to the available content.""" def __init__( @@ -73,6 +77,13 @@ def __init__( self._background_tasks = [] self._file_service = file_service + def cancel_upload(self, identification: str) -> None: + """Mark an in-flight upload as cancelled.""" + self._key_value_store.cancel_run(identification) + if ":" not in identification: + self._key_value_store.cancel_run(f"file:{identification}") + logger.info("Cancellation requested for file upload: %s", identification) + async def upload_file( self, base_url: str, @@ -94,17 +105,30 @@ async def upload_file( """ self._prune_background_tasks() + source_name = "" + run_id: str | None = None + task_started = False try: file.filename = sanitize_document_name(file.filename) source_name = f"file:{sanitize_document_name(file.filename)}" self._check_if_already_in_processing(source_name) + run_id = self._key_value_store.start_run(source_name) self._key_value_store.upsert(source_name, Status.PROCESSING) content = await file.read() s3_path = await self._asave_new_document(content, file.filename, source_name) - task = asyncio.create_task(self._handle_source_upload(s3_path, source_name, file.filename, base_url)) + task = asyncio.create_task( + self._handle_source_upload( + s3_path, + source_name, + file.filename, + base_url, + run_id=run_id, + ) + ) task.add_done_callback(self._log_task_exception) self._background_tasks.append(task) + task_started = True except ValueError as e: self._key_value_store.upsert(source_name, Status.ERROR) raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) @@ -112,6 +136,9 @@ async def upload_file( self._key_value_store.upsert(source_name, Status.ERROR) logger.exception("Error while uploading %s", source_name) raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) + finally: + if run_id is not None and not task_started: + self._key_value_store.finish_run(source_name, run_id) def _log_task_exception(self, task: asyncio.Task) -> None: """ @@ -154,53 +181,68 @@ def _check_if_already_in_processing(self, source_name: str) -> None: if any(s == Status.PROCESSING for s in existing): raise ValueError(f"Document {source_name} is already in processing state") + async def _aextract_information_pieces(self, s3_path: Path, source_name: str) -> list: + """Extract information pieces for a file from the extractor service.""" + information_pieces = await asyncio.to_thread( + self._extractor_api.extract_from_file_post, + ExtractionRequest(path_on_s3=str(s3_path), document_name=source_name), + ) + if not information_pieces: + logger.error("No information pieces found in the document: %s", source_name) + raise RuntimeError("No information pieces found") + return information_pieces + async def _handle_source_upload( self, s3_path: Path, source_name: str, file_name: str, base_url: str, + run_id: str | None = None, ): + if run_id is None: + run_id = self._key_value_store.start_run(source_name) try: - # Run blocking extractor API call in thread pool to avoid blocking event loop - information_pieces = await asyncio.to_thread( - self._extractor_api.extract_from_file_post, - ExtractionRequest(path_on_s3=str(s3_path), document_name=source_name), - ) - - if not information_pieces: - self._key_value_store.upsert(source_name, Status.ERROR) - logger.error("No information pieces found in the document: %s", source_name) - raise Exception("No information pieces found") - documents: list[Document] = [] - for piece in information_pieces: - documents.append(self._information_mapper.extractor_information_piece2document(piece)) + self._assert_not_cancelled(source_name, run_id) + information_pieces = await self._aextract_information_pieces(s3_path, source_name) + self._assert_not_cancelled(source_name, run_id) - # Run blocking chunker call in thread pool to avoid blocking event loop - chunked_documents = await asyncio.to_thread(self._chunker.chunk, documents) + documents = self._map_information_pieces(information_pieces) + chunked_documents = await self._achunk_documents(documents) + self._assert_not_cancelled(source_name, run_id) enhanced_documents = await self._information_enhancer.ainvoke(chunked_documents) + self._assert_not_cancelled(source_name, run_id) self._add_file_url(file_name, base_url, enhanced_documents) rag_information_pieces = [ self._information_mapper.document2rag_information_piece(doc) for doc in enhanced_documents ] - # Replace old document - # deletion is allowed to fail - with suppress(Exception): - await self._document_deleter.adelete_document( - source_name, - remove_from_key_value_store=False, - remove_from_storage=False, - ) + await self._abest_effort_replace_existing(source_name) + self._assert_not_cancelled(source_name, run_id) # Run blocking RAG API call in thread pool to avoid blocking event loop await asyncio.to_thread(self._rag_api.upload_information_piece, rag_information_pieces) + + if self._key_value_store.is_cancelled_or_stale(source_name, run_id): + await self._abest_effort_cleanup_cancelled(source_name) + logger.info("Upload for %s finished after cancellation request; cleaned up artifacts.", source_name) + return + self._key_value_store.upsert(source_name, Status.READY) logger.info("Source uploaded successfully: %s", source_name) + except UploadCancelledError: + with suppress(Exception): + self._key_value_store.remove(source_name) + logger.info("Upload cancelled for %s.", source_name) except Exception: + if self._key_value_store.is_cancelled_or_stale(source_name, run_id): + logger.info("Upload for %s stopped because cancellation was requested.", source_name) + return self._key_value_store.upsert(source_name, Status.ERROR) logger.exception("Error while uploading %s", source_name) + finally: + self._key_value_store.finish_run(source_name, run_id) def _add_file_url(self, file_name: str, base_url: str, chunked_documents: list[Document]): document_url = f"{base_url.rstrip('/')}/document_reference/{urllib.parse.quote_plus(file_name)}" diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_source_uploader.py b/libs/admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_source_uploader.py index ab4fa2b1..f0e5d16b 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_source_uploader.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/api_endpoints/default_source_uploader.py @@ -7,7 +7,6 @@ from pydantic import StrictStr from fastapi import status, HTTPException -from langchain_core.documents import Document from admin_api_lib.extractor_api_client.openapi_client.api.extractor_api import ExtractorApi from admin_api_lib.extractor_api_client.openapi_client.models.extraction_parameters import ExtractionParameters @@ -20,16 +19,17 @@ from admin_api_lib.chunker.chunker import Chunker from admin_api_lib.models.status import Status from admin_api_lib.impl.key_db.file_status_key_value_store import FileStatusKeyValueStore +from admin_api_lib.impl.api_endpoints.upload_pipeline_mixin import ( + UploadCancelledError, + UploadPipelineMixin, +) from admin_api_lib.information_enhancer.information_enhancer import InformationEnhancer from admin_api_lib.utils.utils import sanitize_document_name -from admin_api_lib.rag_backend_client.openapi_client.models.information_piece import ( - InformationPiece as RagInformationPiece, -) logger = logging.getLogger(__name__) -class DefaultSourceUploader(SourceUploader): +class DefaultSourceUploader(UploadPipelineMixin, SourceUploader): """The DefaultSourceUploader is responsible for uploading source files for content extraction.""" def __init__( @@ -74,6 +74,11 @@ def __init__( self._background_threads = [] self._settings = settings + def cancel_upload(self, identification: str) -> None: + """Mark an in-flight source upload as cancelled.""" + self._key_value_store.cancel_run(identification) + logger.info("Cancellation requested for source upload: %s", identification) + async def upload_source( self, source_type: StrictStr, @@ -101,13 +106,20 @@ async def upload_source( self._prune_background_threads() source_name = f"{source_type}:{sanitize_document_name(name)}" + run_id: str | None = None + thread_started = False try: self._check_if_already_in_processing(source_name) + run_id = self._key_value_store.start_run(source_name) self._key_value_store.upsert(source_name, Status.PROCESSING) - thread = Thread(target=self._thread_worker, args=(source_name, source_type, kwargs, self._settings.timeout)) + thread = Thread( + target=self._thread_worker, + args=(source_name, source_type, kwargs, self._settings.timeout, run_id), + ) thread.start() self._background_threads.append(thread) + thread_started = True except ValueError as e: self._key_value_store.upsert(source_name, Status.ERROR) raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) @@ -115,6 +127,9 @@ async def upload_source( self._key_value_store.upsert(source_name, Status.ERROR) logger.exception("Error while uploading %s", source_name) raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) + finally: + if run_id is not None and not thread_started: + self._key_value_store.finish_run(source_name, run_id) def _check_if_already_in_processing(self, source_name: str) -> None: """ @@ -138,17 +153,43 @@ def _check_if_already_in_processing(self, source_name: str) -> None: if any(s == Status.PROCESSING for s in existing): raise ValueError(f"Document {source_name} is already in processing state") - def _thread_worker(self, source_name, source_type, kwargs, timeout): + async def _aextract_information_pieces( + self, + source_name: str, + source_type: StrictStr, + kwargs: list[KeyValuePair], + ) -> list: + """Extract information pieces for a source from the extractor service.""" + information_pieces = await asyncio.to_thread( + self._extractor_api.extract_from_source, + ExtractionParameters( + source_type=source_type, document_name=source_name, kwargs=[x.to_dict() for x in kwargs] + ), + ) + if not information_pieces: + logger.error("No information pieces found in the document: %s", source_name) + raise RuntimeError("No information pieces found") + return information_pieces + + def _thread_worker(self, source_name, source_type, kwargs, timeout, run_id: str): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete( asyncio.wait_for( - self._handle_source_upload(source_name=source_name, source_type=source_type, kwargs=kwargs), + self._handle_source_upload( + source_name=source_name, + source_type=source_type, + kwargs=kwargs, + run_id=run_id, + ), timeout=timeout, ) ) except asyncio.TimeoutError: + if self._key_value_store.is_cancelled_or_stale(source_name, run_id): + logger.info("Timed out worker for %s ignored because upload was cancelled.", source_name) + return logger.error( "Upload of %s timed out after %s seconds (increase SOURCE_UPLOADER_TIMEOUT to allow longer ingestions)", source_name, @@ -156,57 +197,67 @@ def _thread_worker(self, source_name, source_type, kwargs, timeout): ) self._key_value_store.upsert(source_name, Status.ERROR) except Exception: + if self._key_value_store.is_cancelled_or_stale(source_name, run_id): + logger.info("Worker exception for %s ignored because upload was cancelled.", source_name) + return logger.exception("Error while uploading %s", source_name) self._key_value_store.upsert(source_name, Status.ERROR) finally: loop.close() + self._key_value_store.finish_run(source_name, run_id) async def _handle_source_upload( self, source_name: str, source_type: StrictStr, kwargs: list[KeyValuePair], + run_id: str | None = None, ): try: - # Run blocking extractor API call in thread pool to avoid blocking event loop - information_pieces = await asyncio.to_thread( - self._extractor_api.extract_from_source, - ExtractionParameters( - source_type=source_type, document_name=source_name, kwargs=[x.to_dict() for x in kwargs] - ), - ) - - if not information_pieces: - self._key_value_store.upsert(source_name, Status.ERROR) - logger.error("No information pieces found in the document: %s", source_name) - raise Exception("No information pieces found") - documents: list[Document] = [] - for piece in information_pieces: - documents.append(self._information_mapper.extractor_information_piece2document(piece)) + if run_id is None: + run_id = self._key_value_store.start_run(source_name) + self._assert_not_cancelled(source_name, run_id) + information_pieces = await self._aextract_information_pieces(source_name, source_type, kwargs) + self._assert_not_cancelled(source_name, run_id) - # Run blocking chunker call in thread pool to avoid blocking event loop - chunked_documents = await asyncio.to_thread(self._chunker.chunk, documents) + documents = self._map_information_pieces(information_pieces) + chunked_documents = await self._achunk_documents(documents) + self._assert_not_cancelled(source_name, run_id) # limit concurrency to avoid spawning multiple threads per call enhanced_documents = await self._information_enhancer.ainvoke( chunked_documents, config={"max_concurrency": 1} ) + self._assert_not_cancelled(source_name, run_id) - rag_information_pieces: list[RagInformationPiece] = [] - for doc in enhanced_documents: - rag_information_pieces.append(self._information_mapper.document2rag_information_piece(doc)) + rag_information_pieces = [ + self._information_mapper.document2rag_information_piece(doc) for doc in enhanced_documents + ] - with suppress(Exception): - await self._document_deleter.adelete_document( - source_name, - remove_from_key_value_store=False, - remove_from_storage=False, - ) + await self._abest_effort_replace_existing(source_name) + self._assert_not_cancelled(source_name, run_id) # Run blocking RAG API call in thread pool to avoid blocking event loop await asyncio.to_thread(self._rag_api.upload_information_piece, rag_information_pieces) + + if self._key_value_store.is_cancelled_or_stale(source_name, run_id): + await self._abest_effort_cleanup_cancelled(source_name) + logger.info("Upload for %s finished after cancellation request; cleaned up artifacts.", source_name) + return + self._key_value_store.upsert(source_name, Status.READY) logger.info("Source uploaded successfully: %s", source_name) + except UploadCancelledError: + with suppress(Exception): + self._key_value_store.remove(source_name) + logger.info("Upload cancelled for %s.", source_name) except Exception: + if self._key_value_store.is_cancelled_or_stale(source_name, run_id): + logger.info("Upload for %s stopped because cancellation was requested.", source_name) + return self._key_value_store.upsert(source_name, Status.ERROR) logger.exception("Error while uploading %s", source_name) + finally: + # Best-effort cleanup for direct calls/tests; thread worker also calls finish_run. + if run_id is not None: + self._key_value_store.finish_run(source_name, run_id) diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/api_endpoints/upload_pipeline_mixin.py b/libs/admin-api-lib/src/admin_api_lib/impl/api_endpoints/upload_pipeline_mixin.py new file mode 100644 index 00000000..864a85f4 --- /dev/null +++ b/libs/admin-api-lib/src/admin_api_lib/impl/api_endpoints/upload_pipeline_mixin.py @@ -0,0 +1,66 @@ +"""Shared helpers for admin upload pipelines (file + source).""" + +from __future__ import annotations + +import asyncio +from contextlib import suppress + +from langchain_core.documents import Document + +from admin_api_lib.api_endpoints.document_deleter import DocumentDeleter +from admin_api_lib.chunker.chunker import Chunker +from admin_api_lib.impl.key_db.file_status_key_value_store import FileStatusKeyValueStore +from admin_api_lib.impl.mapper.informationpiece2document import InformationPiece2Document + + +class UploadCancelledError(Exception): + """Raised when a running upload/ingestion was cancelled or became stale.""" + + +class UploadPipelineMixin: + """ + Mixin that consolidates shared pipeline steps for file and source uploads. + + The concrete uploader must provide these attributes: + - `_key_value_store: FileStatusKeyValueStore` + - `_document_deleter: DocumentDeleter` + - `_information_mapper: InformationPiece2Document` + - `_chunker: Chunker` + """ + + _key_value_store: FileStatusKeyValueStore + _document_deleter: DocumentDeleter + _information_mapper: InformationPiece2Document + _chunker: Chunker + + def _assert_not_cancelled(self, identification: str, run_id: str) -> None: + if self._key_value_store.is_cancelled_or_stale(identification, run_id): + raise UploadCancelledError(f"Upload cancelled for {identification}") + + def _map_information_pieces(self, information_pieces: list) -> list[Document]: + """Map extractor information pieces to langchain documents.""" + return [self._information_mapper.extractor_information_piece2document(piece) for piece in information_pieces] + + async def _achunk_documents(self, documents: list[Document]) -> list[Document]: + """Chunk documents using the configured chunker in a thread pool.""" + return await asyncio.to_thread(self._chunker.chunk, documents) + + async def _abest_effort_replace_existing(self, identification: str) -> None: + """Best-effort delete of existing document chunks to support re-upload.""" + with suppress(Exception): + await self._document_deleter.adelete_document( + identification, + remove_from_key_value_store=False, + remove_from_storage=False, + ) + + async def _abest_effort_cleanup_cancelled(self, identification: str) -> None: + """Best-effort cleanup for cancelled uploads (status + vector-db artifacts).""" + with suppress(Exception): + await self._document_deleter.adelete_document( + identification, + remove_from_key_value_store=False, + remove_from_storage=False, + ) + with suppress(Exception): + self._key_value_store.remove(identification) diff --git a/libs/admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py b/libs/admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py index 826a4db9..947c9777 100644 --- a/libs/admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py +++ b/libs/admin-api-lib/src/admin_api_lib/impl/key_db/file_status_key_value_store.py @@ -2,6 +2,7 @@ import json import ssl +import uuid from typing import Any from redis import Redis @@ -32,6 +33,11 @@ class FileStatusKeyValueStore: INNER_FILENAME_KEY = "filename" INNER_STATUS_KEY = "status" + ACTIVE_RUN_PREFIX = "stackit-rag-template-active-run:" + CANCELLED_RUN_PREFIX = "stackit-rag-template-cancelled-run:" + CANCEL_TTL_SECONDS = 6 * 60 * 60 # keep cancel markers around for a while to stop late workers + ACTIVE_TTL_SECONDS = 24 * 60 * 60 # keep last run_id around so late workers can detect staleness + def __init__(self, settings: KeyValueSettings): """ Initialize the FileStatusKeyValueStore with the given settings. @@ -155,3 +161,45 @@ def get_all(self) -> list[tuple[str, Status]]: """ all_file_informations = list(self._redis.smembers(self.STORAGE_KEY)) return [FileStatusKeyValueStore._from_str(x) for x in all_file_informations] + + def start_run(self, identification: str) -> str: + """Start a new ingestion run for `identification` and return a run_id.""" + run_id = uuid.uuid4().hex + self._redis.set(self._active_run_key(identification), run_id) + self._redis.delete(self._cancelled_run_key(identification)) + return run_id + + def finish_run(self, identification: str, run_id: str) -> None: + """Finish a run, keeping the last run_id around so late workers can detect staleness.""" + active = self._redis.get(self._active_run_key(identification)) + if active == run_id: + self._redis.delete(self._cancelled_run_key(identification)) + # Keep the last run_id for a while: if a newer run was started+finished quickly, + # older workers must still see that they are stale and must not publish results. + self._redis.expire(self._active_run_key(identification), self.ACTIVE_TTL_SECONDS) + + def cancel_run(self, identification: str) -> None: + """Request cancellation of the currently active run for `identification`.""" + active = self._redis.get(self._active_run_key(identification)) + if not active: + return + self._redis.set(self._cancelled_run_key(identification), active, ex=self.CANCEL_TTL_SECONDS) + + def is_cancelled_or_stale(self, identification: str, run_id: str) -> bool: + """ + Return True if this run has been cancelled or is no longer the active run. + + This is what makes cancellation safe across multiple pods and multiple worker processes: + all processes consult the same Redis state. + """ + cancelled = self._redis.get(self._cancelled_run_key(identification)) + if cancelled == run_id: + return True + active = self._redis.get(self._active_run_key(identification)) + return active is not None and active != run_id + + def _active_run_key(self, identification: str) -> str: + return f"{self.ACTIVE_RUN_PREFIX}{identification}" + + def _cancelled_run_key(self, identification: str) -> str: + return f"{self.CANCELLED_RUN_PREFIX}{identification}" diff --git a/libs/admin-api-lib/tests/admin_api_test.py b/libs/admin-api-lib/tests/admin_api_test.py new file mode 100644 index 00000000..29295ae1 --- /dev/null +++ b/libs/admin-api-lib/tests/admin_api_test.py @@ -0,0 +1,25 @@ +import pytest +from unittest.mock import AsyncMock, MagicMock + +from admin_api_lib.impl.admin_api import AdminApi + + +@pytest.mark.asyncio +async def test_delete_document_requests_cancellation_before_delete(): + admin_api = AdminApi() + document_deleter = MagicMock() + document_deleter.adelete_document = AsyncMock() + file_uploader = MagicMock() + source_uploader = MagicMock() + + identification = "file:doc.pdf" + await admin_api.delete_document( + identification=identification, + document_deleter=document_deleter, + file_uploader=file_uploader, + source_uploader=source_uploader, + ) + + file_uploader.cancel_upload.assert_called_once_with(identification) + source_uploader.cancel_upload.assert_called_once_with(identification) + document_deleter.adelete_document.assert_awaited_once_with(identification) diff --git a/libs/admin-api-lib/tests/default_file_uploader_test.py b/libs/admin-api-lib/tests/default_file_uploader_test.py index 79571767..e146f8fc 100644 --- a/libs/admin-api-lib/tests/default_file_uploader_test.py +++ b/libs/admin-api-lib/tests/default_file_uploader_test.py @@ -13,6 +13,10 @@ def mocks(): extractor_api = MagicMock() key_value_store = MagicMock() key_value_store.get_all.return_value = [] + key_value_store.start_run.return_value = "run-1" + key_value_store.finish_run.return_value = None + key_value_store.cancel_run.return_value = None + key_value_store.is_cancelled_or_stale.return_value = False information_enhancer = MagicMock() information_enhancer.ainvoke = AsyncMock() chunker = MagicMock() @@ -139,3 +143,44 @@ async def test_upload_file_starts_background_task(mocks): key_value_store.upsert.assert_any_call(source_name, Status.PROCESSING) assert len(uploader._background_tasks) == 1 assert uploader._background_tasks[0].get_name() # Task was created + + +@pytest.mark.asyncio +async def test_handle_file_upload_cancelled_skips_terminal_status(mocks): + extractor_api, key_value_store, information_enhancer, chunker, document_deleter, rag_api, information_mapper = mocks + # setup mocks for a successful path + dummy_piece = MagicMock() + extractor_api.extract_from_file_post.return_value = [dummy_piece] + dummy_doc = MagicMock() + information_mapper.extractor_information_piece2document.return_value = dummy_doc + chunker.chunk.return_value = [dummy_doc] + information_enhancer.ainvoke.return_value = [dummy_doc] + information_mapper.document2rag_information_piece.return_value = {"foo": "bar"} + + uploader = DefaultFileUploader( + extractor_api, + key_value_store, + information_enhancer, + chunker, + document_deleter, + rag_api, + information_mapper, + file_service=MagicMock(), + ) + + upload_filename = "file:cancelled-doc" + key_value_store.start_run.return_value = "run-cancelled" + key_value_store.is_cancelled_or_stale.return_value = True + + await uploader._handle_source_upload( + "s3path", + upload_filename, + "doc.txt", + "http://base", + run_id="run-cancelled", + ) + + status_updates = [call.args for call in key_value_store.upsert.call_args_list] + assert (upload_filename, Status.READY) not in status_updates + assert (upload_filename, Status.ERROR) not in status_updates + rag_api.upload_information_piece.assert_not_called() diff --git a/libs/admin-api-lib/tests/default_source_uploader_test.py b/libs/admin-api-lib/tests/default_source_uploader_test.py index 96fb65f1..c799af29 100644 --- a/libs/admin-api-lib/tests/default_source_uploader_test.py +++ b/libs/admin-api-lib/tests/default_source_uploader_test.py @@ -16,6 +16,10 @@ def mocks(): extractor_api = MagicMock() key_value_store = MagicMock() key_value_store.get_all.return_value = [] + key_value_store.start_run.return_value = "run-1" + key_value_store.finish_run.return_value = None + key_value_store.cancel_run.return_value = None + key_value_store.is_cancelled_or_stale.return_value = False information_enhancer = MagicMock() information_enhancer.ainvoke = AsyncMock() chunker = MagicMock() @@ -198,7 +202,7 @@ async def test_upload_source_timeout_error(mocks, monkeypatch): source_name = f"{source_type}:{sanitize_document_name(name)}" # monkey-patch the handler to sleep so that timeout triggers - async def fake_handle(self, source_name_arg, source_type_arg, kwargs_arg): + async def fake_handle(self, source_name_arg, source_type_arg, kwargs_arg, upload_version=None): await asyncio.sleep(3600) # patch handler and Thread to trigger timeout synchronously @@ -233,3 +237,52 @@ def is_alive(self): calls = [call.args for call in key_value_store.upsert.call_args_list] assert (source_name, Status.PROCESSING) in calls assert (source_name, Status.ERROR) in calls + + +@pytest.mark.asyncio +async def test_handle_source_upload_cancelled_skips_terminal_status(mocks): + ( + extractor_api, + key_value_store, + information_enhancer, + chunker, + document_deleter, + rag_api, + information_mapper, + settings, + ) = mocks + # Setup mocks for a successful path + dummy_piece = MagicMock() + extractor_api.extract_from_source.return_value = [dummy_piece] + dummy_doc = MagicMock() + information_mapper.extractor_information_piece2document.return_value = dummy_doc + chunker.chunk.return_value = [dummy_doc] + information_enhancer.ainvoke.return_value = [dummy_doc] + information_mapper.document2rag_information_piece.return_value = {"p": "v"} + + uploader = DefaultSourceUploader( + extractor_api, + key_value_store, + information_enhancer, + chunker, + document_deleter, + rag_api, + information_mapper, + settings=settings, + ) + + source_name = "confluence:cancelled-space" + key_value_store.start_run.return_value = "run-cancelled" + key_value_store.is_cancelled_or_stale.return_value = True + + await uploader._handle_source_upload( + source_name, + "confluence", + [], + run_id="run-cancelled", + ) + + status_updates = [call.args for call in key_value_store.upsert.call_args_list] + assert (source_name, Status.READY) not in status_updates + assert (source_name, Status.ERROR) not in status_updates + rag_api.upload_information_piece.assert_not_called() diff --git a/services/frontend/apps/admin-app-e2e/src/e2e/app.cy.ts b/services/frontend/apps/admin-app-e2e/src/e2e/app.cy.ts index 4438194c..0e18f88d 100644 --- a/services/frontend/apps/admin-app-e2e/src/e2e/app.cy.ts +++ b/services/frontend/apps/admin-app-e2e/src/e2e/app.cy.ts @@ -4,7 +4,7 @@ describe('When the App is loaded', () => { cy.get('[data-testid="document-view"]').should('exist'); }); - it('should disable delete for processing documents', () => { + it('should enable delete for processing and ready documents', () => { // Stub the backend response for documents cy.intercept('GET', '**/all_documents_status', [ { name: 'Doc-A.pdf', status: 'PROCESSING' }, @@ -16,11 +16,11 @@ describe('When the App is loaded', () => { // The list renders items with id equal to document name cy.get('#Doc-A.pdf').within(() => { - cy.get('[data-testid="document-delete-btn"]').should('be.disabled'); + cy.get('[data-testid="document-delete-btn"]').should('not.be.disabled'); }); cy.get('#Doc-B.pdf').within(() => { cy.get('[data-testid="document-delete-btn"]').should('not.be.disabled'); }); }); -}); \ No newline at end of file +}); diff --git a/services/frontend/apps/admin-app-e2e/src/e2e/delete-disable.cy.ts b/services/frontend/apps/admin-app-e2e/src/e2e/delete-disable.cy.ts index 3c636fda..b67ef09b 100644 --- a/services/frontend/apps/admin-app-e2e/src/e2e/delete-disable.cy.ts +++ b/services/frontend/apps/admin-app-e2e/src/e2e/delete-disable.cy.ts @@ -1,5 +1,5 @@ describe('Document deletion availability', () => { - it('disables delete for documents in PROCESSING and enables for READY', () => { + it('enables delete for documents in PROCESSING and READY', () => { cy.intercept('GET', '**/all_documents_status', [ { name: 'doc-processing.pdf', status: 'PROCESSING' }, { name: 'doc-ready.pdf', status: 'READY' }, @@ -8,12 +8,12 @@ describe('Document deletion availability', () => { cy.visit('/documents'); cy.wait('@docs'); - // Processing item should have disabled delete button + // Processing item should have enabled delete button cy.contains('h4', 'doc-processing.pdf') .parentsUntil('div') .parent() .find('[data-testid="document-delete-btn"]') - .should('be.disabled'); + .should('not.be.disabled'); // Ready item should have enabled delete button cy.contains('h4', 'doc-ready.pdf') diff --git a/services/frontend/apps/admin-app-e2e/src/e2e/delete-processing.cy.ts b/services/frontend/apps/admin-app-e2e/src/e2e/delete-processing.cy.ts index 4262d231..97480975 100644 --- a/services/frontend/apps/admin-app-e2e/src/e2e/delete-processing.cy.ts +++ b/services/frontend/apps/admin-app-e2e/src/e2e/delete-processing.cy.ts @@ -1,20 +1,22 @@ -describe('Document deletion guarded by processing state', () => { - it('disables delete for processing documents and enables for ready ones', () => { +describe('Document deletion for processing state', () => { + it('allows deleting processing documents', () => { cy.intercept('GET', '**/all_documents_status', [ { name: 'doc-processing.pdf', status: 'PROCESSING' }, { name: 'doc-ready.pdf', status: 'READY' } ]).as('getDocs'); + cy.intercept('DELETE', '**/delete_document/*', { statusCode: 200, body: {} }).as('deleteDocument'); cy.visit('/documents'); cy.wait('@getDocs'); - // Find the list items by their id (DocumentContainer sets :id to document.name) cy.get('#doc-processing.pdf').within(() => { - cy.get('[data-testid="document-delete-btn"]').should('be.disabled'); + cy.get('[data-testid="document-delete-btn"]').should('not.be.disabled').click(); }); - cy.get('#doc-ready.pdf').within(() => { - cy.get('[data-testid="document-delete-btn"]').should('not.be.disabled'); - }); + cy.get('.modal-action-button--delete').click(); + + cy.wait('@deleteDocument') + .its('request.url') + .should('include', 'doc-processing.pdf'); }); }); diff --git a/services/frontend/libs/admin-app/data-access/+state/documents.store.ts b/services/frontend/libs/admin-app/data-access/+state/documents.store.ts index cdd1b830..566f7aee 100644 --- a/services/frontend/libs/admin-app/data-access/+state/documents.store.ts +++ b/services/frontend/libs/admin-app/data-access/+state/documents.store.ts @@ -137,11 +137,6 @@ export const useDocumentsStore = defineStore("chat", () => { const deleteDocument = async (documentId: string) => { try { - // Prevent deletion if the document is currently processing - const doc = allDocuments.value?.find((d) => d.name === documentId); - if (doc?.status === "PROCESSING") { - return; // No-op while processing - } await DocumentAPI.deleteDocument(documentId); await loadDocuments(); } catch (err) { diff --git a/services/frontend/libs/admin-app/ui/DocumentItem.vue b/services/frontend/libs/admin-app/ui/DocumentItem.vue index b36b9e93..b88e1774 100644 --- a/services/frontend/libs/admin-app/ui/DocumentItem.vue +++ b/services/frontend/libs/admin-app/ui/DocumentItem.vue @@ -21,11 +21,6 @@ const props = defineProps<{ deleteDocument: (documentId: string) => void; }>(); -// Deletion is not allowed while a document is in PROCESSING state -const PROCESSING_STATE = "PROCESSING"; -const isProcessing = computed(() => props.data.status === PROCESSING_STATE); -const canDelete = computed(() => !isProcessing.value); - const statusClasses = { UPLOADING: "text-info", PROCESSING: "text-warning", @@ -43,7 +38,6 @@ const statusText = { const documentIcon = computed(() => getDocumentIcon(props.data.name)); const confirmDelete = () => { - if (!canDelete.value) return; // Guard against accidental triggers if (typeof document !== "undefined") { previouslyFocusedElement.value = document.activeElement as HTMLElement | null; } @@ -144,13 +138,11 @@ watch(showDeleteModal, async (isOpen) => {
diff --git a/tools/api-generator.sh b/tools/api-generator.sh index 6f212b16..988f51b4 100755 --- a/tools/api-generator.sh +++ b/tools/api-generator.sh @@ -1,47 +1,61 @@ -#!/bin/bash -echo "Chose which api and its corresponding client to recreate. Possible values are: 0, 1, 2" -echo 0: admin-api-lib -echo 1: rag-backend -echo 2: extractor-api-lib +#!/usr/bin/env bash +set -euo pipefail -read api_idx +echo "Choose which API and its corresponding client to recreate. Possible values are: 0, 1, 2" +echo "0: admin-api-lib" +echo "1: rag-backend" +echo "2: extractor-api-lib" -declare -A api_names=( ["0"]="admin-api-lib" ["1"]="rag-backend" ["2"]="extractor-api-lib" ) -api_name=${api_names[$api_idx]} +api_idx="${1:-}" +if [[ -z "${api_idx}" ]]; then + read -r api_idx +fi -echo $api_name +api_name="" +case "${api_idx}" in + 0) api_name="admin-api-lib" ;; + 1) api_name="rag-backend" ;; + 2) api_name="extractor-api-lib" ;; + *) echo "Invalid api index: ${api_idx}" ; exit 1 ;; +esac + +echo "${api_name}" + +format_with_black() { + local dir="$1" + if command -v black >/dev/null 2>&1; then + (cd "${dir}" && black .) + return 0 + fi + if command -v poetry >/dev/null 2>&1; then + # Poetry environments are not guaranteed to be installed locally; don't hard-fail on formatting. + (cd "${dir}" && poetry run black .) || true + return 0 + fi + echo "Warning: black not found; skipping formatting for ${dir}" >&2 +} -case $api_name in +case "${api_name}" in "admin-api-lib") docker run --user $(id -u):$(id -g) --rm -v $PWD:/local openapitools/openapi-generator-cli@sha256:b35aee2d0f6ffadadcdad9d8fc3c46e8d48360c20b5731a5f47c809d51f67a04 generate -i /local/libs/admin-api-lib/openapi.yaml -g python-fastapi -o /local/libs/admin-api-lib --additional-properties=packageName=admin_api_lib,generateSourceCodeOnly=True - rm -r libs/admin-api-lib/src/openapi_server - cd ./libs/admin-api-lib - black . - cd ../.. + rm -rf libs/admin-api-lib/src/openapi_server + format_with_black "./libs/admin-api-lib" ;; "rag-backend") docker run --user $(id -u):$(id -g) --rm -v $PWD:/local openapitools/openapi-generator-cli@sha256:b35aee2d0f6ffadadcdad9d8fc3c46e8d48360c20b5731a5f47c809d51f67a04 generate -i /local/libs/rag-core-api/openapi.yaml -g python-fastapi -o /local/libs/rag-core-api --additional-properties=packageName=rag_core_api,generateSourceCodeOnly=True - rm -r libs/rag-core-api/src/openapi_server - cd ./libs/rag-core-api - black . - cd ../.. + rm -rf libs/rag-core-api/src/openapi_server + format_with_black "./libs/rag-core-api" docker run --user $(id -u):$(id -g) --rm -v $PWD:/local openapitools/openapi-generator-cli@sha256:b35aee2d0f6ffadadcdad9d8fc3c46e8d48360c20b5731a5f47c809d51f67a04 generate -i /local/libs/rag-core-api/openapi.yaml -g python -o /local/libs/admin-api-lib/src --additional-properties=generateSourceCodeOnly=True,packageName=admin_api_lib.rag_backend_client.openapi_client - cd ./libs/admin-api-lib - black . - cd ../.. + format_with_black "./libs/admin-api-lib" ;; "extractor-api-lib") docker run --user $(id -u):$(id -g) --rm -v $PWD:/local openapitools/openapi-generator-cli@sha256:b35aee2d0f6ffadadcdad9d8fc3c46e8d48360c20b5731a5f47c809d51f67a04 generate -i /local/libs/extractor-api-lib/openapi.yaml -g python-fastapi -o /local/libs/extractor-api-lib --additional-properties=packageName=extractor_api_lib,generateSourceCodeOnly=True - rm -r libs/extractor-api-lib/src/openapi_server - cd ./libs/extractor-api-lib - black . - cd ../.. + rm -rf libs/extractor-api-lib/src/openapi_server + format_with_black "./libs/extractor-api-lib" docker run --user $(id -u):$(id -g) --rm -v $PWD:/local openapitools/openapi-generator-cli@sha256:b35aee2d0f6ffadadcdad9d8fc3c46e8d48360c20b5731a5f47c809d51f67a04 generate -i /local/libs/extractor-api-lib/openapi.yaml -g python -o /local/libs/admin-api-lib/src --additional-properties=packageName=admin_api_lib.extractor_api_client.openapi_client,generateSourceCodeOnly=True,testOutput=false - rm -r libs/admin-api-lib/src/openapi_server + rm -rf libs/admin-api-lib/src/openapi_server find ./libs/admin-api-lib/src/admin_api_lib/extractor_api_client -type f -name '*.md' -delete - cd ./libs/admin-api-lib - black . - cd ../.. + format_with_black "./libs/admin-api-lib" ;; *) echo "Invalid api name"