Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added =3.1.0
Empty file.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
boto3>=1.28.0 # AWS SDK for S3 support
click>=8.0.0
fastapi[standard]>=0.109.1 # Vulnerable to https://osv.dev/vulnerability/PYSEC-2024-38
gitpython>=3.1.0 # Git operations via Python instead of system calls
httpx
loguru>=0.7.0
pathspec>=0.12.1
Expand Down
96 changes: 65 additions & 31 deletions src/gitingest/clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@

from __future__ import annotations

import asyncio
from pathlib import Path
from typing import TYPE_CHECKING

from git import Repo
from gitingest.config import DEFAULT_TIMEOUT
from gitingest.utils.git_utils import (
check_repo_exists,
checkout_partial_clone,
create_git_auth_header,
create_git_command,
create_git_command_with_auth,
ensure_git_installed,
is_github_host,
resolve_commit,
run_command,
)
from gitingest.utils.logging_config import get_logger
from gitingest.utils.os_utils import ensure_directory_exists_or_create
Expand Down Expand Up @@ -83,41 +84,74 @@ async def clone_repo(config: CloneConfig, *, token: str | None = None) -> None:
commit = await resolve_commit(config, token=token)
logger.debug("Resolved commit", extra={"commit": commit})

clone_cmd = ["git"]
if token and is_github_host(url):
clone_cmd += ["-c", create_git_auth_header(token, url=url)]

clone_cmd += ["clone", "--single-branch", "--no-checkout", "--depth=1"]
if partial_clone:
clone_cmd += ["--filter=blob:none", "--sparse"]

clone_cmd += [url, local_path]

# Clone the repository
logger.info("Executing git clone command", extra={"command": " ".join([*clone_cmd[:-1], "<url>", local_path])})
await run_command(*clone_cmd)
logger.info("Git clone completed successfully")
def perform_clone():
"""Perform the git clone operation using GitPython."""
try:
# Set up clone options
clone_kwargs = {
"single_branch": True,
"depth": 1,
"no_checkout": True,
}

# Add authentication for GitHub repositories
env = None
if token and is_github_host(url):
import os
env = os.environ.copy()
env["GIT_CONFIG_PARAMETERS"] = create_git_auth_header(token, url=url)

# Add filter and sparse options for partial clones
if partial_clone:
clone_kwargs["multi_options"] = ["--filter=blob:none", "--sparse"]

# Clone the repository
logger.info("Executing git clone command")
repo = Repo.clone_from(url, local_path, env=env, **clone_kwargs)
logger.info("Git clone completed successfully")
return repo

except Exception as e:
raise RuntimeError(f"Failed to clone repository: {str(e)}") from e

# Perform the clone operation
repo = await asyncio.get_event_loop().run_in_executor(None, perform_clone)

# Checkout the subpath if it is a partial clone
if partial_clone:
logger.info("Setting up partial clone for subpath", extra={"subpath": config.subpath})
await checkout_partial_clone(config, token=token)
logger.debug("Partial clone setup completed")

git = create_git_command(["git"], local_path, url, token)

# Ensure the commit is locally available
logger.debug("Fetching specific commit", extra={"commit": commit})
await run_command(*git, "fetch", "--depth=1", "origin", commit)

# Write the work-tree at that commit
logger.info("Checking out commit", extra={"commit": commit})
await run_command(*git, "checkout", commit)

# Update submodules
if config.include_submodules:
logger.info("Updating submodules")
await run_command(*git, "submodule", "update", "--init", "--recursive", "--depth=1")
logger.debug("Submodules updated successfully")
def perform_checkout():
"""Perform the checkout operations using GitPython."""
try:
# Fetch the specific commit
logger.debug("Fetching specific commit", extra={"commit": commit})

# Set up authentication environment for fetch operations
env = None
if token and is_github_host(url):
import os
env = os.environ.copy()
env["GIT_CONFIG_PARAMETERS"] = create_git_auth_header(token, url=url)

repo.git.fetch("--depth=1", "origin", commit, env=env)

# Checkout the specific commit
logger.info("Checking out commit", extra={"commit": commit})
repo.git.checkout(commit)

# Update submodules if requested
if config.include_submodules:
logger.info("Updating submodules")
repo.git.submodule("update", "--init", "--recursive", "--depth=1")
logger.debug("Submodules updated successfully")

except Exception as e:
raise RuntimeError(f"Failed during checkout operations: {str(e)}") from e

