diff --git a/.gitignore b/.gitignore index 0a19790..9ac7e6f 100644 --- a/.gitignore +++ b/.gitignore @@ -172,3 +172,6 @@ cython_debug/ # PyPI configuration file .pypirc + +storage/ + diff --git a/codetide/__init__.py b/codetide/__init__.py index dbb0342..8348455 100644 --- a/codetide/__init__.py +++ b/codetide/__init__.py @@ -10,11 +10,12 @@ from codetide import parsers from pydantic import BaseModel, Field, field_validator -from typing import Optional, List, Union, Dict -from pathspec import GitIgnoreSpec +from typing import Optional, List, Tuple, Union, Dict +from datetime import datetime, timezone from pathlib import Path import logging import asyncio +import pygit2 import time import json import os @@ -29,9 +30,8 @@ class CodeTide(BaseModel): """Root model representing a complete codebase""" rootpath : Union[str, Path] codebase :CodeBase = Field(default_factory=CodeBase) - file_list :List[Path] = Field(default_factory=list) + files :Dict[Path, datetime]= Field(default_factory=dict) _instantiated_parsers :Dict[str, BaseParser] = {} - _gitignore_cache :Dict[str, GitIgnoreSpec] = {} @field_validator("rootpath", mode="after") @classmethod @@ -65,46 +65,76 @@ async def from_path( Initialized CodeTide instance """ rootpath = Path(rootpath) - codebase = cls(rootpath=rootpath) - logger.info(f"Initializing CodeBase from path: {str(rootpath)}") + codeTide = cls(rootpath=rootpath) + logger.info(f"Initializing CodeTide from path: {str(rootpath)}") st = time.time() - codebase._find_code_files(rootpath, languages=languages) - if not codebase.file_list: + codeTide.files = codeTide._find_code_files(rootpath, languages=languages) + if not codeTide.files: logger.warning("No code files found matching the criteria") - return codebase + return codeTide - language_files = codebase._organize_files_by_language() - await codebase._initialize_parsers(language_files.keys()) + language_files = codeTide._organize_files_by_language(codeTide.files) + codeTide._initialize_parsers(language_files.keys()) - results = await codebase._process_files_concurrently( + results = await codeTide._process_files_concurrently( language_files, max_concurrent_tasks, batch_size ) - codebase._add_results_to_codebase(results) - codebase._resolve_files_dependencies() - logger.info(f"CodeBase initialized with {len(results)} files processed in {time.time() - st:.2f}s") + codeTide._add_results_to_codebase(results) + codeTide._resolve_files_dependencies() + logger.info(f"CodeTide initialized with {len(results)} files processed in {time.time() - st:.2f}s") - return codebase + return codeTide - def serialize(self, filepath :Optional[Union[str, Path]]=DEFAULT_SERIALIZATION_PATH, include_codebase_cached_elements :bool=False, include_cached_ids :bool=False): + async def _reset(self): + self = await self.from_path(self.rootpath) + + def serialize(self, + filepath: Optional[Union[str, Path]] = DEFAULT_SERIALIZATION_PATH, + include_codebase_cached_elements: bool = False, + include_cached_ids: bool = False, + store_in_project_root: bool=True): + + if store_in_project_root: + filepath = Path(self.rootpath) / filepath + if not os.path.exists(filepath): os.makedirs(os.path.split(filepath)[0], exist_ok=True) + writeFile(self.model_dump_json(indent=4), filepath) - if include_codebase_cached_elements or include_cached_ids: - dir_path = Path(os.path.split(filepath)[0]) - if include_codebase_cached_elements: - cached_elements_path = dir_path / DEFAULT_CACHED_ELEMENTS_FILE - writeFile(self.codebase.serialize_cache_elements(), cached_elements_path) - - if include_cached_ids: - cached_ids_path = dir_path / DEFAULT_CACHED_IDS_FILE - writeFile(json.dumps(self.codebase.unique_ids, indent=4), cached_ids_path) + + dir_path = Path(os.path.split(filepath)[0]) + + current_path = dir_path + gitignore_path = None + for parent in current_path.parents: + potential_gitignore = parent / ".gitignore" + if potential_gitignore.exists(): + gitignore_path = potential_gitignore + break + + if gitignore_path: + with open(gitignore_path, 'r+') as f: + lines = f.read().splitlines() + if f"{dir_path.name}/" not in lines: + f.write(f"\n{dir_path.name}/\n") + + if include_codebase_cached_elements: + cached_elements_path = dir_path / DEFAULT_CACHED_ELEMENTS_FILE + writeFile(self.codebase.serialize_cache_elements(), cached_elements_path) + + if include_cached_ids: + cached_ids_path = dir_path / DEFAULT_CACHED_IDS_FILE + writeFile(json.dumps(self.codebase.unique_ids, indent=4), cached_ids_path) @classmethod - def deserialize(cls, filepath :Optional[Union[str, Path]]=DEFAULT_SERIALIZATION_PATH)->"CodeTide": + def deserialize(cls, filepath :Optional[Union[str, Path]]=DEFAULT_SERIALIZATION_PATH, rootpath :Optional[Union[str, Path]] = None)->"CodeTide": + if rootpath is not None: + filepath = Path(rootpath) / filepath + if not os.path.exists(filepath): raise FileNotFoundError(f"{filepath} is not a valid path") @@ -119,19 +149,18 @@ def deserialize(cls, filepath :Optional[Union[str, Path]]=DEFAULT_SERIALIZATION_ return tideInstance - def _organize_files_by_language( - self, - ) -> Dict[str, List[Path]]: + @classmethod + def _organize_files_by_language(cls, files :Union[List, Dict[str, str]]) -> Dict[str, List[Path]]: """Organize files by their programming language.""" language_files = {} - for filepath in self.file_list: - language = self._get_language_from_extension(filepath) + for filepath in files: + language = cls._get_language_from_extension(filepath) if language not in language_files: language_files[language] = [] language_files[language].append(filepath) return language_files - async def _initialize_parsers( + def _initialize_parsers( self, languages: List[str] ) -> None: @@ -208,68 +237,6 @@ def _add_results_to_codebase( self.codebase.root.append(code_file) logger.debug(f"Added {len(results)} files to codebase") - @staticmethod - def _load_gitignore_spec(directory: Path) -> GitIgnoreSpec: - """ - Load and parse .gitignore file from a directory into a GitIgnoreSpec object. - - Args: - directory: Directory containing the .gitignore file - - Returns: - GitIgnoreSpec object with the patterns from the .gitignore file - """ - gitignore_path = directory / ".gitignore" - patterns = [".git/"] - - if gitignore_path.exists() and gitignore_path.is_file(): - try: - _gitignore = readFile(gitignore_path) - for line in _gitignore.splitlines(): - line = line.strip() - # Skip empty lines and comments - if line and not line.startswith('#'): - patterns.append(line) - except Exception as e: - logger.warning(f"Error reading .gitignore file {gitignore_path}: {e}") - - return GitIgnoreSpec.from_lines(patterns) - - def _get_gitignore_for_path(self, path: Path) -> GitIgnoreSpec: - """ - Get the combined GitIgnoreSpec for a path by checking all parent directories. - - Args: - path: The file path to check - - Returns: - Combined GitIgnoreSpec for all relevant .gitignore files - """ - # Check cache first - if path in self._gitignore_cache: - return self._gitignore_cache[path] - - # Collect all .gitignore specs from parent directories - specs = [] - - # Check the directory containing the file - parent_dir = path.parent if path.is_file() else path - - # Walk up the directory tree - for directory in [parent_dir, *parent_dir.parents]: - if directory not in self._gitignore_cache: - # Load and cache the spec for this directory - self._gitignore_cache[directory] = self._load_gitignore_spec(directory) - - specs.append(self._gitignore_cache[directory]) - - # Combine all specs into one - combined_spec = GitIgnoreSpec([]) - for spec in reversed(specs): # Apply from root to leaf - combined_spec += spec - - return combined_spec - def _find_code_files(self, rootpath: Path, languages: Optional[List[str]] = None) -> List[Path]: """ Find all code files in a directory tree, respecting .gitignore rules in each directory. @@ -279,11 +246,11 @@ def _find_code_files(self, rootpath: Path, languages: Optional[List[str]] = None languages: List of languages to include (None for all supported) Returns: - List of paths to code files + List of paths to code files with their last modified timestamps """ if not rootpath.exists() or not rootpath.is_dir(): logger.error(f"Root path does not exist or is not a directory: {rootpath}") - return [] + return {} # Determine valid extensions extensions = [] @@ -292,29 +259,48 @@ def _find_code_files(self, rootpath: Path, languages: Optional[List[str]] = None if lang in LANGUAGE_EXTENSIONS: extensions.extend(LANGUAGE_EXTENSIONS[lang]) - code_files = [] - - for file_path in rootpath.rglob('*'): - if not file_path.is_file() or (extensions and file_path.suffix.lower() not in extensions): - continue - - # Get the combined gitignore spec for this path - gitignore_spec = self._get_gitignore_for_path(file_path) - - # Convert path to relative path for gitignore matching - try: - rel_path = file_path.relative_to(rootpath) - except ValueError: - # This shouldn't happen since we're scanning from rootpath + code_files = {} + + try: + # Try to open the repository + repo = pygit2.Repository(rootpath) + + # Get the repository's index (staging area) + index = repo.index + + # Convert all tracked files to Path objects + tracked_files = {Path(rootpath) / Path(entry.path) for entry in index} + + # Get status and filter files + status = repo.status() + + # Untracked files are those with status == pygit2.GIT_STATUS_WT_NEW + untracked_not_ignored = { + Path(rootpath) / Path(filepath) + for filepath, file_status in status.items() + if file_status == pygit2.GIT_STATUS_WT_NEW and not repo.path_is_ignored(filepath) + } + + all_files = tracked_files.union(untracked_not_ignored) + + except (pygit2.GitError, KeyError): + # Fallback to simple directory walk if not a git repo + all_files = set(rootpath.rglob('*')) + + for file_path in all_files: + if not file_path.is_file(): continue - - # Check if the file is ignored by any gitignore rules - if gitignore_spec.match_file(rel_path): + + # Check extension filter if languages were specified + if extensions and file_path.suffix.lower() not in extensions: continue - - code_files.append(file_path) - - self.file_list = code_files + + # Get the last modified time and convert to UTC datetime + modified_timestamp = file_path.stat().st_mtime + modified_datetime = datetime.fromtimestamp(modified_timestamp, timezone.utc) + + code_files[file_path] = modified_datetime + return code_files @staticmethod @@ -329,7 +315,7 @@ def _get_language_from_extension(filepath: Path) -> Optional[str]: Language name or None if not recognized """ - extension = filepath.suffix.lower() + extension = Path(filepath).suffix.lower() for language, extensions in LANGUAGE_EXTENSIONS.items(): if extension in extensions: @@ -340,5 +326,94 @@ def _get_language_from_extension(filepath: Path) -> Optional[str]: def _resolve_files_dependencies(self): for _, parser in self._instantiated_parsers.items(): parser.resolve_inter_files_dependencies(self.codebase) - parser.resolve_intra_file_dependencies(self.codebase) + parser.resolve_intra_file_dependencies(self.codebase.root) + def _get_changed_files(self) -> Tuple[List[Path], bool]: + """ + TODO consider if it is worth storing singular timestamp for latest fetch and then just use + pygit2 to changed files based on commit history + current repo status + """ + file_deletion_detected = False + files = self._find_code_files(self.rootpath) # Dict[Path, datetime] + + changed_files = [] + + # Check for new files and modified files + for file_path, current_modified_time in files.items(): + if file_path not in self.files: + # New file + changed_files.append(file_path) + elif current_modified_time > self.files[file_path]: + # File has been modified since last scan + changed_files.append(file_path) + + # Check for deleted files + for stored_file_path in self.files: + if stored_file_path not in files: + file_deletion_detected = True + break + + self.files = files + return changed_files, file_deletion_detected + + async def check_for_updates(self, + max_concurrent_tasks: int = DEFAULT_MAX_CONCURRENT_TASKS, + batch_size: int = DEFAULT_BATCH_SIZE): + + changed_files, deletion_detected = self._get_changed_files() + if deletion_detected: + logger.info("deletion operation detected reseting CodeTide [this is a temporary solution]") + await self._reset() + + changed_language_files = self._organize_files_by_language(changed_files) + self._initialize_parsers(changed_language_files.keys()) + + results :List[CodeFileModel] = await self._process_files_concurrently( + changed_language_files, + max_concurrent_tasks=max_concurrent_tasks, + batch_size=batch_size + ) + changedPaths = { + codeFile.file_path: None for codeFile in results + } + + for i, codeFile in enumerate(self.codebase.root): + if codeFile.file_path in changedPaths: + changedPaths[codeFile.file_path] = i + + newFiles :List[CodeFileModel] = [] + for codeFile in results: + i = changedPaths.get(codeFile.file_path) + if i is not None: ### is file update + ### TODO if new imports are found need to build inter and then intra + ### otherwise can just build intra and add directly + if codeFile.all_imports() == self.codebase.root[i].all_imports(): + language = self._get_language_from_extension(codeFile.file_path) + parser = self._instantiated_parsers.get(language) + self.codebase.root[i] = codeFile + logger.info(f"updating {codeFile.file_path} no new dependencies detected") + continue + + self.codebase.root[i] = codeFile + logger.info(f"updating {codeFile.file_path} with new dependencies") + + else: + self.codebase.root.append(codeFile) + changedPaths[codeFile.file_path] = len(self.codebase.root) - 1 + logger.info(f"adding new file {codeFile.file_path}") + + newFiles.append(codeFile) + + + for language, filepaths in changed_language_files.items(): + parser = self._instantiated_parsers.get(language) + filteredNewFiles = [ + newFile for newFile in newFiles + if self.rootpath / newFile.file_path in filepaths + ] + parser.resolve_inter_files_dependencies(self.codebase, filteredNewFiles) + parser.resolve_intra_file_dependencies(filteredNewFiles) + + for codeFile in filteredNewFiles: + i = changedPaths.get(codeFile.file_path) + self.codebase.root[i] = codeFile diff --git a/codetide/parsers/base_parser.py b/codetide/parsers/base_parser.py index 7c2a9c6..ecd013f 100644 --- a/codetide/parsers/base_parser.py +++ b/codetide/parsers/base_parser.py @@ -1,7 +1,8 @@ from codetide.core.models import CodeBase, CodeFileModel, ImportStatement + +from typing import List, Optional, Union from abc import ABC, abstractmethod -from typing import Optional, Union from tree_sitter import Parser from pydantic import BaseModel from pathlib import Path @@ -47,11 +48,11 @@ async def parse_file(self, file_path: Union[str, Path], root_path: Optional[Unio pass @abstractmethod - def resolve_inter_files_dependencies(self, codeBase: CodeBase) -> None: + def resolve_inter_files_dependencies(self, codeBase: CodeBase, codeFiles :Optional[List[CodeFileModel]]=None) -> None: pass @abstractmethod - def resolve_intra_file_dependencies(self, codeBase: CodeBase) -> None: + def resolve_intra_file_dependencies(self, codeFiles: List[CodeFileModel]) -> None: pass # @abstractmethod diff --git a/codetide/parsers/generic_parser.py b/codetide/parsers/generic_parser.py index d06e058..3652c5c 100644 --- a/codetide/parsers/generic_parser.py +++ b/codetide/parsers/generic_parser.py @@ -2,7 +2,7 @@ from codetide.parsers.base_parser import BaseParser from concurrent.futures import ThreadPoolExecutor -from typing import Optional, Union +from typing import List, Optional, Union from pathlib import Path import asyncio @@ -59,8 +59,8 @@ def parse_code(self, file_path :Path): ) return codeFile - def resolve_inter_files_dependencies(self, codeBase: CodeBase) -> None: + def resolve_inter_files_dependencies(self, codeBase: CodeBase, codeFiles :Optional[List[CodeFileModel]]=None) -> None: pass - def resolve_intra_file_dependencies(self, codeBase: CodeBase) -> None: + def resolve_intra_file_dependencies(self, codeFiles: List[CodeFileModel]) -> None: pass \ No newline at end of file diff --git a/codetide/parsers/python_parser.py b/codetide/parsers/python_parser.py index 39e73a8..cf11f42 100644 --- a/codetide/parsers/python_parser.py +++ b/codetide/parsers/python_parser.py @@ -396,14 +396,18 @@ def _generate_unique_import_id(cls, importModel :ImportStatement): importModel.raw = cls.import_statement_template(importModel) - def resolve_inter_files_dependencies(self, codeBase: CodeBase) -> None: + @classmethod + def resolve_inter_files_dependencies(cls, codeBase: CodeBase, codeFiles :Optional[List[CodeFileModel]]=None) -> None: ### for codeFile in codeBase search through imports and if defition_id matches an id from a class, a function or a variable let it be ### otherwise check if it matches a unique_id from imports, if so map dfeiniton_id to import unique id ### othewise map to None and is a package ### this should handle all imports across file + if codeFiles is None: + codeFiles = codeBase.root + all_imports = codeBase.all_imports() all_elements = codeBase.all_classes() + codeBase.all_functions() + codeBase.all_variables() - for codeFile in codeBase.root: + for codeFile in codeFiles: global_imports_minus_current = [ importId for importId in all_imports if importId not in codeFile.all_imports() @@ -417,7 +421,7 @@ def resolve_inter_files_dependencies(self, codeBase: CodeBase) -> None: continue importStatement.definition_id = None - importStatement.unique_id = self._default_unique_import_id(importStatement) + importStatement.unique_id = cls._default_unique_import_id(importStatement) @staticmethod def count_occurences_in_code(code: str, substring: str) -> int: @@ -431,8 +435,8 @@ def count_occurences_in_code(code: str, substring: str) -> int: matches = re.findall(pattern, code) return len(matches) - def resolve_intra_file_dependencies(self, codeBase: CodeBase) -> None: - for codeFile in codeBase.root: + def resolve_intra_file_dependencies(self, codeFiles: List[CodeFileModel]) -> None: + for codeFile in codeFiles: if not codeFile.file_path.endswith(self.extension): continue @@ -464,7 +468,8 @@ def resolve_intra_file_dependencies(self, codeBase: CodeBase) -> None: codeFile=codeFile ) - def _find_elements_references(self, + @classmethod + def _find_elements_references(cls, element_type :Literal["variables", "functions", "classes"], non_import_ids :List[str], raw_contents :List[str], @@ -473,12 +478,12 @@ def _find_elements_references(self, ### broken for class defintion as we need to search through methods and attributes if element_type == "classes": for classAttribute in element.attributes: - elementCounts = self._get_element_count(raw_contents, classAttribute) + elementCounts = cls._get_element_count(raw_contents, classAttribute) if elementCounts <= 0: continue - self._find_references( + cls._find_references( non_import_ids=non_import_ids, raw_contents=raw_contents, matches_count=elementCounts, @@ -489,12 +494,12 @@ def _find_elements_references(self, for classMethod in element.methods: # print(f"{classMethod.name=}") - elementCounts = self._get_element_count(raw_contents, classMethod) + elementCounts = cls._get_element_count(raw_contents, classMethod) if elementCounts <= 0: continue - self._find_references( + cls._find_references( non_import_ids=non_import_ids, raw_contents=raw_contents, matches_count=elementCounts, @@ -504,12 +509,12 @@ def _find_elements_references(self, ) else: - elementCounts = self._get_element_count(raw_contents, element) + elementCounts = cls._get_element_count(raw_contents, element) if elementCounts <= 0: continue - self._find_references( + cls._find_references( non_import_ids=non_import_ids, raw_contents=raw_contents, matches_count=elementCounts, @@ -524,7 +529,8 @@ def _get_element_count(cls, raw_contents :List[str], element): elementCounts -= 1 return elementCounts - def _find_references(self, + @staticmethod + def _find_references( non_import_ids :List[str], raw_contents :List[str], matches_count :int, diff --git a/requirements.txt b/requirements.txt index bca6fa6..17c960b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ pathspec==0.12.1 pydantic==2.10.3 +pygit2==1.18.0 pyyaml==6.0.2 tree-sitter==0.24.0 tree-sitter-python==0.23.6 \ No newline at end of file