Spaces:
Sleeping
Sleeping
| """ | |
| osint_core.observer | |
| =================== | |
| Independent observer circuit for the Enterprise Drift-Aware OSINT Control Fabric. | |
| The observer does not execute. It reconstructs expected behavior from intent, | |
| policy, and executor trace, then emits dissent when reality does not match | |
| declared constraints. | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from typing import Any, Mapping | |
| class ObserverSeverity(str, Enum): | |
| INFO = "info" | |
| WARNING = "warning" | |
| CRITICAL = "critical" | |
| class ObserverCheck: | |
| name: str | |
| ok: bool | |
| severity: ObserverSeverity | |
| reason: str | |
| evidence: dict[str, Any] = field(default_factory=dict) | |
| class ExecutionTrace: | |
| intent_id: str | |
| modules_requested: tuple[str, ...] | |
| modules_executed: tuple[str, ...] | |
| modules_blocked: tuple[str, ...] | |
| observed_effects: tuple[str, ...] | |
| output_schema_valid: bool | |
| audit_payload: Mapping[str, Any] | |
| errors: tuple[str, ...] = field(default_factory=tuple) | |
| class ObserverAssessment: | |
| intent_id: str | |
| checks: tuple[ObserverCheck, ...] | |
| def dissent(self) -> bool: | |
| return any(not check.ok for check in self.checks) | |
| def has_critical_violation(self) -> bool: | |
| return any((not check.ok) and check.severity == ObserverSeverity.CRITICAL for check in self.checks) | |
| RAW_AUDIT_KEYS = { | |
| "raw_indicator", | |
| "raw_input", | |
| "indicator", | |
| "email", | |
| "domain", | |
| "username", | |
| "url", | |
| "ip", | |
| } | |
| def observe_execution(intent: IntentPacket, trace: ExecutionTrace, policy_result: PolicyEvaluation) -> ObserverAssessment: | |
| checks = ( | |
| check_intent_trace_match(intent, trace), | |
| check_modules_match_policy(trace, policy_result), | |
| check_output_schema(trace), | |
| check_no_raw_indicator_leak(trace), | |
| check_expected_side_effects(intent, trace), | |
| ) | |
| return ObserverAssessment(intent_id=trace.intent_id, checks=checks) | |
| def check_intent_trace_match(intent: Any, trace: ExecutionTrace) -> ObserverCheck: | |
| expected_intent_id = getattr(intent, "intent_id", None) | |
| ok = expected_intent_id == trace.intent_id | |
| return ObserverCheck( | |
| name="intent_trace_match", | |
| ok=ok, | |
| severity=ObserverSeverity.CRITICAL, | |
| reason="Execution trace must correspond to the intent packet.", | |
| evidence={"expected": expected_intent_id, "actual": trace.intent_id}, | |
| ) | |
| def check_modules_match_policy(trace: ExecutionTrace, policy_result: Any) -> ObserverCheck: | |
| if isinstance(policy_result, dict): | |
| allowed = set(policy_result.get("allowed_modules", [])) | |
| else: | |
| allowed = set(getattr(policy_result, "allowed_modules", [])) | |
| executed = set(trace.modules_executed) | |
| unexpected = sorted(executed - allowed) | |
| return ObserverCheck( | |
| name="modules_match_policy", | |
| ok=not unexpected, | |
| severity=ObserverSeverity.CRITICAL, | |
| reason="Executed modules must be allowed by policy.", | |
| evidence={"unexpected_modules": unexpected}, | |
| ) | |
| def check_output_schema(trace: ExecutionTrace) -> ObserverCheck: | |
| return ObserverCheck( | |
| name="output_schema_valid", | |
| ok=trace.output_schema_valid, | |
| severity=ObserverSeverity.WARNING, | |
| reason="Executor output should conform to expected schema.", | |
| evidence={}, | |
| ) | |
| def check_no_raw_indicator_leak(trace: ExecutionTrace) -> ObserverCheck: | |
| present = sorted(set(trace.audit_payload.keys()).intersection(RAW_AUDIT_KEYS)) | |
| return ObserverCheck( | |
| name="no_raw_indicator_leak", | |
| ok=not present, | |
| severity=ObserverSeverity.CRITICAL, | |
| reason="Audit payload must not contain raw indicator fields.", | |
| evidence={"raw_fields": present}, | |
| ) | |
| def check_expected_side_effects(intent: Any, trace: ExecutionTrace) -> ObserverCheck: | |
| expected = set(getattr(intent, "expected_side_effects", ())) | |
| observed = set(trace.observed_effects) | |
| missing = sorted(expected - observed) | |
| return ObserverCheck( | |
| name="expected_side_effects_present", | |
| ok=not missing, | |
| severity=ObserverSeverity.WARNING, | |
| reason="Declared expected side effects should be observed or explained.", | |
| evidence={"missing_effects": missing}, | |
| ) | |