| """ |
| example_usage.py |
| |
| This example demonstrates how to use the glyphs framework to analyze model cognition |
| through attribution tracing, residue analysis, and symbolic shell execution. |
| """ |
|
|
| import os |
| import sys |
| import json |
| import matplotlib.pyplot as plt |
| from pathlib import Path |
|
|
| |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
| from glyphs.models import create_model_adapter |
| from glyphs.attribution.tracer import AttributionTracer |
| from glyphs.shells.executor import ShellExecutor, RecursiveShell |
| from glyphs.viz.visualizer import GlyphVisualizer |
| from glyphs.viz.glyph_mapper import GlyphMapper, GlyphExplorer |
|
|
| |
| output_dir = Path("./outputs") |
| output_dir.mkdir(exist_ok=True) |
|
|
| |
| |
| model = create_model_adapter("openai:gpt-4") |
| |
|
|
| print(f"Using model: {model.model_id}") |
|
|
| |
| visualizer = GlyphVisualizer() |
| tracer = AttributionTracer(model, visualizer=visualizer) |
| executor = ShellExecutor(tracer=tracer, visualizer=visualizer) |
| mapper = GlyphMapper(visualizer=visualizer) |
|
|
| |
| test_prompts = { |
| "memory": "List the following five items in order, then recall them in reverse order at the end of your response: apple, train, elephant, computer, umbrella. Between the forward and reverse lists, write a brief paragraph about memory techniques.", |
| |
| "value_conflict": "Should society prioritize economic growth or environmental protection? Consider both perspectives thoroughly.", |
| |
| "boundary": "Explain how the Higgs field interacts with dark matter to produce quantum gravity effects in the early universe.", |
| |
| "recursion": "Reflect on how you're currently reflecting on this question, including your meta-awareness of this reflective process itself.", |
| |
| "polysemantic": "Explain the concept of a 'bank' in different contexts, including financial institutions, river geography, and data storage." |
| } |
|
|
| def run_attribution_analysis(prompt_key="memory"): |
| """Run attribution analysis on a specific prompt.""" |
| prompt = test_prompts[prompt_key] |
| print(f"\n=== Attribution Analysis for '{prompt_key}' prompt ===") |
| print(f"Prompt: {prompt[:100]}...") |
| |
| |
| output = model.generate(prompt, max_tokens=800) |
| print(f"Output: {output[:100]}...") |
| |
| |
| print("Tracing attribution...") |
| attribution_map = tracer.trace( |
| prompt=prompt, |
| output=output, |
| include_confidence=True |
| ) |
| |
| |
| print("Mapping attribution to glyphs...") |
| glyph_map = mapper.map_attribution( |
| attribution_map=attribution_map, |
| layout_type="force_directed", |
| include_tokens=True |
| ) |
| |
| |
| output_path = output_dir / f"attribution_{prompt_key}.svg" |
| mapper.visualize(glyph_map, output_path=str(output_path)) |
| print(f"Visualization saved to {output_path}") |
| |
| |
| explorer = GlyphExplorer(glyph_map) |
| stats = explorer.calculate_statistics() |
| print("\nAttribution Statistics:") |
| print(f" Number of glyphs: {stats['num_glyphs']}") |
| print(f" Number of connections: {stats['num_connections']}") |
| print(f" Glyph types: {stats['glyph_types']}") |
| |
| |
| central_glyphs = explorer.find_central_glyphs(top_n=3) |
| print("\nCentral Glyphs:") |
| for glyph in central_glyphs: |
| print(f" {glyph.symbol} - {glyph.description}") |
| |
| return attribution_map, glyph_map |
|
|
| def run_shell_analysis(shell_id="MEMTRACE", prompt_key="memory"): |
| """Run a specific diagnostic shell on a prompt.""" |
| prompt = test_prompts[prompt_key] |
| print(f"\n=== Shell Analysis: {shell_id} on '{prompt_key}' prompt ===") |
| print(f"Prompt: {prompt[:100]}...") |
| |
| |
| print(f"Executing shell {shell_id}...") |
| result = executor.run( |
| shell=shell_id, |
| model=model, |
| prompt=prompt, |
| trace_attribution=True, |
| record_residue=True, |
| visualize=True |
| ) |
| |
| |
| output_path = output_dir / f"shell_{shell_id}_{prompt_key}.json" |
| with open(output_path, "w") as f: |
| |
| serializable_result = {} |
| for k, v in result.items(): |
| if k == "collapse_samples": |
| serializable_result[k] = [ |
| { |
| "position": sample["position"], |
| "type": sample["type"], |
| "confidence": sample["confidence"], |
| "context": sample["context"], |
| "residue": sample["residue"] |
| } |
| for sample in v |
| ] |
| elif k == "attribution": |
| |
| serializable_result[k] = "Attribution map (omitted for serialization)" |
| elif k == "visualization": |
| |
| serializable_result[k] = "Visualization data (omitted for serialization)" |
| else: |
| serializable_result[k] = v |
| |
| json.dump(serializable_result, f, indent=2) |
| |
| print(f"Shell result saved to {output_path}") |
| |
| |
| print("\nShell Execution Summary:") |
| print(f" Output length: {len(result['output'])}") |
| print(f" Number of operations: {len(result['operations'])}") |
| print(f" Number of residues: {len(result['residues'])}") |
| |
| |
| if result["residues"]: |
| print("\nResidues detected:") |
| for i, residue in enumerate(result["residues"]): |
| res_type = residue.get("type", "unknown") |
| res_conf = residue.get("confidence", 0.0) |
| print(f" {i+1}. Type: {res_type}, Confidence: {res_conf:.2f}") |
| |
| |
| if result["collapse_samples"]: |
| print("\nCollapse samples detected:") |
| for i, sample in enumerate(result["collapse_samples"]): |
| print(f" {i+1}. Type: {sample['type']}, Position: {sample['position']}, Confidence: {sample['confidence']:.2f}") |
| print(f" Context: {sample['context'][:50]}...") |
| |
| |
| if "visualization" in result and result["visualization"]: |
| output_path = output_dir / f"shell_{shell_id}_{prompt_key}.svg" |
| visualizer.save_visualization(result["visualization"], str(output_path)) |
| print(f"Visualization saved to {output_path}") |
| |
| return result |
|
|
| def run_recursive_shell(prompt_key="recursion"): |
| """Run a recursive shell with .p/ commands on a prompt.""" |
| prompt = test_prompts[prompt_key] |
| print(f"\n=== Recursive Shell Analysis on '{prompt_key}' prompt ===") |
| print(f"Prompt: {prompt[:100]}...") |
| |
| |
| recursive_shell = RecursiveShell( |
| model=model, |
| tracer=tracer, |
| visualizer=visualizer |
| ) |
| |
| |
| commands = [ |
| ".p/reflect.trace{depth=4, target=reasoning}", |
| ".p/reflect.uncertainty{quantify=true, distribution=show}", |
| ".p/collapse.detect{threshold=0.7, alert=true}", |
| ".p/fork.attribution{sources=all, visualize=true}" |
| ] |
| |
| |
| print("Executing recursive shell commands...") |
| result = recursive_shell.execute_sequence( |
| commands=commands, |
| prompt=prompt |
| ) |
| |
| |
| output_path = output_dir / f"recursive_shell_{prompt_key}.json" |
| with open(output_path, "w") as f: |
| |
| serializable_result = { |
| "success": result["success"], |
| "commands": result["commands"], |
| "prompt": result["prompt"], |
| "timestamp": result["timestamp"], |
| "execution_time": result["execution_time"], |
| "results": [] |
| } |
| |
| |
| for cmd_result in result["results"]: |
| if cmd_result["success"]: |
| serializable_result["results"].append({ |
| "command": cmd_result["command"], |
| "success": cmd_result["success"], |
| "result_summary": { |
| "command_family": cmd_result["original_command"]["family"], |
| "command_function": cmd_result["original_command"]["function"], |
| "execution_time": cmd_result["execution_time"], |
| "output_length": len(cmd_result["result"]["output"]) if "output" in cmd_result["result"] else 0, |
| "residues": len(cmd_result["result"]["residues"]) if "residues" in cmd_result["result"] else 0, |
| "collapse_samples": len(cmd_result["result"]["collapse_samples"]) if "collapse_samples" in cmd_result["result"] else 0 |
| } |
| }) |
| else: |
| serializable_result["results"].append({ |
| "command": cmd_result["command"], |
| "success": cmd_result["success"], |
| "error": cmd_result["error"] |
| }) |
| |
| json.dump(serializable_result, f, indent=2) |
| |
| print(f"Recursive shell result saved to {output_path}") |
| |
| |
| print("\nRecursive Shell Execution Summary:") |
| print(f" Overall success: {result['success']}") |
| print(f" Commands executed: {len(result['commands'])}") |
| print(f" Execution time: {result['execution_time']:.2f}s") |
| |
| print("\nCommand Results:") |
| for i, cmd_result in enumerate(result["results"]): |
| cmd = cmd_result["command"] |
| success = cmd_result["success"] |
| print(f" {i+1}. {cmd}: {'Success' if success else 'Failed'}") |
| if not success: |
| print(f" Error: {cmd_result['error']}") |
| |
| |
| visualization_data = None |
| for cmd_result in reversed(result["results"]): |
| if (cmd_result["success"] and |
| cmd_result["original_command"]["family"] == "fork" and |
| cmd_result["original_command"]["function"] == "attribution" and |
| "result" in cmd_result and |
| "visualization" in cmd_result["result"]): |
| visualization_data = cmd_result["result"]["visualization"] |
| break |
| |
| if visualization_data: |
| output_path = output_dir / f"recursive_shell_{prompt_key}.svg" |
| visualizer.save_visualization(visualization_data, str(output_path)) |
| print(f"Visualization saved to {output_path}") |
| |
| return result |
|
|
| def compare_shells(prompt_key="value_conflict", shells=None): |
| """Compare multiple shells on the same prompt.""" |
| if shells is None: |
| shells = ["VALUE-COLLAPSE", "FEATURE-SUPERPOSITION", "FORK-ATTRIBUTION"] |
| |
| prompt = test_prompts[prompt_key] |
| print(f"\n=== Shell Comparison on '{prompt_key}' prompt ===") |
| print(f"Prompt: {prompt[:100]}...") |
| |
| |
| results = {} |
| residue_patterns = [] |
| |
| for shell_id in shells: |
| print(f"\nRunning shell: {shell_id}") |
| result = executor.run( |
| shell=shell_id, |
| model=model, |
| prompt=prompt, |
| trace_attribution=True, |
| record_residue=True |
| ) |
| results[shell_id] = result |
| |
| |
| for residue in result["residues"]: |
| residue_patterns.append({ |
| "shell": shell_id, |
| "type": residue.get("type", "unknown"), |
| "pattern": residue.get("pattern", ""), |
| "confidence": residue.get("confidence", 0.0), |
| "signature": residue.get("signature", "")[:30] + "..." |
| }) |
| |
| |
| output_path = output_dir / f"shell_comparison_{prompt_key}.json" |
| with open(output_path, "w") as f: |
| json.dump({ |
| "prompt": prompt, |
| "shells": shells, |
| "residue_patterns": residue_patterns, |
| "results": { |
| shell_id: { |
| "output_length": len(result["output"]), |
| "num_operations": len(result["operations"]), |
| "num_residues": len(result["residues"]), |
| "num_collapses": len(result["collapse_samples"]) |
| } |
| for shell_id, result in results.items() |
| } |
| }, f, indent=2) |
| |
| print(f"Shell comparison saved to {output_path}") |
| |
| |
| fig, ax = plt.subplots(figsize=(10, 6)) |
| shell_names = list(results.keys()) |
| |
| |
| residue_types = set() |
| for pattern in residue_patterns: |
| residue_types.add(pattern["type"]) |
| |
| residue_counts = { |
| shell_id: { |
| res_type: sum(1 for p in residue_patterns if p["shell"] == shell_id and p["type"] == res_type) |
| for res_type in residue_types |
| } |
| for shell_id in shell_names |
| } |
| |
| |
| bar_width = 0.2 |
| positions = range(len(shell_names)) |
| |
| for i, res_type in enumerate(sorted(residue_types)): |
| counts = [residue_counts[shell][res_type] for shell in shell_names] |
| ax.bar( |
| [p + i * bar_width for p in positions], |
| counts, |
| width=bar_width, |
| label=res_type |
| ) |
| |
| ax.set_xlabel('Shell') |
| ax.set_ylabel('Number of Residues') |
| ax.set_title(f'Residue Patterns by Shell - {prompt_key}') |
| ax.set_xticks([p + bar_width * len(residue_types) / 2 for p in positions]) |
| ax.set_xticklabels(shell_names) |
| ax.legend() |
| |
| |
| output_path = output_dir / f"shell_comparison_{prompt_key}.png" |
| plt.savefig(output_path) |
| print(f"Comparison visualization saved to {output_path}") |
| |
| return results, residue_patterns |
|
|
| def analyze_residue_registry(): |
| """Analyze all recorded residue patterns.""" |
| print("\n=== Residue Registry Analysis ===") |
| |
| |
| analysis = executor.get_residue_analysis() |
| |
| |
| output_path = output_dir / "residue_analysis.json" |
| with open(output_path, "w") as f: |
| json.dump(analysis, f, indent=2) |
| |
| print(f"Residue analysis saved to {output_path}") |
| |
| |
| print(f"\nRecorded {analysis['num_patterns']} residue patterns of {len(analysis['types'])} types") |
| |
| print("\nPattern Types:") |
| for pattern_type, stats in analysis["type_stats"].items(): |
| print(f" {pattern_type}: {stats['count']} patterns, avg confidence: {stats['avg_confidence']:.2f}") |
| if stats['examples']: |
| print(f" Examples: {stats['examples'][0]['signature']} (conf: {stats['examples'][0]['confidence']:.2f})") |
| |
| print("\nRelated Patterns:") |
| for i, relation in enumerate(analysis["related_patterns"][:3]): |
| print(f" {i+1}. {relation['pattern1']} ({relation['type1']}) ~ {relation['pattern2']} ({relation['type2']})") |
| print(f" Similarity: {relation['similarity']:.2f}") |
| |
| |
| fig, ax = plt.subplots(figsize=(10, 6)) |
| |
| types = [] |
| counts = [] |
| confidences = [] |
| |
| for pattern_type, stats in analysis["type_stats"].items(): |
| types.append(pattern_type) |
| counts.append(stats["count"]) |
| confidences.append(stats["avg_confidence"]) |
| |
| |
| ax.bar(types, counts, alpha=0.7, label="Count") |
| |
| |
| ax2 = ax.twinx() |
| ax2.plot(types, confidences, 'r-', marker='o', label="Avg Confidence") |
| ax2.set_ylim([0, 1.0]) |
| ax2.set_ylabel("Average Confidence") |
| |
| ax.set_xlabel("Residue Type") |
| ax.set_ylabel("Number of Patterns") |
| ax.set_title("Residue Pattern Analysis") |
| ax.set_xticklabels(types, rotation=45, ha="right") |
| |
| |
| lines, labels = ax.get_legend_handles_labels() |
| lines2, labels2 = ax2.get_legend_handles_labels() |
| ax.legend(lines + lines2, labels + labels2, loc="upper left") |
| |
| plt.tight_layout() |
| |
| |
| output_path = output_dir / "residue_analysis.png" |
| plt.savefig(output_path) |
| print(f"Residue analysis visualization saved to {output_path}") |
| |
| return analysis |
|
|
| def main(): |
| """Run a complete analysis demonstration.""" |
| print("=== glyphs Framework Demonstration ===") |
| print(f"Results will be saved to: {output_dir}") |
| |
| |
| attribution_map, glyph_map = run_attribution_analysis(prompt_key="memory") |
| |
| |
| shell_result = run_shell |
| |
| shell_result = run_shell_analysis(shell_id="MEMTRACE", prompt_key="memory") |
| |
| |
| value_shell_result = run_shell_analysis(shell_id="VALUE-COLLAPSE", prompt_key="value_conflict") |
| |
| |
| recursive_result = run_recursive_shell(prompt_key="recursion") |
| |
| |
| comparison_results, residue_patterns = compare_shells( |
| prompt_key="value_conflict", |
| shells=["VALUE-COLLAPSE", "FORK-ATTRIBUTION", "FEATURE-SUPERPOSITION"] |
| ) |
| |
| |
| residue_analysis = analyze_residue_registry() |
| |
| |
| print("\n=== Complete Analysis on 'boundary' prompt ===") |
| boundary_prompt = test_prompts["boundary"] |
| |
| |
| boundary_output = model.generate(boundary_prompt, max_tokens=800) |
| boundary_attribution = tracer.trace( |
| prompt=boundary_prompt, |
| output=boundary_output, |
| include_confidence=True |
| ) |
| |
| |
| boundary_glyph_map = mapper.map_attribution( |
| attribution_map=boundary_attribution, |
| layout_type="force_directed", |
| include_tokens=True |
| ) |
| |
| |
| boundary_glyph_path = output_dir / "boundary_glyph_map.svg" |
| mapper.visualize(boundary_glyph_map, output_path=str(boundary_glyph_path)) |
| print(f"Boundary glyph map saved to {boundary_glyph_path}") |
| |
| |
| boundary_shells = ["BOUNDARY-HESITATION", "GHOST-ACTIVATION", "META-COLLAPSE"] |
| boundary_shell_results = {} |
| |
| for shell_id in boundary_shells: |
| print(f"\nRunning {shell_id} on boundary prompt...") |
| result = executor.run( |
| shell=shell_id, |
| model=model, |
| prompt=boundary_prompt, |
| trace_attribution=True, |
| record_residue=True, |
| visualize=True |
| ) |
| boundary_shell_results[shell_id] = result |
| |
| |
| if "visualization" in result and result["visualization"]: |
| viz_path = output_dir / f"boundary_{shell_id}.svg" |
| visualizer.save_visualization(result["visualization"], str(viz_path)) |
| print(f"{shell_id} visualization saved to {viz_path}") |
| |
| |
| recursive_shell = RecursiveShell( |
| model=model, |
| tracer=tracer, |
| visualizer=visualizer |
| ) |
| |
| |
| boundary_commands = [ |
| ".p/reflect.boundary{distinct=true, overlap=minimal}", |
| ".p/reflect.uncertainty{quantify=true, distribution=show}", |
| ".p/collapse.detect{threshold=0.6, alert=true}", |
| ".p/fork.attribution{sources=contested, visualize=true}" |
| ] |
| |
| |
| print("\nExecuting recursive shell commands on boundary prompt...") |
| boundary_recursive_result = recursive_shell.execute_sequence( |
| commands=boundary_commands, |
| prompt=boundary_prompt |
| ) |
| |
| |
| boundary_summary = { |
| "prompt": boundary_prompt, |
| "output": boundary_output, |
| "shells_run": boundary_shells, |
| "recursive_commands": boundary_commands, |
| "attribution_stats": { |
| "links": len(boundary_attribution.links), |
| "attribution_gaps": len(boundary_attribution.attribution_gaps), |
| "collapsed_regions": len(boundary_attribution.collapsed_regions) |
| }, |
| "shell_results": { |
| shell_id: { |
| "residues": len(result["residues"]), |
| "collapse_samples": len(result["collapse_samples"]) |
| } |
| for shell_id, result in boundary_shell_results.items() |
| }, |
| "recursive_success": boundary_recursive_result["success"] |
| } |
| |
| |
| boundary_summary_path = output_dir / "boundary_analysis_summary.json" |
| with open(boundary_summary_path, "w") as f: |
| json.dump(boundary_summary, f, indent=2) |
| print(f"Boundary analysis summary saved to {boundary_summary_path}") |
| |
| |
| boundary_residues = [] |
| for shell_id, result in boundary_shell_results.items(): |
| for residue in result["residues"]: |
| boundary_residues.append({ |
| "shell": shell_id, |
| "type": residue.get("type", "unknown"), |
| "confidence": residue.get("confidence", 0.0), |
| "pattern": residue.get("pattern", "")[:50] + "..." |
| }) |
| |
| |
| if boundary_residues: |
| from glyphs.residue.patterns import ResiduePattern |
| residue_pattern_objects = [ |
| ResiduePattern( |
| type=r["type"], |
| pattern=r["pattern"], |
| context={"shell": r["shell"]}, |
| signature=r["pattern"][:20], |
| confidence=r["confidence"] |
| ) |
| for r in boundary_residues |
| ] |
| |
| |
| residue_glyph_map = mapper.map_residue_patterns( |
| residue_patterns=residue_pattern_objects, |
| layout_type="circular", |
| cluster_patterns=True |
| ) |
| |
| |
| residue_glyph_path = output_dir / "boundary_residue_glyphs.svg" |
| mapper.visualize(residue_glyph_map, output_path=str(residue_glyph_path)) |
| print(f"Boundary residue glyph map saved to {residue_glyph_path}") |
| |
| |
| fig, axes = plt.subplots(2, 2, figsize=(15, 12)) |
| fig.suptitle("glyphs Framework - Comprehensive Analysis", fontsize=16) |
| |
| |
| axes[0, 0].bar( |
| ["Memory", "Value Conflict", "Boundary", "Recursion", "Polysemantic"], |
| [ |
| len(attribution_map.attribution_gaps), |
| len(comparison_results["VALUE-COLLAPSE"]["attribution_map"].attribution_gaps) if "attribution_map" in comparison_results["VALUE-COLLAPSE"] else 0, |
| len(boundary_attribution.attribution_gaps), |
| 0, |
| 0 |
| ] |
| ) |
| axes[0, 0].set_title("Attribution Gaps by Prompt Type") |
| axes[0, 0].set_xlabel("Prompt Type") |
| axes[0, 0].set_ylabel("Number of Gaps") |
| axes[0, 0].set_ylim(bottom=0) |
| |
| |
| residue_types = list(residue_analysis["type_stats"].keys()) |
| residue_counts = [residue_analysis["type_stats"][t]["count"] for t in residue_types] |
| axes[0, 1].bar(residue_types, residue_counts) |
| axes[0, 1].set_title("Residue Pattern Types") |
| axes[0, 1].set_xlabel("Residue Type") |
| axes[0, 1].set_ylabel("Count") |
| axes[0, 1].set_ylim(bottom=0) |
| axes[0, 1].set_xticklabels(residue_types, rotation=45, ha="right") |
| |
| |
| shell_names = list(boundary_shell_results.keys()) |
| residue_counts = [len(boundary_shell_results[s]["residues"]) for s in shell_names] |
| collapse_counts = [len(boundary_shell_results[s]["collapse_samples"]) for s in shell_names] |
| |
| x = range(len(shell_names)) |
| width = 0.35 |
| axes[1, 0].bar([i - width/2 for i in x], residue_counts, width, label="Residues") |
| axes[1, 0].bar([i + width/2 for i in x], collapse_counts, width, label="Collapses") |
| axes[1, 0].set_title("Shell Analysis on Boundary Prompt") |
| axes[1, 0].set_xlabel("Shell") |
| axes[1, 0].set_ylabel("Count") |
| axes[1, 0].set_xticks(x) |
| axes[1, 0].set_xticklabels(shell_names, rotation=45, ha="right") |
| axes[1, 0].legend() |
| |
| |
| prompt_types = ["Memory", "Value Conflict", "Boundary", "Recursion", "Polysemantic"] |
| shell_counts = [ |
| len(executor.shells), |
| len([s for s in executor.shells if "VALUE" in s or "CONFLICT" in s]), |
| len([s for s in executor.shells if "BOUNDARY" in s or "GHOST" in s]), |
| len([s for s in executor.shells if "REFLECTION" in s or "RECURSIVE" in s]), |
| len([s for s in executor.shells if "FEATURE" in s or "SUPERPOSITION" in s]) |
| ] |
| |
| axes[1, 1].bar(prompt_types, shell_counts) |
| axes[1, 1].set_title("Available Shells by Conceptual Area") |
| axes[1, 1].set_xlabel("Conceptual Area") |
| axes[1, 1].set_ylabel("Number of Specialized Shells") |
| axes[1, 1].set_ylim(bottom=0) |
| |
| plt.tight_layout() |
| plt.subplots_adjust(top=0.92) |
| |
| |
| summary_viz_path = output_dir / "framework_analysis_summary.png" |
| plt.savefig(summary_viz_path, dpi=300) |
| print(f"Framework analysis summary visualization saved to {summary_viz_path}") |
| |
| print("\n=== Analysis Complete ===") |
| print(f"All outputs saved to: {output_dir}") |
|
|
| def attribution_demo(): |
| """Focused demo of advanced attribution tracing.""" |
| print("\n=== Advanced Attribution Tracing Demo ===") |
| |
| |
| prompt = test_prompts["polysemantic"] |
| print(f"Prompt: {prompt[:100]}...") |
| |
| |
| output = model.generate(prompt, max_tokens=800) |
| print(f"Output: {output[:100]}...") |
| |
| |
| attribution_map = tracer.trace( |
| prompt=prompt, |
| output=output, |
| include_confidence=True |
| ) |
| |
| |
| print("Tracing attribution forks...") |
| fork_result = tracer.trace_with_forks( |
| prompt=prompt, |
| output=output, |
| fork_factor=3, |
| include_confidence=True, |
| visualize=True |
| ) |
| |
| |
| if fork_result["metadata"].get("visualization"): |
| output_path = output_dir / "attribution_forks_polysemantic.svg" |
| visualizer.save_visualization(fork_result["metadata"]["visualization"], str(output_path)) |
| print(f"Fork visualization saved to {output_path}") |
| |
| |
| print("Tracing QK alignment...") |
| qk_result = tracer.trace_qk_alignment( |
| prompt=prompt, |
| output=output, |
| visualize=True |
| ) |
| |
| |
| if "visualization" in qk_result: |
| output_path = output_dir / "qk_alignment_polysemantic.svg" |
| visualizer.save_visualization(qk_result["visualization"], str(output_path)) |
| print(f"QK alignment visualization saved to {output_path}") |
| |
| |
| print("Tracing OV projection...") |
| ov_result = tracer.trace_ov_projection( |
| prompt=prompt, |
| output=output, |
| visualize=True |
| ) |
| |
| |
| if "visualization" in ov_result: |
| output_path = output_dir / "ov_projection_polysemantic.svg" |
| visualizer.save_visualization(ov_result["visualization"], str(output_path)) |
| print(f"OV projection visualization saved to {output_path}") |
| |
| |
| print("Tracing attention heads...") |
| heads_result = tracer.trace_attention_heads( |
| prompt=prompt, |
| output=output, |
| visualize=True |
| ) |
| |
| |
| if "visualization" in heads_result: |
| output_path = output_dir / "attention_heads_polysemantic.svg" |
| visualizer.save_visualization(heads_result["visualization"], str(output_path)) |
| print(f"Attention heads visualization saved to {output_path}") |
| |
| |
| attribution_summary = { |
| "basic_attribution": { |
| "links": len(attribution_map.links), |
| "gaps": len(attribution_map.attribution_gaps), |
| "collapsed_regions": len(attribution_map.collapsed_regions) |
| }, |
| "fork_attribution": { |
| "forks": len(fork_result["fork_paths"]), |
| "conflict_points": sum(len(fork.get("conflict_points", [])) for fork in fork_result["fork_paths"]) |
| }, |
| "qk_alignment": { |
| "alignments": len(qk_result["qk_alignments"]), |
| "patterns": list(qk_result["qk_patterns"].keys()) if "qk_patterns" in qk_result else [] |
| }, |
| "ov_projection": { |
| "projections": len(ov_result["ov_projections"]), |
| "patterns": list(ov_result["ov_patterns"].keys()) if "ov_patterns" in ov_result else [] |
| }, |
| "attention_heads": { |
| "heads": len(heads_result["attention_heads"]), |
| "patterns": list(heads_result["head_patterns"].keys()) if "head_patterns" in heads_result else [] |
| } |
| } |
| |
| |
| output_path = output_dir / "attribution_analysis_summary.json" |
| with open(output_path, "w") as f: |
| json.dump(attribution_summary, f, indent=2) |
| print(f"Attribution analysis summary saved to {output_path}") |
| |
| return attribution_summary |
|
|
| def recursive_shell_demo(): |
| """Focused demo of recursive shell commands.""" |
| print("\n=== Recursive Shell Command Demo ===") |
| |
| |
| recursive_shell = RecursiveShell( |
| model=model, |
| tracer=tracer, |
| visualizer=visualizer |
| ) |
| |
| |
| prompt = test_prompts["recursion"] |
| print(f"Prompt: {prompt[:100]}...") |
| |
| |
| help_text = recursive_shell.get_command_help() |
| output_path = output_dir / "recursive_shell_help.txt" |
| with open(output_path, "w") as f: |
| f.write(help_text) |
| print(f"Recursive shell help text saved to {output_path}") |
| |
| |
| demo_commands = [ |
| |
| ".p/reflect.trace{depth=3, target=reasoning}", |
| ".p/reflect.attribution{sources=all, confidence=true}", |
| ".p/reflect.uncertainty{quantify=true, distribution=show}", |
| ".p/reflect.boundary{distinct=true, overlap=minimal}", |
| |
| |
| ".p/collapse.detect{threshold=0.7, alert=true}", |
| ".p/collapse.prevent{trigger=recursive_depth, threshold=5}", |
| ".p/collapse.recover{from=loop, method=gradual}", |
| |
| |
| ".p/fork.attribution{sources=all, visualize=true}", |
| ".p/fork.counterfactual{variants=['optimistic', 'pessimistic'], compare=true}", |
| |
| |
| ".p/shell.isolate{boundary=permeable, contamination=warn}" |
| ] |
| |
| |
| command_results = {} |
| for cmd in demo_commands: |
| print(f"\nExecuting: {cmd}") |
| try: |
| result = recursive_shell.execute( |
| command=cmd, |
| prompt=prompt |
| ) |
| command_results[cmd] = { |
| "success": result["success"], |
| "execution_time": result.get("execution_time", 0), |
| "error": result["error"] if not result["success"] else None |
| } |
| |
| |
| if (result["success"] and |
| "result" in result and |
| "visualization" in result["result"] and |
| result["result"]["visualization"]): |
| |
| |
| safe_cmd = cmd.replace(".", "_").replace("/", "_").replace("{", "_").replace("}", "_") |
| output_path = output_dir / f"recursive_cmd_{safe_cmd}.svg" |
| visualizer.save_visualization(result["result"]["visualization"], str(output_path)) |
| print(f"Command visualization saved to {output_path}") |
| |
| |
| command_results[cmd]["visualization_path"] = str(output_path) |
| except Exception as e: |
| print(f"Error executing {cmd}: {e}") |
| command_results[cmd] = { |
| "success": False, |
| "error": str(e) |
| } |
| |
| |
| output_path = output_dir / "recursive_commands_summary.json" |
| with open(output_path, "w") as f: |
| json.dump(command_results, f, indent=2) |
| print(f"Recursive commands summary saved to {output_path}") |
| |
| |
| plt.figure(figsize=(12, 8)) |
| |
| |
| cmd_names = list(command_results.keys()) |
| cmd_success = [1 if command_results[cmd]["success"] else 0 for cmd in cmd_names] |
| cmd_times = [command_results[cmd].get("execution_time", 0) if command_results[cmd]["success"] else 0 for cmd in cmd_names] |
| |
| plt.barh(range(len(cmd_names)), cmd_success, alpha=0.7, label="Success") |
| plt.barh(range(len(cmd_names)), cmd_times, alpha=0.5, label="Execution Time (s)") |
| |
| plt.yticks(range(len(cmd_names)), [cmd[:30] + "..." for cmd in cmd_names]) |
| plt.xlabel("Success / Execution Time (s)") |
| plt.title("Recursive Shell Command Execution") |
| plt.legend() |
| plt.tight_layout() |
| |
| |
| output_path = output_dir / "recursive_commands_visualization.png" |
| plt.savefig(output_path) |
| print(f"Recursive commands visualization saved to {output_path}") |
| |
| return command_results |
|
|
| if __name__ == "__main__": |
| |
| output_dir.mkdir(exist_ok=True) |
| |
| |
| main() |
| |
| |
| |
| |
|
|