Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions vectordb_bench/backend/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class DB(Enum):
TencentElasticsearch = "TencentElasticsearch"
AliSQL = "AlibabaCloudRDSMySQL"
Doris = "Doris"
TurboPuffer = "TurpoBuffer"

@property
def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912, C901, PLR0915
Expand Down Expand Up @@ -187,6 +188,10 @@ def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912, C901, PLR0915
from .doris.doris import Doris

return Doris
if self == DB.TurboPuffer:
from .turbopuffer.turbopuffer import TurboPuffer

return TurboPuffer

if self == DB.Test:
from .test.test import Test
Expand Down Expand Up @@ -357,6 +362,10 @@ def config_cls(self) -> type[DBConfig]: # noqa: PLR0911, PLR0912, C901, PLR0915
from .doris.config import DorisConfig

return DorisConfig
if self == DB.TurboPuffer:
from .turbopuffer.config import TurboPufferConfig

return TurboPufferConfig

if self == DB.Test:
from .test.config import TestConfig
Expand Down Expand Up @@ -537,6 +546,10 @@ def case_config_cls( # noqa: C901, PLR0911, PLR0912, PLR0915
from .doris.config import DorisCaseConfig

return DorisCaseConfig
if self == DB.TurboPuffer:
from .turbopuffer.config import TurboPufferIndexConfig

return TurboPufferIndexConfig

# DB.Pinecone, DB.Chroma, DB.Redis
return EmptyDBCaseConfig
Expand Down
2 changes: 1 addition & 1 deletion vectordb_bench/backend/clients/cockroachdb/cockroachdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class CockroachDB(VectorDB):
FilterOp.StrEqual,
]