# Perform checkout operations
await asyncio.get_event_loop().run_in_executor(None, perform_checkout)

logger.info("Git clone operation completed successfully", extra={"local_path": local_path})
171 changes: 112 additions & 59 deletions src/gitingest/utils/git_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from urllib.parse import urlparse

import httpx
from git import Repo, Remote, GitCommandError, InvalidGitRepositoryError
from git.cmd import Git
from starlette.status import HTTP_200_OK, HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND

from gitingest.utils.compat_func import removesuffix
Expand Down Expand Up @@ -47,17 +49,19 @@ def is_github_host(url: str) -> bool:
return hostname.startswith("github.")


async def run_command(*args: str) -> tuple[bytes, bytes]:
"""Execute a shell command asynchronously and return (stdout, stderr) bytes.
async def run_git_command(*args: str, cwd: str | None = None) -> tuple[str, str]:
"""Execute a git command using GitPython and return (stdout, stderr) strings.

Parameters
----------
*args : str
The command and its arguments to execute.
The git command arguments to execute (without the 'git' prefix).
cwd : str | None
The working directory to execute the command in.

Returns
-------
tuple[bytes, bytes]
tuple[str, str]
A tuple containing the stdout and stderr of the command.

Raises
Expand All @@ -66,18 +70,32 @@ async def run_command(*args: str) -> tuple[bytes, bytes]:
If command exits with a non-zero status.

