Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions vectordb_bench/backend/clients/oss_opensearch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,17 @@ class OSSOpenSearchTypedDict(TypedDict):
),
]

on_disk: Annotated[
bool,
click.option(
"--on-disk",
is_flag=True,
help="Enable on-disk vector storage mode only for faiss engine (The on_disk mode only works with the float data type.)",
default=False,
required=False,
),
]


class OSSOpenSearchHNSWTypedDict(CommonTypedDict, OSSOpenSearchTypedDict, HNSWFlavor1): ...

Expand Down Expand Up @@ -150,6 +161,7 @@ def OSSOpenSearch(**parameters: Unpack[OSSOpenSearchHNSWTypedDict]):
M=parameters["m"],
engine=OSSOS_Engine(parameters["engine"]),
quantization_type=OSSOpenSearchQuantization(parameters["quantization_type"]),
on_disk=parameters["on_disk"],
),
**parameters,
)
3 changes: 3 additions & 0 deletions vectordb_bench/backend/clients/oss_opensearch/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class OSSOpenSearchIndexConfig(BaseModel, DBCaseConfig):
force_merge_enabled: bool | None = True
flush_threshold_size: str | None = "5120mb"
index_thread_qty_during_force_merge: int = 8
on_disk: bool = False
cb_threshold: str | None = "50%"
number_of_indexing_clients: int | None = 1
use_routing: bool = False # for label-filter cases
Expand Down Expand Up @@ -107,6 +108,7 @@ def __eq__(self, obj: any):
and self.replication_type == obj.replication_type
and self.knn_derived_source_enabled == obj.knn_derived_source_enabled
and self.memory_optimized_search == obj.memory_optimized_search
and self.on_disk == obj.on_disk
)

def __hash__(self) -> int:
Expand All @@ -123,6 +125,7 @@ def __hash__(self) -> int:
self.replication_type,
self.knn_derived_source_enabled,
self.memory_optimized_search,
self.on_disk,
)
)

Expand Down
32 changes: 31 additions & 1 deletion vectordb_bench/backend/clients/oss_opensearch/oss_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@
},
]

VERSION_SPECIFIC_PROPERTIES_RULES = [
{
"name": "mode",
"applies": lambda version, case_config: (
version >= Version("2.17")
and case_config.engine == OSSOS_Engine.faiss
),
"value": lambda case_config: "on_disk" if case_config.on_disk else "in_memory",
}
]


class OpenSearchError(Exception):
"""Custom exception for OpenSearch operations."""
Expand Down Expand Up @@ -274,6 +285,18 @@ def _get_version_specific_settings(self, cluster_version: Version) -> dict:
value = setting["value"](self.case_config)
version_specific_settings[name] = value
return version_specific_settings

def _get_version_specific_properties(self, cluster_version: Version) -> dict:
"""
Builds and returns a dictionary of applicable version-specific properties.
"""
version_specific_properties = {}
for property in VERSION_SPECIFIC_PROPERTIES_RULES:
if property["applies"](cluster_version, self.case_config):
name = property["name"]
value = property["value"](self.case_config)
version_specific_properties[name] = value
return version_specific_properties

def _get_bulk_manager(self, client: OpenSearch) -> BulkInsertManager:
"""Get bulk insert manager for the given client."""
Expand All @@ -291,6 +314,8 @@ def _create_index(self, client: OpenSearch) -> None:
log.info(f"All case_config parameters: {self.case_config.__dict__}")

settings_manager = self._get_settings_manager(client)
cluster_version = self._get_cluster_version(client)

cluster_settings = {
"knn.algo_param.index_thread_qty": self.case_config.index_thread_qty,
"knn.memory.circuit_breaker.limit": self.case_config.cb_threshold,
Expand All @@ -311,13 +336,14 @@ def _create_index(self, client: OpenSearch) -> None:
}
settings["index"]["knn.algo_param.ef_search"] = ef_search_value

version_specific_settings = self._get_version_specific_settings(self._get_cluster_version(client))
version_specific_settings = self._get_version_specific_settings(cluster_version)
if version_specific_settings:
log.info(f"Applying version-dependent settings: {version_specific_settings}")
settings["index"].update(version_specific_settings)

# Build properties mapping, excluding _id which is automatically handled by OpenSearch
properties = {}
version_specific_properties = self._get_version_specific_properties(cluster_version)

# Only add id field to properties if it's not the special _id field
if self.id_col_name != "_id":
Expand All @@ -330,6 +356,10 @@ def _create_index(self, client: OpenSearch) -> None:
"method": self.case_config.index_param(),
}

# mode if supported by the version else ignore
if("mode" in version_specific_properties):
properties[self.vector_col_name]["mode"] = version_specific_properties["mode"]

mappings = {
"properties": properties,
}
Expand Down
11 changes: 11 additions & 0 deletions vectordb_bench/frontend/config/dbCaseConfigs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,15 @@ class CaseConfigInput(BaseModel):
},
)

CaseConfigParamInput_ON_DISK_AWSOpensearch = CaseConfigInput(
label=CaseConfigParamType.on_disk,
displayLabel="On Disk",
inputHelp="Enable on-disk vector storage mode (The on_disk mode only works with the float data type.) Supported by OpenSearch >=2.17",
inputType=InputType.Bool,
inputConfig={"value": False},
isDisplayed=lambda config: (config.get(CaseConfigParamType.engine_name, "").lower() == "faiss"),
)

CaseConfigParamInput_NUMBER_OF_INDEXING_CLIENTS_AWSOpensearch = CaseConfigInput(
label=CaseConfigParamType.number_of_indexing_clients,
displayLabel="Number of Indexing Clients",
Expand Down Expand Up @@ -2337,6 +2346,7 @@ class CaseConfigInput(BaseModel):
CaseConfigParamInput_REPLICATION_TYPE_AWSOpensearch,
CaseConfigParamInput_MEMORY_OPTIMIZED_SEARCH_AWSOpensearch,
CaseConfigParamInput_INDEX_THREAD_QTY_DURING_FORCE_MERGE_AWSOpensearch,
CaseConfigParamInput_ON_DISK_AWSOpensearch,
]

AWSOpenSearchPerformanceConfig = [
Expand All @@ -2354,6 +2364,7 @@ class CaseConfigInput(BaseModel):
CaseConfigParamInput_REPLICATION_TYPE_AWSOpensearch,
CaseConfigParamInput_MEMORY_OPTIMIZED_SEARCH_AWSOpensearch,
CaseConfigParamInput_INDEX_THREAD_QTY_DURING_FORCE_MERGE_AWSOpensearch,
CaseConfigParamInput_ON_DISK_AWSOpensearch,
]

# Map DB to config
Expand Down
1 change: 1 addition & 0 deletions vectordb_bench/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class CaseConfigParamType(Enum):
num_sub_vectors = "num_sub_vectors"
sample_rate = "sample_rate"
index_thread_qty_during_force_merge = "index_thread_qty_during_force_merge"
on_disk = "on_disk"
number_of_indexing_clients = "number_of_indexing_clients"
number_of_shards = "number_of_shards"
number_of_replicas = "number_of_replicas"
Expand Down