From 7ad180d3f25d7f970e1bc2b28ef744bd7f0702c5 Mon Sep 17 00:00:00 2001 From: viragtripathi Date: Fri, 28 Nov 2025 00:31:21 -0500 Subject: [PATCH] feat(cockroachdb): Add complete UI configuration and cloud support - Add frontend Streamlit UI configuration - Logo, parameter types, and input configurations - Loading and performance config for all index parameters - Add SSL/TLS support for CockroachDB Cloud clusters - Support verify-full, verify-ca, require, disable SSL modes - CLI parameters: --sslmode, --sslrootcert - SSL certificates passed to connection pool and optimize connections - Optimize connection handling for multi-node clusters - Connection pooling for concurrent searches (enables true parallelism) - Settings in connection string to reduce per-connection overhead - Dedicated connection without statement_timeout for index creation - Handles background index creation with timeout detection and polling - Handle CockroachDB-specific schema changes - Cancel running schema jobs before DROP TABLE (prevents blocking) - Transaction conflict retry logic for aborted transactions - All linting checks passing --- .../backend/clients/cockroachdb/cli.py | 21 +++ .../clients/cockroachdb/cockroachdb.py | 146 ++++++++++++++---- .../backend/clients/cockroachdb/config.py | 30 +++- .../backend/clients/cockroachdb/db_retry.py | 2 + .../frontend/config/dbCaseConfigs.py | 77 +++++++++ vectordb_bench/frontend/config/styles.py | 1 + vectordb_bench/models.py | 6 + 7 files changed, 244 insertions(+), 39 deletions(-) diff --git a/vectordb_bench/backend/clients/cockroachdb/cli.py b/vectordb_bench/backend/clients/cockroachdb/cli.py index 1a21d7921..a32dfe255 100644 --- a/vectordb_bench/backend/clients/cockroachdb/cli.py +++ b/vectordb_bench/backend/clients/cockroachdb/cli.py @@ -39,6 +39,25 @@ class CockroachDBTypedDict(CommonTypedDict): str, click.option("--db-name", type=str, help="Database name", required=True), ] + sslmode: Annotated[ + str, + click.option( + "--sslmode", + type=str, + help="SSL mode (disable, require, verify-ca, verify-full)", + default="disable", + show_default=True, + ), + ] + sslrootcert: Annotated[ + str | None, + click.option( + "--sslrootcert", + type=str, + help="Path to SSL root certificate (required for verify-ca, verify-full)", + default=None, + ), + ] min_partition_size: Annotated[ int | None, click.option( @@ -99,6 +118,8 @@ def CockroachDB( host=parameters["host"], port=parameters["port"], db_name=parameters["db_name"], + sslmode=parameters.get("sslmode", "disable"), + sslrootcert=parameters.get("sslrootcert"), ), db_case_config=CockroachDBVectorIndexConfig( metric_type=metric_type, diff --git a/vectordb_bench/backend/clients/cockroachdb/cockroachdb.py b/vectordb_bench/backend/clients/cockroachdb/cockroachdb.py index d76d75557..83fbcaf6b 100644 --- a/vectordb_bench/backend/clients/cockroachdb/cockroachdb.py +++ b/vectordb_bench/backend/clients/cockroachdb/cockroachdb.py @@ -64,8 +64,14 @@ def __init__( # noqa: PLR0915 "user": db_config.get("user_name", "root"), "password": db_config.get("password", ""), } - # Add sslmode if specified, otherwise default to disable for local dev + # Add SSL configuration if specified conn_params["sslmode"] = db_config.get("sslmode", "disable") + if db_config.get("sslrootcert"): + conn_params["sslrootcert"] = db_config["sslrootcert"] + if db_config.get("sslcert"): + conn_params["sslcert"] = db_config["sslcert"] + if db_config.get("sslkey"): + conn_params["sslkey"] = db_config["sslkey"] self.connect_config = conn_params self.pool_size = db_config.get("pool_size", 100) @@ -117,8 +123,7 @@ def __init__( # noqa: PLR0915 cursor = conn.cursor() if drop_old: - if self.case_config is not None: - self._drop_index() # Use SQLAlchemy + # DROP TABLE CASCADE will automatically drop indexes, no need to drop separately self._drop_table(cursor, conn) self._create_table(cursor, conn, dim) if self.case_config is not None and self.case_config.create_index_before_load: @@ -139,6 +144,12 @@ def _create_connection(**kwargs) -> tuple[Connection, Cursor]: def _create_connection_pool(self) -> ConnectionPool: """Create connection pool with production settings.""" # Build connection info without 'options' parameter (not supported by psycopg_pool) + # Include vector_search_beam_size in connection options for performance + beam_size = 32 + if self.case_config is not None: + search_param = self.case_config.search_param() + beam_size = search_param.get("vector_search_beam_size", 32) + conninfo = ( f"host={self.connect_config['host']} " f"port={self.connect_config['port']} " @@ -147,23 +158,23 @@ def _create_connection_pool(self) -> ConnectionPool: f"password={self.connect_config['password']}" ) - # Add sslmode if present + # Add SSL configuration if present if "sslmode" in self.connect_config: conninfo += f" sslmode={self.connect_config['sslmode']}" + if "sslrootcert" in self.connect_config: + conninfo += f" sslrootcert={self.connect_config['sslrootcert']}" + if "sslcert" in self.connect_config: + conninfo += f" sslcert={self.connect_config['sslcert']}" + if "sslkey" in self.connect_config: + conninfo += f" sslkey={self.connect_config['sslkey']}" - # Add statement timeout for long-running vector index operations - conninfo += " options='-c statement_timeout=600s'" + # Add all settings in connection options to avoid per-connection overhead + conninfo += f" options='-c statement_timeout=600s -c vector_search_beam_size={beam_size}'" - # Configure each connection with vector support and search parameters + # Configure each connection with vector support (lightweight operation) def configure_connection(conn: Connection) -> None: register_vector(conn) - # Set vector_search_beam_size on every connection for index usage - if self.case_config is not None: - search_param = self.case_config.search_param() - beam_size = search_param.get("vector_search_beam_size", 32) - with conn.cursor() as cur: - cur.execute(f"SET vector_search_beam_size = {beam_size}") - conn.commit() + # No need to set beam_size here - it's in connection options return ConnectionPool( conninfo=conninfo, @@ -211,9 +222,50 @@ def init(self) -> Generator[None, None, None]: self.conn = None self.pool = None + def _cancel_running_schema_jobs(self): + """Cancel any running schema change jobs for this table. + CockroachDB-specific: Running CREATE INDEX jobs block DROP TABLE.""" + import psycopg + + try: + conn = psycopg.connect(**self.connect_config) + conn.autocommit = True + cursor = conn.cursor() + + # Find running schema change jobs for our table + cursor.execute( + """ + SELECT job_id + FROM [SHOW JOBS] + WHERE status IN ('running', 'pending') + AND job_type = 'NEW SCHEMA CHANGE' + AND description LIKE %s + """, + (f"%{self.table_name}%",), + ) + jobs = cursor.fetchall() + + for job in jobs: + job_id = job[0] + log.warning(f"{self.name} canceling schema job {job_id} before dropping table") + try: + cursor.execute(f"CANCEL JOB {job_id}") + log.info(f"Canceled job {job_id}") + except Exception as e: + log.warning(f"Failed to cancel job {job_id}: {e}") + + cursor.close() + conn.close() + except Exception as e: + log.warning(f"Failed to check/cancel running jobs: {e}") + @db_retry(max_attempts=3, initial_delay=0.5, backoff_factor=2.0) def _drop_table(self, cursor: Cursor, conn: Connection): - """Drop table with retry logic.""" + """Drop table with retry logic. + Note: CockroachDB-specific - must cancel running schema jobs first.""" + # Cancel any running schema change jobs that would block DROP + self._cancel_running_schema_jobs() + log.info(f"{self.name} dropping table: {self.table_name}") cursor.execute( sql.SQL("DROP TABLE IF EXISTS {table_name} CASCADE").format( @@ -223,7 +275,8 @@ def _drop_table(self, cursor: Cursor, conn: Connection): conn.commit() def _drop_index(self): - """Drop CockroachDB vector index if it exists (DDL with autocommit).""" + """Drop CockroachDB vector index if it exists (DDL with autocommit). + Note: This is typically not needed as DROP TABLE CASCADE handles it.""" log.info(f"{self.name} dropping index: {self._index_name}") conn = psycopg.connect(**self.connect_config) conn.autocommit = True @@ -397,25 +450,46 @@ def optimize(self, data_size: int | None = None): start_time = time.time() connection_closed = False - # Try to create index + # Try to create index - use a connection without statement_timeout try: - with self.pool.connection() as conn: + # Create connection without statement_timeout for long-running index creation + import psycopg + + conninfo_no_timeout = ( + f"host={self.connect_config['host']} " + f"port={self.connect_config['port']} " + f"dbname={self.connect_config['dbname']} " + f"user={self.connect_config['user']} " + f"password={self.connect_config['password']}" + ) + if "sslmode" in self.connect_config: + conninfo_no_timeout += f" sslmode={self.connect_config['sslmode']}" + if "sslrootcert" in self.connect_config: + conninfo_no_timeout += f" sslrootcert={self.connect_config['sslrootcert']}" + if "sslcert" in self.connect_config: + conninfo_no_timeout += f" sslcert={self.connect_config['sslcert']}" + if "sslkey" in self.connect_config: + conninfo_no_timeout += f" sslkey={self.connect_config['sslkey']}" + + with psycopg.connect(conninfo_no_timeout, autocommit=True) as conn: register_vector(conn) - conn.autocommit = True - cursor = conn.cursor() - try: + with conn.cursor() as cursor: cursor.execute(sql_str) elapsed = time.time() - start_time log.info(f"{self.name} index created successfully in {elapsed:.1f}s") return # Success! - finally: - cursor.close() except Exception as e: elapsed = time.time() - start_time - # Check if this is the expected 30s timeout on multi-node clusters - if "server closed the connection" in str(e) or "connection" in str(e).lower(): - log.warning(f"Connection closed after {elapsed:.1f}s during index creation: {e}") - log.info("This is expected on multi-node clusters - checking if index was created...") + error_msg = str(e) + # Check for timeout or connection issues + if ( + "server closed the connection" in error_msg + or "statement timeout" in error_msg.lower() + or "query execution canceled" in error_msg.lower() + or "connection" in error_msg.lower() + ): + log.warning(f"Timeout/connection issue after {elapsed:.1f}s during index creation: {e}") + log.info("This is expected on large datasets - checking if index is being created in background...") connection_closed = True else: # Unexpected error, re-raise @@ -495,8 +569,7 @@ def search_embedding( **kwargs: Any, ) -> list[int]: """Search for k nearest neighbors using vector index.""" - assert self.conn is not None, "Connection is not initialized" - assert self.cursor is not None, "Cursor is not initialized" + assert self.pool is not None, "Connection pool is not initialized" # Use default L2 distance if no case_config provided if self.case_config is not None: @@ -520,5 +593,16 @@ def search_embedding( metric_op=sql.SQL(metric_op), ) - result = self.cursor.execute(full_sql, (q, k), prepare=True, binary=True) - return [int(i[0]) for i in result.fetchall()] + # Get a connection from the pool for this query (enables true concurrency) + # Pool returns already-configured connections with vector support + with self.pool.connection() as conn, conn.cursor() as cursor: + try: + result = cursor.execute(full_sql, (q, k), prepare=True, binary=True) + return [int(i[0]) for i in result.fetchall()] + except Exception as e: + # If transaction is aborted, rollback and retry + if "transaction is aborted" in str(e).lower(): + conn.rollback() + result = cursor.execute(full_sql, (q, k), prepare=True, binary=True) + return [int(i[0]) for i in result.fetchall()] + raise diff --git a/vectordb_bench/backend/clients/cockroachdb/config.py b/vectordb_bench/backend/clients/cockroachdb/config.py index 79e2ff912..0d608da8f 100644 --- a/vectordb_bench/backend/clients/cockroachdb/config.py +++ b/vectordb_bench/backend/clients/cockroachdb/config.py @@ -34,20 +34,34 @@ class CockroachDBConfig(DBConfig): max_overflow: int = 100 pool_recycle: int = 3600 connect_timeout: int = 10 + sslmode: str = "disable" # Options: disable, require, verify-ca, verify-full + sslrootcert: str | None = None # Path to CA cert (for verify-ca, verify-full) + sslcert: str | None = None # Path to client cert (for mutual TLS) + sslkey: str | None = None # Path to client key (for mutual TLS) def to_dict(self) -> CockroachDBConfigDict: user_str = self.user_name.get_secret_value() if isinstance(self.user_name, SecretStr) else self.user_name pwd_str = self.password.get_secret_value() if self.password else "" + connect_config = { + "host": self.host, + "port": self.port, + "dbname": self.db_name, + "user": user_str, + "password": pwd_str, + "sslmode": self.sslmode, + } + + # Add SSL certificate paths if provided + if self.sslrootcert: + connect_config["sslrootcert"] = self.sslrootcert + if self.sslcert: + connect_config["sslcert"] = self.sslcert + if self.sslkey: + connect_config["sslkey"] = self.sslkey + return { - "connect_config": { - "host": self.host, - "port": self.port, - "dbname": self.db_name, - "user": user_str, - "password": pwd_str, - "sslmode": "disable", # Default for local dev; production should override - }, + "connect_config": connect_config, "table_name": self.table_name, "pool_size": self.pool_size, "max_overflow": self.max_overflow, diff --git a/vectordb_bench/backend/clients/cockroachdb/db_retry.py b/vectordb_bench/backend/clients/cockroachdb/db_retry.py index 3481c321b..a838c029c 100644 --- a/vectordb_bench/backend/clients/cockroachdb/db_retry.py +++ b/vectordb_bench/backend/clients/cockroachdb/db_retry.py @@ -39,6 +39,8 @@ "no such host", "initial connection heartbeat failed", "sending to all replicas failed", + "transaction is aborted", + "commands ignored until end of transaction block", "40001", "40003", ) diff --git a/vectordb_bench/frontend/config/dbCaseConfigs.py b/vectordb_bench/frontend/config/dbCaseConfigs.py index 6dd8a6e19..e37d1570d 100644 --- a/vectordb_bench/frontend/config/dbCaseConfigs.py +++ b/vectordb_bench/frontend/config/dbCaseConfigs.py @@ -1531,6 +1531,64 @@ class CaseConfigInput(BaseModel): }, ) +# CockroachDB configs +CaseConfigParamInput_IndexType_CockroachDB = CaseConfigInput( + label=CaseConfigParamType.IndexType, + inputHelp="Select Index Type (all use C-SPANN vector index)", + inputType=InputType.Option, + inputConfig={ + "options": [ + IndexType.Flat.value, + IndexType.HNSW.value, + IndexType.IVFFlat.value, + ], + }, +) + +CaseConfigParamInput_MinPartitionSize_CockroachDB = CaseConfigInput( + label=CaseConfigParamType.min_partition_size, + inputHelp="Minimum partition size (1-1024)", + inputType=InputType.Number, + inputConfig={ + "min": 1, + "max": 1024, + "value": 16, + }, +) + +CaseConfigParamInput_MaxPartitionSize_CockroachDB = CaseConfigInput( + label=CaseConfigParamType.max_partition_size, + inputHelp="Maximum partition size", + inputType=InputType.Number, + inputConfig={ + "min": 1, + "max": 10000, + "value": 128, + }, +) + +CaseConfigParamInput_BuildBeamSize_CockroachDB = CaseConfigInput( + label=CaseConfigParamType.build_beam_size, + inputHelp="Build beam size for index creation", + inputType=InputType.Number, + inputConfig={ + "min": 1, + "max": 100, + "value": 8, + }, +) + +CaseConfigParamInput_VectorSearchBeamSize_CockroachDB = CaseConfigInput( + label=CaseConfigParamType.vector_search_beam_size, + inputHelp="Vector search beam size", + inputType=InputType.Number, + inputConfig={ + "min": 1, + "max": 1000, + "value": 32, + }, +) + CaseConfigParamInput_IndexType_MariaDB = CaseConfigInput( label=CaseConfigParamType.IndexType, inputHelp="Select Index Type", @@ -2051,6 +2109,21 @@ class CaseConfigInput(BaseModel): CaseConfigParamInput_MongoDBNumCandidatesRatio, ] +CockroachDBLoadingConfig = [ + CaseConfigParamInput_IndexType_CockroachDB, + CaseConfigParamInput_MinPartitionSize_CockroachDB, + CaseConfigParamInput_MaxPartitionSize_CockroachDB, + CaseConfigParamInput_BuildBeamSize_CockroachDB, + CaseConfigParamInput_VectorSearchBeamSize_CockroachDB, +] +CockroachDBPerformanceConfig = [ + CaseConfigParamInput_IndexType_CockroachDB, + CaseConfigParamInput_MinPartitionSize_CockroachDB, + CaseConfigParamInput_MaxPartitionSize_CockroachDB, + CaseConfigParamInput_BuildBeamSize_CockroachDB, + CaseConfigParamInput_VectorSearchBeamSize_CockroachDB, +] + MariaDBLoadingConfig = [ CaseConfigParamInput_IndexType_MariaDB, CaseConfigParamInput_StorageEngine_MariaDB, @@ -2314,6 +2387,10 @@ class CaseConfigInput(BaseModel): CaseLabel.Load: AliSQLLoadingConfig, CaseLabel.Performance: AliSQLPerformanceConfig, }, + DB.CockroachDB: { + CaseLabel.Load: CockroachDBLoadingConfig, + CaseLabel.Performance: CockroachDBPerformanceConfig, + }, } diff --git a/vectordb_bench/frontend/config/styles.py b/vectordb_bench/frontend/config/styles.py index 65e891217..bce4561fd 100644 --- a/vectordb_bench/frontend/config/styles.py +++ b/vectordb_bench/frontend/config/styles.py @@ -70,6 +70,7 @@ def getPatternShape(i): DB.Hologres: "https://img.alicdn.com/imgextra/i3/O1CN01d9qrry1i6lTNa2BRa_!!6000000004364-2-tps-218-200.png", DB.Doris: "https://doris.apache.org/images/logo.svg", DB.TurboPuffer: "https://turbopuffer.com/logo2.png", + DB.CockroachDB: "https://raw.githubusercontent.com/cockroachdb/cockroach/master/docs/media/cockroach_db.png", } # RedisCloud color: #0D6EFD diff --git a/vectordb_bench/models.py b/vectordb_bench/models.py index 5ffd0e70f..34ab2398a 100644 --- a/vectordb_bench/models.py +++ b/vectordb_bench/models.py @@ -129,6 +129,12 @@ class CaseConfigParamType(Enum): replication_type = "replication_type" cache_size = "cache_size" + # CockroachDB parameters + min_partition_size = "min_partition_size" + max_partition_size = "max_partition_size" + build_beam_size = "build_beam_size" + vector_search_beam_size = "vector_search_beam_size" + dataset_with_size_type = "dataset_with_size_type" filter_rate = "filter_rate" insert_rate = "insert_rate"