Source code for unidep._dependencies_parsing

"""unidep - Unified Conda and Pip requirements management.

This module provides parsing of `requirements.yaml` and `pyproject.toml` files.
"""

from __future__ import annotations

import functools
import hashlib
import os
import sys
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, Any, NamedTuple

from ruamel.yaml import YAML
from ruamel.yaml.comments import CommentedMap, CommentedSeq

from unidep.platform_definitions import Platform, Spec, platforms_from_selector
from unidep.utils import (
    LocalDependency,
    PathWithExtras,
    defaultdict_to_dict,
    is_pip_installable,
    parse_folder_or_filename,
    parse_package_str,
    selector_from_comment,
    split_path_and_extras,
    unidep_configured_in_toml,
    warn,
)

if TYPE_CHECKING:
    if sys.version_info >= (3, 8):
        from typing import Literal
    else:  # pragma: no cover
        from typing_extensions import Literal


if sys.version_info >= (3, 11):
    import tomllib
else:  # pragma: no cover
    import tomli as tomllib


[docs] def find_requirements_files( base_dir: str | Path = ".", depth: int = 1, *, verbose: bool = False, ) -> list[Path]: """Scan a directory for `requirements.yaml` and `pyproject.toml` files.""" base_path = Path(base_dir) found_files = [] # Define a helper function to recursively scan directories def _scan_dir(path: Path, current_depth: int) -> None: if verbose: print(f"🔍 Scanning in `{path}` at depth {current_depth}") if current_depth > depth: return for child in sorted(path.iterdir()): if child.is_dir(): _scan_dir(child, current_depth + 1) elif child.name == "requirements.yaml": found_files.append(child) if verbose: print(f'🔍 Found `"requirements.yaml"` at `{child}`') elif child.name == "pyproject.toml" and unidep_configured_in_toml(child): if verbose: print(f'🔍 Found `"pyproject.toml"` with dependencies at `{child}`') found_files.append(child) _scan_dir(base_path, 0) return sorted(found_files)
def _extract_first_comment( commented_map: CommentedMap, index_or_key: int | str, ) -> str | None: """Extract the first comment from a CommentedMap.""" comments = commented_map.ca.items.get(index_or_key, None) if comments is None: return None comment_strings = next( c.value.split("\n")[0].rstrip().lstrip() for c in comments if c is not None ) if not comment_strings: # empty string return None return "".join(comment_strings) def _identifier(identifier: int, selector: str | None) -> str: """Return a unique identifier based on the comment.""" platforms = None if selector is None else tuple(platforms_from_selector(selector)) data_str = f"{identifier}-{platforms}" # Hash using SHA256 and take the first 8 characters for a shorter hash return hashlib.sha256(data_str.encode()).hexdigest()[:8] def _parse_dependency( dependency: str, dependencies: CommentedMap, index_or_key: int | str, which: Literal["conda", "pip", "both"], identifier: int, ignore_pins: list[str], overwrite_pins: dict[str, str | None], skip_dependencies: list[str], origin: Path, ) -> list[Spec]: name, pin, selector = parse_package_str(dependency) if name in ignore_pins: pin = None if name in skip_dependencies: return [] if name in overwrite_pins: pin = overwrite_pins[name] comment = ( _extract_first_comment(dependencies, index_or_key) if isinstance(dependencies, (CommentedMap, CommentedSeq)) else None ) if comment and selector is None: selector = selector_from_comment(comment) identifier_hash = _identifier(identifier, selector) if which == "both": return [ Spec(name, "conda", pin, identifier_hash, selector, origin=(origin,)), Spec(name, "pip", pin, identifier_hash, selector, origin=(origin,)), ] return [Spec(name, which, pin, identifier_hash, selector, origin=(origin,))] class ParsedRequirements(NamedTuple): """Requirements with comments.""" channels: list[str] platforms: list[Platform] requirements: dict[str, list[Spec]] optional_dependencies: dict[str, dict[str, list[Spec]]] class Requirements(NamedTuple): """Requirements as CommentedSeq.""" # mypy doesn't support CommentedSeq[str], so we use list[str] instead. channels: list[str] # actually a CommentedSeq[str] conda: list[str] # actually a CommentedSeq[str] pip: list[str] # actually a CommentedSeq[str] def _parse_overwrite_pins(overwrite_pins: list[str]) -> dict[str, str | None]: """Parse overwrite pins.""" result = {} for overwrite_pin in overwrite_pins: pkg = parse_package_str(overwrite_pin) result[pkg.name] = pkg.pin return result @functools.lru_cache def _load(p: Path, yaml: YAML) -> dict[str, Any]: if p.suffix == ".toml": with p.open("rb") as f: pyproject = tomllib.load(f) project_dependencies = pyproject.get("project", {}).get("dependencies", []) unidep_cfg = pyproject["tool"]["unidep"] if not project_dependencies: return unidep_cfg unidep_dependencies = unidep_cfg.setdefault("dependencies", []) project_dependency_handling = unidep_cfg.get( "project_dependency_handling", "ignore", ) _add_project_dependencies( project_dependencies, unidep_dependencies, project_dependency_handling, ) return unidep_cfg with p.open() as f: return yaml.load(f) def _add_project_dependencies( project_dependencies: list[str], unidep_dependencies: list[dict[str, str] | str], project_dependency_handling: Literal["same-name", "pip-only", "ignore"], ) -> None: """Add project dependencies to unidep dependencies based on the chosen handling.""" if project_dependency_handling == "same-name": unidep_dependencies.extend(project_dependencies) elif project_dependency_handling == "pip-only": unidep_dependencies.extend([{"pip": dep} for dep in project_dependencies]) elif project_dependency_handling != "ignore": msg = ( f"Invalid `project_dependency_handling` value: {project_dependency_handling}." # noqa: E501 " Must be one of 'same-name', 'pip-only', 'ignore'." ) raise ValueError(msg) def _parse_local_dependency_item(item: str | dict[str, str]) -> LocalDependency: """Parse a single local dependency item into a LocalDependency object.""" if isinstance(item, str): return LocalDependency(local=item, pypi=None) if isinstance(item, dict): if "local" not in item: msg = "Dictionary-style local dependency must have a 'local' key" raise ValueError(msg) return LocalDependency(local=item["local"], pypi=item.get("pypi")) msg = f"Invalid local dependency format: {item}" raise TypeError(msg) def get_local_dependencies(data: dict[str, Any]) -> list[LocalDependency]: """Get `local_dependencies` from a `requirements.yaml` or `pyproject.toml` file.""" raw_deps = [] if "local_dependencies" in data: raw_deps = data["local_dependencies"] elif "includes" in data: warn( "⚠️ You are using `includes` in `requirements.yaml` or `pyproject.toml`" " `[unidep.tool]` which is deprecated since 0.42.0 and has been renamed to" " `local_dependencies`.", category=DeprecationWarning, stacklevel=2, ) raw_deps = data["includes"] return [_parse_local_dependency_item(item) for item in raw_deps] def _to_path_with_extras( paths: list[Path], extras: list[list[str]] | Literal["*"] | None, ) -> list[PathWithExtras]: if isinstance(extras, (list, tuple)) and len(extras) != len(paths): msg = ( f"Length of `extras` ({len(extras)}) does not match length" f" of `paths` ({len(paths)})." ) raise ValueError(msg) paths_with_extras = [parse_folder_or_filename(p) for p in paths] if extras is None: return paths_with_extras assert extras is not None if any(p.extras for p in paths_with_extras): msg = ( "Cannot specify `extras` list when paths are" " specified like `path/to/project[extra1,extra2]`, `extras` must be `None`" " or specify pure paths without extras like `path/to/project` and specify" " extras in `extras`." ) raise ValueError(msg) if extras == "*": extras = [["*"]] * len(paths) # type: ignore[list-item] return [PathWithExtras(p.path, e) for p, e in zip(paths_with_extras, extras)] def _update_data_structures( *, path_with_extras: PathWithExtras, datas: list[dict[str, Any]], # modified in place all_extras: list[list[str]], # modified in place seen: set[PathWithExtras], # modified in place yaml: YAML, is_nested: bool, origin: Path, verbose: bool = False, ) -> None: if verbose: print(f"📄 Parsing `{path_with_extras.path_with_extras}`") data = _load(path_with_extras.path, yaml) data["_origin"] = origin datas.append(data) _move_local_optional_dependencies_to_local_dependencies( data=data, # modified in place path_with_extras=path_with_extras, verbose=verbose, ) if not is_nested: all_extras.append(path_with_extras.extras) else: # When nested, the extras that are specified in the # local_dependencies section should be moved to the main dependencies # because they are not optional if specified in the file. Only # the top-level extras are optional. all_extras.append([]) _move_optional_dependencies_to_dependencies( data=data, # modified in place path_with_extras=path_with_extras, verbose=verbose, ) seen.add(path_with_extras.resolved()) # Handle "local_dependencies" (or old name "includes", changed in 0.42.0) for local_dep_obj in get_local_dependencies(data): # NOTE: The current function calls _add_local_dependencies, # which calls the current function recursively _add_local_dependencies( local_dependency=local_dep_obj.local, path_with_extras=path_with_extras, datas=datas, # modified in place all_extras=all_extras, # modified in place seen=seen, # modified in place yaml=yaml, origin=origin, verbose=verbose, ) def _move_optional_dependencies_to_dependencies( data: dict[str, Any], path_with_extras: PathWithExtras, *, verbose: bool = False, ) -> None: optional_dependencies = data.pop("optional_dependencies", {}) for extra in path_with_extras.extras: if extra == "*": # If "*" is specified, include all optional dependencies for opt_deps in optional_dependencies.values(): data.setdefault("dependencies", []).extend(opt_deps) if verbose: print( "📄 Moving all optional dependencies to main dependencies" f" for `{path_with_extras.path_with_extras}`", ) elif extra in optional_dependencies: data.setdefault("dependencies", []).extend(optional_dependencies[extra]) if verbose: print( f"📄 Moving `{extra}` optional dependencies to main dependencies" f" for `{path_with_extras.path_with_extras}`", ) def _move_local_optional_dependencies_to_local_dependencies( *, data: dict[str, Any], # modified in place path_with_extras: PathWithExtras, verbose: bool = False, ) -> None: # Move local dependencies from `optional_dependencies` to `local_dependencies` extras = path_with_extras.extras if "*" in extras: extras = list(data.get("optional_dependencies", {}).keys()) optional_dependencies = data.get("optional_dependencies", {}) for extra in extras: moved = set() for dep in optional_dependencies.get(extra, []): if isinstance(dep, dict): # This is a {"pip": "package"} and/or {"conda": "package"} dependency continue if _str_is_path_like(dep): if verbose: print( f"📄 Moving `{dep}` from the `{extra}` section in" " `optional_dependencies` to `local_dependencies`", ) data.setdefault("local_dependencies", []).append(dep) moved.add(dep) for dep in moved: extras = optional_dependencies[extra] # key must exist if moved non-empty extras.pop(extras.index(dep)) # Remove empty optional_dependencies sections to_delete = [extra for extra, deps in optional_dependencies.items() if not deps] for extra in to_delete: if verbose: print(f"📄 Removing empty `{extra}` section from `optional_dependencies`") optional_dependencies.pop(extra) def _add_local_dependencies( *, local_dependency: str, path_with_extras: PathWithExtras, datas: list[dict[str, Any]], all_extras: list[list[str]], seen: set[PathWithExtras], yaml: YAML, origin: Path, verbose: bool = False, ) -> None: try: requirements_dep_file = parse_folder_or_filename( path_with_extras.path.parent / local_dependency, ) except FileNotFoundError: # Means that this is a local package that is not managed by unidep. # We do not need to do anything here, just in `unidep install`. return if requirements_dep_file.path.suffix in (".whl", ".zip"): if verbose: print( f"⚠️ Local dependency `{local_dependency}` is a wheel or zip file. " "Skipping parsing, but it will be installed by pip if " "`--skip-local` is not set. Note that unidep will not " "detect its dependencies.", ) return if requirements_dep_file.resolved() in seen: return # Avoids circular local_dependencies if verbose: print(f"📄 Parsing `{local_dependency}` from `local_dependencies`") _update_data_structures( path_with_extras=requirements_dep_file, datas=datas, # modified in place all_extras=all_extras, # modified in place seen=seen, # modified in place yaml=yaml, verbose=verbose, is_nested=True, origin=origin, )
[docs] def parse_requirements( *paths: Path, ignore_pins: list[str] | None = None, overwrite_pins: list[str] | None = None, skip_dependencies: list[str] | None = None, verbose: bool = False, extras: list[list[str]] | Literal["*"] | None = None, ) -> ParsedRequirements: """Parse a list of `requirements.yaml` or `pyproject.toml` files. Parameters ---------- paths Paths to `requirements.yaml` or `pyproject.toml` files. ignore_pins List of package names to ignore pins for. overwrite_pins List of package names with pins to overwrite. skip_dependencies List of package names to skip. verbose Whether to print verbose output. extras List of lists of extras to include. The outer list corresponds to the `requirements.yaml` or `pyproject.toml` files, the inner list to the extras to include for that file. If "*", all extras are included, if None, no extras are included. """ paths_with_extras = _to_path_with_extras(paths, extras) # type: ignore[arg-type] ignore_pins = ignore_pins or [] skip_dependencies = skip_dependencies or [] overwrite_pins_map = _parse_overwrite_pins(overwrite_pins or []) # `data` and `all_extras` are lists of the same length datas: list[dict[str, Any]] = [] all_extras: list[list[str]] = [] seen: set[PathWithExtras] = set() yaml = YAML(typ="rt") # Might be unused if all are TOML files for path_with_extras in paths_with_extras: _update_data_structures( path_with_extras=path_with_extras, datas=datas, # modified in place all_extras=all_extras, # modified in place seen=seen, # modified in place yaml=yaml, verbose=verbose, is_nested=False, origin=path_with_extras.path, ) assert len(datas) == len(all_extras) # Parse the requirements from loaded data requirements: dict[str, list[Spec]] = defaultdict(list) optional_dependencies: dict[str, dict[str, list[Spec]]] = defaultdict( lambda: defaultdict(list), ) channels: set[str] = set() platforms: set[Platform] = set() identifier = -1 for data, _extras in zip(datas, all_extras): channels.update(data.get("channels", [])) platforms.update(data.get("platforms", [])) if "dependencies" in data: identifier = _add_dependencies( data["dependencies"], requirements, # modified in place identifier, ignore_pins, overwrite_pins_map, skip_dependencies, origin=data["_origin"], ) for opt_name, opt_deps in data.get("optional_dependencies", {}).items(): if opt_name in _extras or "*" in _extras: identifier = _add_dependencies( opt_deps, optional_dependencies[opt_name], # modified in place identifier, ignore_pins, overwrite_pins_map, skip_dependencies, is_optional=True, origin=data["_origin"], ) return ParsedRequirements( sorted(channels), sorted(platforms), dict(requirements), defaultdict_to_dict(optional_dependencies), )
def _str_is_path_like(s: str) -> bool: """Check if a string is path-like.""" return os.path.sep in s or "/" in s or s.startswith(".") def _check_allowed_local_dependency(name: str, is_optional: bool) -> None: # noqa: FBT001 if _str_is_path_like(name): # There should not be path-like dependencies in the optional_dependencies # section after _move_local_optional_dependencies_to_local_dependencies. assert not is_optional msg = ( f"Local dependencies (`{name}`) are not allowed in `dependencies`." " Use the `local_dependencies` section instead." ) raise ValueError(msg) def _add_dependencies( dependencies: list[str], requirements: dict[str, list[Spec]], # modified in place identifier: int, ignore_pins: list[str], overwrite_pins_map: dict[str, str | None], skip_dependencies: list[str], *, is_optional: bool = False, origin: Path, ) -> int: for i, dep in enumerate(dependencies): identifier += 1 if isinstance(dep, str): specs = _parse_dependency( dep, dependencies, i, "both", identifier, ignore_pins, overwrite_pins_map, skip_dependencies, origin, ) for spec in specs: _check_allowed_local_dependency(spec.name, is_optional) requirements[spec.name].append(spec) continue assert isinstance(dep, dict) for which in ["conda", "pip"]: if which in dep: specs = _parse_dependency( dep[which], dep, which, which, # type: ignore[arg-type] identifier, ignore_pins, overwrite_pins_map, skip_dependencies, origin, ) for spec in specs: _check_allowed_local_dependency(spec.name, is_optional) requirements[spec.name].append(spec) return identifier # Alias for backwards compatibility parse_yaml_requirements = parse_requirements def _extract_local_dependencies( # noqa: PLR0912 path: Path, base_path: Path, processed: set[Path], dependencies: dict[str, set[str]], *, check_pip_installable: bool = True, verbose: bool = False, raise_if_missing: bool = True, warn_non_managed: bool = True, ) -> None: path, extras = parse_folder_or_filename(path) if path in processed: return processed.add(path) yaml = YAML(typ="safe") data = _load(path, yaml) _move_local_optional_dependencies_to_local_dependencies( data=data, # modified in place path_with_extras=PathWithExtras(path, extras), verbose=verbose, ) # Handle "local_dependencies" (or old name "includes", changed in 0.42.0) for local_dep_obj in get_local_dependencies(data): local_dependency = local_dep_obj.local assert not os.path.isabs(local_dependency) # noqa: PTH117 local_path, extras = split_path_and_extras(local_dependency) abs_local = (path.parent / local_path).resolve() if abs_local.suffix in (".whl", ".zip"): if verbose: print(f"🔗 Adding `{local_dependency}` from `local_dependencies`") dependencies[str(base_path)].add(str(abs_local)) continue if not abs_local.exists(): if raise_if_missing: msg = f"File `{abs_local}` not found." raise FileNotFoundError(msg) continue try: requirements_path = parse_folder_or_filename(abs_local).path except FileNotFoundError: # Means that this is a local package that is not managed by unidep. if is_pip_installable(abs_local): dependencies[str(base_path)].add(str(abs_local)) if warn_non_managed: # We do not need to emit this warning when `pip install` is called warn( f"⚠️ Installing a local dependency (`{abs_local.name}`) which" " is not managed by unidep, this will skip all of its" " dependencies, i.e., it will call `pip install` with" " `--no-deps`. To properly manage this dependency," " add a `requirements.yaml` or `pyproject.toml` file with" " `[tool.unidep]` in its directory.", ) elif _is_empty_folder(abs_local): msg = ( f"`{local_dependency}` in `local_dependencies` is not pip" " installable because it is an empty folder. Is it perhaps" " an uninitialized Git submodule? If so, initialize it with" " `git submodule update --init --recursive`. Otherwise," " remove it from `local_dependencies`." ) raise RuntimeError(msg) from None elif _is_empty_git_submodule(abs_local): # Extra check for empty Git submodules (common problem folks run into) msg = ( f"`{local_dependency}` in `local_dependencies` is not installable" " by pip because it is an empty Git submodule. Either remove it" " from `local_dependencies` or fetch the submodule with" " `git submodule update --init --recursive`." ) raise RuntimeError(msg) from None else: msg = ( f"`{local_dependency}` in `local_dependencies` is not pip" " installable nor is it managed by unidep. Remove it" " from `local_dependencies`." ) raise RuntimeError(msg) from None continue project_path = str(requirements_path.parent) if project_path == str(base_path): continue if not check_pip_installable or is_pip_installable(requirements_path.parent): dependencies[str(base_path)].add(project_path) if verbose: print(f"🔗 Adding `{requirements_path}` from `local_dependencies`") _extract_local_dependencies( requirements_path, base_path, processed, dependencies, check_pip_installable=check_pip_installable, verbose=verbose, )
[docs] def parse_local_dependencies( *paths: Path, check_pip_installable: bool = True, verbose: bool = False, raise_if_missing: bool = True, warn_non_managed: bool = True, ) -> dict[Path, list[Path]]: """Extract local project dependencies from a list of `requirements.yaml` or `pyproject.toml` files. Works by loading the specified `local_dependencies` list. Returns a dictionary with the: name of the project folder => list of `Path`s of local dependencies folders. """ # noqa: E501 dependencies: dict[str, set[str]] = defaultdict(set) for p in paths: if verbose: print(f"🔗 Analyzing dependencies in `{p}`") base_path = p.resolve().parent _extract_local_dependencies( path=p, base_path=base_path, processed=set(), dependencies=dependencies, check_pip_installable=check_pip_installable, verbose=verbose, raise_if_missing=raise_if_missing, warn_non_managed=warn_non_managed, ) return { Path(k): sorted({Path(v) for v in v_set}) for k, v_set in sorted(dependencies.items()) }
def yaml_to_toml(yaml_path: Path) -> str: """Converts a `requirements.yaml` file TOML format.""" try: import tomli_w except ImportError: # pragma: no cover msg = ( "❌ `tomli_w` is required to convert YAML to TOML." " Install it with `pip install tomli_w`." ) raise ImportError(msg) from None yaml = YAML(typ="rt") data = _load(yaml_path, yaml) data.pop("name", None) dependencies = data.get("dependencies", []) for i, dep in enumerate(dependencies): if isinstance(dep, str): comment = _extract_first_comment(dependencies, i) if comment is not None: selector = selector_from_comment(comment) if selector is not None: dependencies[i] = f"{dep}:{selector}" continue assert isinstance(dep, dict) for which in ["conda", "pip"]: if which in dep: comment = _extract_first_comment(dep, which) if comment is not None: selector = selector_from_comment(comment) if selector is not None: dep[which] = f"{dep[which]}:{selector}" return tomli_w.dumps({"tool": {"unidep": data}}) def _is_empty_git_submodule(path: Path) -> bool: """Checks if the given path is an empty Git submodule.""" if not path.is_dir(): return False git_file = path / ".git" if not git_file.exists() or not git_file.is_file(): return False # Check if it's empty (apart from the .git file) return len(list(path.iterdir())) == 1 # Only .git should be present def _is_empty_folder(path: Path) -> bool: """Checks if the given path is an empty folder.""" return not any(path.iterdir())