"""unidep - Unified Conda and Pip requirements management.
This module provides parsing of `requirements.yaml` and `pyproject.toml` files.
"""
from __future__ import annotations
import functools
import hashlib
import os
import sys
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, Any, NamedTuple
from ruamel.yaml import YAML
from ruamel.yaml.comments import CommentedMap, CommentedSeq
from unidep.platform_definitions import Platform, Spec, platforms_from_selector
from unidep.utils import (
LocalDependency,
PathWithExtras,
defaultdict_to_dict,
is_pip_installable,
parse_folder_or_filename,
parse_package_str,
selector_from_comment,
split_path_and_extras,
unidep_configured_in_toml,
warn,
)
if TYPE_CHECKING:
if sys.version_info >= (3, 8):
from typing import Literal
else: # pragma: no cover
from typing_extensions import Literal
if sys.version_info >= (3, 11):
import tomllib
else: # pragma: no cover
import tomli as tomllib
[docs]
def find_requirements_files(
base_dir: str | Path = ".",
depth: int = 1,
*,
verbose: bool = False,
) -> list[Path]:
"""Scan a directory for `requirements.yaml` and `pyproject.toml` files."""
base_path = Path(base_dir)
found_files = []
# Define a helper function to recursively scan directories
def _scan_dir(path: Path, current_depth: int) -> None:
if verbose:
print(f"🔍 Scanning in `{path}` at depth {current_depth}")
if current_depth > depth:
return
for child in sorted(path.iterdir()):
if child.is_dir():
_scan_dir(child, current_depth + 1)
elif child.name == "requirements.yaml":
found_files.append(child)
if verbose:
print(f'🔍 Found `"requirements.yaml"` at `{child}`')
elif child.name == "pyproject.toml" and unidep_configured_in_toml(child):
if verbose:
print(f'🔍 Found `"pyproject.toml"` with dependencies at `{child}`')
found_files.append(child)
_scan_dir(base_path, 0)
return sorted(found_files)
def _extract_first_comment(
commented_map: CommentedMap,
index_or_key: int | str,
) -> str | None:
"""Extract the first comment from a CommentedMap."""
comments = commented_map.ca.items.get(index_or_key, None)
if comments is None:
return None
comment_strings = next(
c.value.split("\n")[0].rstrip().lstrip() for c in comments if c is not None
)
if not comment_strings:
# empty string
return None
return "".join(comment_strings)
def _identifier(identifier: int, selector: str | None) -> str:
"""Return a unique identifier based on the comment."""
platforms = None if selector is None else tuple(platforms_from_selector(selector))
data_str = f"{identifier}-{platforms}"
# Hash using SHA256 and take the first 8 characters for a shorter hash
return hashlib.sha256(data_str.encode()).hexdigest()[:8]
def _parse_dependency(
dependency: str,
dependencies: CommentedMap,
index_or_key: int | str,
which: Literal["conda", "pip", "both"],
identifier: int,
ignore_pins: list[str],
overwrite_pins: dict[str, str | None],
skip_dependencies: list[str],
origin: Path,
) -> list[Spec]:
name, pin, selector = parse_package_str(dependency)
if name in ignore_pins:
pin = None
if name in skip_dependencies:
return []
if name in overwrite_pins:
pin = overwrite_pins[name]
comment = (
_extract_first_comment(dependencies, index_or_key)
if isinstance(dependencies, (CommentedMap, CommentedSeq))
else None
)
if comment and selector is None:
selector = selector_from_comment(comment)
identifier_hash = _identifier(identifier, selector)
if which == "both":
return [
Spec(name, "conda", pin, identifier_hash, selector, origin=(origin,)),
Spec(name, "pip", pin, identifier_hash, selector, origin=(origin,)),
]
return [Spec(name, which, pin, identifier_hash, selector, origin=(origin,))]
class ParsedRequirements(NamedTuple):
"""Requirements with comments."""
channels: list[str]
platforms: list[Platform]
requirements: dict[str, list[Spec]]
optional_dependencies: dict[str, dict[str, list[Spec]]]
class Requirements(NamedTuple):
"""Requirements as CommentedSeq."""
# mypy doesn't support CommentedSeq[str], so we use list[str] instead.
channels: list[str] # actually a CommentedSeq[str]
conda: list[str] # actually a CommentedSeq[str]
pip: list[str] # actually a CommentedSeq[str]
def _parse_overwrite_pins(overwrite_pins: list[str]) -> dict[str, str | None]:
"""Parse overwrite pins."""
result = {}
for overwrite_pin in overwrite_pins:
pkg = parse_package_str(overwrite_pin)
result[pkg.name] = pkg.pin
return result
@functools.lru_cache
def _load(p: Path, yaml: YAML) -> dict[str, Any]:
if p.suffix == ".toml":
with p.open("rb") as f:
pyproject = tomllib.load(f)
project_dependencies = pyproject.get("project", {}).get("dependencies", [])
unidep_cfg = pyproject["tool"]["unidep"]
if not project_dependencies:
return unidep_cfg
unidep_dependencies = unidep_cfg.setdefault("dependencies", [])
project_dependency_handling = unidep_cfg.get(
"project_dependency_handling",
"ignore",
)
_add_project_dependencies(
project_dependencies,
unidep_dependencies,
project_dependency_handling,
)
return unidep_cfg
with p.open() as f:
return yaml.load(f)
def _add_project_dependencies(
project_dependencies: list[str],
unidep_dependencies: list[dict[str, str] | str],
project_dependency_handling: Literal["same-name", "pip-only", "ignore"],
) -> None:
"""Add project dependencies to unidep dependencies based on the chosen handling."""
if project_dependency_handling == "same-name":
unidep_dependencies.extend(project_dependencies)
elif project_dependency_handling == "pip-only":
unidep_dependencies.extend([{"pip": dep} for dep in project_dependencies])
elif project_dependency_handling != "ignore":
msg = (
f"Invalid `project_dependency_handling` value: {project_dependency_handling}." # noqa: E501
" Must be one of 'same-name', 'pip-only', 'ignore'."
)
raise ValueError(msg)
def _parse_local_dependency_item(item: str | dict[str, str]) -> LocalDependency:
"""Parse a single local dependency item into a LocalDependency object."""
if isinstance(item, str):
return LocalDependency(local=item, pypi=None)
if isinstance(item, dict):
if "local" not in item:
msg = "Dictionary-style local dependency must have a 'local' key"
raise ValueError(msg)
return LocalDependency(local=item["local"], pypi=item.get("pypi"))
msg = f"Invalid local dependency format: {item}"
raise TypeError(msg)
def get_local_dependencies(data: dict[str, Any]) -> list[LocalDependency]:
"""Get `local_dependencies` from a `requirements.yaml` or `pyproject.toml` file."""
raw_deps = []
if "local_dependencies" in data:
raw_deps = data["local_dependencies"]
elif "includes" in data:
warn(
"⚠️ You are using `includes` in `requirements.yaml` or `pyproject.toml`"
" `[unidep.tool]` which is deprecated since 0.42.0 and has been renamed to"
" `local_dependencies`.",
category=DeprecationWarning,
stacklevel=2,
)
raw_deps = data["includes"]
return [_parse_local_dependency_item(item) for item in raw_deps]
def _to_path_with_extras(
paths: list[Path],
extras: list[list[str]] | Literal["*"] | None,
) -> list[PathWithExtras]:
if isinstance(extras, (list, tuple)) and len(extras) != len(paths):
msg = (
f"Length of `extras` ({len(extras)}) does not match length"
f" of `paths` ({len(paths)})."
)
raise ValueError(msg)
paths_with_extras = [parse_folder_or_filename(p) for p in paths]
if extras is None:
return paths_with_extras
assert extras is not None
if any(p.extras for p in paths_with_extras):
msg = (
"Cannot specify `extras` list when paths are"
" specified like `path/to/project[extra1,extra2]`, `extras` must be `None`"
" or specify pure paths without extras like `path/to/project` and specify"
" extras in `extras`."
)
raise ValueError(msg)
if extras == "*":
extras = [["*"]] * len(paths) # type: ignore[list-item]
return [PathWithExtras(p.path, e) for p, e in zip(paths_with_extras, extras)]
def _update_data_structures(
*,
path_with_extras: PathWithExtras,
datas: list[dict[str, Any]], # modified in place
all_extras: list[list[str]], # modified in place
seen: set[PathWithExtras], # modified in place
yaml: YAML,
is_nested: bool,
origin: Path,
verbose: bool = False,
) -> None:
if verbose:
print(f"📄 Parsing `{path_with_extras.path_with_extras}`")
data = _load(path_with_extras.path, yaml)
data["_origin"] = origin
datas.append(data)
_move_local_optional_dependencies_to_local_dependencies(
data=data, # modified in place
path_with_extras=path_with_extras,
verbose=verbose,
)
if not is_nested:
all_extras.append(path_with_extras.extras)
else:
# When nested, the extras that are specified in the
# local_dependencies section should be moved to the main dependencies
# because they are not optional if specified in the file. Only
# the top-level extras are optional.
all_extras.append([])
_move_optional_dependencies_to_dependencies(
data=data, # modified in place
path_with_extras=path_with_extras,
verbose=verbose,
)
seen.add(path_with_extras.resolved())
# Handle "local_dependencies" (or old name "includes", changed in 0.42.0)
for local_dep_obj in get_local_dependencies(data):
# NOTE: The current function calls _add_local_dependencies,
# which calls the current function recursively
_add_local_dependencies(
local_dependency=local_dep_obj.local,
path_with_extras=path_with_extras,
datas=datas, # modified in place
all_extras=all_extras, # modified in place
seen=seen, # modified in place
yaml=yaml,
origin=origin,
verbose=verbose,
)
def _move_optional_dependencies_to_dependencies(
data: dict[str, Any],
path_with_extras: PathWithExtras,
*,
verbose: bool = False,
) -> None:
optional_dependencies = data.pop("optional_dependencies", {})
for extra in path_with_extras.extras:
if extra == "*":
# If "*" is specified, include all optional dependencies
for opt_deps in optional_dependencies.values():
data.setdefault("dependencies", []).extend(opt_deps)
if verbose:
print(
"📄 Moving all optional dependencies to main dependencies"
f" for `{path_with_extras.path_with_extras}`",
)
elif extra in optional_dependencies:
data.setdefault("dependencies", []).extend(optional_dependencies[extra])
if verbose:
print(
f"📄 Moving `{extra}` optional dependencies to main dependencies"
f" for `{path_with_extras.path_with_extras}`",
)
def _move_local_optional_dependencies_to_local_dependencies(
*,
data: dict[str, Any], # modified in place
path_with_extras: PathWithExtras,
verbose: bool = False,
) -> None:
# Move local dependencies from `optional_dependencies` to `local_dependencies`
extras = path_with_extras.extras
if "*" in extras:
extras = list(data.get("optional_dependencies", {}).keys())
optional_dependencies = data.get("optional_dependencies", {})
for extra in extras:
moved = set()
for dep in optional_dependencies.get(extra, []):
if isinstance(dep, dict):
# This is a {"pip": "package"} and/or {"conda": "package"} dependency
continue
if _str_is_path_like(dep):
if verbose:
print(
f"📄 Moving `{dep}` from the `{extra}` section in"
" `optional_dependencies` to `local_dependencies`",
)
data.setdefault("local_dependencies", []).append(dep)
moved.add(dep)
for dep in moved:
extras = optional_dependencies[extra] # key must exist if moved non-empty
extras.pop(extras.index(dep))
# Remove empty optional_dependencies sections
to_delete = [extra for extra, deps in optional_dependencies.items() if not deps]
for extra in to_delete:
if verbose:
print(f"📄 Removing empty `{extra}` section from `optional_dependencies`")
optional_dependencies.pop(extra)
def _add_local_dependencies(
*,
local_dependency: str,
path_with_extras: PathWithExtras,
datas: list[dict[str, Any]],
all_extras: list[list[str]],
seen: set[PathWithExtras],
yaml: YAML,
origin: Path,
verbose: bool = False,
) -> None:
try:
requirements_dep_file = parse_folder_or_filename(
path_with_extras.path.parent / local_dependency,
)
except FileNotFoundError:
# Means that this is a local package that is not managed by unidep.
# We do not need to do anything here, just in `unidep install`.
return
if requirements_dep_file.path.suffix in (".whl", ".zip"):
if verbose:
print(
f"⚠️ Local dependency `{local_dependency}` is a wheel or zip file. "
"Skipping parsing, but it will be installed by pip if "
"`--skip-local` is not set. Note that unidep will not "
"detect its dependencies.",
)
return
if requirements_dep_file.resolved() in seen:
return # Avoids circular local_dependencies
if verbose:
print(f"📄 Parsing `{local_dependency}` from `local_dependencies`")
_update_data_structures(
path_with_extras=requirements_dep_file,
datas=datas, # modified in place
all_extras=all_extras, # modified in place
seen=seen, # modified in place
yaml=yaml,
verbose=verbose,
is_nested=True,
origin=origin,
)
[docs]
def parse_requirements(
*paths: Path,
ignore_pins: list[str] | None = None,
overwrite_pins: list[str] | None = None,
skip_dependencies: list[str] | None = None,
verbose: bool = False,
extras: list[list[str]] | Literal["*"] | None = None,
) -> ParsedRequirements:
"""Parse a list of `requirements.yaml` or `pyproject.toml` files.
Parameters
----------
paths
Paths to `requirements.yaml` or `pyproject.toml` files.
ignore_pins
List of package names to ignore pins for.
overwrite_pins
List of package names with pins to overwrite.
skip_dependencies
List of package names to skip.
verbose
Whether to print verbose output.
extras
List of lists of extras to include. The outer list corresponds to the
`requirements.yaml` or `pyproject.toml` files, the inner list to the
extras to include for that file. If "*", all extras are included,
if None, no extras are included.
"""
paths_with_extras = _to_path_with_extras(paths, extras) # type: ignore[arg-type]
ignore_pins = ignore_pins or []
skip_dependencies = skip_dependencies or []
overwrite_pins_map = _parse_overwrite_pins(overwrite_pins or [])
# `data` and `all_extras` are lists of the same length
datas: list[dict[str, Any]] = []
all_extras: list[list[str]] = []
seen: set[PathWithExtras] = set()
yaml = YAML(typ="rt") # Might be unused if all are TOML files
for path_with_extras in paths_with_extras:
_update_data_structures(
path_with_extras=path_with_extras,
datas=datas, # modified in place
all_extras=all_extras, # modified in place
seen=seen, # modified in place
yaml=yaml,
verbose=verbose,
is_nested=False,
origin=path_with_extras.path,
)
assert len(datas) == len(all_extras)
# Parse the requirements from loaded data
requirements: dict[str, list[Spec]] = defaultdict(list)
optional_dependencies: dict[str, dict[str, list[Spec]]] = defaultdict(
lambda: defaultdict(list),
)
channels: set[str] = set()
platforms: set[Platform] = set()
identifier = -1
for data, _extras in zip(datas, all_extras):
channels.update(data.get("channels", []))
platforms.update(data.get("platforms", []))
if "dependencies" in data:
identifier = _add_dependencies(
data["dependencies"],
requirements, # modified in place
identifier,
ignore_pins,
overwrite_pins_map,
skip_dependencies,
origin=data["_origin"],
)
for opt_name, opt_deps in data.get("optional_dependencies", {}).items():
if opt_name in _extras or "*" in _extras:
identifier = _add_dependencies(
opt_deps,
optional_dependencies[opt_name], # modified in place
identifier,
ignore_pins,
overwrite_pins_map,
skip_dependencies,
is_optional=True,
origin=data["_origin"],
)
return ParsedRequirements(
sorted(channels),
sorted(platforms),
dict(requirements),
defaultdict_to_dict(optional_dependencies),
)
def _str_is_path_like(s: str) -> bool:
"""Check if a string is path-like."""
return os.path.sep in s or "/" in s or s.startswith(".")
def _check_allowed_local_dependency(name: str, is_optional: bool) -> None: # noqa: FBT001
if _str_is_path_like(name):
# There should not be path-like dependencies in the optional_dependencies
# section after _move_local_optional_dependencies_to_local_dependencies.
assert not is_optional
msg = (
f"Local dependencies (`{name}`) are not allowed in `dependencies`."
" Use the `local_dependencies` section instead."
)
raise ValueError(msg)
def _add_dependencies(
dependencies: list[str],
requirements: dict[str, list[Spec]], # modified in place
identifier: int,
ignore_pins: list[str],
overwrite_pins_map: dict[str, str | None],
skip_dependencies: list[str],
*,
is_optional: bool = False,
origin: Path,
) -> int:
for i, dep in enumerate(dependencies):
identifier += 1
if isinstance(dep, str):
specs = _parse_dependency(
dep,
dependencies,
i,
"both",
identifier,
ignore_pins,
overwrite_pins_map,
skip_dependencies,
origin,
)
for spec in specs:
_check_allowed_local_dependency(spec.name, is_optional)
requirements[spec.name].append(spec)
continue
assert isinstance(dep, dict)
for which in ["conda", "pip"]:
if which in dep:
specs = _parse_dependency(
dep[which],
dep,
which,
which, # type: ignore[arg-type]
identifier,
ignore_pins,
overwrite_pins_map,
skip_dependencies,
origin,
)
for spec in specs:
_check_allowed_local_dependency(spec.name, is_optional)
requirements[spec.name].append(spec)
return identifier
# Alias for backwards compatibility
parse_yaml_requirements = parse_requirements
def _extract_local_dependencies( # noqa: PLR0912
path: Path,
base_path: Path,
processed: set[Path],
dependencies: dict[str, set[str]],
*,
check_pip_installable: bool = True,
verbose: bool = False,
raise_if_missing: bool = True,
warn_non_managed: bool = True,
) -> None:
path, extras = parse_folder_or_filename(path)
if path in processed:
return
processed.add(path)
yaml = YAML(typ="safe")
data = _load(path, yaml)
_move_local_optional_dependencies_to_local_dependencies(
data=data, # modified in place
path_with_extras=PathWithExtras(path, extras),
verbose=verbose,
)
# Handle "local_dependencies" (or old name "includes", changed in 0.42.0)
for local_dep_obj in get_local_dependencies(data):
local_dependency = local_dep_obj.local
assert not os.path.isabs(local_dependency) # noqa: PTH117
local_path, extras = split_path_and_extras(local_dependency)
abs_local = (path.parent / local_path).resolve()
if abs_local.suffix in (".whl", ".zip"):
if verbose:
print(f"🔗 Adding `{local_dependency}` from `local_dependencies`")
dependencies[str(base_path)].add(str(abs_local))
continue
if not abs_local.exists():
if raise_if_missing:
msg = f"File `{abs_local}` not found."
raise FileNotFoundError(msg)
continue
try:
requirements_path = parse_folder_or_filename(abs_local).path
except FileNotFoundError:
# Means that this is a local package that is not managed by unidep.
if is_pip_installable(abs_local):
dependencies[str(base_path)].add(str(abs_local))
if warn_non_managed:
# We do not need to emit this warning when `pip install` is called
warn(
f"⚠️ Installing a local dependency (`{abs_local.name}`) which"
" is not managed by unidep, this will skip all of its"
" dependencies, i.e., it will call `pip install` with"
" `--no-deps`. To properly manage this dependency,"
" add a `requirements.yaml` or `pyproject.toml` file with"
" `[tool.unidep]` in its directory.",
)
elif _is_empty_folder(abs_local):
msg = (
f"`{local_dependency}` in `local_dependencies` is not pip"
" installable because it is an empty folder. Is it perhaps"
" an uninitialized Git submodule? If so, initialize it with"
" `git submodule update --init --recursive`. Otherwise,"
" remove it from `local_dependencies`."
)
raise RuntimeError(msg) from None
elif _is_empty_git_submodule(abs_local):
# Extra check for empty Git submodules (common problem folks run into)
msg = (
f"`{local_dependency}` in `local_dependencies` is not installable"
" by pip because it is an empty Git submodule. Either remove it"
" from `local_dependencies` or fetch the submodule with"
" `git submodule update --init --recursive`."
)
raise RuntimeError(msg) from None
else:
msg = (
f"`{local_dependency}` in `local_dependencies` is not pip"
" installable nor is it managed by unidep. Remove it"
" from `local_dependencies`."
)
raise RuntimeError(msg) from None
continue
project_path = str(requirements_path.parent)
if project_path == str(base_path):
continue
if not check_pip_installable or is_pip_installable(requirements_path.parent):
dependencies[str(base_path)].add(project_path)
if verbose:
print(f"🔗 Adding `{requirements_path}` from `local_dependencies`")
_extract_local_dependencies(
requirements_path,
base_path,
processed,
dependencies,
check_pip_installable=check_pip_installable,
verbose=verbose,
)
[docs]
def parse_local_dependencies(
*paths: Path,
check_pip_installable: bool = True,
verbose: bool = False,
raise_if_missing: bool = True,
warn_non_managed: bool = True,
) -> dict[Path, list[Path]]:
"""Extract local project dependencies from a list of `requirements.yaml` or `pyproject.toml` files.
Works by loading the specified `local_dependencies` list.
Returns a dictionary with the:
name of the project folder => list of `Path`s of local dependencies folders.
""" # noqa: E501
dependencies: dict[str, set[str]] = defaultdict(set)
for p in paths:
if verbose:
print(f"🔗 Analyzing dependencies in `{p}`")
base_path = p.resolve().parent
_extract_local_dependencies(
path=p,
base_path=base_path,
processed=set(),
dependencies=dependencies,
check_pip_installable=check_pip_installable,
verbose=verbose,
raise_if_missing=raise_if_missing,
warn_non_managed=warn_non_managed,
)
return {
Path(k): sorted({Path(v) for v in v_set})
for k, v_set in sorted(dependencies.items())
}
def yaml_to_toml(yaml_path: Path) -> str:
"""Converts a `requirements.yaml` file TOML format."""
try:
import tomli_w
except ImportError: # pragma: no cover
msg = (
"❌ `tomli_w` is required to convert YAML to TOML."
" Install it with `pip install tomli_w`."
)
raise ImportError(msg) from None
yaml = YAML(typ="rt")
data = _load(yaml_path, yaml)
data.pop("name", None)
dependencies = data.get("dependencies", [])
for i, dep in enumerate(dependencies):
if isinstance(dep, str):
comment = _extract_first_comment(dependencies, i)
if comment is not None:
selector = selector_from_comment(comment)
if selector is not None:
dependencies[i] = f"{dep}:{selector}"
continue
assert isinstance(dep, dict)
for which in ["conda", "pip"]:
if which in dep:
comment = _extract_first_comment(dep, which)
if comment is not None:
selector = selector_from_comment(comment)
if selector is not None:
dep[which] = f"{dep[which]}:{selector}"
return tomli_w.dumps({"tool": {"unidep": data}})
def _is_empty_git_submodule(path: Path) -> bool:
"""Checks if the given path is an empty Git submodule."""
if not path.is_dir():
return False
git_file = path / ".git"
if not git_file.exists() or not git_file.is_file():
return False
# Check if it's empty (apart from the .git file)
return len(list(path.iterdir())) == 1 # Only .git should be present
def _is_empty_folder(path: Path) -> bool:
"""Checks if the given path is an empty folder."""
return not any(path.iterdir())