Spaces:
Sleeping
Sleeping
| """ | |
| osint_core.validators | |
| ===================== | |
| Input validation and normalization for the Passive OSINT Control Panel. | |
| Design goals: | |
| - Treat all input as hostile. | |
| - Normalize before hashing, enrichment, audit, or reporting. | |
| - Return structured results so downstream modules do not guess intent. | |
| - Reject ambiguous or dangerous inputs early. | |
| - Avoid network calls. This module is pure validation/normalization. | |
| Supported indicator types: | |
| - domain | |
| - username | |
| - ip | |
| - url | |
| """ | |
| from __future__ import annotations | |
| import html | |
| import ipaddress | |
| import re | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from typing import Literal | |
| from urllib.parse import urlparse, urlunparse | |
| IndicatorType = Literal["domain", "username", "email", "ip", "url", "unknown"] | |
| class ValidationErrorCode(str, Enum): | |
| EMPTY_INPUT = "empty_input" | |
| TOO_LONG = "too_long" | |
| CONTROL_CHARACTERS = "control_characters" | |
| INVALID_TYPE = "invalid_type" | |
| INVALID_DOMAIN = "invalid_domain" | |
| INVALID_USERNAME = "invalid_username" | |
| INVALID_EMAIL = "invalid_email" | |
| INVALID_IP = "invalid_ip" | |
| INVALID_URL = "invalid_url" | |
| UNSUPPORTED_INDICATOR = "unsupported_indicator" | |
| BLOCKED_LOCAL_TARGET = "blocked_local_target" | |
| BLOCKED_DANGEROUS_PATTERN = "blocked_dangerous_pattern" | |
| class ValidationResult: | |
| ok: bool | |
| indicator_type: IndicatorType | |
| normalized: str | |
| original_length: int | |
| warnings: list[str] | |
| error: str | None = None | |
| error_code: ValidationErrorCode | None = None | |
| MAX_INPUT_LENGTH = 256 | |
| MAX_USERNAME_LENGTH = 64 | |
| MAX_EMAIL_LOCAL_LENGTH = 64 | |
| MAX_EMAIL_LENGTH = 320 | |
| MAX_DOMAIN_LENGTH = 253 | |
| MAX_URL_LENGTH = 2048 | |
| CONTROL_CHARS_RE = re.compile(r"[\x00-\x1f\x7f]") | |
| DOMAIN_RE = re.compile( | |
| r"^(?=.{1,253}$)(?!-)(?:[a-zA-Z0-9-]{1,63}\.)+[a-zA-Z]{2,63}$" | |
| ) | |
| USERNAME_RE = re.compile(r"^[a-zA-Z0-9_.-]{2,64}$") | |
| EMAIL_RE = re.compile(r"^[A-Za-z0-9.!#$%&'*+/=?^_`{|}~-]{1,64}@[A-Za-z0-9.-]{1,255}\.[A-Za-z]{2,63}$") | |
| DANGEROUS_PATTERNS = [ | |
| re.compile(pattern, re.IGNORECASE) | |
| for pattern in [ | |
| r"\.\./", | |
| r"%2e%2e", | |
| r"<\s*script", | |
| r"javascript:", | |
| r"data:", | |
| r"file:", | |
| r";", | |
| r"\|", | |
| r"&&", | |
| r"\$\(", | |
| r"`", | |
| r"\{.*\}", | |
| ] | |
| ] | |
| LOCAL_HOSTNAMES = {"localhost", "ip6-localhost", "ip6-loopback"} | |
| PRIVATE_NETS = [ | |
| ipaddress.ip_network("10.0.0.0/8"), | |
| ipaddress.ip_network("172.16.0.0/12"), | |
| ipaddress.ip_network("192.168.0.0/16"), | |
| ipaddress.ip_network("127.0.0.0/8"), | |
| ipaddress.ip_network("169.254.0.0/16"), | |
| ipaddress.ip_network("::1/128"), | |
| ipaddress.ip_network("fc00::/7"), | |
| ipaddress.ip_network("fe80::/10"), | |
| ] | |
| def validate_indicator(raw_value: str, forced_type: str = "Auto", allow_private_targets: bool = False) -> ValidationResult: | |
| """ | |
| Validate and normalize a user-supplied OSINT indicator. | |
| Parameters | |
| ---------- | |
| raw_value: | |
| User input. | |
| forced_type: | |
| One of: Auto, Domain, Username, Email, IP, URL. | |
| allow_private_targets: | |
| Whether private/local network targets should be accepted. | |
| This should remain False for public Spaces. | |
| Returns | |
| ------- | |
| ValidationResult | |
| Structured validation result. | |
| """ | |
| original_length = len(raw_value) if raw_value is not None else 0 | |
| warnings: list[str] = [] | |
| try: | |
| cleaned = sanitize_raw_input(raw_value) | |
| check_dangerous_patterns(cleaned) | |
| forced = normalize_forced_type(forced_type) | |
| if forced != "auto": | |
| indicator_type, normalized = validate_as_type(cleaned, forced, allow_private_targets) | |
| else: | |
| indicator_type, normalized = classify_auto(cleaned, allow_private_targets) | |
| if normalized != cleaned: | |
| warnings.append("Input was normalized before processing.") | |
| return ValidationResult( | |
| ok=True, | |
| indicator_type=indicator_type, | |
| normalized=normalized, | |
| original_length=original_length, | |
| warnings=warnings, | |
| ) | |
| except ValidationException as exc: | |
| return ValidationResult( | |
| ok=False, | |
| indicator_type="unknown", | |
| normalized="", | |
| original_length=original_length, | |
| warnings=warnings, | |
| error=str(exc), | |
| error_code=exc.code, | |
| ) | |
| class ValidationException(ValueError): | |
| def __init__(self, message: str, code: ValidationErrorCode): | |
| super().__init__(message) | |
| self.code = code | |
| def sanitize_raw_input(raw_value: str) -> str: | |
| if raw_value is None: | |
| raise ValidationException("Input is required.", ValidationErrorCode.EMPTY_INPUT) | |
| value = str(raw_value).strip() | |
| if not value: | |
| raise ValidationException("Input is empty.", ValidationErrorCode.EMPTY_INPUT) | |
| if CONTROL_CHARS_RE.search(value): | |
| raise ValidationException( | |
| "Input contains control characters.", | |
| ValidationErrorCode.CONTROL_CHARACTERS, | |
| ) | |
| if len(value) > MAX_INPUT_LENGTH: | |
| raise ValidationException( | |
| f"Input exceeds {MAX_INPUT_LENGTH} characters.", | |
| ValidationErrorCode.TOO_LONG, | |
| ) | |
| # Escape then unescape to normalize obvious HTML entity tricks without | |
| # returning an escaped value to downstream validators. | |
| escaped = html.escape(value, quote=True) | |
| return html.unescape(escaped).strip() | |
| def check_dangerous_patterns(value: str) -> None: | |
| for pattern in DANGEROUS_PATTERNS: | |
| if pattern.search(value): | |
| raise ValidationException( | |
| "Input contains a blocked pattern.", | |
| ValidationErrorCode.BLOCKED_DANGEROUS_PATTERN, | |
| ) | |
| def normalize_forced_type(forced_type: str) -> str: | |
| value = (forced_type or "Auto").strip().lower() | |
| aliases = { | |
| "auto": "auto", | |
| "domain": "domain", | |
| "username": "username", | |
| "user": "username", | |
| "email": "email", | |
| "mail": "email", | |
| "ip": "ip", | |
| "ip address": "ip", | |
| "url": "url", | |
| "uri": "url", | |
| } | |
| if value not in aliases: | |
| raise ValidationException( | |
| f"Unsupported forced type: {forced_type}", | |
| ValidationErrorCode.INVALID_TYPE, | |
| ) | |
| return aliases[value] | |
| def classify_auto(value: str, allow_private_targets: bool) -> tuple[IndicatorType, str]: | |
| # URL first, because URLs can contain domains/IPs. | |
| if looks_like_url(value): | |
| return validate_url(value, allow_private_targets) | |
| # IP before domain. | |
| try: | |
| return validate_ip(value, allow_private_targets) | |
| except ValidationException: | |
| pass | |
| if "@" in value: | |
| return validate_email(value, allow_private_targets) | |
| if "." in value: | |
| return validate_domain(value, allow_private_targets) | |
| if USERNAME_RE.fullmatch(value): | |
| return validate_username(value, allow_private_targets) | |
| raise ValidationException( | |
| "Unsupported or malformed indicator.", | |
| ValidationErrorCode.UNSUPPORTED_INDICATOR, | |
| ) | |
| def validate_as_type(value: str, forced: str, allow_private_targets: bool) -> tuple[IndicatorType, str]: | |
| if forced == "domain": | |
| return validate_domain(value, allow_private_targets) | |
| if forced == "username": | |
| return validate_username(value, allow_private_targets) | |
| if forced == "email": | |
| return validate_email(value, allow_private_targets) | |
| if forced == "ip": | |
| return validate_ip(value, allow_private_targets) | |
| if forced == "url": | |
| return validate_url(value, allow_private_targets) | |
| raise ValidationException("Unsupported indicator type.", ValidationErrorCode.INVALID_TYPE) | |
| def validate_domain(value: str, allow_private_targets: bool = False) -> tuple[IndicatorType, str]: | |
| domain = value.strip().lower().rstrip(".") | |
| if len(domain) > MAX_DOMAIN_LENGTH or not DOMAIN_RE.fullmatch(domain): | |
| raise ValidationException("Invalid domain.", ValidationErrorCode.INVALID_DOMAIN) | |
| labels = domain.split(".") | |
| for label in labels: | |
| if label.startswith("-") or label.endswith("-"): | |
| raise ValidationException("Invalid domain label.", ValidationErrorCode.INVALID_DOMAIN) | |
| if domain in LOCAL_HOSTNAMES and not allow_private_targets: | |
| raise ValidationException( | |
| "Local/private targets are blocked by policy.", | |
| ValidationErrorCode.BLOCKED_LOCAL_TARGET, | |
| ) | |
| return "domain", domain | |
| def validate_username(value: str, allow_private_targets: bool = False) -> tuple[IndicatorType, str]: | |
| del allow_private_targets | |
| username = value.strip() | |
| if len(username) > MAX_USERNAME_LENGTH or not USERNAME_RE.fullmatch(username): | |
| raise ValidationException("Invalid username.", ValidationErrorCode.INVALID_USERNAME) | |
| if username in {".", ".."}: | |
| raise ValidationException("Invalid username.", ValidationErrorCode.INVALID_USERNAME) | |
| return "username", username | |
| def validate_email(value: str, allow_private_targets: bool = False) -> tuple[IndicatorType, str]: | |
| email = value.strip().lower() | |
| if len(email) > MAX_EMAIL_LENGTH or not EMAIL_RE.fullmatch(email): | |
| raise ValidationException("Invalid email address.", ValidationErrorCode.INVALID_EMAIL) | |
| local, domain = email.rsplit("@", 1) | |
| if len(local) > MAX_EMAIL_LOCAL_LENGTH: | |
| raise ValidationException("Invalid email local part.", ValidationErrorCode.INVALID_EMAIL) | |
| _, normalized_domain = validate_domain(domain, allow_private_targets) | |
| return "email", f"{local}@{normalized_domain}" | |
| def validate_ip(value: str, allow_private_targets: bool = False) -> tuple[IndicatorType, str]: | |
| try: | |
| ip = ipaddress.ip_address(value.strip()) | |
| except ValueError as exc: | |
| raise ValidationException("Invalid IP address.", ValidationErrorCode.INVALID_IP) from exc | |
| if not allow_private_targets and is_private_or_local_ip(ip): | |
| raise ValidationException( | |
| "Local/private targets are blocked by policy.", | |
| ValidationErrorCode.BLOCKED_LOCAL_TARGET, | |
| ) | |
| return "ip", str(ip) | |
| def validate_url(value: str, allow_private_targets: bool = False) -> tuple[IndicatorType, str]: | |
| if len(value) > MAX_URL_LENGTH: | |
| raise ValidationException("URL is too long.", ValidationErrorCode.TOO_LONG) | |
| parsed = urlparse(value.strip()) | |
| if parsed.scheme.lower() not in {"http", "https"} or not parsed.netloc: | |
| raise ValidationException( | |
| "Invalid URL. Only http:// and https:// URLs are supported.", | |
| ValidationErrorCode.INVALID_URL, | |
| ) | |
| hostname = parsed.hostname | |
| if not hostname: | |
| raise ValidationException("Invalid URL hostname.", ValidationErrorCode.INVALID_URL) | |
| hostname = hostname.lower().rstrip(".") | |
| if hostname in LOCAL_HOSTNAMES and not allow_private_targets: | |
| raise ValidationException( | |
| "Local/private targets are blocked by policy.", | |
| ValidationErrorCode.BLOCKED_LOCAL_TARGET, | |
| ) | |
| # Validate hostname as IP or domain. | |
| try: | |
| _, normalized_host = validate_ip(hostname, allow_private_targets) | |
| except ValidationException: | |
| _, normalized_host = validate_domain(hostname, allow_private_targets) | |
| # Strip fragments. Fragments are client-side and not useful for passive OSINT hashing. | |
| normalized = urlunparse( | |
| ( | |
| parsed.scheme.lower(), | |
| normalized_host if parsed.port is None else f"{normalized_host}:{parsed.port}", | |
| parsed.path or "", | |
| "", | |
| parsed.query or "", | |
| "", | |
| ) | |
| ) | |
| return "url", normalized | |
| def looks_like_url(value: str) -> bool: | |
| lowered = value.lower() | |
| return lowered.startswith("http://") or lowered.startswith("https://") | |
| def is_private_or_local_ip(ip: ipaddress.IPv4Address | ipaddress.IPv6Address) -> bool: | |
| return ( | |
| ip.is_private | |
| or ip.is_loopback | |
| or ip.is_link_local | |
| or ip.is_multicast | |
| or ip.is_reserved | |
| or any(ip in net for net in PRIVATE_NETS) | |
| ) | |
| def assert_valid_or_raise(raw_value: str, forced_type: str = "Auto", allow_private_targets: bool = False) -> tuple[IndicatorType, str]: | |
| """ | |
| Convenience helper for callers that prefer exceptions. | |
| """ | |
| result = validate_indicator(raw_value, forced_type, allow_private_targets) | |
| if not result.ok: | |
| raise ValidationException(result.error or "Validation failed.", result.error_code or ValidationErrorCode.UNSUPPORTED_INDICATOR) | |
| return result.indicator_type, result.normalized | |