Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 157 additions & 10 deletions surfsense_backend/app/connectors/composio_google_drive_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
Provides Google Drive specific methods for data retrieval and indexing via Composio.
"""

import hashlib
import json
import logging
import os
import tempfile
Expand Down Expand Up @@ -464,6 +466,55 @@ async def check_document_by_unique_identifier(
return existing_doc_result.scalars().first()


async def check_document_by_content_hash(
session: AsyncSession, content_hash: str
) -> Document | None:
"""Check if a document with the given content hash already exists.

This is used to prevent duplicate content from being indexed, regardless
of which connector originally indexed it.
"""
from sqlalchemy.future import select

existing_doc_result = await session.execute(
select(Document).where(Document.content_hash == content_hash)
)
return existing_doc_result.scalars().first()


async def check_document_by_google_drive_file_id(
session: AsyncSession, file_id: str, search_space_id: int
) -> Document | None:
"""Check if a document with this Google Drive file ID exists (from any connector).

This checks both metadata key formats:
- 'google_drive_file_id' (normal Google Drive connector)
- 'file_id' (Composio Google Drive connector)

This allows detecting duplicates BEFORE downloading/ETL, saving expensive API calls.
"""
from sqlalchemy import String, cast, or_
from sqlalchemy.future import select

# When casting JSON to String, the result includes quotes: "value" instead of value
# So we need to compare with the quoted version
quoted_file_id = f'"{file_id}"'

existing_doc_result = await session.execute(
select(Document).where(
Document.search_space_id == search_space_id,
or_(
# Normal Google Drive connector format
cast(Document.document_metadata["google_drive_file_id"], String)
== quoted_file_id,
# Composio Google Drive connector format
cast(Document.document_metadata["file_id"], String) == quoted_file_id,
),
)
)
return existing_doc_result.scalars().first()


async def update_connector_last_indexed(
session: AsyncSession,
connector,
Expand All @@ -477,6 +528,33 @@ async def update_connector_last_indexed(
logger.info(f"Updated last_indexed_at to {connector.last_indexed_at}")


def generate_indexing_settings_hash(
selected_folders: list[dict],
selected_files: list[dict],
indexing_options: dict,
) -> str:
"""Generate a hash of indexing settings to detect configuration changes.

This hash is used to determine if indexing settings have changed since
the last index, which would require a full re-scan instead of delta sync.

Args:
selected_folders: List of {id, name} for folders to index
selected_files: List of {id, name} for individual files to index
indexing_options: Dict with max_files_per_folder, include_subfolders, etc.

Returns:
MD5 hash string of the settings
"""
settings = {
"folders": sorted([f.get("id", "") for f in selected_folders]),
"files": sorted([f.get("id", "") for f in selected_files]),
"include_subfolders": indexing_options.get("include_subfolders", True),
"max_files_per_folder": indexing_options.get("max_files_per_folder", 100),
}
return hashlib.md5(json.dumps(settings, sort_keys=True).encode()).hexdigest()


async def index_composio_google_drive(
session: AsyncSession,
connector,
Expand All @@ -487,12 +565,16 @@ async def index_composio_google_drive(
log_entry,
update_last_indexed: bool = True,
max_items: int = 1000,
) -> tuple[int, str]:
) -> tuple[int, int, str | None]:
"""Index Google Drive files via Composio with delta sync support.

Returns:
Tuple of (documents_indexed, documents_skipped, error_message or None)

Delta Sync Flow:
1. First sync: Full scan + get initial page token
2. Subsequent syncs: Use LIST_CHANGES to process only changed files
(unless settings changed or incremental_sync is disabled)

