diff --git a/vectordb_bench/backend/clients/__init__.py b/vectordb_bench/backend/clients/__init__.py index 3605113a7..e96622970 100644 --- a/vectordb_bench/backend/clients/__init__.py +++ b/vectordb_bench/backend/clients/__init__.py @@ -55,6 +55,7 @@ class DB(Enum): TencentElasticsearch = "TencentElasticsearch" AliSQL = "AlibabaCloudRDSMySQL" Doris = "Doris" + TurboPuffer = "TurpoBuffer" @property def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912, C901, PLR0915 @@ -187,6 +188,10 @@ def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912, C901, PLR0915 from .doris.doris import Doris return Doris + if self == DB.TurboPuffer: + from .turbopuffer.turbopuffer import TurboPuffer + + return TurboPuffer if self == DB.Test: from .test.test import Test @@ -357,6 +362,10 @@ def config_cls(self) -> type[DBConfig]: # noqa: PLR0911, PLR0912, C901, PLR0915 from .doris.config import DorisConfig return DorisConfig + if self == DB.TurboPuffer: + from .turbopuffer.config import TurboPufferConfig + + return TurboPufferConfig if self == DB.Test: from .test.config import TestConfig @@ -537,6 +546,10 @@ def case_config_cls( # noqa: C901, PLR0911, PLR0912, PLR0915 from .doris.config import DorisCaseConfig return DorisCaseConfig + if self == DB.TurboPuffer: + from .turbopuffer.config import TurboPufferIndexConfig + + return TurboPufferIndexConfig # DB.Pinecone, DB.Chroma, DB.Redis return EmptyDBCaseConfig diff --git a/vectordb_bench/backend/clients/cockroachdb/cockroachdb.py b/vectordb_bench/backend/clients/cockroachdb/cockroachdb.py index dd4016d7f..1d20a350d 100644 --- a/vectordb_bench/backend/clients/cockroachdb/cockroachdb.py +++ b/vectordb_bench/backend/clients/cockroachdb/cockroachdb.py @@ -35,7 +35,7 @@ class CockroachDB(VectorDB): FilterOp.StrEqual, ] - def __init__( + def __init__( # noqa: PLR0915 self, dim: int, db_config: dict, diff --git a/vectordb_bench/backend/clients/turbopuffer/config.py b/vectordb_bench/backend/clients/turbopuffer/config.py new file mode 100644 index 000000000..c552299e3 --- /dev/null +++ b/vectordb_bench/backend/clients/turbopuffer/config.py @@ -0,0 +1,37 @@ +from pydantic import BaseModel, SecretStr + +from ..api import DBCaseConfig, DBConfig, MetricType + + +class TurboPufferConfig(DBConfig): + api_key: SecretStr + api_base_url: str + namespace: str = "vdbbench_test" + + def to_dict(self) -> dict: + return { + "api_key": self.api_key.get_secret_value(), + "api_base_url": self.api_base_url, + "namespace": self.namespace, + } + + +class TurboPufferIndexConfig(BaseModel, DBCaseConfig): + metric_type: MetricType | None = None + use_multi_ns_for_filter: bool = False + time_wait_warmup: int = 60 * 1 # 1min + + def parse_metric(self) -> str: + if self.metric_type == MetricType.COSINE: + return "cosine_distance" + if self.metric_type == MetricType.L2: + return "euclidean_squared" + + msg = f"Not Support {self.metric_type}" + raise ValueError(msg) + + def index_param(self) -> dict: + return {} + + def search_param(self) -> dict: + return {} diff --git a/vectordb_bench/backend/clients/turbopuffer/turbopuffer.py b/vectordb_bench/backend/clients/turbopuffer/turbopuffer.py new file mode 100644 index 000000000..449682e8c --- /dev/null +++ b/vectordb_bench/backend/clients/turbopuffer/turbopuffer.py @@ -0,0 +1,122 @@ +"""Wrapper around the Pinecone vector database over VectorDB""" + +import logging +import time +from contextlib import contextmanager + +import turbopuffer as tpuf + +from vectordb_bench.backend.clients.turbopuffer.config import TurboPufferIndexConfig +from vectordb_bench.backend.filter import Filter, FilterOp + +from ..api import VectorDB + +log = logging.getLogger(__name__) + + +class TurboPuffer(VectorDB): + supported_filter_types: list[FilterOp] = [ + FilterOp.NonFilter, + FilterOp.NumGE, + FilterOp.StrEqual, + ] + + def __init__( + self, + dim: int, + db_config: dict, + db_case_config: TurboPufferIndexConfig, + drop_old: bool = False, + with_scalar_labels: bool = False, + **kwargs, + ): + """Initialize wrapper around the milvus vector database.""" + self.api_key = db_config.get("api_key", "") + self.api_base_url = db_config.get("api_base_url", "") + self.namespace = db_config.get("namespace", "") + self.db_case_config = db_case_config + self.metric = db_case_config.parse_metric() + + self._vector_field = "vector" + self._scalar_id_field = "id" + self._scalar_label_field = "label" + + self.with_scalar_labels = with_scalar_labels + if drop_old: + log.info(f"Drop old. delete the namespace: {self.namespace}") + tpuf.api_key = self.api_key + tpuf.api_base_url = self.api_base_url + ns = tpuf.Namespace(self.namespace) + try: + ns.delete_all() + except Exception as e: + log.warning(f"Failed to delete all. Error: {e}") + + @contextmanager + def init(self): + tpuf.api_key = self.api_key + tpuf.api_base_url = self.api_base_url + self.ns = tpuf.Namespace(self.namespace) + yield + + def optimize(self, data_size: int | None = None): + # turbopuffer responds to the request + # once the cache warming operation has been started. + # It does not wait for the operation to complete, + # which can take multiple minutes for large namespaces. + self.ns.hint_cache_warm() + log.info(f"warming up but no api waiting for complete. just sleep {self.db_case_config.time_wait_warmup}s") + time.sleep(self.db_case_config.time_wait_warmup) + + def insert_embeddings( + self, + embeddings: list[list[float]], + metadata: list[int], + labels_data: list[str] | None = None, + **kwargs, + ) -> tuple[int, Exception]: + try: + if self.with_scalar_labels: + self.ns.write( + upsert_columns={ + self._scalar_id_field: metadata, + self._vector_field: embeddings, + self._scalar_label_field: labels_data, + }, + distance_metric=self.metric, + ) + else: + self.ns.write( + upsert_columns={ + self._scalar_id_field: metadata, + self._vector_field: embeddings, + }, + distance_metric=self.metric, + ) + except Exception as e: + log.warning(f"Failed to insert. Error: {e}") + return len(embeddings), None + + def search_embedding( + self, + query: list[float], + k: int = 100, + timeout: int | None = None, + ) -> list[int]: + res = self.ns.query( + rank_by=["vector", "ANN", query], + top_k=k, + filters=self.expr, + ) + return [row.id for row in res.rows] + + def prepare_filter(self, filters: Filter): + if filters.type == FilterOp.NonFilter: + self.expr = None + elif filters.type == FilterOp.NumGE: + self.expr = [self._scalar_id_field, "Gte", filters.int_value] + elif filters.type == FilterOp.StrEqual: + self.expr = [self._scalar_label_field, "Eq", filters.label_value] + else: + msg = f"Not support Filter for TurboPuffer - {filters}" + raise ValueError(msg) diff --git a/vectordb_bench/backend/runner/rate_runner.py b/vectordb_bench/backend/runner/rate_runner.py index ecfc53862..c56c8ca61 100644 --- a/vectordb_bench/backend/runner/rate_runner.py +++ b/vectordb_bench/backend/runner/rate_runner.py @@ -7,7 +7,6 @@ from vectordb_bench import config from vectordb_bench.backend.clients import api -from vectordb_bench.backend.clients.doris.doris import Doris from vectordb_bench.backend.dataset import DataSetIterator from vectordb_bench.backend.utils import time_it @@ -54,7 +53,7 @@ def _insert_embeddings(db: api.VectorDB, emb: list[list[float]], metadata: list[ db_copy = deepcopy(db) with db_copy.init(): _insert_embeddings(db_copy, emb, metadata, retry_idx=0) - elif isinstance(db, Doris): + elif db.name == "Doris": # DorisVectorClient is not thread-safe. Similar to pgvector, create a per-thread client # by deep-copying the wrapper and forcing lazy re-init inside the thread. db_copy = deepcopy(db) diff --git a/vectordb_bench/frontend/config/styles.py b/vectordb_bench/frontend/config/styles.py index cbc199cdb..65e891217 100644 --- a/vectordb_bench/frontend/config/styles.py +++ b/vectordb_bench/frontend/config/styles.py @@ -69,6 +69,7 @@ def getPatternShape(i): DB.S3Vectors: "https://assets.zilliz.com/s3_vectors_daf370b4e5.png", DB.Hologres: "https://img.alicdn.com/imgextra/i3/O1CN01d9qrry1i6lTNa2BRa_!!6000000004364-2-tps-218-200.png", DB.Doris: "https://doris.apache.org/images/logo.svg", + DB.TurboPuffer: "https://turbopuffer.com/logo2.png", } # RedisCloud color: #0D6EFD