diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index e3b988676..5b8c4b993 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -4,6 +4,8 @@ Provides Google Drive specific methods for data retrieval and indexing via Composio. """ +import hashlib +import json import logging import os import tempfile @@ -464,6 +466,55 @@ async def check_document_by_unique_identifier( return existing_doc_result.scalars().first() +async def check_document_by_content_hash( + session: AsyncSession, content_hash: str +) -> Document | None: + """Check if a document with the given content hash already exists. + + This is used to prevent duplicate content from being indexed, regardless + of which connector originally indexed it. + """ + from sqlalchemy.future import select + + existing_doc_result = await session.execute( + select(Document).where(Document.content_hash == content_hash) + ) + return existing_doc_result.scalars().first() + + +async def check_document_by_google_drive_file_id( + session: AsyncSession, file_id: str, search_space_id: int +) -> Document | None: + """Check if a document with this Google Drive file ID exists (from any connector). + + This checks both metadata key formats: + - 'google_drive_file_id' (normal Google Drive connector) + - 'file_id' (Composio Google Drive connector) + + This allows detecting duplicates BEFORE downloading/ETL, saving expensive API calls. + """ + from sqlalchemy import String, cast, or_ + from sqlalchemy.future import select + + # When casting JSON to String, the result includes quotes: "value" instead of value + # So we need to compare with the quoted version + quoted_file_id = f'"{file_id}"' + + existing_doc_result = await session.execute( + select(Document).where( + Document.search_space_id == search_space_id, + or_( + # Normal Google Drive connector format + cast(Document.document_metadata["google_drive_file_id"], String) + == quoted_file_id, + # Composio Google Drive connector format + cast(Document.document_metadata["file_id"], String) == quoted_file_id, + ), + ) + ) + return existing_doc_result.scalars().first() + + async def update_connector_last_indexed( session: AsyncSession, connector, @@ -477,6 +528,33 @@ async def update_connector_last_indexed( logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}") +def generate_indexing_settings_hash( + selected_folders: list[dict], + selected_files: list[dict], + indexing_options: dict, +) -> str: + """Generate a hash of indexing settings to detect configuration changes. + + This hash is used to determine if indexing settings have changed since + the last index, which would require a full re-scan instead of delta sync. + + Args: + selected_folders: List of {id, name} for folders to index + selected_files: List of {id, name} for individual files to index + indexing_options: Dict with max_files_per_folder, include_subfolders, etc. + + Returns: + MD5 hash string of the settings + """ + settings = { + "folders": sorted([f.get("id", "") for f in selected_folders]), + "files": sorted([f.get("id", "") for f in selected_files]), + "include_subfolders": indexing_options.get("include_subfolders", True), + "max_files_per_folder": indexing_options.get("max_files_per_folder", 100), + } + return hashlib.md5(json.dumps(settings, sort_keys=True).encode()).hexdigest() + + async def index_composio_google_drive( session: AsyncSession, connector, @@ -487,12 +565,16 @@ async def index_composio_google_drive( log_entry, update_last_indexed: bool = True, max_items: int = 1000, -) -> tuple[int, str]: +) -> tuple[int, int, str | None]: """Index Google Drive files via Composio with delta sync support. + Returns: + Tuple of (documents_indexed, documents_skipped, error_message or None) + Delta Sync Flow: 1. First sync: Full scan + get initial page token 2. Subsequent syncs: Use LIST_CHANGES to process only changed files + (unless settings changed or incremental_sync is disabled) Supports folder/file selection via connector config: - selected_folders: List of {id, name} for folders to index @@ -508,12 +590,42 @@ async def index_composio_google_drive( selected_files = connector_config.get("selected_files", []) indexing_options = connector_config.get("indexing_options", {}) + max_files_per_folder = indexing_options.get("max_files_per_folder", 100) + include_subfolders = indexing_options.get("include_subfolders", True) + incremental_sync = indexing_options.get("incremental_sync", True) + + # Generate current settings hash to detect configuration changes + current_settings_hash = generate_indexing_settings_hash( + selected_folders, selected_files, indexing_options + ) + last_settings_hash = connector_config.get("last_indexed_settings_hash") + + # Detect if settings changed since last index + settings_changed = ( + last_settings_hash is not None + and current_settings_hash != last_settings_hash + ) + + if settings_changed: + logger.info( + f"Indexing settings changed for connector {connector_id}. " + f"Will perform full re-scan to apply new configuration." + ) + # Check for stored page token for delta sync stored_page_token = connector_config.get("drive_page_token") - use_delta_sync = stored_page_token and connector.last_indexed_at - max_files_per_folder = indexing_options.get("max_files_per_folder", 100) - include_subfolders = indexing_options.get("include_subfolders", True) + # Determine whether to use delta sync: + # - Must have a stored page token + # - Must have been indexed before (last_indexed_at exists) + # - User must have incremental_sync enabled + # - Settings must not have changed (folder/subfolder config) + use_delta_sync = ( + incremental_sync + and stored_page_token + and connector.last_indexed_at + and not settings_changed + ) # Route to delta sync or full scan if use_delta_sync: @@ -588,6 +700,14 @@ async def index_composio_google_drive( elif token_error: logger.warning(f"Failed to get new page token: {token_error}") + # Save current settings hash for future change detection + # This allows detecting when folder/subfolder settings change + if not connector.config: + connector.config = {} + connector.config["last_indexed_settings_hash"] = current_settings_hash + flag_modified(connector, "config") + logger.info(f"Saved indexing settings hash for connector {connector_id}") + # CRITICAL: Always update timestamp so Electric SQL syncs and UI shows indexed status await update_connector_last_indexed(session, connector, update_last_indexed) @@ -628,11 +748,11 @@ async def index_composio_google_drive( }, ) - return documents_indexed, error_message + return documents_indexed, documents_skipped, error_message except Exception as e: logger.error(f"Failed to index Google Drive via Composio: {e!s}", exc_info=True) - return 0, f"Failed to index Google Drive via Composio: {e!s}" + return 0, 0, f"Failed to index Google Drive via Composio: {e!s}" async def _index_composio_drive_delta_sync( @@ -953,13 +1073,28 @@ async def _process_single_drive_file( """ processing_errors = [] + # ========== EARLY DUPLICATE CHECK BY FILE ID ========== + # Check if this Google Drive file was already indexed by ANY connector + # This happens BEFORE download/ETL to save expensive API calls + existing_by_file_id = await check_document_by_google_drive_file_id( + session, file_id, search_space_id + ) + if existing_by_file_id: + logger.info( + f"Skipping file {file_name} (file_id={file_id}): already indexed " + f"by {existing_by_file_id.document_type.value} as '{existing_by_file_id.title}' " + f"(saved download & ETL cost)" + ) + return 0, 1, processing_errors # Skip - NO download, NO ETL! + # ====================================================== + # Generate unique identifier hash document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"]) unique_identifier_hash = generate_unique_identifier_hash( document_type, f"drive_{file_id}", search_space_id ) - # Check if document exists + # Check if document exists by unique identifier (same connector, same file) existing_document = await check_document_by_unique_identifier( session, unique_identifier_hash ) @@ -1000,7 +1135,7 @@ async def _process_single_drive_file( if existing_document: if existing_document.content_hash == content_hash: - return 0, 1, processing_errors # Skipped + return 0, 1, processing_errors # Skipped - unchanged # Update existing document user_llm = await get_user_long_context_llm(session, user_id, search_space_id) @@ -1039,7 +1174,19 @@ async def _process_single_drive_file( existing_document.chunks = chunks existing_document.updated_at = get_current_timestamp() - return 1, 0, processing_errors # Indexed + return 1, 0, processing_errors # Indexed - updated + + # Check if content_hash already exists (from any connector) + # This prevents duplicate content and avoids IntegrityError on unique constraint + existing_by_content_hash = await check_document_by_content_hash( + session, content_hash + ) + if existing_by_content_hash: + logger.info( + f"Skipping file {file_name} (file_id={file_id}): identical content " + f"already indexed as '{existing_by_content_hash.title}'" + ) + return 0, 1, processing_errors # Skipped - duplicate content # Create new document user_llm = await get_user_long_context_llm(session, user_id, search_space_id) @@ -1085,7 +1232,7 @@ async def _process_single_drive_file( ) session.add(document) - return 1, 0, processing_errors # Indexed + return 1, 0, processing_errors # Indexed - new async def _fetch_folder_files_recursively( diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 191c6f954..2237ddfa8 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -1180,7 +1180,8 @@ async def _run_indexing_with_notifications( ) # Run the indexing function - documents_processed, error_or_warning = await indexing_function( + # Some indexers return (indexed, error), others return (indexed, skipped, error) + result = await indexing_function( session=session, connector_id=connector_id, search_space_id=search_space_id, @@ -1190,6 +1191,13 @@ async def _run_indexing_with_notifications( update_last_indexed=False, ) + # Handle both 2-tuple and 3-tuple returns for backwards compatibility + if len(result) == 3: + documents_processed, documents_skipped, error_or_warning = result + else: + documents_processed, error_or_warning = result + documents_skipped = None + # Update connector timestamp if function provided and indexing was successful if documents_processed > 0 and update_timestamp_func: # Update notification to storing stage @@ -1216,6 +1224,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=documents_processed, error_message=error_or_warning, # Show errors even if some documents were indexed + skipped_count=documents_skipped, ) await ( session.commit() @@ -1242,6 +1251,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=documents_processed, error_message=error_or_warning, # Show errors even if some documents were indexed + skipped_count=documents_skipped, ) await ( session.commit() @@ -1283,6 +1293,7 @@ async def _run_indexing_with_notifications( indexed_count=0, error_message=notification_message, # Pass as warning, not error is_warning=True, # Flag to indicate this is a warning, not an error + skipped_count=documents_skipped, ) await ( session.commit() @@ -1298,6 +1309,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=0, error_message=error_or_warning, + skipped_count=documents_skipped, ) await ( session.commit() @@ -1319,6 +1331,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=0, error_message=None, # No error - sync succeeded + skipped_count=documents_skipped, ) await ( session.commit() @@ -1336,6 +1349,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=0, error_message=str(e), + skipped_count=None, # Unknown on exception ) except Exception as notif_error: logger.error(f"Failed to update notification: {notif_error!s}") diff --git a/surfsense_backend/app/services/notification_service.py b/surfsense_backend/app/services/notification_service.py index 04f39d8ef..34acbad88 100644 --- a/surfsense_backend/app/services/notification_service.py +++ b/surfsense_backend/app/services/notification_service.py @@ -336,6 +336,7 @@ async def notify_indexing_completed( indexed_count: int, error_message: str | None = None, is_warning: bool = False, + skipped_count: int | None = None, ) -> Notification: """ Update notification when connector indexing completes. @@ -346,6 +347,7 @@ async def notify_indexing_completed( indexed_count: Total number of items indexed error_message: Error message if indexing failed, or warning message (optional) is_warning: If True, treat error_message as a warning (success case) rather than an error + skipped_count: Number of items skipped (e.g., duplicates) - optional Returns: Updated notification @@ -354,6 +356,14 @@ async def notify_indexing_completed( "connector_name", "Connector" ) + # Build the skipped text if there are skipped items + skipped_text = "" + if skipped_count and skipped_count > 0: + skipped_item_text = "item" if skipped_count == 1 else "items" + skipped_text = ( + f" ({skipped_count} {skipped_item_text} skipped - already indexed)" + ) + # If there's an error message but items were indexed, treat it as a warning (partial success) # If is_warning is True, treat it as success even with 0 items (e.g., duplicates found) # Otherwise, treat it as a failure @@ -362,12 +372,12 @@ async def notify_indexing_completed( # Partial success with warnings (e.g., duplicate content from other connectors) title = f"Ready: {connector_name}" item_text = "item" if indexed_count == 1 else "items" - message = f"Now searchable! {indexed_count} {item_text} synced. Note: {error_message}" + message = f"Now searchable! {indexed_count} {item_text} synced{skipped_text}. Note: {error_message}" status = "completed" elif is_warning: # Warning case (e.g., duplicates found) - treat as success title = f"Ready: {connector_name}" - message = f"Sync completed. {error_message}" + message = f"Sync completed{skipped_text}. {error_message}" status = "completed" else: # Complete failure @@ -377,14 +387,21 @@ async def notify_indexing_completed( else: title = f"Ready: {connector_name}" if indexed_count == 0: - message = "Already up to date! No new items to sync." + if skipped_count and skipped_count > 0: + skipped_item_text = "item" if skipped_count == 1 else "items" + message = f"Already up to date! {skipped_count} {skipped_item_text} skipped (already indexed)." + else: + message = "Already up to date! No new items to sync." else: item_text = "item" if indexed_count == 1 else "items" - message = f"Now searchable! {indexed_count} {item_text} synced." + message = ( + f"Now searchable! {indexed_count} {item_text} synced{skipped_text}." + ) status = "completed" metadata_updates = { "indexed_count": indexed_count, + "skipped_count": skipped_count or 0, "sync_stage": "completed" if (not error_message or is_warning or indexed_count > 0) else "failed", diff --git a/surfsense_backend/app/tasks/composio_indexer.py b/surfsense_backend/app/tasks/composio_indexer.py index f97652114..ffc4a1f27 100644 --- a/surfsense_backend/app/tasks/composio_indexer.py +++ b/surfsense_backend/app/tasks/composio_indexer.py @@ -86,7 +86,7 @@ async def index_composio_connector( end_date: str | None = None, update_last_indexed: bool = True, max_items: int = 1000, -) -> tuple[int, str]: +) -> tuple[int, int, str | None]: """ Index content from a Composio connector. @@ -104,7 +104,7 @@ async def index_composio_connector( max_items: Maximum number of items to fetch Returns: - Tuple of (number_of_indexed_items, error_message or None) + Tuple of (number_of_indexed_items, number_of_skipped_items, error_message or None) """ task_logger = TaskLoggingService(session, search_space_id) @@ -132,14 +132,14 @@ async def index_composio_connector( await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "InvalidConnectorType"} ) - return 0, error_msg + return 0, 0, error_msg if not connector: error_msg = f"Composio connector with ID {connector_id} not found" await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "ConnectorNotFound"} ) - return 0, error_msg + return 0, 0, error_msg # Get toolkit ID from config toolkit_id = connector.config.get("toolkit_id") @@ -150,7 +150,7 @@ async def index_composio_connector( await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "MissingToolkitId"} ) - return 0, error_msg + return 0, 0, error_msg # Check if toolkit is indexable if toolkit_id not in INDEXABLE_TOOLKITS: @@ -158,7 +158,7 @@ async def index_composio_connector( await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "ToolkitNotIndexable"} ) - return 0, error_msg + return 0, 0, error_msg # Get indexer function from registry try: @@ -167,7 +167,7 @@ async def index_composio_connector( await task_logger.log_task_failure( log_entry, str(e), {"error_type": "NoIndexerImplemented"} ) - return 0, str(e) + return 0, 0, str(e) # Build kwargs for the indexer function kwargs = { @@ -199,7 +199,7 @@ async def index_composio_connector( {"error_type": "SQLAlchemyError"}, ) logger.error(f"Database error: {db_error!s}", exc_info=True) - return 0, f"Database error: {db_error!s}" + return 0, 0, f"Database error: {db_error!s}" except Exception as e: await session.rollback() await task_logger.log_task_failure( @@ -209,4 +209,4 @@ async def index_composio_connector( {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Composio connector: {e!s}", exc_info=True) - return 0, f"Failed to index Composio connector: {e!s}" + return 0, 0, f"Failed to index Composio connector: {e!s}" diff --git a/surfsense_web/components/assistant-ui/connector-popup/config/connector-status-config.json b/surfsense_web/components/assistant-ui/connector-popup/config/connector-status-config.json index 9c8585a0f..b729c3f8b 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/config/connector-status-config.json +++ b/surfsense_web/components/assistant-ui/connector-popup/config/connector-status-config.json @@ -24,11 +24,6 @@ "enabled": true, "status": "warning", "statusMessage": "Some requests may be blocked if not using Firecrawl." - }, - "COMPOSIO_GOOGLE_DRIVE_CONNECTOR": { - "enabled": false, - "status": "disabled", - "statusMessage": "Not available yet." } }, "globalSettings": {