with image milvusdb/milvus:latest to be ready ...\n"
+ ]
+ }
+ ],
+ "source": [
+ "if not db:\n",
+ " db = MilvusTestHelpers.start_db_container()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 15,
+ "id": "31496ee0-75a2-48ad-954e-9c4ae5abbf5e",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Configure connection parameters for both ingestion and search.\n",
+ "milvus_connection_config = MilvusConnectionConfig(\n",
+ " uri=db.uri, \n",
+ " user=db.user, \n",
+ " password=db.password, \n",
+ " db_name=db.db_id\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "configure_ingestion",
+ "metadata": {},
+ "source": [
+ "### Configure Milvus Ingestion\n",
+ "\n",
+ "We configure the Milvus I/O connector with:\n",
+ "- Connection settings\n",
+ "- Collection name and write batch size\n",
+ "- Embedding model for generating vectors"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "770t460e9yy",
+ "metadata": {},
+ "source": [
+ "### Create Collection Manually\n",
+ "\n",
+ "Before using the Apache Beam I/O connector, let's create the Milvus collection with proper schema and indexes using the pymilvus client directly."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 16,
+ "id": "rg52ap2nwu9",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Created collection 'beam_technical_docs' with schema:\n",
+ " - id: 21 (primary)\n",
+ " - embedding: 101 \n",
+ " - content: 21 \n",
+ " - metadata: 23 \n",
+ "Created COSINE similarity index for 'embedding' field\n",
+ "Collection 'beam_technical_docs' is ready for ingestion!\n"
+ ]
+ }
+ ],
+ "source": [
+ "# Collection name for storing our vectors\n",
+ "collection_name = \"beam_technical_docs\"\n",
+ "\n",
+ "# Create Milvus client\n",
+ "client = MilvusClient(\n",
+ " uri=db.uri,\n",
+ " user=db.user,\n",
+ " password=db.password,\n",
+ " db_name=db.db_id\n",
+ ")\n",
+ "\n",
+ "# Define collection schema with fields that match our RAG use case\n",
+ "fields = [\n",
+ " FieldSchema(name=\"id\", dtype=DataType.VARCHAR, max_length=100, is_primary=True),\n",
+ " FieldSchema(name=\"embedding\", dtype=DataType.FLOAT_VECTOR, dim=384), # sentence-transformers/all-MiniLM-L6-v2 dimension\n",
+ " FieldSchema(name=\"content\", dtype=DataType.VARCHAR, max_length=2000),\n",
+ " # Metadata fields from our chunks\n",
+ " FieldSchema(name=\"metadata\", dtype=DataType.JSON) # Complete metadata as JSON\n",
+ "]\n",
+ "\n",
+ "schema = CollectionSchema(fields=fields, description=\"Technical documentation collection for RAG\")\n",
+ "\n",
+ "# Create collection if it doesn't exist\n",
+ "if client.has_collection(collection_name):\n",
+ " print(f\"Collection '{collection_name}' already exists\")\n",
+ " client.drop_collection(collection_name)\n",
+ " print(f\"Dropped existing collection '{collection_name}'\")\n",
+ "\n",
+ "client.create_collection(\n",
+ " collection_name=collection_name,\n",
+ " schema=schema\n",
+ ")\n",
+ "\n",
+ "print(f\"Created collection '{collection_name}' with schema:\")\n",
+ "for field in fields:\n",
+ " print(f\" - {field.name}: {field.dtype} {'(primary)' if field.is_primary else ''}\")\n",
+ "\n",
+ "# Create index for the vector field for efficient similarity search\n",
+ "index_params = IndexParams()\n",
+ "index_params.add_index(\n",
+ " field_name=\"embedding\",\n",
+ " index_type=\"IVF_FLAT\",\n",
+ " metric_type=\"COSINE\",\n",
+ " params={\"nlist\": 128}\n",
+ ")\n",
+ "\n",
+ "client.create_index(\n",
+ " collection_name=collection_name,\n",
+ " index_params=index_params\n",
+ ")\n",
+ "\n",
+ "print(f\"Created COSINE similarity index for 'embedding' field\")\n",
+ "print(f\"Collection '{collection_name}' is ready for ingestion!\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 17,
+ "id": "configure_milvus_writer",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Configure write settings (collection already created above).\n",
+ "write_config = MilvusWriteConfig(\n",
+ " collection_name=collection_name,\n",
+ " write_config=WriteConfig(write_batch_size=100)\n",
+ ")\n",
+ "\n",
+ "# Configure Milvus writer\n",
+ "milvus_config = MilvusVectorWriterConfig(\n",
+ " connection_params=milvus_connection_config,\n",
+ " write_config=write_config\n",
+ ")\n",
+ "\n",
+ "# Configure embedding model\n",
+ "model_name = 'sentence-transformers/all-MiniLM-L6-v2'\n",
+ "huggingface_embedder = HuggingfaceTextEmbeddings(model_name=model_name)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "run_ingestion_pipeline",
+ "metadata": {},
+ "source": [
+ "### Run Ingestion Pipeline\n",
+ "\n",
+ "This pipeline will:\n",
+ "1. Create chunks from our documents\n",
+ "2. Generate embeddings for each chunk\n",
+ "3. Write everything to Milvus"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 18,
+ "id": "run_ingestion",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Starting Milvus ingestion pipeline...\n"
+ ]
+ },
+ {
+ "data": {
+ "application/javascript": [
+ "\n",
+ " if (typeof window.interactive_beam_jquery == 'undefined') {\n",
+ " var jqueryScript = document.createElement('script');\n",
+ " jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n",
+ " jqueryScript.type = 'text/javascript';\n",
+ " jqueryScript.onload = function() {\n",
+ " var datatableScript = document.createElement('script');\n",
+ " datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n",
+ " datatableScript.type = 'text/javascript';\n",
+ " datatableScript.onload = function() {\n",
+ " window.interactive_beam_jquery = jQuery.noConflict(true);\n",
+ " window.interactive_beam_jquery(document).ready(function($){\n",
+ " \n",
+ " });\n",
+ " }\n",
+ " document.head.appendChild(datatableScript);\n",
+ " };\n",
+ " document.head.appendChild(jqueryScript);\n",
+ " } else {\n",
+ " window.interactive_beam_jquery(document).ready(function($){\n",
+ " \n",
+ " });\n",
+ " }"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Union[Tuple[apache_beam.pvalue.PCollection[~MLTransformOutputT], apache_beam.pvalue.PCollection[apache_beam.pvalue.Row]], apache_beam.pvalue.PCollection[~MLTransformOutputT]] instead.\n",
+ "WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Union[Tuple[apache_beam.pvalue.PCollection[~MLTransformOutputT], apache_beam.pvalue.PCollection[apache_beam.pvalue.Row]], apache_beam.pvalue.PCollection[~MLTransformOutputT]] instead.\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Ingestion pipeline completed!\n"
+ ]
+ }
+ ],
+ "source": [
+ "print(\"Starting Milvus ingestion pipeline...\")\n",
+ "\n",
+ "with beam.Pipeline() as p:\n",
+ " _ = (\n",
+ " p\n",
+ " | \"Create Chunks\" >> beam.Create(chunks)\n",
+ " | \"Generate Embeddings\" >> MLTransform(write_artifact_location=tempfile.mkdtemp())\n",
+ " .with_transform(huggingface_embedder)\n",
+ " | \"Write to Milvus\" >> milvus_config.create_write_transform()\n",
+ " )\n",
+ "\n",
+ "print(\"Ingestion pipeline completed!\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "verify_ingestion",
+ "metadata": {},
+ "source": [
+ "### Verify Ingestion\n",
+ "\n",
+ "Let's verify that our data was successfully ingested into Milvus."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 19,
+ "id": "verify_data",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Collection 'beam_technical_docs' exists with 6 entities\n"
+ ]
+ }
+ ],
+ "source": [
+ "# Create a Milvus client to verify data\n",
+ "client = MilvusClient(**milvus_connection_config.__dict__)\n",
+ "\n",
+ "# Check if collection exists and get entity count\n",
+ "if client.has_collection(collection_name):\n",
+ " stats = client.get_collection_stats(collection_name)\n",
+ " print(f\"Collection '{collection_name}' exists with {stats['row_count']} entities\")\n",
+ "else:\n",
+ " print(f\"Collection '{collection_name}' not found\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "ea478136-2ca8-4fee-bb1e-6bfcc2e97c93",
+ "metadata": {},
+ "source": [
+ "# Part 2: Vector Search and Enrichment\n",
+ "\n",
+ "Now that we have ingested our documents into Milvus, we'll demonstrate different search capabilities using Apache Beam's enrichment transform."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "e9ad2509-3e5d-42e8-b565-ecccde38b8f4",
+ "metadata": {},
+ "source": [
+ "## Prepare Search Infrastructure\n",
+ "\n",
+ "We'll create helper classes and configure the enrichment transform for different types of searches."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 20,
+ "id": "4911e8cc-10f1-4d21-9251-1b756b61f2c1",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "class FormatAndPrintResults(beam.PTransform):\n",
+ " \"\"\"Transform to format and display search results.\"\"\"\n",
+ " \n",
+ " def expand(self, pcoll):\n",
+ " return pcoll | beam.Map(self.format_and_print)\n",
+ " \n",
+ " @staticmethod\n",
+ " def format_and_print(chunk):\n",
+ " # Create a clean structure to display\n",
+ " formatted_result = {\n",
+ " \"query\": chunk.content.text,\n",
+ " \"query_embedding_dimensions\": FormatAndPrintResults.get_embedding_count(chunk),\n",
+ " \"results\": []\n",
+ " }\n",
+ " \n",
+ " # Extract the enrichment data\n",
+ " enrichment_data = chunk.metadata.get('enrichment_data', defaultdict(list))\n",
+ " \n",
+ " # Format each result with its distance score\n",
+ " for i in range(len(enrichment_data.get('id', []))):\n",
+ " result = {\n",
+ " \"id\": enrichment_data['id'][i],\n",
+ " \"distance\": round(enrichment_data['distance'][i], 4),\n",
+ " \"fields\": enrichment_data['fields'][i] if i < len(enrichment_data.get('fields', [])) else {}\n",
+ " }\n",
+ " formatted_result[\"results\"].append(result)\n",
+ " \n",
+ " # Sort by distance in descending order (highest/best first)\n",
+ " formatted_result[\"results\"] = sorted(formatted_result[\"results\"], key=lambda x: x[\"distance\"], reverse=True)\n",
+ "\n",
+ " # Print the formatted JSON\n",
+ " print_json(data=formatted_result)\n",
+ " \n",
+ " # Return the original chunk for further processing if needed\n",
+ " return chunk\n",
+ "\n",
+ " @staticmethod\n",
+ " def get_embedding_count(chunk):\n",
+ " if chunk.embedding and chunk.embedding.dense_embedding:\n",
+ " return len(chunk.embedding.dense_embedding)\n",
+ " return 0"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 21,
+ "id": "setup_embedding_model",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Set up embedding model for query processing\n",
+ "model = SentenceTransformer(model_name)\n",
+ "\n",
+ "def get_default_device():\n",
+ " return \"cuda:0\" if cuda.is_available() else \"cpu\"\n",
+ "\n",
+ "def encode_embedding(text, device=get_default_device()):\n",
+ " return list(map(float, model.encode(text, device=device)))"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "656110c9-1360-49fd-ba17-f55f2257f127",
+ "metadata": {},
+ "source": [
+ "## Vector Similarity Search\n",
+ "\n",
+ "First, let's perform a basic vector similarity search to find documents most similar to our query."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 22,
+ "id": "vector_search_example",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Running vector similarity search...\n"
+ ]
+ },
+ {
+ "data": {
+ "text/html": [
+ "{\n",
+ " \"query\": \"What is a distributed data processing framework?\",\n",
+ " \"query_embedding_dimensions\": 384,\n",
+ " \"results\": [\n",
+ " {\n",
+ " \"id\": \"1_1\",\n",
+ " \"distance\": 0.5024,\n",
+ " \"fields\": {\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Beam\",\n",
+ " \"stream processing\",\n",
+ " \"batch processing\",\n",
+ " \"data pipelines\",\n",
+ " \"SDK\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Data Engineering\",\n",
+ " \"Open Source\",\n",
+ " \"Streaming\",\n",
+ " \"Batch\",\n",
+ " \"Big Data\"\n",
+ " ],\n",
+ " \"category\": \"framework\",\n",
+ " \"doc_id\": \"1\",\n",
+ " \"chunk_index\": 0\n",
+ " }\n",
+ " }\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"3_1\",\n",
+ " \"distance\": 0.4727,\n",
+ " \"fields\": {\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Kafka: Distributed Streaming Platform\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Kafka\",\n",
+ " \"distributed systems\",\n",
+ " \"event streaming\",\n",
+ " \"message queue\",\n",
+ " \"real-time\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Streaming\",\n",
+ " \"Messaging\",\n",
+ " \"Distributed Systems\",\n",
+ " \"Open Source\",\n",
+ " \"Real-time\"\n",
+ " ],\n",
+ " \"category\": \"streaming-platform\",\n",
+ " \"doc_id\": \"3\",\n",
+ " \"chunk_index\": 0\n",
+ " }\n",
+ " }\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"1_2\",\n",
+ " \"distance\": 0.4306,\n",
+ " \"fields\": {\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Beam\",\n",
+ " \"stream processing\",\n",
+ " \"batch processing\",\n",
+ " \"data pipelines\",\n",
+ " \"SDK\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Data Engineering\",\n",
+ " \"Open Source\",\n",
+ " \"Streaming\",\n",
+ " \"Batch\",\n",
+ " \"Big Data\"\n",
+ " ],\n",
+ " \"category\": \"framework\",\n",
+ " \"doc_id\": \"1\",\n",
+ " \"chunk_index\": 1\n",
+ " }\n",
+ " }\n",
+ " }\n",
+ " ]\n",
+ "}\n",
+ "\n"
+ ],
+ "text/plain": [
+ "\u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"What is a distributed data processing framework?\"\u001b[0m,\n",
+ " \u001b[1;34m\"query_embedding_dimensions\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n",
+ " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_1\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.5024\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"stream processing\"\u001b[0m,\n",
+ " \u001b[32m\"batch processing\"\u001b[0m,\n",
+ " \u001b[32m\"data pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"SDK\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Data Engineering\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Batch\"\u001b[0m,\n",
+ " \u001b[32m\"Big Data\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n",
+ " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n",
+ " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m0\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m,\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"3_1\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.4727\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Kafka: Distributed Streaming Platform\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Kafka\"\u001b[0m,\n",
+ " \u001b[32m\"distributed systems\"\u001b[0m,\n",
+ " \u001b[32m\"event streaming\"\u001b[0m,\n",
+ " \u001b[32m\"message queue\"\u001b[0m,\n",
+ " \u001b[32m\"real-time\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Messaging\"\u001b[0m,\n",
+ " \u001b[32m\"Distributed Systems\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Real-time\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"streaming-platform\"\u001b[0m,\n",
+ " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"3\"\u001b[0m,\n",
+ " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m0\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m,\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_2\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.4306\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"stream processing\"\u001b[0m,\n",
+ " \u001b[32m\"batch processing\"\u001b[0m,\n",
+ " \u001b[32m\"data pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"SDK\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Data Engineering\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Batch\"\u001b[0m,\n",
+ " \u001b[32m\"Big Data\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n",
+ " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n",
+ " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m1\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ "\u001b[1m}\u001b[0m\n"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "# Example query for vector search\n",
+ "query_text = \"What is a distributed data processing framework?\"\n",
+ "query_embedding = encode_embedding(query_text)\n",
+ "\n",
+ "# Configure vector search parameters\n",
+ "search_parameters = MilvusSearchParameters(\n",
+ " collection_name=collection_name,\n",
+ " search_strategy=VectorSearchParameters(limit=3, anns_field=\"embedding\"),\n",
+ " output_fields=[\"metadata\"]\n",
+ ")\n",
+ "\n",
+ "collection_load_parameters = MilvusCollectionLoadParameters()\n",
+ "\n",
+ "milvus_handler = MilvusSearchEnrichmentHandler(\n",
+ " connection_parameters=milvus_connection_config,\n",
+ " search_parameters=search_parameters,\n",
+ " collection_load_parameters=collection_load_parameters\n",
+ ")\n",
+ "\n",
+ "print(\"Running vector similarity search...\")\n",
+ "\n",
+ "with beam.Pipeline() as p:\n",
+ " _ = (\n",
+ " p\n",
+ " | \"Create Query\" >> beam.Create([\n",
+ " Chunk(\n",
+ " content=Content(text=query_text),\n",
+ " embedding=Embedding(dense_embedding=query_embedding)\n",
+ " )\n",
+ " ])\n",
+ " | \"Vector Search\" >> Enrichment(milvus_handler)\n",
+ " | \"Format Results\" >> FormatAndPrintResults()\n",
+ " )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "metadata_filtering",
+ "metadata": {},
+ "source": [
+ "## Filtered Search (Metadata Filtering)\n",
+ "\n",
+ "Now let's perform a search with metadata filtering to find documents in a specific category."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 23,
+ "id": "filtered_search_example",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Running filtered search (category='framework')...\n"
+ ]
+ },
+ {
+ "data": {
+ "text/html": [
+ "{\n",
+ " \"query\": \"How to build data pipelines?\",\n",
+ " \"query_embedding_dimensions\": 384,\n",
+ " \"results\": [\n",
+ " {\n",
+ " \"id\": \"1_2\",\n",
+ " \"distance\": 0.555,\n",
+ " \"fields\": {\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Beam\",\n",
+ " \"stream processing\",\n",
+ " \"batch processing\",\n",
+ " \"data pipelines\",\n",
+ " \"SDK\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Data Engineering\",\n",
+ " \"Open Source\",\n",
+ " \"Streaming\",\n",
+ " \"Batch\",\n",
+ " \"Big Data\"\n",
+ " ],\n",
+ " \"category\": \"framework\",\n",
+ " \"doc_id\": \"1\",\n",
+ " \"chunk_index\": 1\n",
+ " }\n",
+ " }\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"1_1\",\n",
+ " \"distance\": 0.5223,\n",
+ " \"fields\": {\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Beam\",\n",
+ " \"stream processing\",\n",
+ " \"batch processing\",\n",
+ " \"data pipelines\",\n",
+ " \"SDK\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Data Engineering\",\n",
+ " \"Open Source\",\n",
+ " \"Streaming\",\n",
+ " \"Batch\",\n",
+ " \"Big Data\"\n",
+ " ],\n",
+ " \"category\": \"framework\",\n",
+ " \"doc_id\": \"1\",\n",
+ " \"chunk_index\": 0\n",
+ " }\n",
+ " }\n",
+ " }\n",
+ " ]\n",
+ "}\n",
+ "\n"
+ ],
+ "text/plain": [
+ "\u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"How to build data pipelines?\"\u001b[0m,\n",
+ " \u001b[1;34m\"query_embedding_dimensions\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n",
+ " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_2\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.555\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"stream processing\"\u001b[0m,\n",
+ " \u001b[32m\"batch processing\"\u001b[0m,\n",
+ " \u001b[32m\"data pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"SDK\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Data Engineering\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Batch\"\u001b[0m,\n",
+ " \u001b[32m\"Big Data\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n",
+ " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n",
+ " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m1\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m,\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_1\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.5223\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"stream processing\"\u001b[0m,\n",
+ " \u001b[32m\"batch processing\"\u001b[0m,\n",
+ " \u001b[32m\"data pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"SDK\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Data Engineering\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Batch\"\u001b[0m,\n",
+ " \u001b[32m\"Big Data\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n",
+ " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n",
+ " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m0\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ "\u001b[1m}\u001b[0m\n"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "# Filtered search - only looking for framework-related documents\n",
+ "filtered_query = \"How to build data pipelines?\"\n",
+ "filtered_embedding = encode_embedding(filtered_query)\n",
+ "\n",
+ "# Configure search with category filter\n",
+ "filtered_search_parameters = MilvusSearchParameters(\n",
+ " collection_name=collection_name,\n",
+ " search_strategy=VectorSearchParameters(\n",
+ " filter=\"metadata['category'] == 'framework'\", # Filter for framework documents\n",
+ " limit=2,\n",
+ " anns_field=\"embedding\"\n",
+ " ),\n",
+ " output_fields=[\"metadata\"]\n",
+ ")\n",
+ "\n",
+ "filtered_handler = MilvusSearchEnrichmentHandler(\n",
+ " connection_parameters=milvus_connection_config,\n",
+ " search_parameters=filtered_search_parameters,\n",
+ " collection_load_parameters=collection_load_parameters\n",
+ ")\n",
+ "\n",
+ "print(\"Running filtered search (category='framework')...\")\n",
+ "\n",
+ "with beam.Pipeline() as p:\n",
+ " _ = (\n",
+ " p\n",
+ " | \"Create Filtered Query\" >> beam.Create([\n",
+ " Chunk(\n",
+ " content=Content(text=filtered_query),\n",
+ " embedding=Embedding(dense_embedding=filtered_embedding)\n",
+ " )\n",
+ " ])\n",
+ " | \"Filtered Search\" >> Enrichment(filtered_handler)\n",
+ " | \"Format Filtered Results\" >> FormatAndPrintResults()\n",
+ " )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "batch_search_example",
+ "metadata": {},
+ "source": [
+ "## Batch Search Processing\n",
+ "\n",
+ "Finally, let's demonstrate processing multiple queries in a single pipeline, which is useful for batch enrichment scenarios."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 24,
+ "id": "batch_processing",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Running batch search processing...\n"
+ ]
+ },
+ {
+ "data": {
+ "text/html": [
+ "{\n",
+ " \"query\": \"What is stream processing?\",\n",
+ " \"query_embedding_dimensions\": 384,\n",
+ " \"results\": [\n",
+ " {\n",
+ " \"id\": \"1_1\",\n",
+ " \"distance\": 0.52,\n",
+ " \"fields\": {\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Beam\",\n",
+ " \"stream processing\",\n",
+ " \"batch processing\",\n",
+ " \"data pipelines\",\n",
+ " \"SDK\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Data Engineering\",\n",
+ " \"Open Source\",\n",
+ " \"Streaming\",\n",
+ " \"Batch\",\n",
+ " \"Big Data\"\n",
+ " ],\n",
+ " \"category\": \"framework\",\n",
+ " \"doc_id\": \"1\",\n",
+ " \"chunk_index\": 0\n",
+ " }\n",
+ " }\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"1_2\",\n",
+ " \"distance\": 0.5118,\n",
+ " \"fields\": {\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Beam\",\n",
+ " \"stream processing\",\n",
+ " \"batch processing\",\n",
+ " \"data pipelines\",\n",
+ " \"SDK\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Data Engineering\",\n",
+ " \"Open Source\",\n",
+ " \"Streaming\",\n",
+ " \"Batch\",\n",
+ " \"Big Data\"\n",
+ " ],\n",
+ " \"category\": \"framework\",\n",
+ " \"doc_id\": \"1\",\n",
+ " \"chunk_index\": 1\n",
+ " }\n",
+ " }\n",
+ " }\n",
+ " ]\n",
+ "}\n",
+ "\n"
+ ],
+ "text/plain": [
+ "\u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"What is stream processing?\"\u001b[0m,\n",
+ " \u001b[1;34m\"query_embedding_dimensions\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n",
+ " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_1\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.52\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"stream processing\"\u001b[0m,\n",
+ " \u001b[32m\"batch processing\"\u001b[0m,\n",
+ " \u001b[32m\"data pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"SDK\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Data Engineering\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Batch\"\u001b[0m,\n",
+ " \u001b[32m\"Big Data\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n",
+ " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n",
+ " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m0\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m,\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_2\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.5118\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"stream processing\"\u001b[0m,\n",
+ " \u001b[32m\"batch processing\"\u001b[0m,\n",
+ " \u001b[32m\"data pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"SDK\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Data Engineering\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Batch\"\u001b[0m,\n",
+ " \u001b[32m\"Big Data\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n",
+ " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n",
+ " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m1\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ "\u001b[1m}\u001b[0m\n"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "{\n",
+ " \"query\": \"How to handle real-time data?\",\n",
+ " \"query_embedding_dimensions\": 384,\n",
+ " \"results\": [\n",
+ " {\n",
+ " \"id\": \"3_1\",\n",
+ " \"distance\": 0.4392,\n",
+ " \"fields\": {\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Kafka: Distributed Streaming Platform\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Kafka\",\n",
+ " \"distributed systems\",\n",
+ " \"event streaming\",\n",
+ " \"message queue\",\n",
+ " \"real-time\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Streaming\",\n",
+ " \"Messaging\",\n",
+ " \"Distributed Systems\",\n",
+ " \"Open Source\",\n",
+ " \"Real-time\"\n",
+ " ],\n",
+ " \"category\": \"streaming-platform\",\n",
+ " \"doc_id\": \"3\",\n",
+ " \"chunk_index\": 0\n",
+ " }\n",
+ " }\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"1_1\",\n",
+ " \"distance\": 0.4366,\n",
+ " \"fields\": {\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Beam\",\n",
+ " \"stream processing\",\n",
+ " \"batch processing\",\n",
+ " \"data pipelines\",\n",
+ " \"SDK\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Data Engineering\",\n",
+ " \"Open Source\",\n",
+ " \"Streaming\",\n",
+ " \"Batch\",\n",
+ " \"Big Data\"\n",
+ " ],\n",
+ " \"category\": \"framework\",\n",
+ " \"doc_id\": \"1\",\n",
+ " \"chunk_index\": 0\n",
+ " }\n",
+ " }\n",
+ " }\n",
+ " ]\n",
+ "}\n",
+ "\n"
+ ],
+ "text/plain": [
+ "\u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"How to handle real-time data?\"\u001b[0m,\n",
+ " \u001b[1;34m\"query_embedding_dimensions\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n",
+ " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"3_1\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.4392\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Kafka: Distributed Streaming Platform\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Kafka\"\u001b[0m,\n",
+ " \u001b[32m\"distributed systems\"\u001b[0m,\n",
+ " \u001b[32m\"event streaming\"\u001b[0m,\n",
+ " \u001b[32m\"message queue\"\u001b[0m,\n",
+ " \u001b[32m\"real-time\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Messaging\"\u001b[0m,\n",
+ " \u001b[32m\"Distributed Systems\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Real-time\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"streaming-platform\"\u001b[0m,\n",
+ " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"3\"\u001b[0m,\n",
+ " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m0\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m,\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_1\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.4366\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"stream processing\"\u001b[0m,\n",
+ " \u001b[32m\"batch processing\"\u001b[0m,\n",
+ " \u001b[32m\"data pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"SDK\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Data Engineering\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Batch\"\u001b[0m,\n",
+ " \u001b[32m\"Big Data\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n",
+ " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n",
+ " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m0\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ "\u001b[1m}\u001b[0m\n"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "{\n",
+ " \"query\": \"What are the benefits of cloud services?\",\n",
+ " \"query_embedding_dimensions\": 384,\n",
+ " \"results\": [\n",
+ " {\n",
+ " \"id\": \"2_1\",\n",
+ " \"distance\": 0.4162,\n",
+ " \"fields\": {\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Google Cloud Dataflow: Run Apache Beam in the Cloud\",\n",
+ " \"keywords\": [\n",
+ " \"Google Cloud\",\n",
+ " \"Dataflow\",\n",
+ " \"Apache Beam\",\n",
+ " \"serverless\",\n",
+ " \"stream and batch\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Cloud Computing\",\n",
+ " \"Data Pipelines\",\n",
+ " \"Google Cloud\",\n",
+ " \"Serverless\",\n",
+ " \"Enterprise\"\n",
+ " ],\n",
+ " \"category\": \"cloud-service\",\n",
+ " \"doc_id\": \"2\",\n",
+ " \"chunk_index\": 0\n",
+ " }\n",
+ " }\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"1_2\",\n",
+ " \"distance\": 0.3556,\n",
+ " \"fields\": {\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Beam\",\n",
+ " \"stream processing\",\n",
+ " \"batch processing\",\n",
+ " \"data pipelines\",\n",
+ " \"SDK\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Data Engineering\",\n",
+ " \"Open Source\",\n",
+ " \"Streaming\",\n",
+ " \"Batch\",\n",
+ " \"Big Data\"\n",
+ " ],\n",
+ " \"category\": \"framework\",\n",
+ " \"doc_id\": \"1\",\n",
+ " \"chunk_index\": 1\n",
+ " }\n",
+ " }\n",
+ " }\n",
+ " ]\n",
+ "}\n",
+ "\n"
+ ],
+ "text/plain": [
+ "\u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"What are the benefits of cloud services?\"\u001b[0m,\n",
+ " \u001b[1;34m\"query_embedding_dimensions\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n",
+ " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"2_1\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.4162\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Google Cloud Dataflow: Run Apache Beam in the Cloud\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Google Cloud\"\u001b[0m,\n",
+ " \u001b[32m\"Dataflow\"\u001b[0m,\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"serverless\"\u001b[0m,\n",
+ " \u001b[32m\"stream and batch\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Cloud Computing\"\u001b[0m,\n",
+ " \u001b[32m\"Data Pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"Google Cloud\"\u001b[0m,\n",
+ " \u001b[32m\"Serverless\"\u001b[0m,\n",
+ " \u001b[32m\"Enterprise\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"cloud-service\"\u001b[0m,\n",
+ " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"2\"\u001b[0m,\n",
+ " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m0\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m,\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_2\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.3556\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"stream processing\"\u001b[0m,\n",
+ " \u001b[32m\"batch processing\"\u001b[0m,\n",
+ " \u001b[32m\"data pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"SDK\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Data Engineering\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Batch\"\u001b[0m,\n",
+ " \u001b[32m\"Big Data\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n",
+ " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n",
+ " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m1\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ "\u001b[1m}\u001b[0m\n"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "{\n",
+ " \"query\": \"How to build distributed systems?\",\n",
+ " \"query_embedding_dimensions\": 384,\n",
+ " \"results\": [\n",
+ " {\n",
+ " \"id\": \"3_1\",\n",
+ " \"distance\": 0.4383,\n",
+ " \"fields\": {\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Kafka: Distributed Streaming Platform\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Kafka\",\n",
+ " \"distributed systems\",\n",
+ " \"event streaming\",\n",
+ " \"message queue\",\n",
+ " \"real-time\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Streaming\",\n",
+ " \"Messaging\",\n",
+ " \"Distributed Systems\",\n",
+ " \"Open Source\",\n",
+ " \"Real-time\"\n",
+ " ],\n",
+ " \"category\": \"streaming-platform\",\n",
+ " \"doc_id\": \"3\",\n",
+ " \"chunk_index\": 0\n",
+ " }\n",
+ " }\n",
+ " },\n",
+ " {\n",
+ " \"id\": \"1_2\",\n",
+ " \"distance\": 0.3502,\n",
+ " \"fields\": {\n",
+ " \"metadata\": {\n",
+ " \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+ " \"keywords\": [\n",
+ " \"Apache Beam\",\n",
+ " \"stream processing\",\n",
+ " \"batch processing\",\n",
+ " \"data pipelines\",\n",
+ " \"SDK\"\n",
+ " ],\n",
+ " \"tags\": [\n",
+ " \"Data Engineering\",\n",
+ " \"Open Source\",\n",
+ " \"Streaming\",\n",
+ " \"Batch\",\n",
+ " \"Big Data\"\n",
+ " ],\n",
+ " \"category\": \"framework\",\n",
+ " \"doc_id\": \"1\",\n",
+ " \"chunk_index\": 1\n",
+ " }\n",
+ " }\n",
+ " }\n",
+ " ]\n",
+ "}\n",
+ "\n"
+ ],
+ "text/plain": [
+ "\u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"How to build distributed systems?\"\u001b[0m,\n",
+ " \u001b[1;34m\"query_embedding_dimensions\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n",
+ " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"3_1\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.4383\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Kafka: Distributed Streaming Platform\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Kafka\"\u001b[0m,\n",
+ " \u001b[32m\"distributed systems\"\u001b[0m,\n",
+ " \u001b[32m\"event streaming\"\u001b[0m,\n",
+ " \u001b[32m\"message queue\"\u001b[0m,\n",
+ " \u001b[32m\"real-time\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Messaging\"\u001b[0m,\n",
+ " \u001b[32m\"Distributed Systems\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Real-time\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"streaming-platform\"\u001b[0m,\n",
+ " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"3\"\u001b[0m,\n",
+ " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m0\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m,\n",
+ " \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_2\"\u001b[0m,\n",
+ " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.3502\u001b[0m,\n",
+ " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n",
+ " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n",
+ " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Apache Beam\"\u001b[0m,\n",
+ " \u001b[32m\"stream processing\"\u001b[0m,\n",
+ " \u001b[32m\"batch processing\"\u001b[0m,\n",
+ " \u001b[32m\"data pipelines\"\u001b[0m,\n",
+ " \u001b[32m\"SDK\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n",
+ " \u001b[32m\"Data Engineering\"\u001b[0m,\n",
+ " \u001b[32m\"Open Source\"\u001b[0m,\n",
+ " \u001b[32m\"Streaming\"\u001b[0m,\n",
+ " \u001b[32m\"Batch\"\u001b[0m,\n",
+ " \u001b[32m\"Big Data\"\u001b[0m\n",
+ " \u001b[1m]\u001b[0m,\n",
+ " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n",
+ " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n",
+ " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m1\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m}\u001b[0m\n",
+ " \u001b[1m]\u001b[0m\n",
+ "\u001b[1m}\u001b[0m\n"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "# Multiple queries to process\n",
+ "batch_queries = [\n",
+ " \"What is stream processing?\",\n",
+ " \"How to handle real-time data?\",\n",
+ " \"What are the benefits of cloud services?\",\n",
+ " \"How to build distributed systems?\"\n",
+ "]\n",
+ "\n",
+ "def create_query_chunk(query_text: str) -> Chunk:\n",
+ " \"\"\"Create a chunk with embedded query for search.\"\"\"\n",
+ " embedding = encode_embedding(query_text)\n",
+ " return Chunk(\n",
+ " content=Content(text=query_text),\n",
+ " embedding=Embedding(dense_embedding=embedding),\n",
+ " metadata={\"query_type\": \"batch_search\"}\n",
+ " )\n",
+ "\n",
+ "# Configure search for batch processing\n",
+ "batch_search_parameters = MilvusSearchParameters(\n",
+ " collection_name=collection_name,\n",
+ " search_strategy=VectorSearchParameters(limit=2, anns_field=\"embedding\"),\n",
+ " output_fields=[\"metadata\"]\n",
+ ")\n",
+ "\n",
+ "batch_handler = MilvusSearchEnrichmentHandler(\n",
+ " connection_parameters=milvus_connection_config,\n",
+ " search_parameters=batch_search_parameters,\n",
+ " collection_load_parameters=collection_load_parameters\n",
+ ")\n",
+ "\n",
+ "print(\"Running batch search processing...\")\n",
+ "\n",
+ "with beam.Pipeline() as p:\n",
+ " _ = (\n",
+ " p\n",
+ " | \"Create Batch Queries\" >> beam.Create(batch_queries)\n",
+ " | \"Convert to Chunks\" >> beam.Map(create_query_chunk)\n",
+ " | \"Batch Search\" >> Enrichment(batch_handler)\n",
+ " | \"Format Batch Results\" >> FormatAndPrintResults()\n",
+ " )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "performance_considerations",
+ "metadata": {},
+ "source": [
+ "## Performance Considerations\n",
+ "\n",
+ "When using Apache Beam with Milvus in production, consider:\n",
+ "\n",
+ "### Ingestion Performance\n",
+ "- **Batch Size**: Adjust `write_batch_size` in `MilvusWriteConfig` based on your data size and Milvus cluster capacity\n",
+ "- **Parallel Processing**: Use Apache Beam's natural parallelization for large document collections\n",
+ "- **Embedding Efficiency**: Consider using GPU-enabled workers for embedding generation\n",
+ "\n",
+ "### Search Performance\n",
+ "- **Collection Loading**: Ensure collections are loaded into memory for faster searches\n",
+ "- **Index Types**: Choose appropriate index types (IVF_FLAT, HNSW) based on your accuracy/speed requirements\n",
+ "- **Batch Searches**: Process multiple queries together to amortize connection overhead\n",
+ "\n",
+ "### Resource Management\n",
+ "- **Connection Pooling**: Configure connection parameters for production workloads\n",
+ "- **Memory Usage**: Monitor memory usage during embedding generation and vector storage\n",
+ "- **Network Optimization**: Place compute close to your Milvus cluster when possible"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "cleanup_section",
+ "metadata": {},
+ "source": [
+ "## Cleanup\n",
+ "\n",
+ "Don't forget to clean up resources when you're done."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 26,
+ "id": "cleanup_resources",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Milvus test container stopped\n"
+ ]
+ }
+ ],
+ "source": [
+ "# Clean up the test database container\n",
+ "if db:\n",
+ " MilvusTestHelpers.stop_db_container(db)\n",
+ " db = None\n",
+ " print(\"Milvus test container stopped\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "next_steps",
+ "metadata": {},
+ "source": [
+ "## Resources\n",
+ "- [Apache Beam Milvus I/O Documentation](https://beam.apache.org/documentation/io/built-in/milvus/)\n",
+ "- [Apache Beam ML Transforms](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.transforms.html)\n",
+ "- [Milvus Documentation](https://milvus.io/docs)\n",
+ "- [Apache Beam RAG Package](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.rag.html)"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3 (ipykernel)",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.9.22"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/website/www/site/content/en/documentation/io/built-in/milvus.md b/website/www/site/content/en/documentation/io/built-in/milvus.md
new file mode 100644
index 000000000000..bb95cda1a6d8
--- /dev/null
+++ b/website/www/site/content/en/documentation/io/built-in/milvus.md
@@ -0,0 +1,319 @@
+---
+title: "Milvus I/O connector"
+---
+
+
+[Built-in I/O Transforms](/documentation/io/built-in/)
+
+# Milvus I/O connector
+
+The Beam SDKs include built-in transforms that can write data to [Milvus](https://milvus.io/) vector databases. Milvus is a high-performance, cloud-native vector database designed for machine learning and AI applications.
+
+## Before you start
+
+To use MilvusIO, you need to install the required dependencies. The Milvus I/O connector is part of the ML/RAG functionality in Apache Beam.
+
+```python
+pip install apache-beam[milvus,gcp]
+```
+
+**Additional resources:**
+
+* [MilvusIO source code](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/rag/ingestion/milvus_search.py)
+* [MilvusIO Pydoc](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.rag.ingestion.milvus_search.html)
+* [Milvus Documentation](https://milvus.io/docs)
+
+## Overview
+
+The Milvus I/O connector provides a sink for writing vector embeddings and
+associated metadata to Milvus collections. This connector is specifically
+designed for RAG (Retrieval-Augmented Generation) use cases where you need to
+store document chunks with their vector embeddings for similarity search.
+
+### Key Features
+
+- **Vector Database Integration**: Write embeddings and metadata to Milvus collections
+- **RAG-Optimized**: Built specifically for RAG workflows with document chunks
+- **Batch Processing**: Efficient batched writes to optimize performance
+- **Flexible Schema**: Configurable column mappings for different data schemas
+- **Connection Management**: Proper connection lifecycle management with context managers
+
+## Writing to Milvus
+
+### Basic Usage
+
+```python
+import apache_beam as beam
+from apache_beam.ml.rag.ingestion.milvus_search import MilvusVectorWriterConfig
+from apache_beam.ml.rag.utils import MilvusConnectionConfig
+
+# Configure connection to Milvus.
+connection_config = MilvusConnectionConfig(
+ uri="http://localhost:19530", # Milvus server URI
+ db_name="default" # Database name
+)
+
+# Configure write settings.
+write_config = MilvusWriteConfig(
+ collection_name="document_embeddings",
+ write_batch_size=1000
+)
+
+# Create the writer configuration.
+milvus_config = MilvusVectorWriterConfig(
+ connection_params=connection_config,
+ write_config=write_config
+)
+
+# Use in a pipeline.
+with beam.Pipeline() as pipeline:
+ chunks = (
+ pipeline
+ | "Read Data" >> beam.io.ReadFromText("input.txt")
+ | "Process to Chunks" >> beam.Map(process_to_chunks)
+ )
+
+ # Write to Milvus.
+ chunks | "Write to Milvus" >> milvus_config.create_write_transform()
+```
+
+### Configuration Options
+
+#### Connection Configuration
+
+```python
+from apache_beam.ml.rag.utils import MilvusConnectionConfig
+
+connection_config = MilvusConnectionConfig(
+ uri="http://localhost:19530", # Milvus server URI
+ token="your_token", # Authentication token (optional)
+ db_name="vector_db", # Database name
+ timeout=30.0 # Connection timeout in seconds
+)
+```
+
+#### Write Configuration
+
+```python
+from apache_beam.ml.rag.ingestion.milvus_search import MilvusWriteConfig
+
+write_config = MilvusWriteConfig(
+ collection_name="embeddings", # Target collection name
+ partition_name="", # Partition name (optional)
+ timeout=60.0, # Write operation timeout
+ write_batch_size=1000 # Number of records per batch
+)
+```
+
+### Working with Chunks
+
+The Milvus I/O connector is designed to work with `Chunk` objects that contain
+document content and embeddings:
+
+```python
+from apache_beam.ml.rag.types import Chunk
+import numpy as np
+
+def create_chunk_example():
+ return Chunk(
+ id="doc_1_chunk_1",
+ content="This is the document content...",
+ embedding=[0.1, 0.2, 0.3, 0.4, 0.5], # Dense embedding vector
+ sparse_embedding={"token_1": 0.5, "token_2": 0.3}, # Sparse embedding (optional)
+ metadata={"source": "document.pdf", "page": 1}
+ )
+```
+
+### Custom Column Specifications
+
+You can customize how chunk fields are mapped to Milvus collection fields:
+
+```python
+from apache_beam.ml.rag.ingestion.postgres_common import ColumnSpec
+
+# Define custom column mappings.
+custom_column_specs = [
+ ColumnSpec(
+ column_name="doc_id",
+ value_fn=lambda chunk: chunk.id
+ ),
+ ColumnSpec(
+ column_name="vector",
+ value_fn=lambda chunk: list(chunk.embedding)
+ ),
+ ColumnSpec(
+ column_name="text_content",
+ value_fn=lambda chunk: chunk.content
+ ),
+ ColumnSpec(
+ column_name="document_metadata",
+ value_fn=lambda chunk: dict(chunk.metadata)
+ )
+]
+
+# Use custom column specs.
+milvus_config = MilvusVectorWriterConfig(
+ connection_params=connection_config,
+ write_config=write_config,
+ column_specs=custom_column_specs
+)
+```
+
+## Complete Example
+
+Here's a complete example that processes documents and writes them to Milvus:
+
+```python
+import apache_beam as beam
+from apache_beam.ml.rag.ingestion.milvus_search import (
+ MilvusVectorWriterConfig,
+ MilvusWriteConfig
+)
+from apache_beam.ml.rag.utils import MilvusConnectionConfig
+from apache_beam.ml.rag.types import Chunk
+import numpy as np
+
+def process_document(document_text):
+ """Process a document into chunks with embeddings."""
+ # This is a simplified example - in practice you would:
+ # 1. Split document into chunks
+ # 2. Generate embeddings using a model
+ # 3. Extract metadata
+
+ chunks = []
+ sentences = document_text.split('.')
+
+ for i, sentence in enumerate(sentences):
+ if sentence.strip():
+ # Generate mock embedding (replace with real embedding model).
+ embedding = np.random.rand(384).tolist() # 384-dimensional vector
+
+ chunk = Chunk(
+ id=f"doc_chunk_{i}",
+ content=sentence.strip(),
+ embedding=embedding,
+ metadata={"chunk_index": i, "length": len(sentence)}
+ )
+ chunks.append(chunk)
+
+ return chunks
+
+def run_pipeline():
+ # Configure Milvus connection.
+ connection_config = MilvusConnectionConfig(
+ uri="http://localhost:19530",
+ db_name="rag_database"
+ )
+
+ # Configure write settings.
+ write_config = MilvusWriteConfig(
+ collection_name="document_chunks",
+ write_batch_size=500
+ )
+
+ # Create writer configuration.
+ milvus_config = MilvusVectorWriterConfig(
+ connection_params=connection_config,
+ write_config=write_config
+ )
+
+ # Define pipeline.
+ with beam.Pipeline() as pipeline:
+ documents = (
+ pipeline
+ | "Create Sample Documents" >> beam.Create([
+ "First document content. It has multiple sentences.",
+ "Second document with different content. More sentences here."
+ ])
+ )
+
+ chunks = (
+ documents
+ | "Process Documents" >> beam.FlatMap(process_document)
+ )
+
+ # Write to Milvus.
+ chunks | "Write to Milvus" >> milvus_config.create_write_transform()
+
+if __name__ == "__main__":
+ run_pipeline()
+```
+
+## Performance Considerations
+
+### Batch Size Optimization
+
+The write batch size significantly affects performance. Larger batches reduce
+the number of network round-trips but consume more memory:
+
+```python
+# For high-throughput scenarios.
+write_config = MilvusWriteConfig(
+ collection_name="large_collection",
+ write_batch_size=2000 # Larger batches for better throughput
+)
+
+# For memory-constrained environments.
+write_config = MilvusWriteConfig(
+ collection_name="small_collection",
+ write_batch_size=100 # Smaller batches to reduce memory usage
+)
+```
+
+### Production Configuration
+
+For production deployments, consider using appropriate timeout settings and
+connection parameters:
+
+```python
+connection_config = MilvusConnectionConfig(
+ uri="http://milvus-cluster:19530",
+ timeout=120.0, # Longer timeout for production workloads
+ db_name="production_db",
+ token="your_production_token" # Using authentication in production
+)
+```
+
+## Error Handling
+
+The connector includes built-in error handling and logging. Monitor your
+pipeline logs for any connection or write failures:
+
+```python
+import logging
+
+# Enable debug logging to see detailed operation information.
+logging.basicConfig(level=logging.DEBUG)
+logger = logging.getLogger(__name__)
+
+# In your processing function.
+def safe_process_document(document):
+ try:
+ return process_document(document)
+ except Exception as e:
+ logger.error(f"Failed to process document: {e}")
+ return [] # Return empty list on failure
+```
+
+## Notebook exmaple
+
+
+
+
+
+
+## Related transforms
+
+- [Milvus Enrichment Handler in Apache Beam](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/rag/enrichment/milvus_search.py)
diff --git a/website/www/site/content/en/documentation/io/connectors.md b/website/www/site/content/en/documentation/io/connectors.md
index 9797195518e9..078f83e09739 100644
--- a/website/www/site/content/en/documentation/io/connectors.md
+++ b/website/www/site/content/en/documentation/io/connectors.md
@@ -1049,6 +1049,21 @@ This table provides a consolidated, at-a-glance overview of the available built-
✔ |
✘ |
+
+ | MilvusIO (guide) |
+ ✘ |
+ ✔ |
+ Not available |
+
+ ✔
+ native
+ |
+ Not available |
+ Not available |
+ ✔ |
+ ✘ |
+ ✘ |
+
|
Iceberg (Managed I/O)
diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html
index 2386ecb39d9d..8ae9afd97775 100755
--- a/website/www/site/layouts/partials/section-menu/en/documentation.html
+++ b/website/www/site/layouts/partials/section-menu/en/documentation.html
@@ -73,6 +73,7 @@
Hadoop Input/Output Format IO
HCatalog IO
Google BigQuery I/O connector
+ Milvus I/O connector
Snowflake I/O connector
CDAP I/O connector
Spark Receiver I/O connector
|