| import os |
| import uuid |
| import time |
| import subprocess |
| import base64 |
| import hashlib |
| import json |
| import random |
| import asyncio |
| import aiofiles |
| import logging |
| import secrets |
|
|
| from collections import deque |
| from datetime import datetime, timedelta |
| from pathlib import Path |
| from typing import Optional, Dict, List |
|
|
| from fastapi import FastAPI, Request, HTTPException, BackgroundTasks |
| from fastapi.responses import FileResponse, JSONResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from huggingface_hub import HfApi, hf_hub_download |
| from pydantic import BaseModel, Field |
|
|
| |
| |
| |
| HF_DATASET = os.getenv("HF_DATASET", "YTShortMakerArchx/BG_VIDS_ARCHX_YT") |
| HF_TOKEN = os.getenv("HF_TOKEN") |
| APP_KEY = os.getenv("APP_KEY") |
| UI_ORIGIN = os.getenv("UI_ORIGIN", "http://shortgenx.pages.dev") |
|
|
| BASE_DIR = Path("/tmp") |
| AUDIO_DIR = BASE_DIR / "audio" |
| OUTPUT_DIR = BASE_DIR / "out" |
| JOBS_DIR = BASE_DIR / "jobs" |
| CLIENT_DIR = BASE_DIR / "clients" |
|
|
| for d in [AUDIO_DIR, OUTPUT_DIR, JOBS_DIR, CLIENT_DIR]: |
| d.mkdir(parents=True, exist_ok=True) |
|
|
| |
| RATE_LIMITS: Dict[str, list] = {} |
| GENERATE_LIMIT = 3 |
| STATUS_LIMIT = 1000 |
|
|
| |
| JOBS: Dict[str, Dict] = {} |
| JOB_TIMEOUT = 900 |
|
|
| |
| VIDEO_CACHE: Dict[str, tuple] = {} |
| CACHE_TIMEOUT = 300 |
|
|
| |
| |
| |
| MAX_CONCURRENT = 1 |
| AVG_JOB_DURATION_S = 360 |
|
|
| _JOB_QUEUE: deque = deque() |
| _RUNNING_JOBS: set = set() |
| _QUEUE_LOCK: asyncio.Lock = None |
|
|
| |
| |
| |
| CLIENT_TOKENS: Dict[str, Dict] = {} |
|
|
| def _client_file(token: str) -> Path: |
| return CLIENT_DIR / f"{token[:8]}.json" |
|
|
| def _load_client(token: str) -> Optional[Dict]: |
| if token in CLIENT_TOKENS: |
| return CLIENT_TOKENS[token] |
| path = _client_file(token) |
| if path.exists(): |
| try: |
| rec = json.loads(path.read_text()) |
| CLIENT_TOKENS[token] = rec |
| return rec |
| except Exception: |
| pass |
| return None |
|
|
| def _save_client(rec: Dict): |
| token = rec["token"] |
| CLIENT_TOKENS[token] = rec |
| try: |
| rec_copy = rec.copy() |
| rec_copy["created_at"] = ( |
| rec_copy["created_at"].isoformat() |
| if isinstance(rec_copy["created_at"], datetime) |
| else rec_copy["created_at"] |
| ) |
| rec_copy["last_seen"] = ( |
| rec_copy["last_seen"].isoformat() |
| if isinstance(rec_copy["last_seen"], datetime) |
| else rec_copy["last_seen"] |
| ) |
| _client_file(token).write_text(json.dumps(rec_copy)) |
| except Exception as e: |
| logger.warning(f"Failed to persist client {token[:8]}: {e}") |
|
|
| def _create_client(fingerprint: str) -> Dict: |
| rec = { |
| "token": secrets.token_hex(24), |
| "fingerprint": fingerprint, |
| "created_at": datetime.now(), |
| "last_seen": datetime.now(), |
| "job_count": 0, |
| "jobs": [], |
| } |
| _save_client(rec) |
| logger.info(f"New client {rec['token'][:8]}...") |
| return rec |
|
|
| def _fingerprint(req: Request) -> str: |
| data = ( |
| f"{req.client.host}:" |
| f"{req.headers.get('user-agent', '')}:" |
| f"{req.headers.get('accept', '')}" |
| ) |
| return hashlib.sha256(data.encode()).hexdigest() |
|
|
| def resolve_client(req: Request) -> Dict: |
| token = req.headers.get("X-Client-ID", "").strip() |
|
|
| if token: |
| rec = _load_client(token) |
| if rec: |
| rec["last_seen"] = datetime.now() |
| _save_client(rec) |
| return rec |
| fp = _fingerprint(req) |
| rec = { |
| "token": token, |
| "fingerprint": fp, |
| "created_at": datetime.now(), |
| "last_seen": datetime.now(), |
| "job_count": 0, |
| "jobs": [], |
| } |
| _save_client(rec) |
| logger.info(f"Restored client {token[:8]}... from header") |
| return rec |
|
|
| fp = _fingerprint(req) |
| for rec in CLIENT_TOKENS.values(): |
| if rec.get("fingerprint") == fp: |
| rec["last_seen"] = datetime.now() |
| _save_client(rec) |
| return rec |
|
|
| return _create_client(fp) |
|
|
| |
| |
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s | %(levelname)s | %(message)s", |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
| app = FastAPI( |
| title="ArchNemix Shorts Generator API", |
| version="5.1.0", |
| docs_url="/docs", |
| redoc_url="/redoc", |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=[ |
| "https://shortgenx.pages.dev", |
| "http://localhost:3000", |
| "http://127.0.0.1:3000", |
| "https://ytshortmakerarchx-archnemix-controller.hf.space", |
| ], |
| allow_credentials=True, |
| allow_methods=["GET", "POST", "DELETE", "OPTIONS", "HEAD"], |
| allow_headers=["*"], |
| expose_headers=["X-Client-ID"], |
| max_age=600, |
| ) |
|
|
| |
| |
| |
| class GenerateRequest(BaseModel): |
| audio_base64: str = Field(..., min_length=10) |
| subtitles_ass: str = Field(..., min_length=10) |
| background: str |
| duration: Optional[float] = Field(None, ge=1, le=180) |
| request_id: Optional[str] = None |
|
|
| |
| |
| |
| def validate_app_key(req: Request) -> bool: |
| if not APP_KEY or APP_KEY == "archnemix-secret-key-change-in-production": |
| return True |
| key = ( |
| req.headers.get("X-APP-KEY") |
| or req.headers.get("Authorization", "").replace("Bearer ", "") |
| ) |
| return key == APP_KEY |
|
|
| def validate_origin(req: Request) -> bool: |
| origin = req.headers.get("origin") or req.headers.get("referer", "") |
| if not origin: |
| return True |
| |
| allowed = [ |
| "https://shortgenx.pages.dev", |
| "http://localhost:3000", |
| "http://127.0.0.1:3000", |
| "https://ytshortmakerarchx-archnemix-controller.hf.space", |
| ] |
| |
| return origin in allowed |
|
|
| def rate_limit(key: str, limit: int, window: int = 3600) -> bool: |
| now = time.time() |
| RATE_LIMITS.setdefault(key, []) |
| RATE_LIMITS[key] = [t for t in RATE_LIMITS[key] if now - t < window] |
| if len(RATE_LIMITS[key]) >= limit: |
| return False |
| RATE_LIMITS[key].append(now) |
| return True |
|
|
| |
| |
| |
| def get_hf_api() -> HfApi: |
| if not HF_TOKEN: |
| raise ValueError("HF_TOKEN not configured") |
| return HfApi(token=HF_TOKEN) |
|
|
| async def list_videos_from_dataset(category: str = "minecraft") -> List[str]: |
| cache_key = f"videos_{category}" |
| if cache_key in VIDEO_CACHE: |
| ts, data = VIDEO_CACHE[cache_key] |
| if time.time() - ts < CACHE_TIMEOUT: |
| return data |
| try: |
| api = get_hf_api() |
| files = list(api.list_repo_files(repo_id=HF_DATASET, repo_type="dataset")) |
| result = [ |
| f.split("/")[-1].replace(".mp4", "") |
| for f in files |
| if f.startswith(f"{category}/") and f.endswith(".mp4") |
| ] |
| VIDEO_CACHE[cache_key] = (time.time(), result) |
| return result |
| except Exception as e: |
| logger.error(f"list_videos failed: {e}") |
| return [f"mc{i}" for i in range(1, 7)] |
|
|
| async def download_video_from_dataset(video_name: str, category: str = "minecraft") -> str: |
| video_name = video_name.replace(".mp4", "") |
| for path in [f"{category}/{video_name}.mp4", f"{video_name}.mp4"]: |
| try: |
| return hf_hub_download( |
| repo_id=HF_DATASET, filename=path, |
| repo_type="dataset", token=HF_TOKEN, |
| cache_dir="/tmp/hf_cache", |
| ) |
| except Exception: |
| continue |
| available = await list_videos_from_dataset(category) |
| raise HTTPException(404, f"Video '{video_name}' not found. Available: {available}") |
|
|
| |
| |
| |
| async def save_base64_audio(data: str, path: Path) -> int: |
| if data.startswith("data:audio/"): |
| data = data.split(",", 1)[1] |
| audio_bytes = base64.b64decode(data) |
| if len(audio_bytes) > 15 * 1024 * 1024: |
| raise ValueError("Audio too large (max 15 MB)") |
| async with aiofiles.open(path, "wb") as f: |
| await f.write(audio_bytes) |
| return len(audio_bytes) |
|
|
| async def save_subtitles(ass: str, path: Path): |
| if "[Script Info]" not in ass or "[Events]" not in ass: |
| raise ValueError("Invalid ASS subtitle format") |
| if len(ass.encode()) > 500 * 1024: |
| raise ValueError("Subtitles too large (max 500 KB)") |
| async with aiofiles.open(path, "w", encoding="utf-8") as f: |
| await f.write(ass) |
|
|
| def get_media_duration(path: Path) -> float: |
| try: |
| r = subprocess.run( |
| ["ffprobe", "-v", "error", |
| "-show_entries", "format=duration", |
| "-of", "default=noprint_wrappers=1:nokey=1", |
| str(path)], |
| capture_output=True, text=True, check=True, timeout=10, |
| ) |
| return float(r.stdout.strip()) |
| except Exception: |
| try: |
| return max(1.0, min(180.0, path.stat().st_size / 16000)) |
| except Exception: |
| return 30.0 |
|
|
| |
| |
| |
| def _make_job(job_id: str, client_token: str) -> Dict: |
| return { |
| "id": job_id, |
| "client_token": client_token, |
| "status": "queued", |
| "queue_position": None, |
| "progress": 0, |
| "message": "Waiting in queue...", |
| "created_at": datetime.now(), |
| "updated_at": datetime.now(), |
| "error": None, |
| "output_path": None, |
| "audio_path": None, |
| "subs_path": None, |
| "ffmpeg_frame": 0, |
| "ffmpeg_fps": 0.0, |
| "ffmpeg_out_time": 0.0, |
| "ffmpeg_speed": 0.0, |
| "total_duration": 0.0, |
| } |
|
|
| def create_job(job_id: str, client_token: str) -> Dict: |
| JOBS[job_id] = _make_job(job_id, client_token) |
| _persist_job(job_id) |
| return JOBS[job_id] |
|
|
| def update_job(job_id: str, **kwargs): |
| if job_id not in JOBS: |
| return |
| job = JOBS[job_id] |
| for k, v in kwargs.items(): |
| job[k] = v |
| job["updated_at"] = datetime.now() |
| _persist_job(job_id) |
|
|
| def _persist_job(job_id: str): |
| if job_id not in JOBS: |
| return |
| try: |
| data = { |
| k: (v.isoformat() if isinstance(v, datetime) else v) |
| for k, v in JOBS[job_id].items() |
| } |
| (JOBS_DIR / f"{job_id}.json").write_text(json.dumps(data)) |
| except Exception as e: |
| logger.error(f"persist_job failed: {e}") |
|
|
| def cleanup_old_jobs(): |
| now = datetime.now() |
| for job_id in list(JOBS): |
| job = JOBS[job_id] |
| age = (now - job["updated_at"]).total_seconds() |
| if age > 3600: |
| JOBS.pop(job_id, None) |
| _RUNNING_JOBS.discard(job_id) |
| try: |
| _JOB_QUEUE.remove(job_id) |
| except ValueError: |
| pass |
| for p in [ |
| JOBS_DIR / f"{job_id}.json", |
| AUDIO_DIR / f"{job_id}.mp3", |
| AUDIO_DIR / f"{job_id}.ass", |
| AUDIO_DIR / f"{job_id}_trimmed.mp4", |
| OUTPUT_DIR / f"{job_id}.mp4", |
| ]: |
| try: |
| p.unlink(missing_ok=True) |
| except Exception: |
| pass |
|
|
| |
| |
| |
| def queue_position(job_id: str) -> Optional[int]: |
| try: |
| idx = list(_JOB_QUEUE).index(job_id) |
| return idx + 1 |
| except ValueError: |
| return None |
|
|
| def queue_eta(position: int, job_duration: Optional[float] = None) -> Dict: |
| running_remaining = AVG_JOB_DURATION_S |
| for jid in _RUNNING_JOBS: |
| j = JOBS.get(jid, {}) |
| total = j.get("total_duration", 0) or AVG_JOB_DURATION_S |
| out = j.get("ffmpeg_out_time", 0) or 0 |
| remaining = max(0, total - out) |
| running_remaining = min(running_remaining, remaining) |
|
|
| queued_ahead = max(0, position - 1) |
| total_wait_s = running_remaining + queued_ahead * AVG_JOB_DURATION_S |
| own_duration = job_duration or AVG_JOB_DURATION_S |
| total_s = total_wait_s + own_duration |
|
|
| low_s = int(total_wait_s * 0.75) |
| high_s = int(total_wait_s * 1.2) |
|
|
| def _fmt(s: int) -> str: |
| if s < 60: |
| return f"{s}s" |
| m, sec = divmod(s, 60) |
| return f"{m}m {sec:02d}s" if sec else f"{m}m" |
|
|
| return { |
| "queue_position": position, |
| "jobs_ahead": position, |
| "wait_low_s": low_s, |
| "wait_high_s": high_s, |
| "wait_low_human": _fmt(low_s), |
| "wait_high_human": _fmt(high_s), |
| "total_with_own_s": int(total_s), |
| "message": ( |
| f"Position {position} in queue — " |
| f"roughly {_fmt(low_s)}–{_fmt(high_s)} wait, " |
| f"likely less" |
| ), |
| } |
|
|
| async def _queue_worker(): |
| while True: |
| await asyncio.sleep(2) |
|
|
| if len(_RUNNING_JOBS) >= MAX_CONCURRENT: |
| continue |
| if not _JOB_QUEUE: |
| continue |
|
|
| job_id = _JOB_QUEUE[0] |
| job = JOBS.get(job_id) |
| if not job or job["status"] in ("failed", "completed"): |
| _JOB_QUEUE.popleft() |
| continue |
|
|
| _JOB_QUEUE.popleft() |
| _RUNNING_JOBS.add(job_id) |
| _refresh_queue_positions() |
|
|
| audio_path = Path(job["audio_path"]) |
| subs_path = Path(job["subs_path"]) |
| background = job.get("background", "mc1") |
| duration = job.get("requested_duration") |
|
|
| asyncio.create_task( |
| _run_and_release(job_id, audio_path, subs_path, background, duration) |
| ) |
|
|
| async def _run_and_release(job_id, audio_path, subs_path, background, duration): |
| try: |
| await process_video_task(job_id, audio_path, subs_path, background, duration) |
| finally: |
| _RUNNING_JOBS.discard(job_id) |
| _refresh_queue_positions() |
|
|
| def _refresh_queue_positions(): |
| for i, jid in enumerate(_JOB_QUEUE): |
| if jid in JOBS: |
| pos = i + 1 |
| JOBS[jid]["queue_position"] = pos |
| JOBS[jid]["message"] = queue_eta(pos)["message"] |
| _persist_job(jid) |
|
|
| |
| |
| |
| _FFMPEG_PROCS: Dict[str, asyncio.subprocess.Process] = {} |
|
|
| def _kill_ffmpeg_for_job(job_id: str): |
| proc = _FFMPEG_PROCS.pop(job_id, None) |
| if proc and proc.returncode is None: |
| try: |
| proc.kill() |
| logger.info(f"Killed FFmpeg for job {job_id}") |
| except Exception: |
| pass |
|
|
| def _parse_ffmpeg_kv(line: str) -> Dict: |
| parts = line.strip().split("=", 1) |
| return {parts[0].strip(): parts[1].strip()} if len(parts) == 2 else {} |
|
|
| async def _stream_ffmpeg_progress( |
| proc: asyncio.subprocess.Process, |
| job_id: str, |
| total_duration: float, |
| progress_start: int = 50, |
| progress_end: int = 95, |
| ): |
| frame = fps = speed = 0 |
| out_time_s = 0.0 |
| last_update_time = time.time() |
|
|
| async for raw_line in proc.stderr: |
| line = raw_line.decode(errors="replace").strip() |
| if not line: |
| continue |
|
|
| kv = _parse_ffmpeg_kv(line) |
| if not kv: |
| continue |
| key, val = next(iter(kv.items())) |
|
|
| if key == "frame": |
| frame = int(val) if val.isdigit() else frame |
| elif key == "fps": |
| try: fps = float(val) |
| except ValueError: pass |
| elif key == "out_time_ms": |
| try: out_time_s = int(val) / 1_000_000 |
| except ValueError: pass |
| elif key == "out_time": |
| try: |
| parts = val.split(":") |
| if len(parts) == 3: |
| h, m, s = parts |
| out_time_s = int(h) * 3600 + int(m) * 60 + float(s) |
| except ValueError: pass |
| elif key == "speed": |
| try: |
| speed = float(val.replace("x", "")) |
| except ValueError: pass |
| elif key == "progress": |
| now = time.time() |
| if val == "end" or now - last_update_time >= 0.5: |
| last_update_time = now |
| ratio = min(1.0, out_time_s / total_duration) if total_duration > 0 else 0 |
| mapped = int(progress_start + ratio * (progress_end - progress_start)) |
| update_job( |
| job_id, |
| progress=mapped, |
| ffmpeg_frame=frame, |
| ffmpeg_fps=fps, |
| ffmpeg_out_time=round(out_time_s, 2), |
| ffmpeg_speed=speed, |
| total_duration=total_duration, |
| message=( |
| f"Encoding {out_time_s:.1f}s / {total_duration:.1f}s " |
| f"({speed:.1f}x speed)" |
| if total_duration > 0 and speed > 0 else "Encoding..." |
| ), |
| ) |
| if val == "end": |
| break |
| if JOBS.get(job_id, {}).get("status") == "failed": |
| proc.kill() |
| break |
|
|
| |
| |
| |
| async def process_video_task( |
| job_id: str, |
| audio_path: Path, |
| subs_path: Path, |
| background: str, |
| duration: Optional[float], |
| ): |
| try: |
| logger.info(f"Job {job_id}: starting encode") |
| update_job(job_id, status="processing", progress=10, |
| message="Downloading background video...", queue_position=None) |
|
|
| try: |
| |
| |
| |
| if background.startswith('ss'): |
| category = 'subwaysurfers' |
| elif background.startswith('mc'): |
| category = 'minecraft' |
| else: |
| category = 'minecraft' |
| |
| bg_path = await download_video_from_dataset(background, category) |
| except HTTPException as e: |
| update_job(job_id, status="failed", error=str(e.detail)) |
| return |
|
|
| update_job(job_id, progress=20, message="Analysing media...") |
|
|
| audio_dur = duration or get_media_duration(audio_path) |
| video_dur = get_media_duration(Path(bg_path)) |
|
|
| if not (1 <= audio_dur <= 180): |
| update_job(job_id, status="failed", error="Audio duration out of range (1-180 s)") |
| return |
|
|
| logger.info(f"Job {job_id}: audio={audio_dur:.2f}s bg={video_dur:.2f}s") |
|
|
| if video_dur > audio_dur: |
| max_start = video_dur - audio_dur |
| start = random.uniform(0, max_start) |
| bg_input_args = [ |
| "-ss", f"{start:.3f}", |
| "-t", f"{audio_dur:.3f}", |
| "-i", str(bg_path), |
| ] |
| logger.info(f"Job {job_id}: trim start={start:.2f}s") |
| else: |
| loop_n = int(audio_dur / video_dur) + 2 |
| bg_input_args = [ |
| "-stream_loop", str(loop_n), |
| "-i", str(bg_path), |
| ] |
| logger.info(f"Job {job_id}: loop x{loop_n}") |
|
|
| subs_str = str(subs_path).replace("\\", "/").replace(":", "\\:") |
| vf = ( |
| f"[0:v]" |
| f"ass='{subs_str}'," |
| f"scale=1080:1920:force_original_aspect_ratio=increase," |
| f"crop=1080:1920," |
| f"setsar=1" |
| f"[v]" |
| ) |
|
|
| output_path = OUTPUT_DIR / f"{job_id}.mp4" |
|
|
| final_cmd = [ |
| "ffmpeg", "-y", |
| "-hide_banner", |
| "-loglevel", "error", |
| *bg_input_args, |
| "-i", str(audio_path), |
| "-filter_complex", vf, |
| "-map", "[v]", |
| "-map", "1:a", |
| "-c:v", "libx264", |
| "-preset", "medium", |
| "-crf", "16", |
| "-minrate", "8M", |
| "-maxrate", "16M", |
| "-bufsize", "16M", |
| "-profile:v", "high", |
| "-level:v", "4.2", |
| "-pix_fmt", "yuv420p", |
| "-c:a", "aac", |
| "-b:a", "192k", |
| "-shortest", |
| "-movflags", "+faststart", |
| "-threads", "0", |
| "-progress", "pipe:2", |
| "-t", f"{audio_dur:.3f}", |
| str(output_path), |
| ] |
|
|
| update_job(job_id, progress=40, message="Encoding... (single-pass)", total_duration=audio_dur) |
|
|
| timeout = max(480, int(audio_dur * 5) + 120) |
|
|
| try: |
| encode_proc = await asyncio.create_subprocess_exec( |
| *final_cmd, |
| stdout=asyncio.subprocess.DEVNULL, |
| stderr=asyncio.subprocess.PIPE, |
| ) |
| _FFMPEG_PROCS[job_id] = encode_proc |
|
|
| await asyncio.wait_for( |
| _stream_ffmpeg_progress(encode_proc, job_id, audio_dur, |
| progress_start=40, progress_end=95), |
| timeout=timeout, |
| ) |
| await asyncio.wait_for(encode_proc.wait(), timeout=30) |
| _FFMPEG_PROCS.pop(job_id, None) |
|
|
| if encode_proc.returncode != 0: |
| raise RuntimeError(f"FFmpeg exited with code {encode_proc.returncode}") |
|
|
| except asyncio.TimeoutError: |
| _kill_ffmpeg_for_job(job_id) |
| update_job(job_id, status="failed", error="Encoding timed out — try a shorter script") |
| return |
| except Exception as e: |
| _kill_ffmpeg_for_job(job_id) |
| if JOBS.get(job_id, {}).get("status") == "failed": |
| return |
| update_job(job_id, status="failed", error=f"Encoding failed: {str(e)[:200]}") |
| return |
|
|
| if not output_path.exists() or output_path.stat().st_size == 0: |
| update_job(job_id, status="failed", error="Output file missing after encode") |
| return |
|
|
| size_mb = output_path.stat().st_size / 1024 / 1024 |
| logger.info(f"Job {job_id}: done {size_mb:.2f} MB") |
|
|
| for p in [audio_path, subs_path]: |
| try: |
| p.unlink(missing_ok=True) |
| except Exception: |
| pass |
|
|
| update_job( |
| job_id, |
| status="completed", |
| progress=100, |
| message=f"Done! {size_mb:.1f} MB", |
| output_path=str(output_path), |
| ) |
|
|
| except Exception: |
| logger.exception(f"process_video_task unhandled for {job_id}") |
| update_job(job_id, status="failed", error="Unexpected server error") |
|
|
| async def _cleanup_old_videos(): |
| """Deletes videos older than 1 hour every 10 minutes""" |
| while True: |
| await asyncio.sleep(600) |
| |
| try: |
| cutoff_time = time.time() - 3600 |
| cleaned_count = 0 |
| cleaned_size_mb = 0.0 |
| |
| for video_file in OUTPUT_DIR.glob("*.mp4"): |
| try: |
| file_stat = video_file.stat() |
| if file_stat.st_mtime < cutoff_time: |
| file_size_mb = file_stat.st_size / 1024 / 1024 |
| video_file.unlink() |
| cleaned_count += 1 |
| cleaned_size_mb += file_size_mb |
| logger.info(f"Cleaned up {video_file.name} ({file_size_mb:.1f}MB)") |
| except Exception as e: |
| logger.warning(f"Cleanup failed {video_file.name}: {e}") |
| |
| if cleaned_count > 0: |
| logger.info(f"Cleaned {cleaned_count} videos, freed {cleaned_size_mb:.1f}MB") |
| except Exception as e: |
| logger.error(f"Cleanup error: {e}") |
|
|
| |
| |
| |
| def _delete_output_file(job_id: str, output_path: Path): |
| """ |
| Delete the video file after it has been sent to the user. |
| Called as a background task from /download so /tmp stays clean. |
| """ |
| try: |
| output_path.unlink(missing_ok=True) |
| if job_id in JOBS: |
| JOBS[job_id]["output_path"] = None |
| JOBS[job_id]["message"] = "Downloaded and cleaned up" |
| _persist_job(job_id) |
| logger.info(f"Cleaned up {job_id} after download") |
| except Exception as e: |
| logger.warning(f"Cleanup failed for {job_id}: {e}") |
|
|
| |
| |
| |
|
|
| @app.get("/") |
| async def root(): |
| return { |
| "name": "ArchNemix Shorts Generator API", |
| "version": "5.1.0", |
| "queue": { |
| "max_concurrent": MAX_CONCURRENT, |
| "avg_job_duration_s": AVG_JOB_DURATION_S, |
| }, |
| "endpoints": { |
| "register": "GET /register", |
| "generate": "POST /generate", |
| "cancel": "DELETE /job/{id}", |
| "job_status": "GET /job/{id}", |
| "queue": "GET /queue", |
| "stream": "GET /stream/{id} — inline video playback", |
| "download": "GET /download/{id} — file download (deletes after)", |
| "delete": "DELETE /video/{id}", |
| "videos": "GET /videos/{category}", |
| "health": "GET /health", |
| }, |
| } |
|
|
| @app.get("/register") |
| async def register_client(req: Request): |
| client = resolve_client(req) |
| return JSONResponse( |
| { |
| "client_token": client["token"], |
| "is_new": client["job_count"] == 0, |
| "job_count": client["job_count"], |
| "instructions": ( |
| "Store client_token in localStorage as 'archx_client_id'. " |
| "Send it as the X-Client-ID header on every request." |
| ), |
| }, |
| headers={"X-Client-ID": client["token"]}, |
| ) |
|
|
| @app.get("/health") |
| async def health(): |
| try: |
| videos = await list_videos_from_dataset("minecraft") |
| return { |
| "status": "healthy", |
| "timestamp": datetime.now().isoformat(), |
| "videos": len(videos), |
| "running": len(_RUNNING_JOBS), |
| "queued": len(_JOB_QUEUE), |
| "active_jobs": len([j for j in JOBS.values() if j["status"] == "processing"]), |
| } |
| except Exception as e: |
| return JSONResponse({"status": "degraded", "error": str(e)}, status_code=503) |
|
|
| @app.get("/queue") |
| async def queue_status(): |
| running = len(_RUNNING_JOBS) |
| waiting = len(_JOB_QUEUE) |
| return { |
| "running": running, |
| "waiting": waiting, |
| "total_active": running + waiting, |
| "avg_job_s": AVG_JOB_DURATION_S, |
| "estimated_wait_s": waiting * AVG_JOB_DURATION_S if running == 0 else |
| (waiting + 1) * AVG_JOB_DURATION_S, |
| "message": ( |
| "No jobs running — submit yours now!" if (running + waiting) == 0 else |
| f"{running} running, {waiting} in queue" |
| ), |
| } |
|
|
| @app.get("/videos/{category}") |
| async def list_videos(category: str = "minecraft"): |
| try: |
| |
| category_map = { |
| "minecraft": "minecraft", |
| "subwaysurfers": "subwaysurfers", |
| "ss": "subwaysurfers", |
| } |
| dataset_category = category_map.get(category.lower(), category.lower()) |
| videos = await list_videos_from_dataset(dataset_category) |
| |
| |
| if not videos: |
| if dataset_category == "subwaysurfers": |
| videos = [f"ss{i}" for i in range(1, 6)] |
| else: |
| videos = [f"mc{i}" for i in range(1, 7)] |
| |
| return {"category": category, "videos": videos, "count": len(videos)} |
| except Exception as e: |
| |
| if category.lower() in ["subwaysurfers", "ss"]: |
| videos = [f"ss{i}" for i in range(1, 6)] |
| else: |
| videos = [f"mc{i}" for i in range(1, 7)] |
| return {"category": category, "videos": videos, "count": len(videos), "error": str(e)} |
|
|
| @app.post("/generate") |
| async def generate_video( |
| req: GenerateRequest, |
| background_tasks: BackgroundTasks, |
| http_req: Request, |
| ): |
| if not validate_app_key(http_req): |
| raise HTTPException(403, "Invalid API key") |
| if not validate_origin(http_req): |
| raise HTTPException(403, "Invalid origin") |
|
|
| client = resolve_client(http_req) |
| token = client["token"] |
|
|
| if not rate_limit(f"generate:{token}", GENERATE_LIMIT): |
| raise HTTPException(429, f"Rate limit: max {GENERATE_LIMIT} requests/hour") |
|
|
| cleanup_old_jobs() |
|
|
| active_for_client = [ |
| j for j in JOBS.values() |
| if j.get("client_token") == token |
| and j["status"] in ("queued", "processing") |
| ] |
| if active_for_client: |
| existing_id = active_for_client[0]["id"] |
| raise HTTPException( |
| 409, |
| f"You already have an active job ({existing_id}). " |
| "Wait for it to finish or cancel it first.", |
| ) |
|
|
| job_id = str(uuid.uuid4()) |
| audio_path = AUDIO_DIR / f"{job_id}.mp3" |
| subs_path = AUDIO_DIR / f"{job_id}.ass" |
|
|
| create_job(job_id, token) |
|
|
| try: |
| await save_base64_audio(req.audio_base64, audio_path) |
| await save_subtitles(req.subtitles_ass, subs_path) |
| except ValueError as e: |
| update_job(job_id, status="failed", error=str(e)) |
| raise HTTPException(400, str(e)) |
|
|
| update_job( |
| job_id, |
| audio_path=str(audio_path), |
| subs_path=str(subs_path), |
| background=req.background, |
| requested_duration=req.duration, |
| ) |
|
|
| client["job_count"] += 1 |
| client["jobs"].append(job_id) |
| if len(client["jobs"]) > 50: |
| client["jobs"] = client["jobs"][-50:] |
| _save_client(client) |
|
|
| position = len(_JOB_QUEUE) + 1 if _RUNNING_JOBS else 0 |
| _JOB_QUEUE.append(job_id) |
| _refresh_queue_positions() |
|
|
| if position == 0: |
| eta = {"message": "Starting very soon...", "wait_low_s": 0, "wait_high_s": 30} |
| else: |
| eta = queue_eta(position, req.duration) |
|
|
| return JSONResponse( |
| { |
| "job_id": job_id, |
| "client_token": token, |
| "status": "queued", |
| "queue_position": position if position > 0 else 1, |
| "queue": eta, |
| "check_status": f"/job/{job_id}", |
| }, |
| headers={"X-Client-ID": token}, |
| ) |
|
|
| @app.get("/job/{job_id}") |
| async def job_status(job_id: str, http_req: Request): |
| if not rate_limit(f"status:{job_id}", STATUS_LIMIT): |
| return JSONResponse({"job_id": job_id, "status": "rate_limited"}) |
|
|
| if job_id not in JOBS: |
| job_file = JOBS_DIR / f"{job_id}.json" |
| if job_file.exists(): |
| try: |
| return JSONResponse(json.loads(job_file.read_text())) |
| except Exception: |
| pass |
| raise HTTPException(404, "Job not found") |
|
|
| job = JOBS[job_id] |
|
|
| if (datetime.now() - job["updated_at"]).total_seconds() > JOB_TIMEOUT: |
| if job["status"] not in ("completed", "failed"): |
| update_job(job_id, status="failed", error="Job timed out") |
|
|
| pos = queue_position(job_id) |
|
|
| resp: Dict = { |
| "job_id": job["id"], |
| "status": job["status"], |
| "progress": job["progress"], |
| "message": job["message"], |
| "queue_position": pos, |
| "queue": queue_eta(pos) if pos else None, |
| "created_at": job["created_at"].isoformat() if isinstance(job["created_at"], datetime) else job["created_at"], |
| "updated_at": job["updated_at"].isoformat() if isinstance(job["updated_at"], datetime) else job["updated_at"], |
| "encode_stats": { |
| "frame": job.get("ffmpeg_frame", 0), |
| "fps": job.get("ffmpeg_fps", 0.0), |
| "out_time_s": job.get("ffmpeg_out_time", 0.0), |
| "total_s": job.get("total_duration", 0.0), |
| "speed": job.get("ffmpeg_speed", 0.0), |
| }, |
| } |
|
|
| |
| if job["status"] == "completed" and job.get("output_path"): |
| resp["download_url"] = f"/download/{job_id}" |
| resp["stream_url"] = f"/stream/{job_id}" |
|
|
| if job.get("error"): |
| resp["error"] = job["error"] |
|
|
| return JSONResponse(resp) |
|
|
|
|
| @app.get("/stream/{job_id}") |
| async def stream_video(job_id: str): |
| """ |
| Serve the video for inline browser playback. |
| Frontend uses this as <video src="..."> so user can watch immediately. |
| File is NOT deleted after streaming — user may rewatch or seek. |
| """ |
| output_path = OUTPUT_DIR / f"{job_id}.mp4" |
| if not output_path.exists(): |
| raise HTTPException(404, "Video not found or not ready") |
| if job_id in JOBS and JOBS[job_id]["status"] != "completed": |
| raise HTTPException(400, "Video not ready yet") |
|
|
| return FileResponse( |
| output_path, |
| media_type="video/mp4", |
| headers={ |
| "Cache-Control": "no-cache", |
| "Accept-Ranges": "bytes", |
| }, |
| ) |
|
|
|
|
| @app.get("/download/{job_id}") |
| async def download_video(job_id: str, background_tasks: BackgroundTasks): |
| """ |
| Serve the video as a file download. |
| Deletes the file from /tmp after sending so disk stays clean. |
| """ |
| output_path = OUTPUT_DIR / f"{job_id}.mp4" |
| if not output_path.exists(): |
| raise HTTPException(404, "Video not found or not ready") |
| if job_id in JOBS and JOBS[job_id]["status"] != "completed": |
| raise HTTPException(400, "Video not ready yet") |
|
|
| |
| background_tasks.add_task(_delete_output_file, job_id, output_path) |
|
|
| return FileResponse( |
| output_path, |
| media_type="video/mp4", |
| filename=f"archnemix-short-{job_id[:8]}.mp4", |
| headers={ |
| "Content-Disposition": f'attachment; filename="archnemix-short-{job_id[:8]}.mp4"', |
| "Cache-Control": "no-cache", |
| }, |
| ) |
|
|
|
|
| @app.delete("/video/{job_id}") |
| async def delete_video(job_id: str, http_req: Request): |
| if not validate_app_key(http_req): |
| raise HTTPException(403, "Invalid API key") |
|
|
| output_path = OUTPUT_DIR / f"{job_id}.mp4" |
| removed = False |
| if output_path.exists(): |
| try: |
| output_path.unlink() |
| removed = True |
| logger.info(f"Deleted {job_id}") |
| except Exception as e: |
| raise HTTPException(500, f"Delete failed: {e}") |
|
|
| if job_id in JOBS: |
| JOBS[job_id]["output_path"] = None |
| JOBS[job_id]["message"] = "Video deleted by user" |
| _persist_job(job_id) |
|
|
| return {"job_id": job_id, "deleted": removed} |
|
|
|
|
| @app.delete("/job/{job_id}") |
| async def cancel_job(job_id: str, http_req: Request): |
| """Called by controller when frontend goes silent.""" |
| if not validate_app_key(http_req): |
| raise HTTPException(403, "Invalid API key") |
|
|
| if job_id not in JOBS: |
| return {"job_id": job_id, "cancelled": False, "reason": "not found"} |
|
|
| job = JOBS[job_id] |
| if job["status"] in ("completed", "failed"): |
| return {"job_id": job_id, "cancelled": False, "reason": job["status"]} |
|
|
| _kill_ffmpeg_for_job(job_id) |
| _RUNNING_JOBS.discard(job_id) |
| try: |
| _JOB_QUEUE.remove(job_id) |
| except ValueError: |
| pass |
|
|
| |
| output_path = OUTPUT_DIR / f"{job_id}.mp4" |
| output_path.unlink(missing_ok=True) |
|
|
| update_job(job_id, status="failed", error="Cancelled by controller — client disconnected") |
| _refresh_queue_positions() |
|
|
| logger.info(f"Job {job_id} cancelled via controller request") |
| return {"job_id": job_id, "cancelled": True} |
|
|
|
|
| @app.get("/debug/dataset") |
| async def debug_dataset(): |
| try: |
| api = get_hf_api() |
| files = list(api.list_repo_files(repo_id=HF_DATASET, repo_type="dataset")) |
| mc = [f for f in files if "minecraft" in f.lower() and f.endswith(".mp4")] |
| return {"dataset": HF_DATASET, "total_files": len(files), "minecraft": mc} |
| except Exception as e: |
| return JSONResponse({"status": "error", "error": str(e)}, status_code=500) |
|
|
| |
| |
| |
| @app.on_event("startup") |
| async def startup(): |
| global _QUEUE_LOCK |
| _QUEUE_LOCK = asyncio.Lock() |
|
|
| logger.info("=" * 60) |
| logger.info("ArchNemix Shorts Generator API v5.1.0") |
| logger.info(f"Dataset : {HF_DATASET}") |
| logger.info(f"Concurrent : {MAX_CONCURRENT}") |
| logger.info(f"Avg job : {AVG_JOB_DURATION_S}s ({AVG_JOB_DURATION_S // 60}m)") |
| logger.info("Quality : CRF 16 · medium · 8-16 Mbps · AAC 192k") |
| logger.info("Video path : stream from /tmp, delete on download") |
| logger.info("=" * 60) |
|
|
| asyncio.create_task(_queue_worker()) |
| asyncio.create_task(_cleanup_old_videos()) |
|
|
| try: |
| vids = await list_videos_from_dataset("minecraft") |
| logger.info(f"{len(vids)} videos cached") |
| except Exception as e: |
| logger.warning(f"Video cache warm-up: {e}") |
|
|
| cleanup_old_jobs() |
| logger.info("Ready") |
|
|
|
|
| @app.on_event("shutdown") |
| async def shutdown(): |
| logger.info("Shutdown — cleaning jobs") |
| cleanup_old_jobs() |