Source code for package_scan.adapters.npm_adapter

"""NPM ecosystem adapter for scanning JavaScript/Node.js projects"""

import json
import os
import re
from pathlib import Path
from typing import List

import click
from semantic_version import Version, NpmSpec

from package_scan.core import Finding
from .base import EcosystemAdapter


[docs] class NpmAdapter(EcosystemAdapter): """ Adapter for scanning npm/JavaScript/Node.js projects Supports: - Manifest files: package.json - Lock files: package-lock.json, yarn.lock, pnpm-lock.yaml - Installed packages: node_modules/ - Version matching: npm semver ranges (^, ~, >=, etc.) """ def _get_ecosystem_name(self) -> str: """Return ecosystem identifier""" return 'npm'
[docs] def get_manifest_files(self) -> List[str]: """Return list of manifest file names""" return ['package.json']
[docs] def get_lockfile_names(self) -> List[str]: """Return list of lockfile names""" return ['package-lock.json', 'yarn.lock', 'pnpm-lock.yaml']
[docs] def detect_projects(self) -> List[Path]: """ Detect npm projects by looking for package.json files Returns: List of project directories containing package.json """ projects = [] for dirpath, dirnames, filenames in os.walk(self.root_dir): # Skip common excluded directories dirnames[:] = [d for d in dirnames if not self._should_skip_directory(Path(dirpath) / d)] if 'package.json' in filenames: projects.append(Path(dirpath)) return projects
[docs] def scan_project(self, project_dir: Path) -> List[Finding]: """ if isinstance(project_dir, str): project_dir = Path(project_dir) Scan a single npm project for compromised packages Args: project_dir: Project directory containing package.json Returns: List of findings """ findings = [] # 1. Check package.json manifest package_json = project_dir / 'package.json' if package_json.exists(): findings.extend(self._scan_package_json(package_json)) # 2. Check lock files for lockfile_name in ['package-lock.json', 'yarn.lock', 'pnpm-lock.yaml']: lockfile_path = project_dir / lockfile_name if lockfile_path.exists(): if lockfile_name == 'package-lock.json': findings.extend(self._scan_package_lock_json(lockfile_path)) elif lockfile_name == 'yarn.lock': findings.extend(self._scan_yarn_lock(lockfile_path)) elif lockfile_name == 'pnpm-lock.yaml': findings.extend(self._scan_pnpm_lock_yaml(lockfile_path)) # 3. Check installed packages in node_modules node_modules = project_dir / 'node_modules' if node_modules.exists() and node_modules.is_dir(): findings.extend(self._scan_node_modules(node_modules)) return findings
def _scan_package_json(self, file_path: Path) -> List[Finding]: """ Scan package.json for compromised dependencies Args: file_path: Path to package.json Returns: List of findings """ findings = [] try: with open(file_path, 'r', encoding='utf-8') as f: package_data = json.load(f) # Check all dependency types dep_types = ['dependencies', 'devDependencies', 'peerDependencies', 'optionalDependencies'] for dep_type in dep_types: if dep_type not in package_data: continue for package_name, version_spec in package_data[dep_type].items(): if package_name not in self.compromised_packages: continue # Try to parse as npm semver range try: spec = NpmSpec(str(version_spec)) included_versions = self._get_matching_versions(spec, package_name) if included_versions: findings.append(Finding( ecosystem='npm', finding_type='manifest', file_path=str(file_path), package_name=package_name, version=", ".join(sorted(included_versions)), match_type='range', declared_spec=version_spec, dependency_type=dep_type, metadata={'included_versions': sorted(included_versions)} )) except Exception: # Not a standard semver spec; try exact match clean_version = str(version_spec).lstrip('^~>=<') if clean_version in self.compromised_packages[package_name]: findings.append(Finding( ecosystem='npm', finding_type='manifest', file_path=str(file_path), package_name=package_name, version=clean_version, match_type='exact', declared_spec=version_spec, dependency_type=dep_type )) except json.JSONDecodeError: click.echo(click.style(f"⚠️ Warning: Invalid JSON in {file_path}", fg='yellow'), err=True) except Exception as e: click.echo(click.style(f"⚠️ Warning: Error reading {file_path}: {e}", fg='yellow'), err=True) return findings def _get_matching_versions(self, spec: NpmSpec, package_name: str) -> List[str]: """ Get compromised versions that match the npm semver spec Args: spec: NpmSpec range package_name: Package name Returns: List of matching compromised versions """ included_versions = [] for compromised_version in self.compromised_packages[package_name]: try: v = Version.coerce(compromised_version) if v in spec: included_versions.append(compromised_version) except Exception: continue return included_versions def _scan_package_lock_json(self, file_path: Path) -> List[Finding]: """ Scan package-lock.json for compromised packages Supports both lockfileVersion 1/2 (nested dependencies) and lockfileVersion 3 (flat packages object) Args: file_path: Path to package-lock.json Returns: List of findings """ findings = [] try: with open(file_path, 'r', encoding='utf-8') as f: lock_data = json.load(f) packages_to_check = {} # Handle lockfileVersion 3+ (npm v7+) if 'packages' in lock_data: for package_path, package_info in lock_data['packages'].items(): if not package_path: # Root package continue # Remove "node_modules/" prefix package_name = package_path.replace('node_modules/', '') version = package_info.get('version') if version: packages_to_check[package_name] = version # Handle lockfileVersion 1/2 (older npm) elif 'dependencies' in lock_data: self._extract_lock_v1_dependencies(lock_data['dependencies'], packages_to_check) # Check for compromised packages for package_name, version in packages_to_check.items(): if package_name in self.compromised_packages: if version in self.compromised_packages[package_name]: findings.append(Finding( ecosystem='npm', finding_type='lockfile', file_path=str(file_path), package_name=package_name, version=version, match_type='exact', metadata={'lockfile_type': 'package-lock.json'} )) except json.JSONDecodeError: click.echo(click.style(f"⚠️ Warning: Invalid JSON in {file_path}", fg='yellow'), err=True) except Exception as e: click.echo(click.style(f"⚠️ Warning: Error reading {file_path}: {e}", fg='yellow'), err=True) return findings def _extract_lock_v1_dependencies(self, deps: dict, output: dict, prefix: str = ""): """ Recursively extract dependencies from npm lock v1/v2 format Args: deps: Dependencies object from lock file output: Output dictionary to populate prefix: Package name prefix for nested dependencies """ for name, info in deps.items(): full_name = f"{prefix}{name}" if prefix else name version = info.get('version') if version: output[full_name] = version # Recurse into nested dependencies if 'dependencies' in info: self._extract_lock_v1_dependencies( info['dependencies'], output, f"{full_name}/node_modules/") def _scan_yarn_lock(self, file_path: Path) -> List[Finding]: """ Scan yarn.lock for compromised packages Args: file_path: Path to yarn.lock Returns: List of findings """ findings = [] try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Yarn lock format: # package-name@^1.0.0, package-name@^1.2.0: # version "1.2.3" # resolved "..." version_pattern = re.compile(r'^\s+version\s+"([^"]+)"', re.MULTILINE) lines = content.split('\n') i = 0 while i < len(lines): line = lines[i] # Check if this line starts a package entry if '@' in line and ':' in line and not line.strip().startswith('#'): # Extract package name (handle scoped packages) pkg_match = re.search(r'["\']?(@?[^@"\s]+)@', line) if pkg_match: package_name = pkg_match.group(1) # Look ahead for version line j = i + 1 while j < len(lines) and j < i + 10: # Look up to 10 lines ahead version_match = version_pattern.match(lines[j]) if version_match: version = version_match.group(1) if package_name in self.compromised_packages: if version in self.compromised_packages[package_name]: findings.append(Finding( ecosystem='npm', finding_type='lockfile', file_path=str(file_path), package_name=package_name, version=version, match_type='exact', metadata={'lockfile_type': 'yarn.lock'} )) break # Stop if we hit a blank line or next package entry if not lines[j].strip() or ('@' in lines[j] and ':' in lines[j]): break j += 1 i += 1 except Exception as e: click.echo(click.style(f"⚠️ Warning: Error reading {file_path}: {e}", fg='yellow'), err=True) return findings def _scan_pnpm_lock_yaml(self, file_path: Path) -> List[Finding]: """ Scan pnpm-lock.yaml for compromised packages Args: file_path: Path to pnpm-lock.yaml Returns: List of findings """ findings = [] try: # Try to import yaml try: import yaml except ImportError: click.echo(click.style( f"⚠️ Warning: PyYAML not installed, skipping {file_path}", fg='yellow'), err=True) click.echo(click.style( " Install with: pip install pyyaml", fg='yellow', dim=True), err=True) return findings with open(file_path, 'r', encoding='utf-8') as f: lock_data = yaml.safe_load(f) if not lock_data: return findings # pnpm v6+ uses 'packages' key packages = lock_data.get('packages', {}) for package_key, package_info in packages.items(): # Package key format: /package-name/1.2.3 or /@scope/package-name/1.2.3 # Extract name and version match = re.match(r'^/(@?[^/]+(?:/[^/]+)?)/(.+?)(?:_|$)', package_key) if match: package_name = match.group(1) version = match.group(2) if package_name in self.compromised_packages: if version in self.compromised_packages[package_name]: findings.append(Finding( ecosystem='npm', finding_type='lockfile', file_path=str(file_path), package_name=package_name, version=version, match_type='exact', metadata={'lockfile_type': 'pnpm-lock.yaml'} )) except Exception as e: click.echo(click.style(f"⚠️ Warning: Error reading {file_path}: {e}", fg='yellow'), err=True) return findings def _scan_node_modules(self, node_modules_path: Path) -> List[Finding]: """ Scan installed packages in node_modules directory Args: node_modules_path: Path to node_modules directory Returns: List of findings """ findings = [] try: items = list(node_modules_path.iterdir()) for idx, item_path in enumerate(items): # Update spinner with progress progress = f"[{idx+1}/{len(items)}]" self.spinner.update(f"{progress} Scanning {node_modules_path}/{item_path.name}") # Handle scoped packages (@org/package) if item_path.name.startswith('@') and item_path.is_dir(): for scoped_package in item_path.iterdir(): if scoped_package.is_dir(): package_name = f"{item_path.name}/{scoped_package.name}" finding = self._check_installed_package( scoped_package, package_name, node_modules_path) if finding: findings.append(finding) elif item_path.is_dir(): finding = self._check_installed_package( item_path, item_path.name, node_modules_path) if finding: findings.append(finding) except PermissionError: click.echo(click.style( f"⚠️ Warning: Permission denied accessing {node_modules_path}", fg='yellow'), err=True) except Exception as e: click.echo(click.style( f"⚠️ Warning: Error scanning {node_modules_path}: {e}", fg='yellow'), err=True) return findings def _check_installed_package( self, package_path: Path, package_name: str, node_modules_path: Path ) -> Finding: """ Check if an installed package is compromised Args: package_path: Path to the package directory package_name: Package name node_modules_path: Path to parent node_modules Returns: Finding if compromised, None otherwise """ if package_name not in self.compromised_packages: return None package_json_path = package_path / 'package.json' if not package_json_path.exists(): return None try: with open(package_json_path, 'r', encoding='utf-8') as f: package_data = json.load(f) installed_version = package_data.get('version', 'unknown') if installed_version in self.compromised_packages[package_name]: return Finding( ecosystem='npm', finding_type='installed', file_path=str(node_modules_path), package_name=package_name, version=installed_version, match_type='exact', metadata={ 'location': str(node_modules_path), 'package_path': str(package_path) } ) except Exception as e: click.echo(click.style( f"⚠️ Warning: Error checking {package_json_path}: {e}", fg='yellow'), err=True) return None