Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions vectordb_bench/backend/clients/cockroachdb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,25 @@ class CockroachDBTypedDict(CommonTypedDict):
str,
click.option("--db-name", type=str, help="Database name", required=True),
]
sslmode: Annotated[
str,
click.option(
"--sslmode",
type=str,
help="SSL mode (disable, require, verify-ca, verify-full)",
default="disable",
show_default=True,
),
]
sslrootcert: Annotated[
str | None,
click.option(
"--sslrootcert",
type=str,
help="Path to SSL root certificate (required for verify-ca, verify-full)",
default=None,
),
]
min_partition_size: Annotated[
int | None,
click.option(
Expand Down Expand Up @@ -99,6 +118,8 @@ def CockroachDB(
host=parameters["host"],
port=parameters["port"],
db_name=parameters["db_name"],
sslmode=parameters.get("sslmode", "disable"),
sslrootcert=parameters.get("sslrootcert"),
),
db_case_config=CockroachDBVectorIndexConfig(
metric_type=metric_type,
Expand Down
146 changes: 115 additions & 31 deletions vectordb_bench/backend/clients/cockroachdb/cockroachdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,14 @@ def __init__( # noqa: PLR0915
"user": db_config.get("user_name", "root"),
"password": db_config.get("password", ""),
}
# Add sslmode if specified, otherwise default to disable for local dev
# Add SSL configuration if specified
conn_params["sslmode"] = db_config.get("sslmode", "disable")
if db_config.get("sslrootcert"):
conn_params["sslrootcert"] = db_config["sslrootcert"]
if db_config.get("sslcert"):
conn_params["sslcert"] = db_config["sslcert"]
if db_config.get("sslkey"):
conn_params["sslkey"] = db_config["sslkey"]

self.connect_config = conn_params
self.pool_size = db_config.get("pool_size", 100)
Expand Down Expand Up @@ -117,8 +123,7 @@ def __init__( # noqa: PLR0915
cursor = conn.cursor()

if drop_old:
if self.case_config is not None:
self._drop_index() # Use SQLAlchemy
# DROP TABLE CASCADE will automatically drop indexes, no need to drop separately
self._drop_table(cursor, conn)
self._create_table(cursor, conn, dim)
if self.case_config is not None and self.case_config.create_index_before_load:
Expand All @@ -139,6 +144,12 @@ def _create_connection(**kwargs) -> tuple[Connection, Cursor]:
def _create_connection_pool(self) -> ConnectionPool:
"""Create connection pool with production settings."""
# Build connection info without 'options' parameter (not supported by psycopg_pool)
# Include vector_search_beam_size in connection options for performance
beam_size = 32
if self.case_config is not None:
search_param = self.case_config.search_param()
beam_size = search_param.get("vector_search_beam_size", 32)

conninfo = (
f"host={self.connect_config['host']} "
f"port={self.connect_config['port']} "
Expand All @@ -147,23 +158,23 @@ def _create_connection_pool(self) -> ConnectionPool:
f"password={self.connect_config['password']}"
)

# Add sslmode if present
# Add SSL configuration if present
if "sslmode" in self.connect_config:
conninfo += f" sslmode={self.connect_config['sslmode']}"
if "sslrootcert" in self.connect_config:
conninfo += f" sslrootcert={self.connect_config['sslrootcert']}"
if "sslcert" in self.connect_config:
conninfo += f" sslcert={self.connect_config['sslcert']}"
if "sslkey" in self.connect_config:
conninfo += f" sslkey={self.connect_config['sslkey']}"

# Add statement timeout for long-running vector index operations
conninfo += " options='-c statement_timeout=600s'"
# Add all settings in connection options to avoid per-connection overhead
conninfo += f" options='-c statement_timeout=600s -c vector_search_beam_size={beam_size}'"

# Configure each connection with vector support and search parameters
# Configure each connection with vector support (lightweight operation)
def configure_connection(conn: Connection) -> None:
register_vector(conn)
# Set vector_search_beam_size on every connection for index usage
if self.case_config is not None:
search_param = self.case_config.search_param()
beam_size = search_param.get("vector_search_beam_size", 32)
with conn.cursor() as cur:
cur.execute(f"SET vector_search_beam_size = {beam_size}")
conn.commit()
# No need to set beam_size here - it's in connection options

return ConnectionPool(
conninfo=conninfo,
Expand Down Expand Up @@ -211,9 +222,50 @@ def init(self) -> Generator[None, None, None]:
self.conn = None
self.pool = None

def _cancel_running_schema_jobs(self):
"""Cancel any running schema change jobs for this table.
CockroachDB-specific: Running CREATE INDEX jobs block DROP TABLE."""
import psycopg

try:
conn = psycopg.connect(**self.connect_config)
conn.autocommit = True
cursor = conn.cursor()

# Find running schema change jobs for our table
cursor.execute(
"""
SELECT job_id
FROM [SHOW JOBS]
WHERE status IN ('running', 'pending')
AND job_type = 'NEW SCHEMA CHANGE'
AND description LIKE %s
""",
(f"%{self.table_name}%",),
)
jobs = cursor.fetchall()

for job in jobs:
job_id = job[0]
log.warning(f"{self.name} canceling schema job {job_id} before dropping table")
try:
cursor.execute(f"CANCEL JOB {job_id}")
log.info(f"Canceled job {job_id}")
except Exception as e:
log.warning(f"Failed to cancel job {job_id}: {e}")

cursor.close()
conn.close()
except Exception as e:
log.warning(f"Failed to check/cancel running jobs: {e}")

@db_retry(max_attempts=3, initial_delay=0.5, backoff_factor=2.0)
def _drop_table(self, cursor: Cursor, conn: Connection):
"""Drop table with retry logic."""
"""Drop table with retry logic.
Note: CockroachDB-specific - must cancel running schema jobs first."""
# Cancel any running schema change jobs that would block DROP
self._cancel_running_schema_jobs()

log.info(f"{self.name} dropping table: {self.table_name}")
cursor.execute(
sql.SQL("DROP TABLE IF EXISTS {table_name} CASCADE").format(
Expand All @@ -223,7 +275,8 @@ def _drop_table(self, cursor: Cursor, conn: Connection):
conn.commit()

def _drop_index(self):
"""Drop CockroachDB vector index if it exists (DDL with autocommit)."""
"""Drop CockroachDB vector index if it exists (DDL with autocommit).
Note: This is typically not needed as DROP TABLE CASCADE handles it."""
log.info(f"{self.name} dropping index: {self._index_name}")
conn = psycopg.connect(**self.connect_config)
conn.autocommit = True
Expand Down Expand Up @@ -397,25 +450,46 @@ def optimize(self, data_size: int | None = None):
start_time = time.time()
connection_closed = False

# Try to create index
# Try to create index - use a connection without statement_timeout
try:
with self.pool.connection() as conn:
# Create connection without statement_timeout for long-running index creation
import psycopg

conninfo_no_timeout = (
f"host={self.connect_config['host']} "
f"port={self.connect_config['port']} "
f"dbname={self.connect_config['dbname']} "
f"user={self.connect_config['user']} "
f"password={self.connect_config['password']}"
)
if "sslmode" in self.connect_config:
conninfo_no_timeout += f" sslmode={self.connect_config['sslmode']}"
if "sslrootcert" in self.connect_config:
conninfo_no_timeout += f" sslrootcert={self.connect_config['sslrootcert']}"
if "sslcert" in self.connect_config:
conninfo_no_timeout += f" sslcert={self.connect_config['sslcert']}"
if "sslkey" in self.connect_config:
conninfo_no_timeout += f" sslkey={self.connect_config['sslkey']}"

with psycopg.connect(conninfo_no_timeout, autocommit=True) as conn:
register_vector(conn)
conn.autocommit = True
cursor = conn.cursor()
try:
with conn.cursor() as cursor:
cursor.execute(sql_str)
elapsed = time.time() - start_time
log.info(f"{self.name} index created successfully in {elapsed:.1f}s")
return # Success!
finally:
cursor.close()
except Exception as e:
elapsed = time.time() - start_time
# Check if this is the expected 30s timeout on multi-node clusters
if "server closed the connection" in str(e) or "connection" in str(e).lower():
log.warning(f"Connection closed after {elapsed:.1f}s during index creation: {e}")
log.info("This is expected on multi-node clusters - checking if index was created...")
error_msg = str(e)
# Check for timeout or connection issues
if (
"server closed the connection" in error_msg
or "statement timeout" in error_msg.lower()
or "query execution canceled" in error_msg.lower()
or "connection" in error_msg.lower()
):
log.warning(f"Timeout/connection issue after {elapsed:.1f}s during index creation: {e}")
log.info("This is expected on large datasets - checking if index is being created in background...")
connection_closed = True
else:
# Unexpected error, re-raise
Expand Down Expand Up @@ -495,8 +569,7 @@ def search_embedding(
**kwargs: Any,
) -> list[int]:
"""Search for k nearest neighbors using vector index."""
assert self.conn is not None, "Connection is not initialized"
assert self.cursor is not None, "Cursor is not initialized"
assert self.pool is not None, "Connection pool is not initialized"

# Use default L2 distance if no case_config provided
if self.case_config is not None:
Expand All @@ -520,5 +593,16 @@ def search_embedding(
metric_op=sql.SQL(metric_op),
)

result = self.cursor.execute(full_sql, (q, k), prepare=True, binary=True)
return [int(i[0]) for i in result.fetchall()]
# Get a connection from the pool for this query (enables true concurrency)
# Pool returns already-configured connections with vector support
with self.pool.connection() as conn, conn.cursor() as cursor:
try:
result = cursor.execute(full_sql, (q, k), prepare=True, binary=True)
return [int(i[0]) for i in result.fetchall()]
except Exception as e:
# If transaction is aborted, rollback and retry
if "transaction is aborted" in str(e).lower():
conn.rollback()
result = cursor.execute(full_sql, (q, k), prepare=True, binary=True)
return [int(i[0]) for i in result.fetchall()]
raise
30 changes: 22 additions & 8 deletions vectordb_bench/backend/clients/cockroachdb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,34 @@ class CockroachDBConfig(DBConfig):
max_overflow: int = 100
pool_recycle: int = 3600
connect_timeout: int = 10
sslmode: str = "disable" # Options: disable, require, verify-ca, verify-full
sslrootcert: str | None = None # Path to CA cert (for verify-ca, verify-full)
sslcert: str | None = None # Path to client cert (for mutual TLS)
sslkey: str | None = None # Path to client key (for mutual TLS)

def to_dict(self) -> CockroachDBConfigDict:
user_str = self.user_name.get_secret_value() if isinstance(self.user_name, SecretStr) else self.user_name
pwd_str = self.password.get_secret_value() if self.password else ""

connect_config = {
"host": self.host,
"port": self.port,
"dbname": self.db_name,
"user": user_str,
"password": pwd_str,
"sslmode": self.sslmode,
}

# Add SSL certificate paths if provided
if self.sslrootcert:
connect_config["sslrootcert"] = self.sslrootcert
if self.sslcert:
connect_config["sslcert"] = self.sslcert
if self.sslkey:
connect_config["sslkey"] = self.sslkey

return {
"connect_config": {
"host": self.host,
"port": self.port,
"dbname": self.db_name,
"user": user_str,
"password": pwd_str,
"sslmode": "disable", # Default for local dev; production should override
},
"connect_config": connect_config,
"table_name": self.table_name,
"pool_size": self.pool_size,
"max_overflow": self.max_overflow,
Expand Down
2 changes: 2 additions & 0 deletions vectordb_bench/backend/clients/cockroachdb/db_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
"no such host",
"initial connection heartbeat failed",
"sending to all replicas failed",
"transaction is aborted",
"commands ignored until end of transaction block",
"40001",
"40003",
)
Expand Down
Loading