def __init__(
def __init__( # noqa: PLR0915
self,
dim: int,
db_config: dict,
Expand Down
37 changes: 37 additions & 0 deletions vectordb_bench/backend/clients/turbopuffer/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from pydantic import BaseModel, SecretStr

from ..api import DBCaseConfig, DBConfig, MetricType


class TurboPufferConfig(DBConfig):
api_key: SecretStr
api_base_url: str
namespace: str = "vdbbench_test"

def to_dict(self) -> dict:
return {
"api_key": self.api_key.get_secret_value(),
"api_base_url": self.api_base_url,
"namespace": self.namespace,
}


class TurboPufferIndexConfig(BaseModel, DBCaseConfig):
metric_type: MetricType | None = None
use_multi_ns_for_filter: bool = False
time_wait_warmup: int = 60 * 1 # 1min

def parse_metric(self) -> str:
if self.metric_type == MetricType.COSINE:
return "cosine_distance"
if self.metric_type == MetricType.L2:
return "euclidean_squared"

msg = f"Not Support {self.metric_type}"
raise ValueError(msg)

def index_param(self) -> dict:
return {}

def search_param(self) -> dict:
return {}
122 changes: 122 additions & 0 deletions vectordb_bench/backend/clients/turbopuffer/turbopuffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""Wrapper around the Pinecone vector database over VectorDB"""

import logging
import time
from contextlib import contextmanager

import turbopuffer as tpuf

from vectordb_bench.backend.clients.turbopuffer.config import TurboPufferIndexConfig
from vectordb_bench.backend.filter import Filter, FilterOp

from ..api import VectorDB

log = logging.getLogger(__name__)


class TurboPuffer(VectorDB):
supported_filter_types: list[FilterOp] = [
FilterOp.NonFilter,
FilterOp.NumGE,
FilterOp.StrEqual,
]

def __init__(
self,
dim: int,
db_config: dict,
db_case_config: TurboPufferIndexConfig,
drop_old: bool = False,
with_scalar_labels: bool = False,
**kwargs,
):
"""Initialize wrapper around the milvus vector database."""
self.api_key = db_config.get("api_key", "")
self.api_base_url = db_config.get("api_base_url", "")
self.namespace = db_config.get("namespace", "")
self.db_case_config = db_case_config
self.metric = db_case_config.parse_metric()

self._vector_field = "vector"
self._scalar_id_field = "id"
self._scalar_label_field = "label"

self.with_scalar_labels = with_scalar_labels
if drop_old:
log.info(f"Drop old. delete the namespace: {self.namespace}")
tpuf.api_key = self.api_key
tpuf.api_base_url = self.api_base_url
ns = tpuf.Namespace(self.namespace)
try:
ns.delete_all()
except Exception as e:
log.warning(f"Failed to delete all. Error: {e}")

@contextmanager
def init(self):
tpuf.api_key = self.api_key
tpuf.api_base_url = self.api_base_url
self.ns = tpuf.Namespace(self.namespace)
yield

def optimize(self, data_size: int | None = None):
# turbopuffer responds to the request
# once the cache warming operation has been started.
# It does not wait for the operation to complete,
# which can take multiple minutes for large namespaces.
self.ns.hint_cache_warm()
log.info(f"warming up but no api waiting for complete. just sleep {self.db_case_config.time_wait_warmup}s")
time.sleep(self.db_case_config.time_wait_warmup)

def insert_embeddings(
self,
embeddings: list[list[float]],
metadata: list[int],
labels_data: list[str] | None = None,
**kwargs,
) -> tuple[int, Exception]:
try:
if self.with_scalar_labels:
self.ns.write(
upsert_columns={
self._scalar_id_field: metadata,
self._vector_field: embeddings,
self._scalar_label_field: labels_data,
},
distance_metric=self.metric,
)
else:
self.ns.write(
upsert_columns={
self._scalar_id_field: metadata,
self._vector_field: embeddings,
},
distance_metric=self.metric,
)
except Exception as e:
log.warning(f"Failed to insert. Error: {e}")
return len(embeddings), None

def search_embedding(
self,
query: list[float],
k: int = 100,
timeout: int | None = None,
) -> list[int]:
res = self.ns.query(
rank_by=["vector", "ANN", query],
top_k=k,
filters=self.expr,
)
return [row.id for row in res.rows]

def prepare_filter(self, filters: Filter):
if filters.type == FilterOp.NonFilter:
self.expr = None
elif filters.type == FilterOp.NumGE:
self.expr = [self._scalar_id_field, "Gte", filters.int_value]
elif filters.type == FilterOp.StrEqual:
self.expr = [self._scalar_label_field, "Eq", filters.label_value]
else:
msg = f"Not support Filter for TurboPuffer - {filters}"
raise ValueError(msg)
3 changes: 1 addition & 2 deletions vectordb_bench/backend/runner/rate_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

from vectordb_bench import config
from vectordb_bench.backend.clients import api
from vectordb_bench.backend.clients.doris.doris import Doris
from vectordb_bench.backend.dataset import DataSetIterator
from vectordb_bench.backend.utils import time_it

Expand Down Expand Up @@ -54,7 +53,7 @@ def _insert_embeddings(db: api.VectorDB, emb: list[list[float]], metadata: list[
db_copy = deepcopy(db)
with db_copy.init():
_insert_embeddings(db_copy, emb, metadata, retry_idx=0)
elif isinstance(db, Doris):
elif db.name == "Doris":
# DorisVectorClient is not thread-safe. Similar to pgvector, create a per-thread client
# by deep-copying the wrapper and forcing lazy re-init inside the thread.
db_copy = deepcopy(db)
Expand Down
1 change: 1 addition & 0 deletions vectordb_bench/frontend/config/styles.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def getPatternShape(i):
DB.S3Vectors: "https://assets.zilliz.com/s3_vectors_daf370b4e5.png",
DB.Hologres: "https://img.alicdn.com/imgextra/i3/O1CN01d9qrry1i6lTNa2BRa_!!6000000004364-2-tps-218-200.png",
DB.Doris: "https://doris.apache.org/images/logo.svg",
DB.TurboPuffer: "https://turbopuffer.com/logo2.png",
}

# RedisCloud color: #0D6EFD
Expand Down