Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions api/data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import base64
import glob
import fnmatch
from adalflow.utils import get_adalflow_default_root_path
from adalflow.core.db import LocalDB
from api.config import configs, DEFAULT_EXCLUDED_DIRS, DEFAULT_EXCLUDED_FILES
Expand Down Expand Up @@ -248,8 +249,24 @@ def should_process_file(file_path: str, use_inclusion: bool, included_dirs: List
Returns:
bool: True if the file should be processed, False otherwise
"""
file_path_parts = os.path.normpath(file_path).split(os.sep)
file_name = os.path.basename(file_path)
norm_file_path = os.path.normpath(file_path)
file_path_parts = norm_file_path.split(os.sep)
file_name = os.path.basename(norm_file_path)
rel_path = os.path.relpath(norm_file_path, path)
rel_path_norm = os.path.normpath(rel_path)

def matches_any_glob(patterns: List[str]) -> bool:
for pattern in patterns:
p = pattern.strip() if pattern else ""
if not p:
continue

p_norm = os.path.normpath(p)

if (fnmatch.fnmatchcase(file_name, p_norm) or
fnmatch.fnmatchcase(rel_path_norm, p_norm)):
return True
return False

if use_inclusion:
# Inclusion mode: file must be in included directories or match included files
Expand All @@ -265,10 +282,7 @@ def should_process_file(file_path: str, use_inclusion: bool, included_dirs: List

# Check if file matches included file patterns
if not is_included and included_files:
for included_file in included_files:
if file_name == included_file or file_name.endswith(included_file):
is_included = True
break
is_included = matches_any_glob(included_files)

# If no inclusion rules are specified for a category, allow all files from that category
if not included_dirs and not included_files:
Expand All @@ -294,10 +308,7 @@ def should_process_file(file_path: str, use_inclusion: bool, included_dirs: List

# Check if file matches excluded file patterns
if not is_excluded:
for excluded_file in excluded_files:
if file_name == excluded_file:
is_excluded = True
break
is_excluded = matches_any_glob(excluded_files)

return not is_excluded

Expand Down Expand Up @@ -402,7 +413,6 @@ def prepare_data_pipeline(embedder_type: str = None, is_ollama_embedder: bool =
if embedder_type is None:
embedder_type = get_embedder_type()

splitter = TextSplitter(**configs["text_splitter"])
embedder_config = get_embedder_config()

embedder = get_embedder(embedder_type=embedder_type)
Expand Down
254 changes: 254 additions & 0 deletions tests/unit/test_file_filters_glob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
import sys
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory


def _write_text(path: Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")


class TestFileFiltersGlob(unittest.TestCase):
def setUp(self) -> None:
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))

def test_excluded_files_glob_md_excludes_markdown(self):
from api.data_pipeline import read_all_documents

with TemporaryDirectory() as tmp_dir:
tmp_path = Path(tmp_dir)
_write_text(tmp_path / "README.md", "# hello\n")
_write_text(tmp_path / "notes.txt", "hello\n")
_write_text(tmp_path / "main.py", "print('hi')\n")

docs = read_all_documents(
str(tmp_path),
embedder_type="openai",
excluded_files=["*.md"],
)

paths = {d.meta_data.get("file_path") for d in docs}
self.assertNotIn("README.md", paths)
self.assertIn("notes.txt", paths)
self.assertIn("main.py", paths)

def test_included_files_glob_md_includes_only_markdown(self):
from api.data_pipeline import read_all_documents

with TemporaryDirectory() as tmp_dir:
tmp_path = Path(tmp_dir)
_write_text(tmp_path / "README.md", "# hello\n")
_write_text(tmp_path / "notes.txt", "hello\n")
_write_text(tmp_path / "main.py", "print('hi')\n")

docs = read_all_documents(
str(tmp_path),
embedder_type="openai",
included_files=["*.md"],
)

paths = {d.meta_data.get("file_path") for d in docs}
self.assertEqual(paths, {"README.md"})

def test_excluded_files_glob_path(self):
from api.data_pipeline import read_all_documents

with TemporaryDirectory() as tmp_dir:
tmp_path = Path(tmp_dir)
_write_text(tmp_path / "src/main.py", "print('main')\n")
_write_text(tmp_path / "pkg/dist/bundle.js", "// bundle\n")
_write_text(tmp_path / "pkg/src/index.js", "// index\n")

docs = read_all_documents(
str(tmp_path),
embedder_type="openai",
excluded_files=["pkg/dist/*"],
)

paths = {d.meta_data.get("file_path") for d in docs}
self.assertNotIn(str(Path("pkg/dist/bundle.js")), paths)
self.assertIn(str(Path("src/main.py")), paths)
self.assertIn(str(Path("pkg/src/index.js")), paths)

def test_excluded_files_multiple_patterns(self):
from api.data_pipeline import read_all_documents

with TemporaryDirectory() as tmp_dir:
tmp_path = Path(tmp_dir)
_write_text(tmp_path / "src/main.py", "print('main')\n")
_write_text(tmp_path / "test/test_main.py", "# test\n")
_write_text(tmp_path / "build/output.js", "// build\n")
_write_text(tmp_path / "lib/helper.py", "# helper\n")

docs = read_all_documents(
str(tmp_path),
embedder_type="openai",
excluded_files=["test/*", "build/*"],
)

paths = {d.meta_data.get("file_path") for d in docs}
self.assertIn(str(Path("src/main.py")), paths)
self.assertIn(str(Path("lib/helper.py")), paths)
self.assertNotIn(str(Path("test/test_main.py")), paths)
self.assertNotIn(str(Path("build/output.js")), paths)

def test_excluded_files_nested_directories(self):
from api.data_pipeline import read_all_documents

with TemporaryDirectory() as tmp_dir:
tmp_path = Path(tmp_dir)
_write_text(tmp_path / "src/utils/helper.py", "# helper\n")
_write_text(tmp_path / "src/core/main.py", "# main\n")
_write_text(tmp_path / "node_modules/pkg/index.js", "// pkg\n")
_write_text(tmp_path / "vendor/lib/code.py", "# vendor\n")

docs = read_all_documents(
str(tmp_path),
embedder_type="openai",
excluded_files=["node_modules/*", "vendor/*"],
)

paths = {d.meta_data.get("file_path") for d in docs}
self.assertIn(str(Path("src/utils/helper.py")), paths)
self.assertIn(str(Path("src/core/main.py")), paths)
self.assertNotIn(str(Path("node_modules/pkg/index.js")), paths)
self.assertNotIn(str(Path("vendor/lib/code.py")), paths)

def test_excluded_files_wildcard_extension(self):
from api.data_pipeline import read_all_documents

with TemporaryDirectory() as tmp_dir:
tmp_path = Path(tmp_dir)
_write_text(tmp_path / "src/main.py", "# python\n")
_write_text(tmp_path / "src/app.js", "// js\n")
_write_text(tmp_path / "config.txt", "key=value\n")
_write_text(tmp_path / "data.rst", "Documentation\n")

docs = read_all_documents(
str(tmp_path),
embedder_type="openai",
excluded_files=["*.txt", "*.rst"],
)

paths = {d.meta_data.get("file_path") for d in docs}
self.assertIn(str(Path("src/main.py")), paths)
self.assertIn(str(Path("src/app.js")), paths)
self.assertNotIn("config.txt", paths)
self.assertNotIn("data.rst", paths)

def test_excluded_dirs_simple(self):
from api.data_pipeline import read_all_documents

with TemporaryDirectory() as tmp_dir:
tmp_path = Path(tmp_dir)
_write_text(tmp_path / "src/main.py", "# main\n")
_write_text(tmp_path / "tests/test_main.py", "# test\n")
_write_text(tmp_path / "lib/helper.py", "# helper\n")

docs = read_all_documents(
str(tmp_path),
embedder_type="openai",
excluded_dirs=["tests"],
)

paths = {d.meta_data.get("file_path") for d in docs}
self.assertIn(str(Path("src/main.py")), paths)
self.assertIn(str(Path("lib/helper.py")), paths)
self.assertNotIn(str(Path("tests/test_main.py")), paths)

def test_excluded_dirs_multiple(self):
from api.data_pipeline import read_all_documents

with TemporaryDirectory() as tmp_dir:
tmp_path = Path(tmp_dir)
_write_text(tmp_path / "src/main.py", "# main\n")
_write_text(tmp_path / "mybuild/output.js", "// build\n")
_write_text(tmp_path / "mydist/bundle.js", "// dist\n")
_write_text(tmp_path / "lib/helper.py", "# helper\n")

docs = read_all_documents(
str(tmp_path),
embedder_type="openai",
excluded_dirs=["mybuild", "mydist"],
)

paths = {d.meta_data.get("file_path") for d in docs}
self.assertIn(str(Path("src/main.py")), paths)
self.assertIn(str(Path("lib/helper.py")), paths)
self.assertNotIn(str(Path("mybuild/output.js")), paths)
self.assertNotIn(str(Path("mydist/bundle.js")), paths)

def test_combined_excluded_dirs_and_files(self):
from api.data_pipeline import read_all_documents

with TemporaryDirectory() as tmp_dir:
tmp_path = Path(tmp_dir)
_write_text(tmp_path / "src/main.py", "# main\n")
_write_text(tmp_path / "src/config.txt", "key=value\n")
_write_text(tmp_path / "tests/test_main.py", "# test\n")
_write_text(tmp_path / "lib/helper.py", "# helper\n")
_write_text(tmp_path / "lib/data.txt", "data\n")

docs = read_all_documents(
str(tmp_path),
embedder_type="openai",
excluded_dirs=["tests"],
excluded_files=["*.txt"],
)

paths = {d.meta_data.get("file_path") for d in docs}
self.assertIn(str(Path("src/main.py")), paths)
self.assertIn(str(Path("lib/helper.py")), paths)
self.assertNotIn(str(Path("tests/test_main.py")), paths)
self.assertNotIn(str(Path("src/config.txt")), paths)
self.assertNotIn(str(Path("lib/data.txt")), paths)

def test_included_files_with_path_pattern(self):
from api.data_pipeline import read_all_documents

with TemporaryDirectory() as tmp_dir:
tmp_path = Path(tmp_dir)
_write_text(tmp_path / "src/main.py", "# main\n")
_write_text(tmp_path / "src/utils/helper.py", "# helper\n")
_write_text(tmp_path / "tests/test_main.py", "# test\n")
_write_text(tmp_path / "lib/util.py", "# util\n")

docs = read_all_documents(
str(tmp_path),
embedder_type="openai",
included_files=["*.py"],
)

paths = {d.meta_data.get("file_path") for d in docs}
self.assertIn(str(Path("src/main.py")), paths)
self.assertIn(str(Path("src/utils/helper.py")), paths)
self.assertIn(str(Path("tests/test_main.py")), paths)
self.assertIn(str(Path("lib/util.py")), paths)

def test_deep_nested_exclusion(self):
from api.data_pipeline import read_all_documents

with TemporaryDirectory() as tmp_dir:
tmp_path = Path(tmp_dir)
_write_text(tmp_path / "src/app/core/main.py", "# main\n")
_write_text(tmp_path / "src/app/tests/test.py", "# test\n")
_write_text(tmp_path / "lib/vendor/pkg/code.py", "# vendor\n")
_write_text(tmp_path / "lib/internal/util.py", "# internal\n")

docs = read_all_documents(
str(tmp_path),
embedder_type="openai",
excluded_files=["*/tests/*", "*/vendor/*"],
)

paths = {d.meta_data.get("file_path") for d in docs}
self.assertIn(str(Path("src/app/core/main.py")), paths)
self.assertIn(str(Path("lib/internal/util.py")), paths)
self.assertNotIn(str(Path("src/app/tests/test.py")), paths)
self.assertNotIn(str(Path("lib/vendor/pkg/code.py")), paths)


if __name__ == "__main__":
unittest.main()