import os import uuid import time import subprocess import base64 import hashlib import json import random import asyncio import aiofiles import logging import secrets from collections import deque from datetime import datetime, timedelta from pathlib import Path from typing import Optional, Dict, List from fastapi import FastAPI, Request, HTTPException, BackgroundTasks from fastapi.responses import FileResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from huggingface_hub import HfApi, hf_hub_download from pydantic import BaseModel, Field # ============================================================================= # CONFIG # ============================================================================= HF_DATASET = os.getenv("HF_DATASET", "YTShortMakerArchx/BG_VIDS_ARCHX_YT") HF_TOKEN = os.getenv("HF_TOKEN") APP_KEY = os.getenv("APP_KEY") UI_ORIGIN = os.getenv("UI_ORIGIN", "http://shortgenx.pages.dev") BASE_DIR = Path("/tmp") AUDIO_DIR = BASE_DIR / "audio" OUTPUT_DIR = BASE_DIR / "out" JOBS_DIR = BASE_DIR / "jobs" CLIENT_DIR = BASE_DIR / "clients" for d in [AUDIO_DIR, OUTPUT_DIR, JOBS_DIR, CLIENT_DIR]: d.mkdir(parents=True, exist_ok=True) # Rate limiting RATE_LIMITS: Dict[str, list] = {} GENERATE_LIMIT = 3 # per hour per client STATUS_LIMIT = 1000 # per hour per job # Job timeout JOBS: Dict[str, Dict] = {} JOB_TIMEOUT = 900 # 15 min hard cap # Video listing cache VIDEO_CACHE: Dict[str, tuple] = {} CACHE_TIMEOUT = 300 # ============================================================================= # QUEUE SYSTEM # ============================================================================= MAX_CONCURRENT = 1 AVG_JOB_DURATION_S = 360 # 6 minutes per job _JOB_QUEUE: deque = deque() _RUNNING_JOBS: set = set() _QUEUE_LOCK: asyncio.Lock = None # created at startup # ============================================================================= # CLIENT ID SYSTEM # ============================================================================= CLIENT_TOKENS: Dict[str, Dict] = {} def _client_file(token: str) -> Path: return CLIENT_DIR / f"{token[:8]}.json" def _load_client(token: str) -> Optional[Dict]: if token in CLIENT_TOKENS: return CLIENT_TOKENS[token] path = _client_file(token) if path.exists(): try: rec = json.loads(path.read_text()) CLIENT_TOKENS[token] = rec return rec except Exception: pass return None def _save_client(rec: Dict): token = rec["token"] CLIENT_TOKENS[token] = rec try: rec_copy = rec.copy() rec_copy["created_at"] = ( rec_copy["created_at"].isoformat() if isinstance(rec_copy["created_at"], datetime) else rec_copy["created_at"] ) rec_copy["last_seen"] = ( rec_copy["last_seen"].isoformat() if isinstance(rec_copy["last_seen"], datetime) else rec_copy["last_seen"] ) _client_file(token).write_text(json.dumps(rec_copy)) except Exception as e: logger.warning(f"Failed to persist client {token[:8]}: {e}") def _create_client(fingerprint: str) -> Dict: rec = { "token": secrets.token_hex(24), "fingerprint": fingerprint, "created_at": datetime.now(), "last_seen": datetime.now(), "job_count": 0, "jobs": [], } _save_client(rec) logger.info(f"New client {rec['token'][:8]}...") return rec def _fingerprint(req: Request) -> str: data = ( f"{req.client.host}:" f"{req.headers.get('user-agent', '')}:" f"{req.headers.get('accept', '')}" ) return hashlib.sha256(data.encode()).hexdigest() def resolve_client(req: Request) -> Dict: token = req.headers.get("X-Client-ID", "").strip() if token: rec = _load_client(token) if rec: rec["last_seen"] = datetime.now() _save_client(rec) return rec fp = _fingerprint(req) rec = { "token": token, "fingerprint": fp, "created_at": datetime.now(), "last_seen": datetime.now(), "job_count": 0, "jobs": [], } _save_client(rec) logger.info(f"Restored client {token[:8]}... from header") return rec fp = _fingerprint(req) for rec in CLIENT_TOKENS.values(): if rec.get("fingerprint") == fp: rec["last_seen"] = datetime.now() _save_client(rec) return rec return _create_client(fp) # ============================================================================= # LOGGING # ============================================================================= logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s", ) logger = logging.getLogger(__name__) # ============================================================================= # APP # ============================================================================= app = FastAPI( title="ArchNemix Shorts Generator API", version="5.1.0", docs_url="/docs", redoc_url="/redoc", ) app.add_middleware( CORSMiddleware, allow_origins=[ "https://shortgenx.pages.dev", # Your website "http://localhost:3000", # Local dev only port 3000 "http://127.0.0.1:3000", # Local dev only port 3000 "https://ytshortmakerarchx-archnemix-controller.hf.space", # Controller space ], allow_credentials=True, allow_methods=["GET", "POST", "DELETE", "OPTIONS", "HEAD"], allow_headers=["*"], expose_headers=["X-Client-ID"], max_age=600, ) # ============================================================================= # MODELS # ============================================================================= class GenerateRequest(BaseModel): audio_base64: str = Field(..., min_length=10) subtitles_ass: str = Field(..., min_length=10) background: str duration: Optional[float] = Field(None, ge=1, le=180) request_id: Optional[str] = None # ============================================================================= # SECURITY # ============================================================================= def validate_app_key(req: Request) -> bool: if not APP_KEY or APP_KEY == "archnemix-secret-key-change-in-production": return True key = ( req.headers.get("X-APP-KEY") or req.headers.get("Authorization", "").replace("Bearer ", "") ) return key == APP_KEY def validate_origin(req: Request) -> bool: origin = req.headers.get("origin") or req.headers.get("referer", "") if not origin: return True # Strict allowed origins - exact match only allowed = [ "https://shortgenx.pages.dev", "http://localhost:3000", "http://127.0.0.1:3000", "https://ytshortmakerarchx-archnemix-controller.hf.space", ] # Use exact match, not prefix matching return origin in allowed def rate_limit(key: str, limit: int, window: int = 3600) -> bool: now = time.time() RATE_LIMITS.setdefault(key, []) RATE_LIMITS[key] = [t for t in RATE_LIMITS[key] if now - t < window] if len(RATE_LIMITS[key]) >= limit: return False RATE_LIMITS[key].append(now) return True # ============================================================================= # DATASET HELPERS # ============================================================================= def get_hf_api() -> HfApi: if not HF_TOKEN: raise ValueError("HF_TOKEN not configured") return HfApi(token=HF_TOKEN) async def list_videos_from_dataset(category: str = "minecraft") -> List[str]: cache_key = f"videos_{category}" if cache_key in VIDEO_CACHE: ts, data = VIDEO_CACHE[cache_key] if time.time() - ts < CACHE_TIMEOUT: return data try: api = get_hf_api() files = list(api.list_repo_files(repo_id=HF_DATASET, repo_type="dataset")) result = [ f.split("/")[-1].replace(".mp4", "") for f in files if f.startswith(f"{category}/") and f.endswith(".mp4") ] VIDEO_CACHE[cache_key] = (time.time(), result) return result except Exception as e: logger.error(f"list_videos failed: {e}") return [f"mc{i}" for i in range(1, 7)] async def download_video_from_dataset(video_name: str, category: str = "minecraft") -> str: video_name = video_name.replace(".mp4", "") for path in [f"{category}/{video_name}.mp4", f"{video_name}.mp4"]: try: return hf_hub_download( repo_id=HF_DATASET, filename=path, repo_type="dataset", token=HF_TOKEN, cache_dir="/tmp/hf_cache", ) except Exception: continue available = await list_videos_from_dataset(category) raise HTTPException(404, f"Video '{video_name}' not found. Available: {available}") # ============================================================================= # FILE HELPERS # ============================================================================= async def save_base64_audio(data: str, path: Path) -> int: if data.startswith("data:audio/"): data = data.split(",", 1)[1] audio_bytes = base64.b64decode(data) if len(audio_bytes) > 15 * 1024 * 1024: raise ValueError("Audio too large (max 15 MB)") async with aiofiles.open(path, "wb") as f: await f.write(audio_bytes) return len(audio_bytes) async def save_subtitles(ass: str, path: Path): if "[Script Info]" not in ass or "[Events]" not in ass: raise ValueError("Invalid ASS subtitle format") if len(ass.encode()) > 500 * 1024: raise ValueError("Subtitles too large (max 500 KB)") async with aiofiles.open(path, "w", encoding="utf-8") as f: await f.write(ass) def get_media_duration(path: Path) -> float: try: r = subprocess.run( ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(path)], capture_output=True, text=True, check=True, timeout=10, ) return float(r.stdout.strip()) except Exception: try: return max(1.0, min(180.0, path.stat().st_size / 16000)) except Exception: return 30.0 # ============================================================================= # JOB MANAGEMENT # ============================================================================= def _make_job(job_id: str, client_token: str) -> Dict: return { "id": job_id, "client_token": client_token, "status": "queued", "queue_position": None, "progress": 0, "message": "Waiting in queue...", "created_at": datetime.now(), "updated_at": datetime.now(), "error": None, "output_path": None, "audio_path": None, "subs_path": None, "ffmpeg_frame": 0, "ffmpeg_fps": 0.0, "ffmpeg_out_time": 0.0, "ffmpeg_speed": 0.0, "total_duration": 0.0, } def create_job(job_id: str, client_token: str) -> Dict: JOBS[job_id] = _make_job(job_id, client_token) _persist_job(job_id) return JOBS[job_id] def update_job(job_id: str, **kwargs): if job_id not in JOBS: return job = JOBS[job_id] for k, v in kwargs.items(): job[k] = v job["updated_at"] = datetime.now() _persist_job(job_id) def _persist_job(job_id: str): if job_id not in JOBS: return try: data = { k: (v.isoformat() if isinstance(v, datetime) else v) for k, v in JOBS[job_id].items() } (JOBS_DIR / f"{job_id}.json").write_text(json.dumps(data)) except Exception as e: logger.error(f"persist_job failed: {e}") def cleanup_old_jobs(): now = datetime.now() for job_id in list(JOBS): job = JOBS[job_id] age = (now - job["updated_at"]).total_seconds() if age > 3600: JOBS.pop(job_id, None) _RUNNING_JOBS.discard(job_id) try: _JOB_QUEUE.remove(job_id) except ValueError: pass for p in [ JOBS_DIR / f"{job_id}.json", AUDIO_DIR / f"{job_id}.mp3", AUDIO_DIR / f"{job_id}.ass", AUDIO_DIR / f"{job_id}_trimmed.mp4", OUTPUT_DIR / f"{job_id}.mp4", ]: try: p.unlink(missing_ok=True) except Exception: pass # ============================================================================= # QUEUE HELPERS # ============================================================================= def queue_position(job_id: str) -> Optional[int]: try: idx = list(_JOB_QUEUE).index(job_id) return idx + 1 except ValueError: return None def queue_eta(position: int, job_duration: Optional[float] = None) -> Dict: running_remaining = AVG_JOB_DURATION_S for jid in _RUNNING_JOBS: j = JOBS.get(jid, {}) total = j.get("total_duration", 0) or AVG_JOB_DURATION_S out = j.get("ffmpeg_out_time", 0) or 0 remaining = max(0, total - out) running_remaining = min(running_remaining, remaining) queued_ahead = max(0, position - 1) total_wait_s = running_remaining + queued_ahead * AVG_JOB_DURATION_S own_duration = job_duration or AVG_JOB_DURATION_S total_s = total_wait_s + own_duration low_s = int(total_wait_s * 0.75) high_s = int(total_wait_s * 1.2) def _fmt(s: int) -> str: if s < 60: return f"{s}s" m, sec = divmod(s, 60) return f"{m}m {sec:02d}s" if sec else f"{m}m" return { "queue_position": position, "jobs_ahead": position, "wait_low_s": low_s, "wait_high_s": high_s, "wait_low_human": _fmt(low_s), "wait_high_human": _fmt(high_s), "total_with_own_s": int(total_s), "message": ( f"Position {position} in queue — " f"roughly {_fmt(low_s)}–{_fmt(high_s)} wait, " f"likely less" ), } async def _queue_worker(): while True: await asyncio.sleep(2) if len(_RUNNING_JOBS) >= MAX_CONCURRENT: continue if not _JOB_QUEUE: continue job_id = _JOB_QUEUE[0] job = JOBS.get(job_id) if not job or job["status"] in ("failed", "completed"): _JOB_QUEUE.popleft() continue _JOB_QUEUE.popleft() _RUNNING_JOBS.add(job_id) _refresh_queue_positions() audio_path = Path(job["audio_path"]) subs_path = Path(job["subs_path"]) background = job.get("background", "mc1") duration = job.get("requested_duration") asyncio.create_task( _run_and_release(job_id, audio_path, subs_path, background, duration) ) async def _run_and_release(job_id, audio_path, subs_path, background, duration): try: await process_video_task(job_id, audio_path, subs_path, background, duration) finally: _RUNNING_JOBS.discard(job_id) _refresh_queue_positions() def _refresh_queue_positions(): for i, jid in enumerate(_JOB_QUEUE): if jid in JOBS: pos = i + 1 JOBS[jid]["queue_position"] = pos JOBS[jid]["message"] = queue_eta(pos)["message"] _persist_job(jid) # ============================================================================= # FFMPEG PROCESS MANAGEMENT # ============================================================================= _FFMPEG_PROCS: Dict[str, asyncio.subprocess.Process] = {} def _kill_ffmpeg_for_job(job_id: str): proc = _FFMPEG_PROCS.pop(job_id, None) if proc and proc.returncode is None: try: proc.kill() logger.info(f"Killed FFmpeg for job {job_id}") except Exception: pass def _parse_ffmpeg_kv(line: str) -> Dict: parts = line.strip().split("=", 1) return {parts[0].strip(): parts[1].strip()} if len(parts) == 2 else {} async def _stream_ffmpeg_progress( proc: asyncio.subprocess.Process, job_id: str, total_duration: float, progress_start: int = 50, progress_end: int = 95, ): frame = fps = speed = 0 out_time_s = 0.0 last_update_time = time.time() async for raw_line in proc.stderr: line = raw_line.decode(errors="replace").strip() if not line: continue kv = _parse_ffmpeg_kv(line) if not kv: continue key, val = next(iter(kv.items())) if key == "frame": frame = int(val) if val.isdigit() else frame elif key == "fps": try: fps = float(val) except ValueError: pass elif key == "out_time_ms": try: out_time_s = int(val) / 1_000_000 except ValueError: pass elif key == "out_time": try: parts = val.split(":") if len(parts) == 3: h, m, s = parts out_time_s = int(h) * 3600 + int(m) * 60 + float(s) except ValueError: pass elif key == "speed": try: speed = float(val.replace("x", "")) except ValueError: pass elif key == "progress": now = time.time() if val == "end" or now - last_update_time >= 0.5: last_update_time = now ratio = min(1.0, out_time_s / total_duration) if total_duration > 0 else 0 mapped = int(progress_start + ratio * (progress_end - progress_start)) update_job( job_id, progress=mapped, ffmpeg_frame=frame, ffmpeg_fps=fps, ffmpeg_out_time=round(out_time_s, 2), ffmpeg_speed=speed, total_duration=total_duration, message=( f"Encoding {out_time_s:.1f}s / {total_duration:.1f}s " f"({speed:.1f}x speed)" if total_duration > 0 and speed > 0 else "Encoding..." ), ) if val == "end": break if JOBS.get(job_id, {}).get("status") == "failed": proc.kill() break # ============================================================================= # CORE VIDEO PROCESSING # ============================================================================= async def process_video_task( job_id: str, audio_path: Path, subs_path: Path, background: str, duration: Optional[float], ): try: logger.info(f"Job {job_id}: starting encode") update_job(job_id, status="processing", progress=10, message="Downloading background video...", queue_position=None) try: # Auto-detect category from video name # ss1, ss2, ss3, ss4, ss5 → subwaysurfers # mc1, mc2, etc. → minecraft if background.startswith('ss'): category = 'subwaysurfers' elif background.startswith('mc'): category = 'minecraft' else: category = 'minecraft' # default fallback bg_path = await download_video_from_dataset(background, category) except HTTPException as e: update_job(job_id, status="failed", error=str(e.detail)) return update_job(job_id, progress=20, message="Analysing media...") audio_dur = duration or get_media_duration(audio_path) video_dur = get_media_duration(Path(bg_path)) if not (1 <= audio_dur <= 180): update_job(job_id, status="failed", error="Audio duration out of range (1-180 s)") return logger.info(f"Job {job_id}: audio={audio_dur:.2f}s bg={video_dur:.2f}s") if video_dur > audio_dur: max_start = video_dur - audio_dur start = random.uniform(0, max_start) bg_input_args = [ "-ss", f"{start:.3f}", "-t", f"{audio_dur:.3f}", "-i", str(bg_path), ] logger.info(f"Job {job_id}: trim start={start:.2f}s") else: loop_n = int(audio_dur / video_dur) + 2 bg_input_args = [ "-stream_loop", str(loop_n), "-i", str(bg_path), ] logger.info(f"Job {job_id}: loop x{loop_n}") subs_str = str(subs_path).replace("\\", "/").replace(":", "\\:") vf = ( f"[0:v]" f"ass='{subs_str}'," f"scale=1080:1920:force_original_aspect_ratio=increase," f"crop=1080:1920," f"setsar=1" f"[v]" ) output_path = OUTPUT_DIR / f"{job_id}.mp4" final_cmd = [ "ffmpeg", "-y", "-hide_banner", "-loglevel", "error", *bg_input_args, "-i", str(audio_path), "-filter_complex", vf, "-map", "[v]", "-map", "1:a", "-c:v", "libx264", "-preset", "medium", "-crf", "16", "-minrate", "8M", "-maxrate", "16M", "-bufsize", "16M", "-profile:v", "high", "-level:v", "4.2", "-pix_fmt", "yuv420p", "-c:a", "aac", "-b:a", "192k", "-shortest", "-movflags", "+faststart", "-threads", "0", "-progress", "pipe:2", "-t", f"{audio_dur:.3f}", str(output_path), ] update_job(job_id, progress=40, message="Encoding... (single-pass)", total_duration=audio_dur) timeout = max(480, int(audio_dur * 5) + 120) try: encode_proc = await asyncio.create_subprocess_exec( *final_cmd, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE, ) _FFMPEG_PROCS[job_id] = encode_proc await asyncio.wait_for( _stream_ffmpeg_progress(encode_proc, job_id, audio_dur, progress_start=40, progress_end=95), timeout=timeout, ) await asyncio.wait_for(encode_proc.wait(), timeout=30) _FFMPEG_PROCS.pop(job_id, None) if encode_proc.returncode != 0: raise RuntimeError(f"FFmpeg exited with code {encode_proc.returncode}") except asyncio.TimeoutError: _kill_ffmpeg_for_job(job_id) update_job(job_id, status="failed", error="Encoding timed out — try a shorter script") return except Exception as e: _kill_ffmpeg_for_job(job_id) if JOBS.get(job_id, {}).get("status") == "failed": return update_job(job_id, status="failed", error=f"Encoding failed: {str(e)[:200]}") return if not output_path.exists() or output_path.stat().st_size == 0: update_job(job_id, status="failed", error="Output file missing after encode") return size_mb = output_path.stat().st_size / 1024 / 1024 logger.info(f"Job {job_id}: done {size_mb:.2f} MB") for p in [audio_path, subs_path]: try: p.unlink(missing_ok=True) except Exception: pass update_job( job_id, status="completed", progress=100, message=f"Done! {size_mb:.1f} MB", output_path=str(output_path), ) except Exception: logger.exception(f"process_video_task unhandled for {job_id}") update_job(job_id, status="failed", error="Unexpected server error") async def _cleanup_old_videos(): """Deletes videos older than 1 hour every 10 minutes""" while True: await asyncio.sleep(600) # 10 minutes try: cutoff_time = time.time() - 3600 # 1 hour ago cleaned_count = 0 cleaned_size_mb = 0.0 for video_file in OUTPUT_DIR.glob("*.mp4"): try: file_stat = video_file.stat() if file_stat.st_mtime < cutoff_time: file_size_mb = file_stat.st_size / 1024 / 1024 video_file.unlink() cleaned_count += 1 cleaned_size_mb += file_size_mb logger.info(f"Cleaned up {video_file.name} ({file_size_mb:.1f}MB)") except Exception as e: logger.warning(f"Cleanup failed {video_file.name}: {e}") if cleaned_count > 0: logger.info(f"Cleaned {cleaned_count} videos, freed {cleaned_size_mb:.1f}MB") except Exception as e: logger.error(f"Cleanup error: {e}") # ============================================================================= # FILE CLEANUP HELPER # ============================================================================= def _delete_output_file(job_id: str, output_path: Path): """ Delete the video file after it has been sent to the user. Called as a background task from /download so /tmp stays clean. """ try: output_path.unlink(missing_ok=True) if job_id in JOBS: JOBS[job_id]["output_path"] = None JOBS[job_id]["message"] = "Downloaded and cleaned up" _persist_job(job_id) logger.info(f"Cleaned up {job_id} after download") except Exception as e: logger.warning(f"Cleanup failed for {job_id}: {e}") # ============================================================================= # API ENDPOINTS # ============================================================================= @app.get("/") async def root(): return { "name": "ArchNemix Shorts Generator API", "version": "5.1.0", "queue": { "max_concurrent": MAX_CONCURRENT, "avg_job_duration_s": AVG_JOB_DURATION_S, }, "endpoints": { "register": "GET /register", "generate": "POST /generate", "cancel": "DELETE /job/{id}", "job_status": "GET /job/{id}", "queue": "GET /queue", "stream": "GET /stream/{id} — inline video playback", "download": "GET /download/{id} — file download (deletes after)", "delete": "DELETE /video/{id}", "videos": "GET /videos/{category}", "health": "GET /health", }, } @app.get("/register") async def register_client(req: Request): client = resolve_client(req) return JSONResponse( { "client_token": client["token"], "is_new": client["job_count"] == 0, "job_count": client["job_count"], "instructions": ( "Store client_token in localStorage as 'archx_client_id'. " "Send it as the X-Client-ID header on every request." ), }, headers={"X-Client-ID": client["token"]}, ) @app.get("/health") async def health(): try: videos = await list_videos_from_dataset("minecraft") return { "status": "healthy", "timestamp": datetime.now().isoformat(), "videos": len(videos), "running": len(_RUNNING_JOBS), "queued": len(_JOB_QUEUE), "active_jobs": len([j for j in JOBS.values() if j["status"] == "processing"]), } except Exception as e: return JSONResponse({"status": "degraded", "error": str(e)}, status_code=503) @app.get("/queue") async def queue_status(): running = len(_RUNNING_JOBS) waiting = len(_JOB_QUEUE) return { "running": running, "waiting": waiting, "total_active": running + waiting, "avg_job_s": AVG_JOB_DURATION_S, "estimated_wait_s": waiting * AVG_JOB_DURATION_S if running == 0 else (waiting + 1) * AVG_JOB_DURATION_S, "message": ( "No jobs running — submit yours now!" if (running + waiting) == 0 else f"{running} running, {waiting} in queue" ), } @app.get("/videos/{category}") async def list_videos(category: str = "minecraft"): try: # Map category aliases to dataset folder names category_map = { "minecraft": "minecraft", "subwaysurfers": "subwaysurfers", "ss": "subwaysurfers", } dataset_category = category_map.get(category.lower(), category.lower()) videos = await list_videos_from_dataset(dataset_category) # Fallback to default list if no videos found if not videos: if dataset_category == "subwaysurfers": videos = [f"ss{i}" for i in range(1, 6)] # ss1 through ss5 else: videos = [f"mc{i}" for i in range(1, 7)] return {"category": category, "videos": videos, "count": len(videos)} except Exception as e: # Fallback defaults if category.lower() in ["subwaysurfers", "ss"]: videos = [f"ss{i}" for i in range(1, 6)] # ss1 through ss5 else: videos = [f"mc{i}" for i in range(1, 7)] return {"category": category, "videos": videos, "count": len(videos), "error": str(e)} @app.post("/generate") async def generate_video( req: GenerateRequest, background_tasks: BackgroundTasks, http_req: Request, ): if not validate_app_key(http_req): raise HTTPException(403, "Invalid API key") if not validate_origin(http_req): raise HTTPException(403, "Invalid origin") client = resolve_client(http_req) token = client["token"] if not rate_limit(f"generate:{token}", GENERATE_LIMIT): raise HTTPException(429, f"Rate limit: max {GENERATE_LIMIT} requests/hour") cleanup_old_jobs() active_for_client = [ j for j in JOBS.values() if j.get("client_token") == token and j["status"] in ("queued", "processing") ] if active_for_client: existing_id = active_for_client[0]["id"] raise HTTPException( 409, f"You already have an active job ({existing_id}). " "Wait for it to finish or cancel it first.", ) job_id = str(uuid.uuid4()) audio_path = AUDIO_DIR / f"{job_id}.mp3" subs_path = AUDIO_DIR / f"{job_id}.ass" create_job(job_id, token) try: await save_base64_audio(req.audio_base64, audio_path) await save_subtitles(req.subtitles_ass, subs_path) except ValueError as e: update_job(job_id, status="failed", error=str(e)) raise HTTPException(400, str(e)) update_job( job_id, audio_path=str(audio_path), subs_path=str(subs_path), background=req.background, requested_duration=req.duration, ) client["job_count"] += 1 client["jobs"].append(job_id) if len(client["jobs"]) > 50: client["jobs"] = client["jobs"][-50:] _save_client(client) position = len(_JOB_QUEUE) + 1 if _RUNNING_JOBS else 0 _JOB_QUEUE.append(job_id) _refresh_queue_positions() if position == 0: eta = {"message": "Starting very soon...", "wait_low_s": 0, "wait_high_s": 30} else: eta = queue_eta(position, req.duration) return JSONResponse( { "job_id": job_id, "client_token": token, "status": "queued", "queue_position": position if position > 0 else 1, "queue": eta, "check_status": f"/job/{job_id}", }, headers={"X-Client-ID": token}, ) @app.get("/job/{job_id}") async def job_status(job_id: str, http_req: Request): if not rate_limit(f"status:{job_id}", STATUS_LIMIT): return JSONResponse({"job_id": job_id, "status": "rate_limited"}) if job_id not in JOBS: job_file = JOBS_DIR / f"{job_id}.json" if job_file.exists(): try: return JSONResponse(json.loads(job_file.read_text())) except Exception: pass raise HTTPException(404, "Job not found") job = JOBS[job_id] if (datetime.now() - job["updated_at"]).total_seconds() > JOB_TIMEOUT: if job["status"] not in ("completed", "failed"): update_job(job_id, status="failed", error="Job timed out") pos = queue_position(job_id) resp: Dict = { "job_id": job["id"], "status": job["status"], "progress": job["progress"], "message": job["message"], "queue_position": pos, "queue": queue_eta(pos) if pos else None, "created_at": job["created_at"].isoformat() if isinstance(job["created_at"], datetime) else job["created_at"], "updated_at": job["updated_at"].isoformat() if isinstance(job["updated_at"], datetime) else job["updated_at"], "encode_stats": { "frame": job.get("ffmpeg_frame", 0), "fps": job.get("ffmpeg_fps", 0.0), "out_time_s": job.get("ffmpeg_out_time", 0.0), "total_s": job.get("total_duration", 0.0), "speed": job.get("ffmpeg_speed", 0.0), }, } # ── Return both URLs when completed — controller stores these directly ─── if job["status"] == "completed" and job.get("output_path"): resp["download_url"] = f"/download/{job_id}" resp["stream_url"] = f"/stream/{job_id}" if job.get("error"): resp["error"] = job["error"] return JSONResponse(resp) @app.get("/stream/{job_id}") async def stream_video(job_id: str): """ Serve the video for inline browser playback. Frontend uses this as