"""
# Execute the requested command
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
msg = f"Command failed: {' '.join(args)}\nError: {stderr.decode().strip()}"
raise RuntimeError(msg)

return stdout, stderr
try:
def run_sync():
git_cmd = Git(cwd or ".")
# Handle different git operations
if args[0] == "--version":
return git_cmd.version(), ""
elif args[0] == "config" and len(args) >= 2:
try:
result = git_cmd.config(args[1])
return result, ""
except GitCommandError as e:
return "", str(e)
else:
# For other commands, use the raw execute method
result = git_cmd.execute(list(args))
return result, ""

# Run the synchronous git operation in a thread pool
stdout, stderr = await asyncio.get_event_loop().run_in_executor(None, run_sync)
return stdout, stderr
except GitCommandError as exc:
msg = f"Git command failed: git {' '.join(args)}\nError: {exc.stderr or str(exc)}"
raise RuntimeError(msg) from exc
except Exception as exc:
msg = f"Git command failed: git {' '.join(args)}\nError: {str(exc)}"
raise RuntimeError(msg) from exc


async def ensure_git_installed() -> None:
Expand All @@ -92,14 +110,14 @@ async def ensure_git_installed() -> None:

"""
try:
await run_command("git", "--version")
await run_git_command("--version")
except RuntimeError as exc:
msg = "Git is not installed or not accessible. Please install Git first."
raise RuntimeError(msg) from exc
if sys.platform == "win32":
try:
stdout, _ = await run_command("git", "config", "core.longpaths")
if stdout.decode().strip().lower() != "true":
stdout, _ = await run_git_command("config", "core.longpaths")
if stdout.strip().lower() != "true":
logger.warning(
"Git clone may fail on Windows due to long file paths. "
"Consider enabling long path support with: 'git config --global core.longpaths true'. "
Expand Down Expand Up @@ -222,61 +240,79 @@ async def fetch_remote_branches_or_tags(url: str, *, ref_type: str, token: str |
msg = f"Invalid fetch type: {ref_type}"
raise ValueError(msg)

cmd = ["git"]

# Add authentication if needed
if token and is_github_host(url):
cmd += ["-c", create_git_auth_header(token, url=url)]

cmd += ["ls-remote"]
await ensure_git_installed()

fetch_tags = ref_type == "tags"
to_fetch = "tags" if fetch_tags else "heads"
def fetch_refs():
git_cmd = create_git_command_with_auth(token, url)

cmd += [f"--{to_fetch}"]
fetch_tags = ref_type == "tags"
to_fetch = "tags" if fetch_tags else "heads"

# `--refs` filters out the peeled tag objects (those ending with "^{}") (for tags)
if fetch_tags:
cmd += ["--refs"]
cmd = ["ls-remote", f"--{to_fetch}"]

# `--refs` filters out the peeled tag objects (those ending with "^{}") (for tags)
if fetch_tags:
cmd.append("--refs")

cmd += [url]
cmd.append(url)

try:
result = git_cmd.execute(cmd)
return result
except GitCommandError as e:
raise RuntimeError(f"Failed to fetch {ref_type}: {e.stderr or str(e)}") from e

await ensure_git_installed()
stdout, _ = await run_command(*cmd)
stdout = await asyncio.get_event_loop().run_in_executor(None, fetch_refs)

# For each line in the output:
# - Skip empty lines and lines that don't contain "refs/{to_fetch}/"
# - Extract the branch or tag name after "refs/{to_fetch}/"
return [
line.split(f"refs/{to_fetch}/", 1)[1]
for line in stdout.decode().splitlines()
for line in stdout.splitlines()
if line.strip() and f"refs/{to_fetch}/" in line
]


def create_git_command(base_cmd: list[str], local_path: str, url: str, token: str | None = None) -> list[str]:
"""Create a git command with authentication if needed.
class GitCommandWithAuth:
"""A wrapper around Git command that stores authentication environment."""

def __init__(self, token: str | None, url: str):
self.git = Git()
self.env = None

if token and is_github_host(url):
import os
self.env = os.environ.copy()
self.env["GIT_CONFIG_PARAMETERS"] = create_git_auth_header(token, url=url)

def execute(self, args: list[str]) -> str:
"""Execute a git command with authentication if needed."""
return self.git.execute(args, env=self.env)

@property
def custom_environment(self) -> dict[str, str] | None:
"""Get the custom environment for testing."""
return self.env


def create_git_command_with_auth(token: str | None, url: str) -> GitCommandWithAuth:
"""Create a Git command object with authentication if needed.

Parameters
----------
base_cmd : list[str]
The base git command to start with.
local_path : str
The local path where the git command should be executed.
url : str
The repository URL to check if it's a GitHub repository.
token : str | None
GitHub personal access token (PAT) for accessing private repositories.
url : str
The repository URL to check if it's a GitHub repository.

Returns
-------
list[str]
The git command with authentication if needed.
GitCommandWithAuth
A Git command wrapper with authentication configured if needed.

"""
cmd = [*base_cmd, "-C", local_path]
if token and is_github_host(url):
cmd += ["-c", create_git_auth_header(token, url=url)]
return cmd
return GitCommandWithAuth(token, url)


def create_git_auth_header(token: str, url: str = "https://github.com") -> str:
Expand Down Expand Up @@ -343,8 +379,23 @@ async def checkout_partial_clone(config: CloneConfig, token: str | None) -> None
if config.blob:
# Remove the file name from the subpath when ingesting from a file url (e.g. blob/branch/path/file.txt)
subpath = str(Path(subpath).parent.as_posix())
checkout_cmd = create_git_command(["git"], config.local_path, config.url, token)
await run_command(*checkout_cmd, "sparse-checkout", "set", subpath)

def setup_sparse_checkout():
try:
repo = Repo(config.local_path)

# Set up authentication environment if needed
env = None
if token and is_github_host(config.url):
import os
env = os.environ.copy()
env["GIT_CONFIG_PARAMETERS"] = create_git_auth_header(token, url=config.url)

repo.git.execute(["sparse-checkout", "set", subpath], env=env)
except Exception as e:
raise RuntimeError(f"Failed to setup sparse checkout: {str(e)}") from e

await asyncio.get_event_loop().run_in_executor(None, setup_sparse_checkout)


async def resolve_commit(config: CloneConfig, token: str | None) -> str:
Expand Down Expand Up @@ -400,14 +451,16 @@ async def _resolve_ref_to_sha(url: str, pattern: str, token: str | None = None)
If the ref does not exist in the remote repository.

"""
# Build: git [-c http.<host>/.extraheader=Auth...] ls-remote <url> <pattern>
cmd: list[str] = ["git"]
if token and is_github_host(url):
cmd += ["-c", create_git_auth_header(token, url=url)]
def resolve_ref():
git_cmd = create_git_command_with_auth(token, url)
try:
result = git_cmd.execute(["ls-remote", url, pattern])
return result
except GitCommandError as e:
raise RuntimeError(f"Failed to resolve ref {pattern}: {e.stderr or str(e)}") from e

cmd += ["ls-remote", url, pattern]
stdout, _ = await run_command(*cmd)
lines = stdout.decode().splitlines()
stdout = await asyncio.get_event_loop().run_in_executor(None, resolve_ref)
lines = stdout.splitlines()
sha = _pick_commit_sha(lines)
if not sha:
msg = f"{pattern!r} not found in {url}"
Expand Down
Loading
Loading