Source code for package_scan.adapters.python_adapter

"""Python ecosystem adapter for scanning pip, poetry, pipenv, and conda projects"""

import json
import os
import re
from pathlib import Path
from typing import List

import click

from package_scan.core import Finding
from .base import EcosystemAdapter


[docs] class PythonAdapter(EcosystemAdapter): """ Adapter for scanning Python projects Supports: - pip: requirements.txt, requirements-\\*.txt - Poetry: pyproject.toml, poetry.lock - Pipenv: Pipfile, Pipfile.lock - conda: environment.yml - Version matching: PEP 440 specifiers (==, >=, ~=, !=, etc.) Ecosystem identifier: 'pip' (matches PyPI package format) """ def _get_ecosystem_name(self) -> str: """Return ecosystem identifier""" return 'pip'
[docs] def get_manifest_files(self) -> List[str]: """Return list of manifest file names""" return ['requirements.txt', 'pyproject.toml', 'Pipfile', 'environment.yml', 'setup.py']
[docs] def get_lockfile_names(self) -> List[str]: """Return list of lockfile names""" return ['poetry.lock', 'Pipfile.lock', 'conda-lock.yml']
[docs] def detect_projects(self) -> List[Path]: """ Detect Python projects by looking for Python manifest files Returns: List of project directories """ projects = [] for dirpath, dirnames, filenames in os.walk(self.root_dir): # Skip common excluded directories dirnames[:] = [d for d in dirnames if not self._should_skip_directory(Path(dirpath) / d)] # Check for Python manifest files manifest_files = { 'requirements.txt', 'pyproject.toml', 'Pipfile', 'environment.yml', 'setup.py', 'setup.cfg' } # Also check for requirements-*.txt files has_requirements_variant = any( f.startswith('requirements') and f.endswith('.txt') for f in filenames ) if manifest_files & set(filenames) or has_requirements_variant: projects.append(Path(dirpath)) return projects
[docs] def scan_project(self, project_dir: Path) -> List[Finding]: """ Scan a single Python project for compromised packages Args: project_dir: Project directory Returns: List of findings """ if isinstance(project_dir, str): project_dir = Path(project_dir) findings = [] # 1. Check requirements.txt files (including requirements-*.txt) for req_file in project_dir.glob('requirements*.txt'): if req_file.is_file(): findings.extend(self._scan_requirements_txt(req_file)) # 2. Check pyproject.toml (Poetry) pyproject_toml = project_dir / 'pyproject.toml' if pyproject_toml.exists(): findings.extend(self._scan_pyproject_toml(pyproject_toml)) # 3. Check poetry.lock poetry_lock = project_dir / 'poetry.lock' if poetry_lock.exists(): findings.extend(self._scan_poetry_lock(poetry_lock)) # 4. Check Pipfile (pipenv) pipfile = project_dir / 'Pipfile' if pipfile.exists(): findings.extend(self._scan_pipfile(pipfile)) # 5. Check Pipfile.lock pipfile_lock = project_dir / 'Pipfile.lock' if pipfile_lock.exists(): findings.extend(self._scan_pipfile_lock(pipfile_lock)) # 6. Check environment.yml (conda) env_yml = project_dir / 'environment.yml' if env_yml.exists(): findings.extend(self._scan_conda_environment(env_yml)) return findings
def _scan_requirements_txt(self, file_path: Path) -> List[Finding]: """ Scan requirements.txt for compromised packages Format: package==1.2.3 package>=1.0.0 package~=1.2.0 package>=1.0,<2.0 package[extra]==1.2.3 git+https://... Args: file_path: Path to requirements.txt Returns: List of findings """ findings = [] try: with open(file_path, 'r', encoding='utf-8') as f: lines = f.readlines() for line_num, line in enumerate(lines, 1): # Remove comments and whitespace line = line.split('#')[0].strip() if not line or line.startswith('-'): continue # Skip URLs and local paths if line.startswith(('http://', 'https://', 'git+', 'file://', './', '../')): continue # Parse package specification # Pattern: package[extras]==version or package>=version,<version match = re.match(r'^([a-zA-Z0-9_\-\.]+)(\[.*?\])?\s*(.*)$', line) if not match: continue package_name = match.group(1).lower() # extras = match.group(2) # Not used for threat matching version_spec = match.group(3).strip() if package_name not in self.compromised_packages: continue # Parse version specifier if not version_spec: # No version specified - warn but don't report as threat continue # Handle exact version (==) if version_spec.startswith('=='): version = version_spec[2:].strip() if version in self.compromised_packages[package_name]: findings.append(Finding( ecosystem='pip', finding_type='manifest', file_path=str(file_path), package_name=package_name, version=version, match_type='exact', declared_spec=version_spec, dependency_type='requirement' )) # Handle version ranges and operators else: matching_versions = self._get_matching_pep440_versions( version_spec, package_name) if matching_versions: findings.append(Finding( ecosystem='pip', finding_type='manifest', file_path=str(file_path), package_name=package_name, version=", ".join(sorted(matching_versions)), match_type='range', declared_spec=version_spec, dependency_type='requirement', metadata={'included_versions': sorted(matching_versions)} )) except Exception as e: click.echo(click.style( f"⚠️ Warning: Error reading {file_path}: {e}", fg='yellow'), err=True) return findings def _scan_pyproject_toml(self, file_path: Path) -> List[Finding]: """ Scan pyproject.toml for compromised packages (Poetry format) Args: file_path: Path to pyproject.toml Returns: List of findings """ findings = [] try: # Try to import toml parser try: import toml except ImportError: try: import tomli as toml except ImportError: click.echo(click.style( f"⚠️ Warning: toml/tomli not installed, skipping {file_path}", fg='yellow'), err=True) click.echo(click.style( " Install with: pip install toml", fg='yellow', dim=True), err=True) return findings with open(file_path, 'r', encoding='utf-8') as f: data = toml.load(f) # Poetry dependencies are in [tool.poetry.dependencies] and [tool.poetry.dev-dependencies] dep_sections = [] if 'tool' in data and 'poetry' in data['tool']: poetry = data['tool']['poetry'] if 'dependencies' in poetry: dep_sections.append(('dependencies', poetry['dependencies'])) if 'dev-dependencies' in poetry: dep_sections.append(('dev-dependencies', poetry['dev-dependencies'])) for dep_type, dependencies in dep_sections: for package_name, version_spec in dependencies.items(): # Skip python itself if package_name.lower() == 'python': continue package_name = package_name.lower() if package_name not in self.compromised_packages: continue # Poetry version specs can be strings or dicts if isinstance(version_spec, dict): version_spec = version_spec.get('version', '') if not version_spec or version_spec == '*': continue # Convert Poetry caret (^) and tilde (~) to PEP 440 # ^1.2.3 means >=1.2.3,<2.0.0 # ~1.2.3 means >=1.2.3,<1.3.0 pep440_spec = self._convert_poetry_to_pep440(version_spec) # Check for matches matching_versions = self._get_matching_pep440_versions( pep440_spec, package_name) if matching_versions: findings.append(Finding( ecosystem='pip', finding_type='manifest', file_path=str(file_path), package_name=package_name, version=", ".join(sorted(matching_versions)), match_type='range' if len(matching_versions) > 1 else 'exact', declared_spec=version_spec, dependency_type=dep_type, metadata={'included_versions': sorted(matching_versions)} )) except Exception as e: click.echo(click.style( f"⚠️ Warning: Error reading {file_path}: {e}", fg='yellow'), err=True) return findings def _scan_poetry_lock(self, file_path: Path) -> List[Finding]: """ Scan poetry.lock for compromised packages Args: file_path: Path to poetry.lock Returns: List of findings """ findings = [] try: # Try to import toml parser try: import toml except ImportError: try: import tomli as toml except ImportError: return findings with open(file_path, 'r', encoding='utf-8') as f: data = toml.load(f) # Poetry lockfile has [[package]] sections packages = data.get('package', []) for pkg in packages: package_name = pkg.get('name', '').lower() version = pkg.get('version', '') if package_name in self.compromised_packages: if version in self.compromised_packages[package_name]: findings.append(Finding( ecosystem='pip', finding_type='lockfile', file_path=str(file_path), package_name=package_name, version=version, match_type='exact', metadata={'lockfile_type': 'poetry.lock'} )) except Exception as e: click.echo(click.style( f"⚠️ Warning: Error reading {file_path}: {e}", fg='yellow'), err=True) return findings def _scan_pipfile(self, file_path: Path) -> List[Finding]: """ Scan Pipfile for compromised packages Args: file_path: Path to Pipfile Returns: List of findings """ findings = [] try: # Try to import toml parser try: import toml except ImportError: try: import tomli as toml except ImportError: return findings with open(file_path, 'r', encoding='utf-8') as f: data = toml.load(f) # Pipfile has [packages] and [dev-packages] dep_sections = [] if 'packages' in data: dep_sections.append(('packages', data['packages'])) if 'dev-packages' in data: dep_sections.append(('dev-packages', data['dev-packages'])) for dep_type, dependencies in dep_sections: for package_name, version_spec in dependencies.items(): package_name = package_name.lower() if package_name not in self.compromised_packages: continue # Pipfile version specs can be strings or dicts if isinstance(version_spec, dict): version_spec = version_spec.get('version', '*') if version_spec == '*': continue # Check for matches matching_versions = self._get_matching_pep440_versions( version_spec, package_name) if matching_versions: findings.append(Finding( ecosystem='pip', finding_type='manifest', file_path=str(file_path), package_name=package_name, version=", ".join(sorted(matching_versions)), match_type='range' if len(matching_versions) > 1 else 'exact', declared_spec=version_spec, dependency_type=dep_type, metadata={'included_versions': sorted(matching_versions)} )) except Exception as e: click.echo(click.style( f"⚠️ Warning: Error reading {file_path}: {e}", fg='yellow'), err=True) return findings def _scan_pipfile_lock(self, file_path: Path) -> List[Finding]: """ Scan Pipfile.lock for compromised packages Args: file_path: Path to Pipfile.lock Returns: List of findings """ findings = [] try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) # Pipfile.lock has "default" and "develop" sections for section_name in ['default', 'develop']: section = data.get(section_name, {}) for package_name, pkg_info in section.items(): package_name = package_name.lower() version = pkg_info.get('version', '').lstrip('=') # Remove leading == if package_name in self.compromised_packages: if version in self.compromised_packages[package_name]: findings.append(Finding( ecosystem='pip', finding_type='lockfile', file_path=str(file_path), package_name=package_name, version=version, match_type='exact', metadata={'lockfile_type': 'Pipfile.lock', 'section': section_name} )) except json.JSONDecodeError: click.echo(click.style( f"⚠️ Warning: Invalid JSON in {file_path}", fg='yellow'), err=True) except Exception as e: click.echo(click.style( f"⚠️ Warning: Error reading {file_path}: {e}", fg='yellow'), err=True) return findings def _scan_conda_environment(self, file_path: Path) -> List[Finding]: """ Scan conda environment.yml for compromised packages Args: file_path: Path to environment.yml Returns: List of findings """ findings = [] try: # Try to import yaml parser try: import yaml except ImportError: click.echo(click.style( f"⚠️ Warning: PyYAML not installed, skipping {file_path}", fg='yellow'), err=True) return findings with open(file_path, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if not data or 'dependencies' not in data: return findings dependencies = data['dependencies'] for dep in dependencies: # Skip conda channels and pip sections if isinstance(dep, dict): # pip dependencies section if 'pip' in dep: for pip_dep in dep['pip']: # Parse pip format match = re.match(r'^([a-zA-Z0-9_\-\.]+)\s*(.*)$', pip_dep) if match: package_name = match.group(1).lower() version_spec = match.group(2).strip() if package_name in self.compromised_packages: if version_spec.startswith('=='): version = version_spec[2:].strip() if version in self.compromised_packages[package_name]: findings.append(Finding( ecosystem='pip', finding_type='manifest', file_path=str(file_path), package_name=package_name, version=version, match_type='exact', declared_spec=version_spec, dependency_type='pip-dependency' )) continue # Conda package format: package=version or package if isinstance(dep, str): parts = dep.split('=') package_name = parts[0].lower() if package_name in self.compromised_packages and len(parts) >= 2: version = parts[1] if version in self.compromised_packages[package_name]: findings.append(Finding( ecosystem='pip', finding_type='manifest', file_path=str(file_path), package_name=package_name, version=version, match_type='exact', declared_spec=f"={version}", dependency_type='conda-dependency' )) except Exception as e: click.echo(click.style( f"⚠️ Warning: Error reading {file_path}: {e}", fg='yellow'), err=True) return findings def _convert_poetry_to_pep440(self, poetry_spec: str) -> str: """ Convert Poetry version specifier to PEP 440 ^1.2.3 → >=1.2.3,<2.0.0 ~1.2.3 → >=1.2.3,<1.3.0 Args: poetry_spec: Poetry version specifier Returns: PEP 440 specifier """ if poetry_spec.startswith('^'): version = poetry_spec[1:] parts = version.split('.') if len(parts) >= 1: major = int(parts[0]) return f">={version},<{major+1}.0.0" elif poetry_spec.startswith('~'): version = poetry_spec[1:] parts = version.split('.') if len(parts) >= 2: major, minor = parts[0], int(parts[1]) return f">={version},<{major}.{minor+1}.0" return poetry_spec def _get_matching_pep440_versions( self, version_spec: str, package_name: str ) -> List[str]: """ Get compromised versions matching PEP 440 specifier Simplified implementation for common operators: ==, >=, <=, >, <, !=, ~= Args: version_spec: PEP 440 version specifier package_name: Package name Returns: List of matching versions """ matching = [] # Handle comma-separated specs: >=1.0,<2.0 specs = [s.strip() for s in version_spec.split(',')] for version in self.compromised_packages[package_name]: if all(self._check_pep440_spec(version, spec) for spec in specs): matching.append(version) return matching def _check_pep440_spec(self, version: str, spec: str) -> bool: """ Check if version satisfies PEP 440 specifier Args: version: Version to check spec: PEP 440 specifier Returns: True if version satisfies spec """ spec = spec.strip() # Handle different operators if spec.startswith('=='): return version == spec[2:].strip() elif spec.startswith('>='): return self._version_compare_simple(version, spec[2:].strip()) >= 0 elif spec.startswith('<='): return self._version_compare_simple(version, spec[2:].strip()) <= 0 elif spec.startswith('>'): return self._version_compare_simple(version, spec[1:].strip()) > 0 elif spec.startswith('<'): return self._version_compare_simple(version, spec[1:].strip()) < 0 elif spec.startswith('!='): return version != spec[2:].strip() elif spec.startswith('~='): # Compatible release: ~=1.2.3 means >=1.2.3,<1.3.0 base = spec[2:].strip() parts = base.split('.') if len(parts) >= 2: upper = f"{parts[0]}.{int(parts[1])+1}.0" return (self._version_compare_simple(version, base) >= 0 and self._version_compare_simple(version, upper) < 0) return False def _version_compare_simple(self, v1: str, v2: str) -> int: """ Simple version comparison Returns: -1 if v1 < v2, 0 if equal, 1 if v1 > v2 """ try: parts1 = [int(x) for x in v1.split('.')] parts2 = [int(x) for x in v2.split('.')] # Pad with zeros max_len = max(len(parts1), len(parts2)) parts1 += [0] * (max_len - len(parts1)) parts2 += [0] * (max_len - len(parts2)) if parts1 < parts2: return -1 elif parts1 > parts2: return 1 return 0 except (ValueError, AttributeError): # Fallback to string comparison if v1 < v2: return -1 elif v1 > v2: return 1 return 0