Supports folder/file selection via connector config:
- selected_folders: List of {id, name} for folders to index
Expand All @@ -508,12 +590,42 @@ async def index_composio_google_drive(
selected_files = connector_config.get("selected_files", [])
indexing_options = connector_config.get("indexing_options", {})

max_files_per_folder = indexing_options.get("max_files_per_folder", 100)
include_subfolders = indexing_options.get("include_subfolders", True)
incremental_sync = indexing_options.get("incremental_sync", True)

# Generate current settings hash to detect configuration changes
current_settings_hash = generate_indexing_settings_hash(
selected_folders, selected_files, indexing_options
)
last_settings_hash = connector_config.get("last_indexed_settings_hash")

# Detect if settings changed since last index
settings_changed = (
last_settings_hash is not None
and current_settings_hash != last_settings_hash
)

if settings_changed:
logger.info(
f"Indexing settings changed for connector {connector_id}. "
f"Will perform full re-scan to apply new configuration."
)

# Check for stored page token for delta sync
stored_page_token = connector_config.get("drive_page_token")
use_delta_sync = stored_page_token and connector.last_indexed_at

max_files_per_folder = indexing_options.get("max_files_per_folder", 100)
include_subfolders = indexing_options.get("include_subfolders", True)
# Determine whether to use delta sync:
# - Must have a stored page token
# - Must have been indexed before (last_indexed_at exists)
# - User must have incremental_sync enabled
# - Settings must not have changed (folder/subfolder config)
use_delta_sync = (
incremental_sync
and stored_page_token
and connector.last_indexed_at
and not settings_changed
)

# Route to delta sync or full scan
if use_delta_sync:
Expand Down Expand Up @@ -588,6 +700,14 @@ async def index_composio_google_drive(
elif token_error:
logger.warning(f"Failed to get new page token: {token_error}")

# Save current settings hash for future change detection
# This allows detecting when folder/subfolder settings change
if not connector.config:
connector.config = {}
connector.config["last_indexed_settings_hash"] = current_settings_hash
flag_modified(connector, "config")
logger.info(f"Saved indexing settings hash for connector {connector_id}")

# CRITICAL: Always update timestamp so Electric SQL syncs and UI shows indexed status
await update_connector_last_indexed(session, connector, update_last_indexed)

Expand Down Expand Up @@ -628,11 +748,11 @@ async def index_composio_google_drive(
},
)

return documents_indexed, error_message
return documents_indexed, documents_skipped, error_message

except Exception as e:
logger.error(f"Failed to index Google Drive via Composio: {e!s}", exc_info=True)
return 0, f"Failed to index Google Drive via Composio: {e!s}"
return 0, 0, f"Failed to index Google Drive via Composio: {e!s}"


async def _index_composio_drive_delta_sync(
Expand Down Expand Up @@ -953,13 +1073,28 @@ async def _process_single_drive_file(
"""
processing_errors = []

# ========== EARLY DUPLICATE CHECK BY FILE ID ==========
# Check if this Google Drive file was already indexed by ANY connector
# This happens BEFORE download/ETL to save expensive API calls
existing_by_file_id = await check_document_by_google_drive_file_id(
session, file_id, search_space_id
)
if existing_by_file_id:
logger.info(
f"Skipping file {file_name} (file_id={file_id}): already indexed "
f"by {existing_by_file_id.document_type.value} as '{existing_by_file_id.title}' "
f"(saved download & ETL cost)"
)
return 0, 1, processing_errors # Skip - NO download, NO ETL!
# ======================================================

# Generate unique identifier hash
document_type = DocumentType(TOOLKIT_TO_DOCUMENT_TYPE["googledrive"])
unique_identifier_hash = generate_unique_identifier_hash(
document_type, f"drive_{file_id}", search_space_id
)

# Check if document exists
# Check if document exists by unique identifier (same connector, same file)
existing_document = await check_document_by_unique_identifier(
session, unique_identifier_hash
)
Expand Down Expand Up @@ -1000,7 +1135,7 @@ async def _process_single_drive_file(

if existing_document:
if existing_document.content_hash == content_hash:
return 0, 1, processing_errors # Skipped
return 0, 1, processing_errors # Skipped - unchanged

# Update existing document
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
Expand Down Expand Up @@ -1039,7 +1174,19 @@ async def _process_single_drive_file(
existing_document.chunks = chunks
existing_document.updated_at = get_current_timestamp()

return 1, 0, processing_errors # Indexed
return 1, 0, processing_errors # Indexed - updated

# Check if content_hash already exists (from any connector)
# This prevents duplicate content and avoids IntegrityError on unique constraint
existing_by_content_hash = await check_document_by_content_hash(
session, content_hash
)
if existing_by_content_hash:
logger.info(
f"Skipping file {file_name} (file_id={file_id}): identical content "
f"already indexed as '{existing_by_content_hash.title}'"
)
return 0, 1, processing_errors # Skipped - duplicate content

# Create new document
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
Expand Down Expand Up @@ -1085,7 +1232,7 @@ async def _process_single_drive_file(
)
session.add(document)

return 1, 0, processing_errors # Indexed
return 1, 0, processing_errors # Indexed - new


async def _fetch_folder_files_recursively(
Expand Down
16 changes: 15 additions & 1 deletion surfsense_backend/app/routes/search_source_connectors_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,8 @@ async def _run_indexing_with_notifications(
)

# Run the indexing function
documents_processed, error_or_warning = await indexing_function(
# Some indexers return (indexed, error), others return (indexed, skipped, error)
result = await indexing_function(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
Expand All @@ -1190,6 +1191,13 @@ async def _run_indexing_with_notifications(
update_last_indexed=False,
)

# Handle both 2-tuple and 3-tuple returns for backwards compatibility
if len(result) == 3:
documents_processed, documents_skipped, error_or_warning = result
else:
documents_processed, error_or_warning = result
documents_skipped = None

# Update connector timestamp if function provided and indexing was successful
if documents_processed > 0 and update_timestamp_func:
# Update notification to storing stage
Expand All @@ -1216,6 +1224,7 @@ async def _run_indexing_with_notifications(
notification=notification,
indexed_count=documents_processed,
error_message=error_or_warning, # Show errors even if some documents were indexed
skipped_count=documents_skipped,
)
await (
session.commit()
Expand All @@ -1242,6 +1251,7 @@ async def _run_indexing_with_notifications(
notification=notification,
indexed_count=documents_processed,
error_message=error_or_warning, # Show errors even if some documents were indexed
skipped_count=documents_skipped,
)
await (
session.commit()
Expand Down Expand Up @@ -1283,6 +1293,7 @@ async def _run_indexing_with_notifications(
indexed_count=0,
error_message=notification_message, # Pass as warning, not error
is_warning=True, # Flag to indicate this is a warning, not an error
skipped_count=documents_skipped,
)
await (
session.commit()
Expand All @@ -1298,6 +1309,7 @@ async def _run_indexing_with_notifications(
notification=notification,
indexed_count=0,
error_message=error_or_warning,
skipped_count=documents_skipped,
)
await (
session.commit()
Expand All @@ -1319,6 +1331,7 @@ async def _run_indexing_with_notifications(
notification=notification,
indexed_count=0,
error_message=None, # No error - sync succeeded
skipped_count=documents_skipped,
)
await (
session.commit()
Expand All @@ -1336,6 +1349,7 @@ async def _run_indexing_with_notifications(
notification=notification,
indexed_count=0,
error_message=str(e),
skipped_count=None, # Unknown on exception
)
except Exception as notif_error:
logger.error(f"Failed to update notification: {notif_error!s}")
Expand Down
Loading
Loading