""" Evaluator implementations for code generation metrics. Each evaluator exposes a single method: evaluate(model, tokenizer, dataset) -> float Scores are always in [0, 1]. """ from __future__ import annotations import ast import multiprocessing import textwrap from abc import ABC, abstractmethod from concurrent.futures import ProcessPoolExecutor, TimeoutError as FuturesTimeoutError from typing import Any import numpy as np import torch from datasets import Dataset from sacrebleu.metrics import BLEU from transformers import PreTrainedModel, PreTrainedTokenizerBase # --------------------------------------------------------------------------- # Base # --------------------------------------------------------------------------- class BaseEvaluator(ABC): @abstractmethod def evaluate( self, model: PreTrainedModel, tokenizer: PreTrainedTokenizerBase, dataset: Dataset, ) -> float: ... def _generate_batch( self, model: PreTrainedModel, tokenizer: PreTrainedTokenizerBase, prompts: list[str], max_new_tokens: int = 256, num_return_sequences: int = 1, temperature: float = 0.2, ) -> list[list[str]]: """Generate completions for a list of prompts. Returns list-of-lists.""" results: list[list[str]] = [] device = next(model.parameters()).device for prompt in prompts: inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512) inputs = {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): outputs = model.generate( **inputs, max_new_tokens=max_new_tokens, num_return_sequences=num_return_sequences, do_sample=temperature > 0, temperature=temperature if temperature > 0 else 1.0, top_p=0.95, pad_token_id=tokenizer.eos_token_id, ) prompt_len = inputs["input_ids"].shape[1] completions = [ tokenizer.decode(out[prompt_len:], skip_special_tokens=True) for out in outputs ] results.append(completions) return results # --------------------------------------------------------------------------- # Pass@k # --------------------------------------------------------------------------- class PassAtKEvaluator(BaseEvaluator): """ Unbiased pass@k estimator from Chen et al. (2021): pass@k = 1 - C(n-c, k) / C(n, k) where n = total samples, c = correct samples. """ def __init__(self, k: int = 1, n: int = 10) -> None: self.k = k self.n = n def evaluate( self, model: PreTrainedModel, tokenizer: PreTrainedTokenizerBase, dataset: Dataset, num_problems: int = 50, ) -> float: problems = dataset.select(range(min(num_problems, len(dataset)))) prompts = [str(ex.get("prompt", ex.get("content", ""))) for ex in problems] references = [str(ex.get("canonical_solution", ex.get("content", ""))) for ex in problems] all_completions = self._generate_batch( model, tokenizer, prompts, num_return_sequences=self.n, temperature=0.8, # diversity for pass@k ) scores: list[float] = [] for completions, reference in zip(all_completions, references): correct = sum( 1 for c in completions if self._is_correct(c, reference) ) scores.append(self._pass_at_k(n=self.n, c=correct, k=self.k)) return float(np.mean(scores)) @staticmethod def _pass_at_k(n: int, c: int, k: int) -> float: if n - c < k: return 1.0 return 1.0 - float(np.prod([(n - c - i) / (n - i) for i in range(k)])) @staticmethod def _is_correct(completion: str, reference: str) -> bool: # Basic syntactic check — override with execution check for HumanEval-style try: ast.parse(completion) return completion.strip() == reference.strip() except SyntaxError: return False # --------------------------------------------------------------------------- # BLEU # --------------------------------------------------------------------------- class BleuEvaluator(BaseEvaluator): def __init__(self, max_new_tokens: int = 256) -> None: self._max_new_tokens = max_new_tokens self._bleu = BLEU(effective_order=True) def evaluate( self, model: PreTrainedModel, tokenizer: PreTrainedTokenizerBase, dataset: Dataset, num_samples: int = 100, ) -> float: subset = dataset.select(range(min(num_samples, len(dataset)))) prompts = [str(ex.get("prompt", ex.get("content", ""))) for ex in subset] references = [str(ex.get("canonical_solution", ex.get("content", ""))) for ex in subset] completions_batch = self._generate_batch( model, tokenizer, prompts, max_new_tokens=self._max_new_tokens ) hypotheses = [batch[0] for batch in completions_batch] result = self._bleu.corpus_score(hypotheses, [references]) # sacrebleu returns score in [0, 100]; normalise to [0, 1] return result.score / 100.0 # --------------------------------------------------------------------------- # Execution accuracy # --------------------------------------------------------------------------- def _run_code_safe(code: str, timeout: int) -> bool: """Run in a subprocess to enforce timeout and isolate crashes.""" try: exec(compile(code, "", "exec"), {}) # noqa: S102 return True except Exception: return False class ExecutionAccuracyEvaluator(BaseEvaluator): """Fraction of generated code snippets that execute without error.""" def __init__(self, timeout: int = 10, max_new_tokens: int = 256) -> None: self._timeout = timeout self._max_new_tokens = max_new_tokens def evaluate( self, model: PreTrainedModel, tokenizer: PreTrainedTokenizerBase, dataset: Dataset, num_samples: int = 50, ) -> float: subset = dataset.select(range(min(num_samples, len(dataset)))) prompts = [str(ex.get("prompt", ex.get("content", ""))) for ex in subset] completions_batch = self._generate_batch( model, tokenizer, prompts, max_new_tokens=self._max_new_tokens ) codes = [batch[0] for batch in completions_batch] passed = 0 with ProcessPoolExecutor(max_workers=4) as executor: futures = {executor.submit(_run_code_safe, code, self._timeout): code for code in codes} for future in futures: try: if future.result(timeout=self._timeout + 1): passed += 1 except (FuturesTimeoutError, Exception): pass return passed / len(codes) if codes else 0.0 # --------------------------------------------------------------------------- # Exact match # --------------------------------------------------------------------------- class ExactMatchEvaluator(BaseEvaluator): def evaluate( self, model: PreTrainedModel, tokenizer: PreTrainedTokenizerBase, dataset: Dataset, num_samples: int = 100, ) -> float: subset = dataset.select(range(min(num_samples, len(dataset)))) prompts = [str(ex.get("prompt", ex.get("content", ""))) for ex in subset] references = [str(ex.get("canonical_solution", ex.get("content", ""))) for ex in subset] completions_batch = self._generate_batch(model, tokenizer, prompts) hypotheses = [batch[0].strip() for batch in completions_batch] matches = sum(h == r.strip() for h, r in zip(hypotheses, references)) return matches / len(references) if references else 0.0