From c5da020440bb7049888c8ac749a5f2c5eb7b8362 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Fri, 22 Aug 2025 20:41:09 +0000 Subject: [PATCH 1/4] examples: add milvus vector ingestion/search notebook --- .../milvus_vector_ingestion_and_search.ipynb | 2043 +++++++++++++++++ 1 file changed, 2043 insertions(+) create mode 100644 examples/notebooks/beam-ml/milvus_vector_ingestion_and_search.ipynb diff --git a/examples/notebooks/beam-ml/milvus_vector_ingestion_and_search.ipynb b/examples/notebooks/beam-ml/milvus_vector_ingestion_and_search.ipynb new file mode 100644 index 000000000000..70e5291ccd68 --- /dev/null +++ b/examples/notebooks/beam-ml/milvus_vector_ingestion_and_search.ipynb @@ -0,0 +1,2043 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "47053bac", + "metadata": { + "cellView": "form" + }, + "outputs": [], + "source": [ + "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n", + "\n", + "# Licensed to the Apache Software Foundation (ASF) under one\n", + "# or more contributor license agreements. See the NOTICE file\n", + "# distributed with this work for additional information\n", + "# regarding copyright ownership. The ASF licenses this file\n", + "# to you under the Apache License, Version 2.0 (the\n", + "# \"License\"); you may not use this file except in compliance\n", + "# with the License. You may obtain a copy of the License at\n", + "#\n", + "# http://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing,\n", + "# software distributed under the License is distributed on an\n", + "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n", + "# KIND, either express or implied. See the License for the\n", + "# specific language governing permissions and limitations\n", + "# under the License" + ] + }, + { + "cell_type": "markdown", + "id": "57ec3e65-b8e5-4f88-adcf-a56e32f91dc6", + "metadata": {}, + "source": [ + "# Embedding Ingestion and Vector Search with Apache Beam and Milvus\n", + "\n", + "\n", + " \n", + " \n", + "
\n", + " Run in Google Colab\n", + " \n", + " View source on GitHub\n", + "
" + ] + }, + { + "cell_type": "markdown", + "id": "0611da21-d031-4b16-8301-9b76bda731e7", + "metadata": {}, + "source": [ + "This notebook demonstrates a complete RAG (Retrieval-Augmented Generation) workflow using Apache Beam and [Milvus](https://milvus.io/). \n", + "\n", + "The example shows both ingestion and search phases:\n", + "\n", + "**Part 1: Vector Ingestion** - Using the Apache Beam [Milvus I/O connector](https://beam.apache.org/documentation/io/built-in/milvus/) to:\n", + "- Process documents into chunks\n", + "- Generate embeddings\n", + "- Store vectors and metadata in Milvus\n", + "\n", + "**Part 2: Vector Search** - Using the Apache Beam [Milvus enrichment transform](https://beam.apache.org/documentation/transforms/python/elementwise/enrichment-milvus) to:\n", + "- Perform vector similarity search\n", + "- Execute hybrid search (vector + keyword)\n", + "- Apply metadata filters\n", + "\n", + "## Use Case: Technical Documentation Search\n", + "\n", + "This example implements a technical documentation search system that can:\n", + "- Ingest technical articles and documentation\n", + "- Perform semantic search to find relevant content\n", + "- Use hybrid search for improved accuracy\n", + "- Filter results by metadata (tags, categories, etc.)\n", + "\n", + "## Before you begin\n", + "Set up your environment and download dependencies.\n", + "\n", + "### Install Apache Beam\n", + "To use the Milvus I/O connector and enrichment transform, install Apache Beam version 2.67.0 or later." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "e550cd55-e91e-4d43-b1bd-b0e89bb8cbd9", + "metadata": {}, + "outputs": [], + "source": [ + "# Disable tokenizers parallelism to prevent deadlocks when forking processes\n", + "# This avoids the \"huggingface/tokenizers: The current process just got forked\" warning.\n", + "import os\n", + "os.environ[\"TOKENIZERS_PARALLELISM\"] = \"false\"" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "31747c45-107a-49be-8885-5a6cc9dc1236", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip is available: \u001b[0m\u001b[31;49m23.0.1\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m25.2\u001b[0m\n", + "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n", + "\n", + "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip is available: \u001b[0m\u001b[31;49m23.0.1\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m25.2\u001b[0m\n", + "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n", + "\n", + "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip is available: \u001b[0m\u001b[31;49m23.0.1\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m25.2\u001b[0m\n", + "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n" + ] + } + ], + "source": [ + "# Install required packages\n", + "!pip install rich sentence_transformers llama_index --quiet\n", + "!pip install apache_beam[milvus,gcp,test,interactive]>=2.67.0 --quiet\n", + "!pip install pymilvus --quiet" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "666e0c2b-0341-4b0e-8d73-561abc39bb10", + "metadata": {}, + "outputs": [], + "source": [ + "# Standard library imports\n", + "from collections import defaultdict\n", + "from math import ceil\n", + "from typing import List, Dict, Any\n", + "import tempfile\n", + "\n", + "# Third-party imports\n", + "import apache_beam as beam\n", + "from apache_beam.ml.rag.types import Chunk, Content, Embedding\n", + "from apache_beam.transforms.enrichment import Enrichment\n", + "from apache_beam.ml.transforms.base import MLTransform\n", + "import numpy as np\n", + "import pandas as pd\n", + "from pymilvus import DataType, CollectionSchema, FieldSchema, Function, FunctionType, MilvusClient, RRFRanker\n", + "from pymilvus.milvus_client import IndexParams\n", + "from rich import print_json\n", + "from sentence_transformers import SentenceTransformer\n", + "from torch import cuda\n", + "from llama_index.core.text_splitter import SentenceSplitter\n", + "\n", + "# Apache Beam ML/RAG imports\n", + "from apache_beam.ml.rag.ingestion.milvus_search import MilvusVectorWriterConfig, MilvusWriteConfig\n", + "from apache_beam.ml.rag.utils import MilvusConnectionConfig\n", + "from apache_beam.ml.rag.embeddings.huggingface import HuggingfaceTextEmbeddings\n", + "from apache_beam.ml.rag.utils import MilvusConnectionConfig\n", + "from apache_beam.ml.rag.enrichment.milvus_search import (\n", + " HybridSearchParameters, \n", + " KeywordSearchMetrics, \n", + " KeywordSearchParameters,\n", + " MilvusCollectionLoadParameters, \n", + " MilvusSearchEnrichmentHandler,\n", + " MilvusSearchParameters, \n", + " SearchStrategy, \n", + " VectorSearchMetrics, \n", + " VectorSearchParameters\n", + ")\n", + "from apache_beam.ml.rag.ingestion.jdbc_common import WriteConfig \n", + "from apache_beam.ml.rag.test_utils import MilvusTestHelpers" + ] + }, + { + "cell_type": "markdown", + "id": "338808ff-3f80-48e5-9c76-b8d19f8769b7", + "metadata": {}, + "source": [ + "# Part 1: Vector Ingestion\n", + "\n", + "In this section, we'll demonstrate how to use Apache Beam's Milvus I/O connector to ingest documents, generate embeddings, and store them in Milvus.\n", + "\n", + "## Collect Sample Data" + ] + }, + { + "cell_type": "markdown", + "id": "d83ad549-5ee1-4a4c-ae5a-e638c3d0279f", + "metadata": {}, + "source": [ + "The following content represents technical documentation that would typically be processed in a RAG system." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "38781cf5-e18f-40f5-827e-2d441ae7d2fa", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Created corpus with 3 documents\n" + ] + } + ], + "source": [ + "corpus = [\n", + " {\n", + " \"id\": \"1\",\n", + " \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n", + " \"keywords\": [\"Apache Beam\", \"stream processing\", \"batch processing\", \"data pipelines\", \"SDK\"],\n", + " \"tags\": [\"Data Engineering\", \"Open Source\", \"Streaming\", \"Batch\", \"Big Data\"],\n", + " \"category\": \"framework\",\n", + " \"content\": (\n", + " \"Apache Beam is an open-source framework that provides a consistent programming model for both batch and streaming data processing. \"\n", + " \"Developed originally by Google, it allows developers to write pipelines that can run on multiple engines, such as Apache Flink, Spark, and Google Cloud Dataflow. \"\n", + " \"Beam uses abstractions like PCollections (data containers) and PTransforms (operations) to define the flow of data. \"\n", + " \"The framework promotes portability through its runner architecture, letting the same pipeline execute on different backends. \"\n", + " \"Support for multiple SDKs, including Java and Python, makes it accessible for a broad audience. \"\n", + " \"Key features include support for event time, windowing, triggers, and stateful processing, which are essential for handling real-time data effectively. \"\n", + " \"Beam is ideal for building ETL jobs, real-time analytics, and machine learning data pipelines. \"\n", + " \"It helps teams focus on logic rather than infrastructure, offering flexibility and scalability in handling unbounded and bounded data sources. \"\n", + " \"Apache Beam also supports a wide range of connectors for both input and output, including Kafka, BigQuery, and JDBC-based systems. \"\n", + " \"This makes it easy to integrate Beam into existing data ecosystems. Developers can build reusable transforms and modularize pipeline logic, improving maintainability and testing. \"\n", + " \"The concept of runners enables developers to write once and run anywhere, which is particularly appealing for organizations that want to avoid vendor lock-in. \"\n", + " \"The Beam model is based on a unified programming model that decouples pipeline logic from execution. \"\n", + " \"This makes it easier to reason about time and state in both batch and streaming pipelines. \"\n", + " \"Advanced features like late data handling, watermarks, and session windowing allow for more accurate and meaningful processing of real-world data. \"\n", + " \"Beam also integrates with orchestration tools and monitoring systems, allowing for production-grade deployments. \"\n", + " \"Community support and contributions have grown significantly, making Beam a stable and evolving ecosystem. \"\n", + " \"Many cloud providers offer native support for Beam pipelines, and it's increasingly a core component in modern data platform architectures.\"\n", + " )\n", + " },\n", + " {\n", + " \"id\": \"2\",\n", + " \"title\": \"Google Cloud Dataflow: Run Apache Beam in the Cloud\",\n", + " \"keywords\": [\"Google Cloud\", \"Dataflow\", \"Apache Beam\", \"serverless\", \"stream and batch\"],\n", + " \"tags\": [\"Cloud Computing\", \"Data Pipelines\", \"Google Cloud\", \"Serverless\", \"Enterprise\"],\n", + " \"category\": \"cloud-service\",\n", + " \"content\": (\n", + " \"Google Cloud Dataflow is a fully managed service that runs Apache Beam pipelines in the cloud. \"\n", + " \"It abstracts away infrastructure management and handles dynamic scaling, load balancing, and fault tolerance. \"\n", + " \"Developers can focus on writing data logic using the Beam SDK and deploy it easily to Google Cloud. \"\n", + " \"Dataflow supports both batch and stream processing and integrates seamlessly with other Google services like BigQuery, Pub/Sub, and Cloud Storage. \"\n", + " \"Its autoscaling capabilities allow it to adapt to changing data volumes, optimizing for cost and performance. \"\n", + " \"Features like monitoring dashboards, job templates, and built-in logging make it suitable for both development and production use. \"\n", + " \"With support for event time processing, stateful functions, and windowing, Dataflow is well-suited for real-time analytics and data transformation tasks. \"\n", + " \"It's a key component for architects building scalable, cloud-native data platforms. \"\n", + " \"Dataflow also offers templates for common ETL tasks, helping teams get started quickly with minimal setup. \"\n", + " \"Its integration with Cloud Functions and Cloud Composer enables event-driven and orchestrated workflows. \"\n", + " \"Security and compliance are built-in with IAM roles, encryption at rest and in transit, and audit logging, making it suitable for enterprise environments. \"\n", + " \"For developers, Dataflow provides local testing capabilities and a unified logging system through Cloud Logging. \"\n", + " \"It also supports SQL-based pipeline definitions using BigQuery, which lowers the barrier to entry for analysts and data engineers. \"\n", + " \"Dataflow's streaming engine significantly improves performance and reduces costs by decoupling compute and state management. \"\n", + " \"In summary, Google Cloud Dataflow not only simplifies the deployment of Apache Beam pipelines but also enhances them with cloud-native features. \"\n", + " \"Its managed runtime, high availability, and integration with the broader Google Cloud ecosystem make it a powerful tool for modern data processing.\"\n", + " )\n", + " },\n", + " {\n", + " \"id\": \"3\",\n", + " \"title\": \"Apache Kafka: Distributed Streaming Platform\",\n", + " \"keywords\": [\"Apache Kafka\", \"distributed systems\", \"event streaming\", \"message queue\", \"real-time\"],\n", + " \"tags\": [\"Streaming\", \"Messaging\", \"Distributed Systems\", \"Open Source\", \"Real-time\"],\n", + " \"category\": \"streaming-platform\",\n", + " \"content\": (\n", + " \"Apache Kafka is a distributed streaming platform that enables you to build real-time data pipelines and streaming applications. \"\n", + " \"Originally developed by LinkedIn and later open-sourced as an Apache project, Kafka handles high-throughput, fault-tolerant streaming of data between systems. \"\n", + " \"Kafka is built around the concept of distributed commit logs and provides a unified platform for handling all real-time data feeds. \"\n", + " \"The platform excels at three key capabilities: publishing and subscribing to streams of records, storing streams of records durably and reliably, and processing streams of records in real-time. \"\n", + " \"Kafka's distributed architecture allows it to scale horizontally across multiple servers, providing both high availability and fault tolerance. \"\n", + " \"Key components include producers (which send data to Kafka), consumers (which read data from Kafka), brokers (Kafka servers), and Kafka Connect for integration with external systems. \"\n", + " \"The platform supports various messaging patterns including pub-sub, point-to-point, and request-reply, making it versatile for different use cases. \"\n", + " \"Kafka is widely used for building data lakes, stream processing applications, event sourcing architectures, and real-time analytics systems. \"\n", + " \"It integrates seamlessly with popular stream processing frameworks like Apache Beam, Apache Flink, and Apache Storm. \"\n", + " \"Common use cases include activity tracking, metrics collection, log aggregation, stream processing, event sourcing, and commit log services. \"\n", + " \"Kafka's ecosystem includes Kafka Streams for building streaming applications directly, Schema Registry for managing data schemas, and various connectors for integration. \"\n", + " \"The platform is designed to handle millions of messages per second with low latency, making it suitable for mission-critical applications.\"\n", + " )\n", + " }\n", + "]\n", + "\n", + "print(f\"Created corpus with {len(corpus)} documents\")" + ] + }, + { + "cell_type": "markdown", + "id": "758c2af7-12c7-477b-9257-3c88712960e7", + "metadata": {}, + "source": [ + "## Exploratory Data Analysis (EDA)" + ] + }, + { + "cell_type": "markdown", + "id": "5e751905-7217-4571-bc07-991ef850a6b2", + "metadata": {}, + "source": [ + "### Average Words/Tokens per Document" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "489e93b6-de41-4ec3-be33-a15c3cba12e8", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
# Words
count3.0
mean277.0
std35.0
min242.0
25%259.5
50%277.0
75%294.5
max312.0
\n", + "
" + ], + "text/plain": [ + " # Words\n", + "count 3.0\n", + "mean 277.0\n", + "std 35.0\n", + "min 242.0\n", + "25% 259.5\n", + "50% 277.0\n", + "75% 294.5\n", + "max 312.0" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "contents = [c['content'] for c in corpus]\n", + "content_lengths = [len(content.split(\" \")) for content in contents]\n", + "df = pd.DataFrame(content_lengths, columns=['# Words'])\n", + "df.describe()" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "eb32aad0-febd-45af-b4bd-e2176b07e2dc", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "The mean word count for each document is about 277 words, which corresponds to a rough token count of 361 tokens.\n" + ] + } + ], + "source": [ + "mean_word_count = ceil(np.mean(content_lengths))\n", + "token_to_word_ratio = 1.3\n", + "approx_token_count = ceil(mean_word_count * token_to_word_ratio)\n", + "print(f'The mean word count for each document is about {mean_word_count} words, which corresponds to a rough token count of {approx_token_count} tokens.')" + ] + }, + { + "cell_type": "markdown", + "id": "42c1c159-875d-411b-a009-4361301b39f6", + "metadata": {}, + "source": [ + "## Document Processing and Chunking" + ] + }, + { + "cell_type": "markdown", + "id": "d545355e-41da-4c53-ba9a-4d33b1fe376c", + "metadata": {}, + "source": [ + "### Text Splitting Strategy\n", + "\n", + "We'll use sentence splitting as the chunking strategy for optimal semantic coherence. The chunk size is constrained by our embedding model's token limit." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "e7e45d70-0c23-409d-b435-b9479245c1ff", + "metadata": {}, + "outputs": [], + "source": [ + "# The `chunk_size` parameter is constrained by the embedding model we're using.\n", + "# Since we're using `sentence-transformers/all-MiniLM-L6-v2`, which has a maximum token limit of ~384 tokens,\n", + "# we need to ensure chunk sizes stay well within that limit.\n", + "chunk_size = 256\n", + "llama_txt_splitter = SentenceSplitter(chunk_size=chunk_size, chunk_overlap=20)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "5a013b08-d7e7-4367-ad49-43ad1320158f", + "metadata": {}, + "outputs": [], + "source": [ + "def split_contents(corpus: list[dict], text_splitter: SentenceSplitter, content_field: str='content') -> list[list[str]]:\n", + " result = []\n", + " for doc in corpus:\n", + " split = text_splitter.split_text(doc[content_field])\n", + " result.append(split)\n", + " return result" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "2d5ea747-40b3-474e-ac36-ccb81256a36c", + "metadata": {}, + "outputs": [], + "source": [ + "content_splits = split_contents(corpus, llama_txt_splitter, \"content\")" + ] + }, + { + "cell_type": "markdown", + "id": "c860e558-2da3-45a6-9e54-acb8b4ffab22", + "metadata": {}, + "source": [ + "## Create Chunks for Ingestion\n", + "\n", + "We'll convert our processed documents into `Chunk` objects that can be used with Apache Beam's ML transforms." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "create_chunks_function", + "metadata": {}, + "outputs": [], + "source": [ + "def create_chunks_from_corpus(corpus: list[dict], content_splits: list[list[str]]) -> list[Chunk]:\n", + " \"\"\"Convert corpus and content splits into Chunk objects.\"\"\"\n", + " chunks = []\n", + " \n", + " for doc_idx, splits in enumerate(content_splits):\n", + " doc = corpus[doc_idx]\n", + " \n", + " for chunk_idx, chunk_text in enumerate(splits):\n", + " chunk_id = f\"{doc['id']}_{chunk_idx + 1}\"\n", + " \n", + " # Create chunk with metadata from original document\n", + " chunk = Chunk(\n", + " id=chunk_id,\n", + " content=Content(text=chunk_text),\n", + " metadata={\n", + " \"title\": doc[\"title\"],\n", + " \"keywords\": doc[\"keywords\"],\n", + " \"tags\": doc[\"tags\"],\n", + " \"category\": doc[\"category\"],\n", + " \"doc_id\": doc[\"id\"],\n", + " \"chunk_index\": chunk_idx\n", + " }\n", + " )\n", + " chunks.append(chunk)\n", + " \n", + " return chunks" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "create_chunks_call", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Created 6 chunks from 3 documents\n" + ] + } + ], + "source": [ + "chunks = create_chunks_from_corpus(corpus, content_splits)\n", + "print(f\"Created {len(chunks)} chunks from {len(corpus)} documents\")" + ] + }, + { + "cell_type": "markdown", + "id": "765115e1-4327-44f6-9dff-5d79121eeb02", + "metadata": {}, + "source": [ + "## Milvus Vector Ingestion Pipeline\n", + "\n", + "Now we'll create an Apache Beam pipeline that:\n", + "1. Takes our `Chunk` objects\n", + "2. Generates embeddings using HuggingFace\n", + "3. Writes the vectors and metadata to Milvus" + ] + }, + { + "cell_type": "markdown", + "id": "3889aaa4-3c0c-4d71-bad3-b196b5eac8dc", + "metadata": {}, + "source": [ + "### Setup Milvus Database\n", + "\n", + "We'll use a test container for this demonstration. In production, you would connect to your Milvus cluster." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "5ae9bc82-9ad7-46dd-b254-19cbdcdd0e07", + "metadata": {}, + "outputs": [], + "source": [ + "db = None" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "aff7b261-3330-4fa9-9a54-3fd87b42521f", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Pulling image testcontainers/ryuk:0.8.1\n", + "Container started: be33f4bf998f\n", + "Waiting for container with image testcontainers/ryuk:0.8.1 to be ready ...\n", + "Pulling image milvusdb/milvus:latest\n", + "Container started: b72a3b645acc\n", + "Waiting for container with image milvusdb/milvus:latest to be ready ...\n", + "Waiting for container with image milvusdb/milvus:latest to be ready ...\n", + "Waiting for container with image milvusdb/milvus:latest to be ready ...\n", + "Waiting for container with image milvusdb/milvus:latest to be ready ...\n", + "Waiting for container with image milvusdb/milvus:latest to be ready ...\n", + "Waiting for container with image milvusdb/milvus:latest to be ready ...\n", + "Waiting for container with image milvusdb/milvus:latest to be ready ...\n", + "Waiting for container with image milvusdb/milvus:latest to be ready ...\n", + "Waiting for container with image milvusdb/milvus:latest to be ready ...\n" + ] + } + ], + "source": [ + "if not db:\n", + " db = MilvusTestHelpers.start_db_container()" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "31496ee0-75a2-48ad-954e-9c4ae5abbf5e", + "metadata": {}, + "outputs": [], + "source": [ + "# Configure connection parameters for both ingestion and search.\n", + "milvus_connection_config = MilvusConnectionConfig(\n", + " uri=db.uri, \n", + " user=db.user, \n", + " password=db.password, \n", + " db_name=db.db_id\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "configure_ingestion", + "metadata": {}, + "source": [ + "### Configure Milvus Ingestion\n", + "\n", + "We configure the Milvus I/O connector with:\n", + "- Connection settings\n", + "- Collection name and write batch size\n", + "- Embedding model for generating vectors" + ] + }, + { + "cell_type": "markdown", + "id": "770t460e9yy", + "metadata": {}, + "source": [ + "### Create Collection Manually\n", + "\n", + "Before using the Apache Beam I/O connector, let's create the Milvus collection with proper schema and indexes using the pymilvus client directly." + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "rg52ap2nwu9", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Created collection 'beam_technical_docs' with schema:\n", + " - id: 21 (primary)\n", + " - embedding: 101 \n", + " - content: 21 \n", + " - metadata: 23 \n", + "Created COSINE similarity index for 'embedding' field\n", + "Collection 'beam_technical_docs' is ready for ingestion!\n" + ] + } + ], + "source": [ + "# Collection name for storing our vectors\n", + "collection_name = \"beam_technical_docs\"\n", + "\n", + "# Create Milvus client\n", + "client = MilvusClient(\n", + " uri=db.uri,\n", + " user=db.user,\n", + " password=db.password,\n", + " db_name=db.db_id\n", + ")\n", + "\n", + "# Define collection schema with fields that match our RAG use case\n", + "fields = [\n", + " FieldSchema(name=\"id\", dtype=DataType.VARCHAR, max_length=100, is_primary=True),\n", + " FieldSchema(name=\"embedding\", dtype=DataType.FLOAT_VECTOR, dim=384), # sentence-transformers/all-MiniLM-L6-v2 dimension\n", + " FieldSchema(name=\"content\", dtype=DataType.VARCHAR, max_length=2000),\n", + " # Metadata fields from our chunks\n", + " FieldSchema(name=\"metadata\", dtype=DataType.JSON) # Complete metadata as JSON\n", + "]\n", + "\n", + "schema = CollectionSchema(fields=fields, description=\"Technical documentation collection for RAG\")\n", + "\n", + "# Create collection if it doesn't exist\n", + "if client.has_collection(collection_name):\n", + " print(f\"Collection '{collection_name}' already exists\")\n", + " client.drop_collection(collection_name)\n", + " print(f\"Dropped existing collection '{collection_name}'\")\n", + "\n", + "client.create_collection(\n", + " collection_name=collection_name,\n", + " schema=schema\n", + ")\n", + "\n", + "print(f\"Created collection '{collection_name}' with schema:\")\n", + "for field in fields:\n", + " print(f\" - {field.name}: {field.dtype} {'(primary)' if field.is_primary else ''}\")\n", + "\n", + "# Create index for the vector field for efficient similarity search\n", + "index_params = IndexParams()\n", + "index_params.add_index(\n", + " field_name=\"embedding\",\n", + " index_type=\"IVF_FLAT\",\n", + " metric_type=\"COSINE\",\n", + " params={\"nlist\": 128}\n", + ")\n", + "\n", + "client.create_index(\n", + " collection_name=collection_name,\n", + " index_params=index_params\n", + ")\n", + "\n", + "print(f\"Created COSINE similarity index for 'embedding' field\")\n", + "print(f\"Collection '{collection_name}' is ready for ingestion!\")" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "configure_milvus_writer", + "metadata": {}, + "outputs": [], + "source": [ + "# Configure write settings (collection already created above).\n", + "write_config = MilvusWriteConfig(\n", + " collection_name=collection_name,\n", + " write_config=WriteConfig(write_batch_size=100)\n", + ")\n", + "\n", + "# Configure Milvus writer\n", + "milvus_config = MilvusVectorWriterConfig(\n", + " connection_params=milvus_connection_config,\n", + " write_config=write_config\n", + ")\n", + "\n", + "# Configure embedding model\n", + "model_name = 'sentence-transformers/all-MiniLM-L6-v2'\n", + "huggingface_embedder = HuggingfaceTextEmbeddings(model_name=model_name)" + ] + }, + { + "cell_type": "markdown", + "id": "run_ingestion_pipeline", + "metadata": {}, + "source": [ + "### Run Ingestion Pipeline\n", + "\n", + "This pipeline will:\n", + "1. Create chunks from our documents\n", + "2. Generate embeddings for each chunk\n", + "3. Write everything to Milvus" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "run_ingestion", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Starting Milvus ingestion pipeline...\n" + ] + }, + { + "data": { + "application/javascript": [ + "\n", + " if (typeof window.interactive_beam_jquery == 'undefined') {\n", + " var jqueryScript = document.createElement('script');\n", + " jqueryScript.src = 'https://code.jquery.com/jquery-3.4.1.slim.min.js';\n", + " jqueryScript.type = 'text/javascript';\n", + " jqueryScript.onload = function() {\n", + " var datatableScript = document.createElement('script');\n", + " datatableScript.src = 'https://cdn.datatables.net/1.10.20/js/jquery.dataTables.min.js';\n", + " datatableScript.type = 'text/javascript';\n", + " datatableScript.onload = function() {\n", + " window.interactive_beam_jquery = jQuery.noConflict(true);\n", + " window.interactive_beam_jquery(document).ready(function($){\n", + " \n", + " });\n", + " }\n", + " document.head.appendChild(datatableScript);\n", + " };\n", + " document.head.appendChild(jqueryScript);\n", + " } else {\n", + " window.interactive_beam_jquery(document).ready(function($){\n", + " \n", + " });\n", + " }" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Union[Tuple[apache_beam.pvalue.PCollection[~MLTransformOutputT], apache_beam.pvalue.PCollection[apache_beam.pvalue.Row]], apache_beam.pvalue.PCollection[~MLTransformOutputT]] instead.\n", + "WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Union[Tuple[apache_beam.pvalue.PCollection[~MLTransformOutputT], apache_beam.pvalue.PCollection[apache_beam.pvalue.Row]], apache_beam.pvalue.PCollection[~MLTransformOutputT]] instead.\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Ingestion pipeline completed!\n" + ] + } + ], + "source": [ + "print(\"Starting Milvus ingestion pipeline...\")\n", + "\n", + "with beam.Pipeline() as p:\n", + " _ = (\n", + " p\n", + " | \"Create Chunks\" >> beam.Create(chunks)\n", + " | \"Generate Embeddings\" >> MLTransform(write_artifact_location=tempfile.mkdtemp())\n", + " .with_transform(huggingface_embedder)\n", + " | \"Write to Milvus\" >> milvus_config.create_write_transform()\n", + " )\n", + "\n", + "print(\"Ingestion pipeline completed!\")" + ] + }, + { + "cell_type": "markdown", + "id": "verify_ingestion", + "metadata": {}, + "source": [ + "### Verify Ingestion\n", + "\n", + "Let's verify that our data was successfully ingested into Milvus." + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "verify_data", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Collection 'beam_technical_docs' exists with 6 entities\n" + ] + } + ], + "source": [ + "# Create a Milvus client to verify data\n", + "client = MilvusClient(**milvus_connection_config.__dict__)\n", + "\n", + "# Check if collection exists and get entity count\n", + "if client.has_collection(collection_name):\n", + " stats = client.get_collection_stats(collection_name)\n", + " print(f\"Collection '{collection_name}' exists with {stats['row_count']} entities\")\n", + "else:\n", + " print(f\"Collection '{collection_name}' not found\")" + ] + }, + { + "cell_type": "markdown", + "id": "ea478136-2ca8-4fee-bb1e-6bfcc2e97c93", + "metadata": {}, + "source": [ + "# Part 2: Vector Search and Enrichment\n", + "\n", + "Now that we have ingested our documents into Milvus, we'll demonstrate different search capabilities using Apache Beam's enrichment transform." + ] + }, + { + "cell_type": "markdown", + "id": "e9ad2509-3e5d-42e8-b565-ecccde38b8f4", + "metadata": {}, + "source": [ + "## Prepare Search Infrastructure\n", + "\n", + "We'll create helper classes and configure the enrichment transform for different types of searches." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "4911e8cc-10f1-4d21-9251-1b756b61f2c1", + "metadata": {}, + "outputs": [], + "source": [ + "class FormatAndPrintResults(beam.PTransform):\n", + " \"\"\"Transform to format and display search results.\"\"\"\n", + " \n", + " def expand(self, pcoll):\n", + " return pcoll | beam.Map(self.format_and_print)\n", + " \n", + " @staticmethod\n", + " def format_and_print(chunk):\n", + " # Create a clean structure to display\n", + " formatted_result = {\n", + " \"query\": chunk.content.text,\n", + " \"query_embedding_dimensions\": FormatAndPrintResults.get_embedding_count(chunk),\n", + " \"results\": []\n", + " }\n", + " \n", + " # Extract the enrichment data\n", + " enrichment_data = chunk.metadata.get('enrichment_data', defaultdict(list))\n", + " \n", + " # Format each result with its distance score\n", + " for i in range(len(enrichment_data.get('id', []))):\n", + " result = {\n", + " \"id\": enrichment_data['id'][i],\n", + " \"distance\": round(enrichment_data['distance'][i], 4),\n", + " \"fields\": enrichment_data['fields'][i] if i < len(enrichment_data.get('fields', [])) else {}\n", + " }\n", + " formatted_result[\"results\"].append(result)\n", + " \n", + " # Sort by distance in descending order (highest/best first)\n", + " formatted_result[\"results\"] = sorted(formatted_result[\"results\"], key=lambda x: x[\"distance\"], reverse=True)\n", + "\n", + " # Print the formatted JSON\n", + " print_json(data=formatted_result)\n", + " \n", + " # Return the original chunk for further processing if needed\n", + " return chunk\n", + "\n", + " @staticmethod\n", + " def get_embedding_count(chunk):\n", + " if chunk.embedding and chunk.embedding.dense_embedding:\n", + " return len(chunk.embedding.dense_embedding)\n", + " return 0" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "setup_embedding_model", + "metadata": {}, + "outputs": [], + "source": [ + "# Set up embedding model for query processing\n", + "model = SentenceTransformer(model_name)\n", + "\n", + "def get_default_device():\n", + " return \"cuda:0\" if cuda.is_available() else \"cpu\"\n", + "\n", + "def encode_embedding(text, device=get_default_device()):\n", + " return list(map(float, model.encode(text, device=device)))" + ] + }, + { + "cell_type": "markdown", + "id": "656110c9-1360-49fd-ba17-f55f2257f127", + "metadata": {}, + "source": [ + "## Vector Similarity Search\n", + "\n", + "First, let's perform a basic vector similarity search to find documents most similar to our query." + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "vector_search_example", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Running vector similarity search...\n" + ] + }, + { + "data": { + "text/html": [ + "
{\n",
+       "  \"query\": \"What is a distributed data processing framework?\",\n",
+       "  \"query_embedding_dimensions\": 384,\n",
+       "  \"results\": [\n",
+       "    {\n",
+       "      \"id\": \"1_1\",\n",
+       "      \"distance\": 0.5024,\n",
+       "      \"fields\": {\n",
+       "        \"metadata\": {\n",
+       "          \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+       "          \"keywords\": [\n",
+       "            \"Apache Beam\",\n",
+       "            \"stream processing\",\n",
+       "            \"batch processing\",\n",
+       "            \"data pipelines\",\n",
+       "            \"SDK\"\n",
+       "          ],\n",
+       "          \"tags\": [\n",
+       "            \"Data Engineering\",\n",
+       "            \"Open Source\",\n",
+       "            \"Streaming\",\n",
+       "            \"Batch\",\n",
+       "            \"Big Data\"\n",
+       "          ],\n",
+       "          \"category\": \"framework\",\n",
+       "          \"doc_id\": \"1\",\n",
+       "          \"chunk_index\": 0\n",
+       "        }\n",
+       "      }\n",
+       "    },\n",
+       "    {\n",
+       "      \"id\": \"3_1\",\n",
+       "      \"distance\": 0.4727,\n",
+       "      \"fields\": {\n",
+       "        \"metadata\": {\n",
+       "          \"title\": \"Apache Kafka: Distributed Streaming Platform\",\n",
+       "          \"keywords\": [\n",
+       "            \"Apache Kafka\",\n",
+       "            \"distributed systems\",\n",
+       "            \"event streaming\",\n",
+       "            \"message queue\",\n",
+       "            \"real-time\"\n",
+       "          ],\n",
+       "          \"tags\": [\n",
+       "            \"Streaming\",\n",
+       "            \"Messaging\",\n",
+       "            \"Distributed Systems\",\n",
+       "            \"Open Source\",\n",
+       "            \"Real-time\"\n",
+       "          ],\n",
+       "          \"category\": \"streaming-platform\",\n",
+       "          \"doc_id\": \"3\",\n",
+       "          \"chunk_index\": 0\n",
+       "        }\n",
+       "      }\n",
+       "    },\n",
+       "    {\n",
+       "      \"id\": \"1_2\",\n",
+       "      \"distance\": 0.4306,\n",
+       "      \"fields\": {\n",
+       "        \"metadata\": {\n",
+       "          \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+       "          \"keywords\": [\n",
+       "            \"Apache Beam\",\n",
+       "            \"stream processing\",\n",
+       "            \"batch processing\",\n",
+       "            \"data pipelines\",\n",
+       "            \"SDK\"\n",
+       "          ],\n",
+       "          \"tags\": [\n",
+       "            \"Data Engineering\",\n",
+       "            \"Open Source\",\n",
+       "            \"Streaming\",\n",
+       "            \"Batch\",\n",
+       "            \"Big Data\"\n",
+       "          ],\n",
+       "          \"category\": \"framework\",\n",
+       "          \"doc_id\": \"1\",\n",
+       "          \"chunk_index\": 1\n",
+       "        }\n",
+       "      }\n",
+       "    }\n",
+       "  ]\n",
+       "}\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"What is a distributed data processing framework?\"\u001b[0m,\n", + " \u001b[1;34m\"query_embedding_dimensions\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n", + " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_1\"\u001b[0m,\n", + " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.5024\u001b[0m,\n", + " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n", + " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Apache Beam\"\u001b[0m,\n", + " \u001b[32m\"stream processing\"\u001b[0m,\n", + " \u001b[32m\"batch processing\"\u001b[0m,\n", + " \u001b[32m\"data pipelines\"\u001b[0m,\n", + " \u001b[32m\"SDK\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Data Engineering\"\u001b[0m,\n", + " \u001b[32m\"Open Source\"\u001b[0m,\n", + " \u001b[32m\"Streaming\"\u001b[0m,\n", + " \u001b[32m\"Batch\"\u001b[0m,\n", + " \u001b[32m\"Big Data\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n", + " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n", + " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m0\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m,\n", + " \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"3_1\"\u001b[0m,\n", + " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.4727\u001b[0m,\n", + " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Kafka: Distributed Streaming Platform\"\u001b[0m,\n", + " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Apache Kafka\"\u001b[0m,\n", + " \u001b[32m\"distributed systems\"\u001b[0m,\n", + " \u001b[32m\"event streaming\"\u001b[0m,\n", + " \u001b[32m\"message queue\"\u001b[0m,\n", + " \u001b[32m\"real-time\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Streaming\"\u001b[0m,\n", + " \u001b[32m\"Messaging\"\u001b[0m,\n", + " \u001b[32m\"Distributed Systems\"\u001b[0m,\n", + " \u001b[32m\"Open Source\"\u001b[0m,\n", + " \u001b[32m\"Real-time\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"streaming-platform\"\u001b[0m,\n", + " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"3\"\u001b[0m,\n", + " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m0\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m,\n", + " \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_2\"\u001b[0m,\n", + " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.4306\u001b[0m,\n", + " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n", + " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Apache Beam\"\u001b[0m,\n", + " \u001b[32m\"stream processing\"\u001b[0m,\n", + " \u001b[32m\"batch processing\"\u001b[0m,\n", + " \u001b[32m\"data pipelines\"\u001b[0m,\n", + " \u001b[32m\"SDK\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Data Engineering\"\u001b[0m,\n", + " \u001b[32m\"Open Source\"\u001b[0m,\n", + " \u001b[32m\"Streaming\"\u001b[0m,\n", + " \u001b[32m\"Batch\"\u001b[0m,\n", + " \u001b[32m\"Big Data\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n", + " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n", + " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m1\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m]\u001b[0m\n", + "\u001b[1m}\u001b[0m\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# Example query for vector search\n", + "query_text = \"What is a distributed data processing framework?\"\n", + "query_embedding = encode_embedding(query_text)\n", + "\n", + "# Configure vector search parameters\n", + "search_parameters = MilvusSearchParameters(\n", + " collection_name=collection_name,\n", + " search_strategy=VectorSearchParameters(limit=3, anns_field=\"embedding\"),\n", + " output_fields=[\"metadata\"]\n", + ")\n", + "\n", + "collection_load_parameters = MilvusCollectionLoadParameters()\n", + "\n", + "milvus_handler = MilvusSearchEnrichmentHandler(\n", + " connection_parameters=milvus_connection_config,\n", + " search_parameters=search_parameters,\n", + " collection_load_parameters=collection_load_parameters\n", + ")\n", + "\n", + "print(\"Running vector similarity search...\")\n", + "\n", + "with beam.Pipeline() as p:\n", + " _ = (\n", + " p\n", + " | \"Create Query\" >> beam.Create([\n", + " Chunk(\n", + " content=Content(text=query_text),\n", + " embedding=Embedding(dense_embedding=query_embedding)\n", + " )\n", + " ])\n", + " | \"Vector Search\" >> Enrichment(milvus_handler)\n", + " | \"Format Results\" >> FormatAndPrintResults()\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "metadata_filtering", + "metadata": {}, + "source": [ + "## Filtered Search (Metadata Filtering)\n", + "\n", + "Now let's perform a search with metadata filtering to find documents in a specific category." + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "id": "filtered_search_example", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Running filtered search (category='framework')...\n" + ] + }, + { + "data": { + "text/html": [ + "
{\n",
+       "  \"query\": \"How to build data pipelines?\",\n",
+       "  \"query_embedding_dimensions\": 384,\n",
+       "  \"results\": [\n",
+       "    {\n",
+       "      \"id\": \"1_2\",\n",
+       "      \"distance\": 0.555,\n",
+       "      \"fields\": {\n",
+       "        \"metadata\": {\n",
+       "          \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+       "          \"keywords\": [\n",
+       "            \"Apache Beam\",\n",
+       "            \"stream processing\",\n",
+       "            \"batch processing\",\n",
+       "            \"data pipelines\",\n",
+       "            \"SDK\"\n",
+       "          ],\n",
+       "          \"tags\": [\n",
+       "            \"Data Engineering\",\n",
+       "            \"Open Source\",\n",
+       "            \"Streaming\",\n",
+       "            \"Batch\",\n",
+       "            \"Big Data\"\n",
+       "          ],\n",
+       "          \"category\": \"framework\",\n",
+       "          \"doc_id\": \"1\",\n",
+       "          \"chunk_index\": 1\n",
+       "        }\n",
+       "      }\n",
+       "    },\n",
+       "    {\n",
+       "      \"id\": \"1_1\",\n",
+       "      \"distance\": 0.5223,\n",
+       "      \"fields\": {\n",
+       "        \"metadata\": {\n",
+       "          \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+       "          \"keywords\": [\n",
+       "            \"Apache Beam\",\n",
+       "            \"stream processing\",\n",
+       "            \"batch processing\",\n",
+       "            \"data pipelines\",\n",
+       "            \"SDK\"\n",
+       "          ],\n",
+       "          \"tags\": [\n",
+       "            \"Data Engineering\",\n",
+       "            \"Open Source\",\n",
+       "            \"Streaming\",\n",
+       "            \"Batch\",\n",
+       "            \"Big Data\"\n",
+       "          ],\n",
+       "          \"category\": \"framework\",\n",
+       "          \"doc_id\": \"1\",\n",
+       "          \"chunk_index\": 0\n",
+       "        }\n",
+       "      }\n",
+       "    }\n",
+       "  ]\n",
+       "}\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"How to build data pipelines?\"\u001b[0m,\n", + " \u001b[1;34m\"query_embedding_dimensions\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n", + " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_2\"\u001b[0m,\n", + " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.555\u001b[0m,\n", + " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n", + " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Apache Beam\"\u001b[0m,\n", + " \u001b[32m\"stream processing\"\u001b[0m,\n", + " \u001b[32m\"batch processing\"\u001b[0m,\n", + " \u001b[32m\"data pipelines\"\u001b[0m,\n", + " \u001b[32m\"SDK\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Data Engineering\"\u001b[0m,\n", + " \u001b[32m\"Open Source\"\u001b[0m,\n", + " \u001b[32m\"Streaming\"\u001b[0m,\n", + " \u001b[32m\"Batch\"\u001b[0m,\n", + " \u001b[32m\"Big Data\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n", + " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n", + " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m1\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m,\n", + " \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_1\"\u001b[0m,\n", + " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.5223\u001b[0m,\n", + " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n", + " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Apache Beam\"\u001b[0m,\n", + " \u001b[32m\"stream processing\"\u001b[0m,\n", + " \u001b[32m\"batch processing\"\u001b[0m,\n", + " \u001b[32m\"data pipelines\"\u001b[0m,\n", + " \u001b[32m\"SDK\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Data Engineering\"\u001b[0m,\n", + " \u001b[32m\"Open Source\"\u001b[0m,\n", + " \u001b[32m\"Streaming\"\u001b[0m,\n", + " \u001b[32m\"Batch\"\u001b[0m,\n", + " \u001b[32m\"Big Data\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n", + " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n", + " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m0\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m]\u001b[0m\n", + "\u001b[1m}\u001b[0m\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# Filtered search - only looking for framework-related documents\n", + "filtered_query = \"How to build data pipelines?\"\n", + "filtered_embedding = encode_embedding(filtered_query)\n", + "\n", + "# Configure search with category filter\n", + "filtered_search_parameters = MilvusSearchParameters(\n", + " collection_name=collection_name,\n", + " search_strategy=VectorSearchParameters(\n", + " filter=\"metadata['category'] == 'framework'\", # Filter for framework documents\n", + " limit=2,\n", + " anns_field=\"embedding\"\n", + " ),\n", + " output_fields=[\"metadata\"]\n", + ")\n", + "\n", + "filtered_handler = MilvusSearchEnrichmentHandler(\n", + " connection_parameters=milvus_connection_config,\n", + " search_parameters=filtered_search_parameters,\n", + " collection_load_parameters=collection_load_parameters\n", + ")\n", + "\n", + "print(\"Running filtered search (category='framework')...\")\n", + "\n", + "with beam.Pipeline() as p:\n", + " _ = (\n", + " p\n", + " | \"Create Filtered Query\" >> beam.Create([\n", + " Chunk(\n", + " content=Content(text=filtered_query),\n", + " embedding=Embedding(dense_embedding=filtered_embedding)\n", + " )\n", + " ])\n", + " | \"Filtered Search\" >> Enrichment(filtered_handler)\n", + " | \"Format Filtered Results\" >> FormatAndPrintResults()\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "batch_search_example", + "metadata": {}, + "source": [ + "## Batch Search Processing\n", + "\n", + "Finally, let's demonstrate processing multiple queries in a single pipeline, which is useful for batch enrichment scenarios." + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "id": "batch_processing", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Running batch search processing...\n" + ] + }, + { + "data": { + "text/html": [ + "
{\n",
+       "  \"query\": \"What is stream processing?\",\n",
+       "  \"query_embedding_dimensions\": 384,\n",
+       "  \"results\": [\n",
+       "    {\n",
+       "      \"id\": \"1_1\",\n",
+       "      \"distance\": 0.52,\n",
+       "      \"fields\": {\n",
+       "        \"metadata\": {\n",
+       "          \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+       "          \"keywords\": [\n",
+       "            \"Apache Beam\",\n",
+       "            \"stream processing\",\n",
+       "            \"batch processing\",\n",
+       "            \"data pipelines\",\n",
+       "            \"SDK\"\n",
+       "          ],\n",
+       "          \"tags\": [\n",
+       "            \"Data Engineering\",\n",
+       "            \"Open Source\",\n",
+       "            \"Streaming\",\n",
+       "            \"Batch\",\n",
+       "            \"Big Data\"\n",
+       "          ],\n",
+       "          \"category\": \"framework\",\n",
+       "          \"doc_id\": \"1\",\n",
+       "          \"chunk_index\": 0\n",
+       "        }\n",
+       "      }\n",
+       "    },\n",
+       "    {\n",
+       "      \"id\": \"1_2\",\n",
+       "      \"distance\": 0.5118,\n",
+       "      \"fields\": {\n",
+       "        \"metadata\": {\n",
+       "          \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+       "          \"keywords\": [\n",
+       "            \"Apache Beam\",\n",
+       "            \"stream processing\",\n",
+       "            \"batch processing\",\n",
+       "            \"data pipelines\",\n",
+       "            \"SDK\"\n",
+       "          ],\n",
+       "          \"tags\": [\n",
+       "            \"Data Engineering\",\n",
+       "            \"Open Source\",\n",
+       "            \"Streaming\",\n",
+       "            \"Batch\",\n",
+       "            \"Big Data\"\n",
+       "          ],\n",
+       "          \"category\": \"framework\",\n",
+       "          \"doc_id\": \"1\",\n",
+       "          \"chunk_index\": 1\n",
+       "        }\n",
+       "      }\n",
+       "    }\n",
+       "  ]\n",
+       "}\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"What is stream processing?\"\u001b[0m,\n", + " \u001b[1;34m\"query_embedding_dimensions\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n", + " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_1\"\u001b[0m,\n", + " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.52\u001b[0m,\n", + " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n", + " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Apache Beam\"\u001b[0m,\n", + " \u001b[32m\"stream processing\"\u001b[0m,\n", + " \u001b[32m\"batch processing\"\u001b[0m,\n", + " \u001b[32m\"data pipelines\"\u001b[0m,\n", + " \u001b[32m\"SDK\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Data Engineering\"\u001b[0m,\n", + " \u001b[32m\"Open Source\"\u001b[0m,\n", + " \u001b[32m\"Streaming\"\u001b[0m,\n", + " \u001b[32m\"Batch\"\u001b[0m,\n", + " \u001b[32m\"Big Data\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n", + " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n", + " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m0\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m,\n", + " \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_2\"\u001b[0m,\n", + " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.5118\u001b[0m,\n", + " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n", + " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Apache Beam\"\u001b[0m,\n", + " \u001b[32m\"stream processing\"\u001b[0m,\n", + " \u001b[32m\"batch processing\"\u001b[0m,\n", + " \u001b[32m\"data pipelines\"\u001b[0m,\n", + " \u001b[32m\"SDK\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Data Engineering\"\u001b[0m,\n", + " \u001b[32m\"Open Source\"\u001b[0m,\n", + " \u001b[32m\"Streaming\"\u001b[0m,\n", + " \u001b[32m\"Batch\"\u001b[0m,\n", + " \u001b[32m\"Big Data\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n", + " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n", + " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m1\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m]\u001b[0m\n", + "\u001b[1m}\u001b[0m\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
{\n",
+       "  \"query\": \"How to handle real-time data?\",\n",
+       "  \"query_embedding_dimensions\": 384,\n",
+       "  \"results\": [\n",
+       "    {\n",
+       "      \"id\": \"3_1\",\n",
+       "      \"distance\": 0.4392,\n",
+       "      \"fields\": {\n",
+       "        \"metadata\": {\n",
+       "          \"title\": \"Apache Kafka: Distributed Streaming Platform\",\n",
+       "          \"keywords\": [\n",
+       "            \"Apache Kafka\",\n",
+       "            \"distributed systems\",\n",
+       "            \"event streaming\",\n",
+       "            \"message queue\",\n",
+       "            \"real-time\"\n",
+       "          ],\n",
+       "          \"tags\": [\n",
+       "            \"Streaming\",\n",
+       "            \"Messaging\",\n",
+       "            \"Distributed Systems\",\n",
+       "            \"Open Source\",\n",
+       "            \"Real-time\"\n",
+       "          ],\n",
+       "          \"category\": \"streaming-platform\",\n",
+       "          \"doc_id\": \"3\",\n",
+       "          \"chunk_index\": 0\n",
+       "        }\n",
+       "      }\n",
+       "    },\n",
+       "    {\n",
+       "      \"id\": \"1_1\",\n",
+       "      \"distance\": 0.4366,\n",
+       "      \"fields\": {\n",
+       "        \"metadata\": {\n",
+       "          \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+       "          \"keywords\": [\n",
+       "            \"Apache Beam\",\n",
+       "            \"stream processing\",\n",
+       "            \"batch processing\",\n",
+       "            \"data pipelines\",\n",
+       "            \"SDK\"\n",
+       "          ],\n",
+       "          \"tags\": [\n",
+       "            \"Data Engineering\",\n",
+       "            \"Open Source\",\n",
+       "            \"Streaming\",\n",
+       "            \"Batch\",\n",
+       "            \"Big Data\"\n",
+       "          ],\n",
+       "          \"category\": \"framework\",\n",
+       "          \"doc_id\": \"1\",\n",
+       "          \"chunk_index\": 0\n",
+       "        }\n",
+       "      }\n",
+       "    }\n",
+       "  ]\n",
+       "}\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"How to handle real-time data?\"\u001b[0m,\n", + " \u001b[1;34m\"query_embedding_dimensions\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n", + " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"3_1\"\u001b[0m,\n", + " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.4392\u001b[0m,\n", + " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Kafka: Distributed Streaming Platform\"\u001b[0m,\n", + " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Apache Kafka\"\u001b[0m,\n", + " \u001b[32m\"distributed systems\"\u001b[0m,\n", + " \u001b[32m\"event streaming\"\u001b[0m,\n", + " \u001b[32m\"message queue\"\u001b[0m,\n", + " \u001b[32m\"real-time\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Streaming\"\u001b[0m,\n", + " \u001b[32m\"Messaging\"\u001b[0m,\n", + " \u001b[32m\"Distributed Systems\"\u001b[0m,\n", + " \u001b[32m\"Open Source\"\u001b[0m,\n", + " \u001b[32m\"Real-time\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"streaming-platform\"\u001b[0m,\n", + " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"3\"\u001b[0m,\n", + " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m0\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m,\n", + " \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_1\"\u001b[0m,\n", + " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.4366\u001b[0m,\n", + " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n", + " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Apache Beam\"\u001b[0m,\n", + " \u001b[32m\"stream processing\"\u001b[0m,\n", + " \u001b[32m\"batch processing\"\u001b[0m,\n", + " \u001b[32m\"data pipelines\"\u001b[0m,\n", + " \u001b[32m\"SDK\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Data Engineering\"\u001b[0m,\n", + " \u001b[32m\"Open Source\"\u001b[0m,\n", + " \u001b[32m\"Streaming\"\u001b[0m,\n", + " \u001b[32m\"Batch\"\u001b[0m,\n", + " \u001b[32m\"Big Data\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n", + " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n", + " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m0\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m]\u001b[0m\n", + "\u001b[1m}\u001b[0m\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
{\n",
+       "  \"query\": \"What are the benefits of cloud services?\",\n",
+       "  \"query_embedding_dimensions\": 384,\n",
+       "  \"results\": [\n",
+       "    {\n",
+       "      \"id\": \"2_1\",\n",
+       "      \"distance\": 0.4162,\n",
+       "      \"fields\": {\n",
+       "        \"metadata\": {\n",
+       "          \"title\": \"Google Cloud Dataflow: Run Apache Beam in the Cloud\",\n",
+       "          \"keywords\": [\n",
+       "            \"Google Cloud\",\n",
+       "            \"Dataflow\",\n",
+       "            \"Apache Beam\",\n",
+       "            \"serverless\",\n",
+       "            \"stream and batch\"\n",
+       "          ],\n",
+       "          \"tags\": [\n",
+       "            \"Cloud Computing\",\n",
+       "            \"Data Pipelines\",\n",
+       "            \"Google Cloud\",\n",
+       "            \"Serverless\",\n",
+       "            \"Enterprise\"\n",
+       "          ],\n",
+       "          \"category\": \"cloud-service\",\n",
+       "          \"doc_id\": \"2\",\n",
+       "          \"chunk_index\": 0\n",
+       "        }\n",
+       "      }\n",
+       "    },\n",
+       "    {\n",
+       "      \"id\": \"1_2\",\n",
+       "      \"distance\": 0.3556,\n",
+       "      \"fields\": {\n",
+       "        \"metadata\": {\n",
+       "          \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+       "          \"keywords\": [\n",
+       "            \"Apache Beam\",\n",
+       "            \"stream processing\",\n",
+       "            \"batch processing\",\n",
+       "            \"data pipelines\",\n",
+       "            \"SDK\"\n",
+       "          ],\n",
+       "          \"tags\": [\n",
+       "            \"Data Engineering\",\n",
+       "            \"Open Source\",\n",
+       "            \"Streaming\",\n",
+       "            \"Batch\",\n",
+       "            \"Big Data\"\n",
+       "          ],\n",
+       "          \"category\": \"framework\",\n",
+       "          \"doc_id\": \"1\",\n",
+       "          \"chunk_index\": 1\n",
+       "        }\n",
+       "      }\n",
+       "    }\n",
+       "  ]\n",
+       "}\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"What are the benefits of cloud services?\"\u001b[0m,\n", + " \u001b[1;34m\"query_embedding_dimensions\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n", + " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"2_1\"\u001b[0m,\n", + " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.4162\u001b[0m,\n", + " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Google Cloud Dataflow: Run Apache Beam in the Cloud\"\u001b[0m,\n", + " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Google Cloud\"\u001b[0m,\n", + " \u001b[32m\"Dataflow\"\u001b[0m,\n", + " \u001b[32m\"Apache Beam\"\u001b[0m,\n", + " \u001b[32m\"serverless\"\u001b[0m,\n", + " \u001b[32m\"stream and batch\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Cloud Computing\"\u001b[0m,\n", + " \u001b[32m\"Data Pipelines\"\u001b[0m,\n", + " \u001b[32m\"Google Cloud\"\u001b[0m,\n", + " \u001b[32m\"Serverless\"\u001b[0m,\n", + " \u001b[32m\"Enterprise\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"cloud-service\"\u001b[0m,\n", + " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"2\"\u001b[0m,\n", + " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m0\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m,\n", + " \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_2\"\u001b[0m,\n", + " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.3556\u001b[0m,\n", + " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n", + " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Apache Beam\"\u001b[0m,\n", + " \u001b[32m\"stream processing\"\u001b[0m,\n", + " \u001b[32m\"batch processing\"\u001b[0m,\n", + " \u001b[32m\"data pipelines\"\u001b[0m,\n", + " \u001b[32m\"SDK\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Data Engineering\"\u001b[0m,\n", + " \u001b[32m\"Open Source\"\u001b[0m,\n", + " \u001b[32m\"Streaming\"\u001b[0m,\n", + " \u001b[32m\"Batch\"\u001b[0m,\n", + " \u001b[32m\"Big Data\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n", + " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n", + " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m1\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m]\u001b[0m\n", + "\u001b[1m}\u001b[0m\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
{\n",
+       "  \"query\": \"How to build distributed systems?\",\n",
+       "  \"query_embedding_dimensions\": 384,\n",
+       "  \"results\": [\n",
+       "    {\n",
+       "      \"id\": \"3_1\",\n",
+       "      \"distance\": 0.4383,\n",
+       "      \"fields\": {\n",
+       "        \"metadata\": {\n",
+       "          \"title\": \"Apache Kafka: Distributed Streaming Platform\",\n",
+       "          \"keywords\": [\n",
+       "            \"Apache Kafka\",\n",
+       "            \"distributed systems\",\n",
+       "            \"event streaming\",\n",
+       "            \"message queue\",\n",
+       "            \"real-time\"\n",
+       "          ],\n",
+       "          \"tags\": [\n",
+       "            \"Streaming\",\n",
+       "            \"Messaging\",\n",
+       "            \"Distributed Systems\",\n",
+       "            \"Open Source\",\n",
+       "            \"Real-time\"\n",
+       "          ],\n",
+       "          \"category\": \"streaming-platform\",\n",
+       "          \"doc_id\": \"3\",\n",
+       "          \"chunk_index\": 0\n",
+       "        }\n",
+       "      }\n",
+       "    },\n",
+       "    {\n",
+       "      \"id\": \"1_2\",\n",
+       "      \"distance\": 0.3502,\n",
+       "      \"fields\": {\n",
+       "        \"metadata\": {\n",
+       "          \"title\": \"Apache Beam: Unified Model for Batch and Streaming Data\",\n",
+       "          \"keywords\": [\n",
+       "            \"Apache Beam\",\n",
+       "            \"stream processing\",\n",
+       "            \"batch processing\",\n",
+       "            \"data pipelines\",\n",
+       "            \"SDK\"\n",
+       "          ],\n",
+       "          \"tags\": [\n",
+       "            \"Data Engineering\",\n",
+       "            \"Open Source\",\n",
+       "            \"Streaming\",\n",
+       "            \"Batch\",\n",
+       "            \"Big Data\"\n",
+       "          ],\n",
+       "          \"category\": \"framework\",\n",
+       "          \"doc_id\": \"1\",\n",
+       "          \"chunk_index\": 1\n",
+       "        }\n",
+       "      }\n",
+       "    }\n",
+       "  ]\n",
+       "}\n",
+       "
\n" + ], + "text/plain": [ + "\u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"query\"\u001b[0m: \u001b[32m\"How to build distributed systems?\"\u001b[0m,\n", + " \u001b[1;34m\"query_embedding_dimensions\"\u001b[0m: \u001b[1;36m384\u001b[0m,\n", + " \u001b[1;34m\"results\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"3_1\"\u001b[0m,\n", + " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.4383\u001b[0m,\n", + " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Kafka: Distributed Streaming Platform\"\u001b[0m,\n", + " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Apache Kafka\"\u001b[0m,\n", + " \u001b[32m\"distributed systems\"\u001b[0m,\n", + " \u001b[32m\"event streaming\"\u001b[0m,\n", + " \u001b[32m\"message queue\"\u001b[0m,\n", + " \u001b[32m\"real-time\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Streaming\"\u001b[0m,\n", + " \u001b[32m\"Messaging\"\u001b[0m,\n", + " \u001b[32m\"Distributed Systems\"\u001b[0m,\n", + " \u001b[32m\"Open Source\"\u001b[0m,\n", + " \u001b[32m\"Real-time\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"streaming-platform\"\u001b[0m,\n", + " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"3\"\u001b[0m,\n", + " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m0\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m,\n", + " \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"id\"\u001b[0m: \u001b[32m\"1_2\"\u001b[0m,\n", + " \u001b[1;34m\"distance\"\u001b[0m: \u001b[1;36m0.3502\u001b[0m,\n", + " \u001b[1;34m\"fields\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"metadata\"\u001b[0m: \u001b[1m{\u001b[0m\n", + " \u001b[1;34m\"title\"\u001b[0m: \u001b[32m\"Apache Beam: Unified Model for Batch and Streaming Data\"\u001b[0m,\n", + " \u001b[1;34m\"keywords\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Apache Beam\"\u001b[0m,\n", + " \u001b[32m\"stream processing\"\u001b[0m,\n", + " \u001b[32m\"batch processing\"\u001b[0m,\n", + " \u001b[32m\"data pipelines\"\u001b[0m,\n", + " \u001b[32m\"SDK\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"tags\"\u001b[0m: \u001b[1m[\u001b[0m\n", + " \u001b[32m\"Data Engineering\"\u001b[0m,\n", + " \u001b[32m\"Open Source\"\u001b[0m,\n", + " \u001b[32m\"Streaming\"\u001b[0m,\n", + " \u001b[32m\"Batch\"\u001b[0m,\n", + " \u001b[32m\"Big Data\"\u001b[0m\n", + " \u001b[1m]\u001b[0m,\n", + " \u001b[1;34m\"category\"\u001b[0m: \u001b[32m\"framework\"\u001b[0m,\n", + " \u001b[1;34m\"doc_id\"\u001b[0m: \u001b[32m\"1\"\u001b[0m,\n", + " \u001b[1;34m\"chunk_index\"\u001b[0m: \u001b[1;36m1\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m}\u001b[0m\n", + " \u001b[1m]\u001b[0m\n", + "\u001b[1m}\u001b[0m\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# Multiple queries to process\n", + "batch_queries = [\n", + " \"What is stream processing?\",\n", + " \"How to handle real-time data?\",\n", + " \"What are the benefits of cloud services?\",\n", + " \"How to build distributed systems?\"\n", + "]\n", + "\n", + "def create_query_chunk(query_text: str) -> Chunk:\n", + " \"\"\"Create a chunk with embedded query for search.\"\"\"\n", + " embedding = encode_embedding(query_text)\n", + " return Chunk(\n", + " content=Content(text=query_text),\n", + " embedding=Embedding(dense_embedding=embedding),\n", + " metadata={\"query_type\": \"batch_search\"}\n", + " )\n", + "\n", + "# Configure search for batch processing\n", + "batch_search_parameters = MilvusSearchParameters(\n", + " collection_name=collection_name,\n", + " search_strategy=VectorSearchParameters(limit=2, anns_field=\"embedding\"),\n", + " output_fields=[\"metadata\"]\n", + ")\n", + "\n", + "batch_handler = MilvusSearchEnrichmentHandler(\n", + " connection_parameters=milvus_connection_config,\n", + " search_parameters=batch_search_parameters,\n", + " collection_load_parameters=collection_load_parameters\n", + ")\n", + "\n", + "print(\"Running batch search processing...\")\n", + "\n", + "with beam.Pipeline() as p:\n", + " _ = (\n", + " p\n", + " | \"Create Batch Queries\" >> beam.Create(batch_queries)\n", + " | \"Convert to Chunks\" >> beam.Map(create_query_chunk)\n", + " | \"Batch Search\" >> Enrichment(batch_handler)\n", + " | \"Format Batch Results\" >> FormatAndPrintResults()\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "performance_considerations", + "metadata": {}, + "source": [ + "## Performance Considerations\n", + "\n", + "When using Apache Beam with Milvus in production, consider:\n", + "\n", + "### Ingestion Performance\n", + "- **Batch Size**: Adjust `write_batch_size` in `MilvusWriteConfig` based on your data size and Milvus cluster capacity\n", + "- **Parallel Processing**: Use Apache Beam's natural parallelization for large document collections\n", + "- **Embedding Efficiency**: Consider using GPU-enabled workers for embedding generation\n", + "\n", + "### Search Performance\n", + "- **Collection Loading**: Ensure collections are loaded into memory for faster searches\n", + "- **Index Types**: Choose appropriate index types (IVF_FLAT, HNSW) based on your accuracy/speed requirements\n", + "- **Batch Searches**: Process multiple queries together to amortize connection overhead\n", + "\n", + "### Resource Management\n", + "- **Connection Pooling**: Configure connection parameters for production workloads\n", + "- **Memory Usage**: Monitor memory usage during embedding generation and vector storage\n", + "- **Network Optimization**: Place compute close to your Milvus cluster when possible" + ] + }, + { + "cell_type": "markdown", + "id": "cleanup_section", + "metadata": {}, + "source": [ + "## Cleanup\n", + "\n", + "Don't forget to clean up resources when you're done." + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "id": "cleanup_resources", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Milvus test container stopped\n" + ] + } + ], + "source": [ + "# Clean up the test database container\n", + "if db:\n", + " MilvusTestHelpers.stop_db_container(db)\n", + " db = None\n", + " print(\"Milvus test container stopped\")" + ] + }, + { + "cell_type": "markdown", + "id": "next_steps", + "metadata": {}, + "source": [ + "## Resources\n", + "- [Apache Beam Milvus I/O Documentation](https://beam.apache.org/documentation/io/built-in/milvus/)\n", + "- [Apache Beam ML Transforms](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.transforms.html)\n", + "- [Milvus Documentation](https://milvus.io/docs)\n", + "- [Apache Beam RAG Package](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.rag.html)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.22" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} From 837aa02f8eaa7734e1ad162c3b3992d51d4e1689 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Fri, 22 Aug 2025 20:42:40 +0000 Subject: [PATCH 2/4] website: update documentation --- .../en/documentation/io/built-in/milvus.md | 319 ++++++++++++++++++ .../content/en/documentation/io/connectors.md | 15 + .../section-menu/en/documentation.html | 1 + 3 files changed, 335 insertions(+) create mode 100644 website/www/site/content/en/documentation/io/built-in/milvus.md diff --git a/website/www/site/content/en/documentation/io/built-in/milvus.md b/website/www/site/content/en/documentation/io/built-in/milvus.md new file mode 100644 index 000000000000..f7e5c59a2896 --- /dev/null +++ b/website/www/site/content/en/documentation/io/built-in/milvus.md @@ -0,0 +1,319 @@ +--- +title: "Milvus I/O connector" +--- + + +[Built-in I/O Transforms](/documentation/io/built-in/) + +# Milvus I/O connector + +The Beam SDKs include built-in transforms that can write data to [Milvus](https://milvus.io/) vector databases. Milvus is a high-performance, cloud-native vector database designed for machine learning and AI applications. + +## Before you start + +To use MilvusIO, you need to install the required dependencies. The Milvus I/O connector is part of the ML/RAG functionality in Apache Beam. + +```python +pip install apache-beam[milvus,gcp] +``` + +**Additional resources:** + +* [MilvusIO source code](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/rag/ingestion/milvus_search.py) +* [MilvusIO Pydoc](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.rag.ingestion.milvus_search.html) +* [Milvus Documentation](https://milvus.io/docs) + +## Overview + +The Milvus I/O connector provides a sink for writing vector embeddings and +associated metadata to Milvus collections. This connector is specifically +designed for RAG (Retrieval-Augmented Generation) use cases where you need to +store document chunks with their vector embeddings for similarity search. + +### Key Features + +- **Vector Database Integration**: Write embeddings and metadata to Milvus collections +- **RAG-Optimized**: Built specifically for RAG workflows with document chunks +- **Batch Processing**: Efficient batched writes to optimize performance +- **Flexible Schema**: Configurable column mappings for different data schemas +- **Connection Management**: Proper connection lifecycle management with context managers + +## Writing to Milvus + +### Basic Usage + +```python +import apache_beam as beam +from apache_beam.ml.rag.ingestion.milvus_search import MilvusVectorWriterConfig +from apache_beam.ml.rag.utils import MilvusConnectionConfig + +# Configure connection to Milvus. +connection_config = MilvusConnectionConfig( + uri="http://localhost:19530", # Milvus server URI + db_name="default" # Database name +) + +# Configure write settings. +write_config = MilvusWriteConfig( + collection_name="document_embeddings", + write_batch_size=1000 +) + +# Create the writer configuration. +milvus_config = MilvusVectorWriterConfig( + connection_params=connection_config, + write_config=write_config +) + +# Use in a pipeline. +with beam.Pipeline() as pipeline: + chunks = ( + pipeline + | "Read Data" >> beam.io.ReadFromText("input.txt") + | "Process to Chunks" >> beam.Map(process_to_chunks) + ) + + # Write to Milvus. + chunks | "Write to Milvus" >> milvus_config.create_write_transform() +``` + +### Configuration Options + +#### Connection Configuration + +```python +from apache_beam.ml.rag.utils import MilvusConnectionConfig + +connection_config = MilvusConnectionConfig( + uri="http://localhost:19530", # Milvus server URI + token="your_token", # Authentication token (optional) + db_name="vector_db", # Database name + timeout=30.0 # Connection timeout in seconds +) +``` + +#### Write Configuration + +```python +from apache_beam.ml.rag.ingestion.milvus_search import MilvusWriteConfig + +write_config = MilvusWriteConfig( + collection_name="embeddings", # Target collection name + partition_name="", # Partition name (optional) + timeout=60.0, # Write operation timeout + write_batch_size=1000 # Number of records per batch +) +``` + +### Working with Chunks + +The Milvus I/O connector is designed to work with `Chunk` objects that contain +document content and embeddings: + +```python +from apache_beam.ml.rag.types import Chunk +import numpy as np + +def create_chunk_example(): + return Chunk( + id="doc_1_chunk_1", + content="This is the document content...", + embedding=[0.1, 0.2, 0.3, 0.4, 0.5], # Dense embedding vector + sparse_embedding={"token_1": 0.5, "token_2": 0.3}, # Sparse embedding (optional) + metadata={"source": "document.pdf", "page": 1} + ) +``` + +### Custom Column Specifications + +You can customize how chunk fields are mapped to Milvus collection fields: + +```python +from apache_beam.ml.rag.ingestion.postgres_common import ColumnSpec + +# Define custom column mappings. +custom_column_specs = [ + ColumnSpec( + column_name="doc_id", + value_fn=lambda chunk: chunk.id + ), + ColumnSpec( + column_name="vector", + value_fn=lambda chunk: list(chunk.embedding) + ), + ColumnSpec( + column_name="text_content", + value_fn=lambda chunk: chunk.content + ), + ColumnSpec( + column_name="document_metadata", + value_fn=lambda chunk: dict(chunk.metadata) + ) +] + +# Use custom column specs. +milvus_config = MilvusVectorWriterConfig( + connection_params=connection_config, + write_config=write_config, + column_specs=custom_column_specs +) +``` + +## Complete Example + +Here's a complete example that processes documents and writes them to Milvus: + +```python +import apache_beam as beam +from apache_beam.ml.rag.ingestion.milvus_search import ( + MilvusVectorWriterConfig, + MilvusWriteConfig +) +from apache_beam.ml.rag.utils import MilvusConnectionConfig +from apache_beam.ml.rag.types import Chunk +import numpy as np + +def process_document(document_text): + """Process a document into chunks with embeddings.""" + # This is a simplified example - in practice you would: + # 1. Split document into chunks + # 2. Generate embeddings using a model + # 3. Extract metadata + + chunks = [] + sentences = document_text.split('.') + + for i, sentence in enumerate(sentences): + if sentence.strip(): + # Generate mock embedding (replace with real embedding model). + embedding = np.random.rand(384).tolist() # 384-dimensional vector + + chunk = Chunk( + id=f"doc_chunk_{i}", + content=sentence.strip(), + embedding=embedding, + metadata={"chunk_index": i, "length": len(sentence)} + ) + chunks.append(chunk) + + return chunks + +def run_pipeline(): + # Configure Milvus connection. + connection_config = MilvusConnectionConfig( + uri="http://localhost:19530", + db_name="rag_database" + ) + + # Configure write settings. + write_config = MilvusWriteConfig( + collection_name="document_chunks", + write_batch_size=500 + ) + + # Create writer configuration. + milvus_config = MilvusVectorWriterConfig( + connection_params=connection_config, + write_config=write_config + ) + + # Define pipeline. + with beam.Pipeline() as pipeline: + documents = ( + pipeline + | "Create Sample Documents" >> beam.Create([ + "First document content. It has multiple sentences.", + "Second document with different content. More sentences here." + ]) + ) + + chunks = ( + documents + | "Process Documents" >> beam.FlatMap(process_document) + ) + + # Write to Milvus. + chunks | "Write to Milvus" >> milvus_config.create_write_transform() + +if __name__ == "__main__": + run_pipeline() +``` + +## Performance Considerations + +### Batch Size Optimization + +The write batch size significantly affects performance. Larger batches reduce +the number of network round-trips but consume more memory: + +```python +# For high-throughput scenarios. +write_config = MilvusWriteConfig( + collection_name="large_collection", + write_batch_size=2000 # Larger batches for better throughput +) + +# For memory-constrained environments. +write_config = MilvusWriteConfig( + collection_name="small_collection", + write_batch_size=100 # Smaller batches to reduce memory usage +) +``` + +### Production Configuration + +For production deployments, consider using appropriate timeout settings and +connection parameters: + +```python +connection_config = MilvusConnectionConfig( + uri="http://milvus-cluster:19530", + timeout=120.0, # Longer timeout for production workloads + db_name="production_db", + token="your_production_token" # Using authentication in production +) +``` + +## Error Handling + +The connector includes built-in error handling and logging. Monitor your +pipeline logs for any connection or write failures: + +```python +import logging + +# Enable debug logging to see detailed operation information. +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +# In your processing function. +def safe_process_document(document): + try: + return process_document(document) + except Exception as e: + logger.error(f"Failed to process document: {e}") + return [] # Return empty list on failure +``` + +## Notebook exmaple + + + Open In Colab + + + +## Related transforms + +- [Milvus Enrichment Handler in Apache Beam](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/rag/enrichment/milvus_search.py) diff --git a/website/www/site/content/en/documentation/io/connectors.md b/website/www/site/content/en/documentation/io/connectors.md index 9797195518e9..078f83e09739 100644 --- a/website/www/site/content/en/documentation/io/connectors.md +++ b/website/www/site/content/en/documentation/io/connectors.md @@ -1049,6 +1049,21 @@ This table provides a consolidated, at-a-glance overview of the available built- ✔ ✘ + + MilvusIO (guide) + ✘ + ✔ + Not available + + ✔ + native + + Not available + Not available + ✔ + ✘ + ✘ + Iceberg (Managed I/O) diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html index 2386ecb39d9d..8ae9afd97775 100755 --- a/website/www/site/layouts/partials/section-menu/en/documentation.html +++ b/website/www/site/layouts/partials/section-menu/en/documentation.html @@ -73,6 +73,7 @@
  • Hadoop Input/Output Format IO
  • HCatalog IO
  • Google BigQuery I/O connector
  • +
  • Milvus I/O connector
  • Snowflake I/O connector
  • CDAP I/O connector
  • Spark Receiver I/O connector
  • From 711e2785bd8e74bf9c129542bc6194ffef9a511b Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Fri, 22 Aug 2025 21:10:57 +0000 Subject: [PATCH 3/4] CHANGES.md: update release notes --- CHANGES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index dd7fb7b734a3..aa7b2589225c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -80,6 +80,8 @@ Beam now supports data enrichment capabilities using SQL databases, with built-in support for: - Managed PostgreSQL, MySQL, and Microsoft SQL Server instances on CloudSQL - Unmanaged SQL database instances not hosted on CloudSQL (e.g., self-hosted or on-premises databases) +* Added documentation and example notebook for Milvus sink I/O connector + (Python) ([#35944](https://github.com/apache/beam/pull/35944)). ## Breaking Changes From 23a9d0ea06aacab99717299f00166a25118f0848 Mon Sep 17 00:00:00 2001 From: Mohamed Awnallah Date: Fri, 22 Aug 2025 22:18:48 +0000 Subject: [PATCH 4/4] website: fix linting issues --- .../en/documentation/io/built-in/milvus.md | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/website/www/site/content/en/documentation/io/built-in/milvus.md b/website/www/site/content/en/documentation/io/built-in/milvus.md index f7e5c59a2896..bb95cda1a6d8 100644 --- a/website/www/site/content/en/documentation/io/built-in/milvus.md +++ b/website/www/site/content/en/documentation/io/built-in/milvus.md @@ -84,7 +84,7 @@ with beam.Pipeline() as pipeline: | "Read Data" >> beam.io.ReadFromText("input.txt") | "Process to Chunks" >> beam.Map(process_to_chunks) ) - + # Write to Milvus. chunks | "Write to Milvus" >> milvus_config.create_write_transform() ``` @@ -178,7 +178,7 @@ Here's a complete example that processes documents and writes them to Milvus: ```python import apache_beam as beam from apache_beam.ml.rag.ingestion.milvus_search import ( - MilvusVectorWriterConfig, + MilvusVectorWriterConfig, MilvusWriteConfig ) from apache_beam.ml.rag.utils import MilvusConnectionConfig @@ -191,15 +191,15 @@ def process_document(document_text): # 1. Split document into chunks # 2. Generate embeddings using a model # 3. Extract metadata - + chunks = [] sentences = document_text.split('.') - + for i, sentence in enumerate(sentences): if sentence.strip(): # Generate mock embedding (replace with real embedding model). embedding = np.random.rand(384).tolist() # 384-dimensional vector - + chunk = Chunk( id=f"doc_chunk_{i}", content=sentence.strip(), @@ -207,7 +207,7 @@ def process_document(document_text): metadata={"chunk_index": i, "length": len(sentence)} ) chunks.append(chunk) - + return chunks def run_pipeline(): @@ -216,19 +216,19 @@ def run_pipeline(): uri="http://localhost:19530", db_name="rag_database" ) - + # Configure write settings. write_config = MilvusWriteConfig( collection_name="document_chunks", write_batch_size=500 ) - + # Create writer configuration. milvus_config = MilvusVectorWriterConfig( connection_params=connection_config, write_config=write_config ) - + # Define pipeline. with beam.Pipeline() as pipeline: documents = ( @@ -238,12 +238,12 @@ def run_pipeline(): "Second document with different content. More sentences here." ]) ) - + chunks = ( documents | "Process Documents" >> beam.FlatMap(process_document) ) - + # Write to Milvus. chunks | "Write to Milvus" >> milvus_config.create_write_transform() @@ -267,7 +267,7 @@ write_config = MilvusWriteConfig( # For memory-constrained environments. write_config = MilvusWriteConfig( - collection_name="small_collection", + collection_name="small_collection", write_batch_size=100 # Smaller batches to reduce memory usage ) ```