ffmpeg-python / main.py
YTShortMakerArchx's picture
Update main.py
a0062ef verified
import os
import uuid
import time
import subprocess
import base64
import hashlib
import json
import random
import asyncio
import aiofiles
import logging
import secrets
from collections import deque
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Dict, List
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from fastapi.responses import FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from huggingface_hub import HfApi, hf_hub_download
from pydantic import BaseModel, Field
# =============================================================================
# CONFIG
# =============================================================================
HF_DATASET = os.getenv("HF_DATASET", "YTShortMakerArchx/BG_VIDS_ARCHX_YT")
HF_TOKEN = os.getenv("HF_TOKEN")
APP_KEY = os.getenv("APP_KEY")
UI_ORIGIN = os.getenv("UI_ORIGIN", "http://shortgenx.pages.dev")
BASE_DIR = Path("/tmp")
AUDIO_DIR = BASE_DIR / "audio"
OUTPUT_DIR = BASE_DIR / "out"
JOBS_DIR = BASE_DIR / "jobs"
CLIENT_DIR = BASE_DIR / "clients"
for d in [AUDIO_DIR, OUTPUT_DIR, JOBS_DIR, CLIENT_DIR]:
d.mkdir(parents=True, exist_ok=True)
# Rate limiting
RATE_LIMITS: Dict[str, list] = {}
GENERATE_LIMIT = 3 # per hour per client
STATUS_LIMIT = 1000 # per hour per job
# Job timeout
JOBS: Dict[str, Dict] = {}
JOB_TIMEOUT = 900 # 15 min hard cap
# Video listing cache
VIDEO_CACHE: Dict[str, tuple] = {}
CACHE_TIMEOUT = 300
# =============================================================================
# QUEUE SYSTEM
# =============================================================================
MAX_CONCURRENT = 1
AVG_JOB_DURATION_S = 360 # 6 minutes per job
_JOB_QUEUE: deque = deque()
_RUNNING_JOBS: set = set()
_QUEUE_LOCK: asyncio.Lock = None # created at startup
# =============================================================================
# CLIENT ID SYSTEM
# =============================================================================
CLIENT_TOKENS: Dict[str, Dict] = {}
def _client_file(token: str) -> Path:
return CLIENT_DIR / f"{token[:8]}.json"
def _load_client(token: str) -> Optional[Dict]:
if token in CLIENT_TOKENS:
return CLIENT_TOKENS[token]
path = _client_file(token)
if path.exists():
try:
rec = json.loads(path.read_text())
CLIENT_TOKENS[token] = rec
return rec
except Exception:
pass
return None
def _save_client(rec: Dict):
token = rec["token"]
CLIENT_TOKENS[token] = rec
try:
rec_copy = rec.copy()
rec_copy["created_at"] = (
rec_copy["created_at"].isoformat()
if isinstance(rec_copy["created_at"], datetime)
else rec_copy["created_at"]
)
rec_copy["last_seen"] = (
rec_copy["last_seen"].isoformat()
if isinstance(rec_copy["last_seen"], datetime)
else rec_copy["last_seen"]
)
_client_file(token).write_text(json.dumps(rec_copy))
except Exception as e:
logger.warning(f"Failed to persist client {token[:8]}: {e}")
def _create_client(fingerprint: str) -> Dict:
rec = {
"token": secrets.token_hex(24),
"fingerprint": fingerprint,
"created_at": datetime.now(),
"last_seen": datetime.now(),
"job_count": 0,
"jobs": [],
}
_save_client(rec)
logger.info(f"New client {rec['token'][:8]}...")
return rec
def _fingerprint(req: Request) -> str:
data = (
f"{req.client.host}:"
f"{req.headers.get('user-agent', '')}:"
f"{req.headers.get('accept', '')}"
)
return hashlib.sha256(data.encode()).hexdigest()
def resolve_client(req: Request) -> Dict:
token = req.headers.get("X-Client-ID", "").strip()
if token:
rec = _load_client(token)
if rec:
rec["last_seen"] = datetime.now()
_save_client(rec)
return rec
fp = _fingerprint(req)
rec = {
"token": token,
"fingerprint": fp,
"created_at": datetime.now(),
"last_seen": datetime.now(),
"job_count": 0,
"jobs": [],
}
_save_client(rec)
logger.info(f"Restored client {token[:8]}... from header")
return rec
fp = _fingerprint(req)
for rec in CLIENT_TOKENS.values():
if rec.get("fingerprint") == fp:
rec["last_seen"] = datetime.now()
_save_client(rec)
return rec
return _create_client(fp)
# =============================================================================
# LOGGING
# =============================================================================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s",
)
logger = logging.getLogger(__name__)
# =============================================================================
# APP
# =============================================================================
app = FastAPI(
title="ArchNemix Shorts Generator API",
version="5.1.0",
docs_url="/docs",
redoc_url="/redoc",
)
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://shortgenx.pages.dev", # Your website
"http://localhost:3000", # Local dev only port 3000
"http://127.0.0.1:3000", # Local dev only port 3000
"https://ytshortmakerarchx-archnemix-controller.hf.space", # Controller space
],
allow_credentials=True,
allow_methods=["GET", "POST", "DELETE", "OPTIONS", "HEAD"],
allow_headers=["*"],
expose_headers=["X-Client-ID"],
max_age=600,
)
# =============================================================================
# MODELS
# =============================================================================
class GenerateRequest(BaseModel):
audio_base64: str = Field(..., min_length=10)
subtitles_ass: str = Field(..., min_length=10)
background: str
duration: Optional[float] = Field(None, ge=1, le=180)
request_id: Optional[str] = None
# =============================================================================
# SECURITY
# =============================================================================
def validate_app_key(req: Request) -> bool:
if not APP_KEY or APP_KEY == "archnemix-secret-key-change-in-production":
return True
key = (
req.headers.get("X-APP-KEY")
or req.headers.get("Authorization", "").replace("Bearer ", "")
)
return key == APP_KEY
def validate_origin(req: Request) -> bool:
origin = req.headers.get("origin") or req.headers.get("referer", "")
if not origin:
return True
# Strict allowed origins - exact match only
allowed = [
"https://shortgenx.pages.dev",
"http://localhost:3000",
"http://127.0.0.1:3000",
"https://ytshortmakerarchx-archnemix-controller.hf.space",
]
# Use exact match, not prefix matching
return origin in allowed
def rate_limit(key: str, limit: int, window: int = 3600) -> bool:
now = time.time()
RATE_LIMITS.setdefault(key, [])
RATE_LIMITS[key] = [t for t in RATE_LIMITS[key] if now - t < window]
if len(RATE_LIMITS[key]) >= limit:
return False
RATE_LIMITS[key].append(now)
return True
# =============================================================================
# DATASET HELPERS
# =============================================================================
def get_hf_api() -> HfApi:
if not HF_TOKEN:
raise ValueError("HF_TOKEN not configured")
return HfApi(token=HF_TOKEN)
async def list_videos_from_dataset(category: str = "minecraft") -> List[str]:
cache_key = f"videos_{category}"
if cache_key in VIDEO_CACHE:
ts, data = VIDEO_CACHE[cache_key]
if time.time() - ts < CACHE_TIMEOUT:
return data
try:
api = get_hf_api()
files = list(api.list_repo_files(repo_id=HF_DATASET, repo_type="dataset"))
result = [
f.split("/")[-1].replace(".mp4", "")
for f in files
if f.startswith(f"{category}/") and f.endswith(".mp4")
]
VIDEO_CACHE[cache_key] = (time.time(), result)
return result
except Exception as e:
logger.error(f"list_videos failed: {e}")
return [f"mc{i}" for i in range(1, 7)]
async def download_video_from_dataset(video_name: str, category: str = "minecraft") -> str:
video_name = video_name.replace(".mp4", "")
for path in [f"{category}/{video_name}.mp4", f"{video_name}.mp4"]:
try:
return hf_hub_download(
repo_id=HF_DATASET, filename=path,
repo_type="dataset", token=HF_TOKEN,
cache_dir="/tmp/hf_cache",
)
except Exception:
continue
available = await list_videos_from_dataset(category)
raise HTTPException(404, f"Video '{video_name}' not found. Available: {available}")
# =============================================================================
# FILE HELPERS
# =============================================================================
async def save_base64_audio(data: str, path: Path) -> int:
if data.startswith("data:audio/"):
data = data.split(",", 1)[1]
audio_bytes = base64.b64decode(data)
if len(audio_bytes) > 15 * 1024 * 1024:
raise ValueError("Audio too large (max 15 MB)")
async with aiofiles.open(path, "wb") as f:
await f.write(audio_bytes)
return len(audio_bytes)
async def save_subtitles(ass: str, path: Path):
if "[Script Info]" not in ass or "[Events]" not in ass:
raise ValueError("Invalid ASS subtitle format")
if len(ass.encode()) > 500 * 1024:
raise ValueError("Subtitles too large (max 500 KB)")
async with aiofiles.open(path, "w", encoding="utf-8") as f:
await f.write(ass)
def get_media_duration(path: Path) -> float:
try:
r = subprocess.run(
["ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
str(path)],
capture_output=True, text=True, check=True, timeout=10,
)
return float(r.stdout.strip())
except Exception:
try:
return max(1.0, min(180.0, path.stat().st_size / 16000))
except Exception:
return 30.0
# =============================================================================
# JOB MANAGEMENT
# =============================================================================
def _make_job(job_id: str, client_token: str) -> Dict:
return {
"id": job_id,
"client_token": client_token,
"status": "queued",
"queue_position": None,
"progress": 0,
"message": "Waiting in queue...",
"created_at": datetime.now(),
"updated_at": datetime.now(),
"error": None,
"output_path": None,
"audio_path": None,
"subs_path": None,
"ffmpeg_frame": 0,
"ffmpeg_fps": 0.0,
"ffmpeg_out_time": 0.0,
"ffmpeg_speed": 0.0,
"total_duration": 0.0,
}
def create_job(job_id: str, client_token: str) -> Dict:
JOBS[job_id] = _make_job(job_id, client_token)
_persist_job(job_id)
return JOBS[job_id]
def update_job(job_id: str, **kwargs):
if job_id not in JOBS:
return
job = JOBS[job_id]
for k, v in kwargs.items():
job[k] = v
job["updated_at"] = datetime.now()
_persist_job(job_id)
def _persist_job(job_id: str):
if job_id not in JOBS:
return
try:
data = {
k: (v.isoformat() if isinstance(v, datetime) else v)
for k, v in JOBS[job_id].items()
}
(JOBS_DIR / f"{job_id}.json").write_text(json.dumps(data))
except Exception as e:
logger.error(f"persist_job failed: {e}")
def cleanup_old_jobs():
now = datetime.now()
for job_id in list(JOBS):
job = JOBS[job_id]
age = (now - job["updated_at"]).total_seconds()
if age > 3600:
JOBS.pop(job_id, None)
_RUNNING_JOBS.discard(job_id)
try:
_JOB_QUEUE.remove(job_id)
except ValueError:
pass
for p in [
JOBS_DIR / f"{job_id}.json",
AUDIO_DIR / f"{job_id}.mp3",
AUDIO_DIR / f"{job_id}.ass",
AUDIO_DIR / f"{job_id}_trimmed.mp4",
OUTPUT_DIR / f"{job_id}.mp4",
]:
try:
p.unlink(missing_ok=True)
except Exception:
pass
# =============================================================================
# QUEUE HELPERS
# =============================================================================
def queue_position(job_id: str) -> Optional[int]:
try:
idx = list(_JOB_QUEUE).index(job_id)
return idx + 1
except ValueError:
return None
def queue_eta(position: int, job_duration: Optional[float] = None) -> Dict:
running_remaining = AVG_JOB_DURATION_S
for jid in _RUNNING_JOBS:
j = JOBS.get(jid, {})
total = j.get("total_duration", 0) or AVG_JOB_DURATION_S
out = j.get("ffmpeg_out_time", 0) or 0
remaining = max(0, total - out)
running_remaining = min(running_remaining, remaining)
queued_ahead = max(0, position - 1)
total_wait_s = running_remaining + queued_ahead * AVG_JOB_DURATION_S
own_duration = job_duration or AVG_JOB_DURATION_S
total_s = total_wait_s + own_duration
low_s = int(total_wait_s * 0.75)
high_s = int(total_wait_s * 1.2)
def _fmt(s: int) -> str:
if s < 60:
return f"{s}s"
m, sec = divmod(s, 60)
return f"{m}m {sec:02d}s" if sec else f"{m}m"
return {
"queue_position": position,
"jobs_ahead": position,
"wait_low_s": low_s,
"wait_high_s": high_s,
"wait_low_human": _fmt(low_s),
"wait_high_human": _fmt(high_s),
"total_with_own_s": int(total_s),
"message": (
f"Position {position} in queue — "
f"roughly {_fmt(low_s)}{_fmt(high_s)} wait, "
f"likely less"
),
}
async def _queue_worker():
while True:
await asyncio.sleep(2)
if len(_RUNNING_JOBS) >= MAX_CONCURRENT:
continue
if not _JOB_QUEUE:
continue
job_id = _JOB_QUEUE[0]
job = JOBS.get(job_id)
if not job or job["status"] in ("failed", "completed"):
_JOB_QUEUE.popleft()
continue
_JOB_QUEUE.popleft()
_RUNNING_JOBS.add(job_id)
_refresh_queue_positions()
audio_path = Path(job["audio_path"])
subs_path = Path(job["subs_path"])
background = job.get("background", "mc1")
duration = job.get("requested_duration")
asyncio.create_task(
_run_and_release(job_id, audio_path, subs_path, background, duration)
)
async def _run_and_release(job_id, audio_path, subs_path, background, duration):
try:
await process_video_task(job_id, audio_path, subs_path, background, duration)
finally:
_RUNNING_JOBS.discard(job_id)
_refresh_queue_positions()
def _refresh_queue_positions():
for i, jid in enumerate(_JOB_QUEUE):
if jid in JOBS:
pos = i + 1
JOBS[jid]["queue_position"] = pos
JOBS[jid]["message"] = queue_eta(pos)["message"]
_persist_job(jid)
# =============================================================================
# FFMPEG PROCESS MANAGEMENT
# =============================================================================
_FFMPEG_PROCS: Dict[str, asyncio.subprocess.Process] = {}
def _kill_ffmpeg_for_job(job_id: str):
proc = _FFMPEG_PROCS.pop(job_id, None)
if proc and proc.returncode is None:
try:
proc.kill()
logger.info(f"Killed FFmpeg for job {job_id}")
except Exception:
pass
def _parse_ffmpeg_kv(line: str) -> Dict:
parts = line.strip().split("=", 1)
return {parts[0].strip(): parts[1].strip()} if len(parts) == 2 else {}
async def _stream_ffmpeg_progress(
proc: asyncio.subprocess.Process,
job_id: str,
total_duration: float,
progress_start: int = 50,
progress_end: int = 95,
):
frame = fps = speed = 0
out_time_s = 0.0
last_update_time = time.time()
async for raw_line in proc.stderr:
line = raw_line.decode(errors="replace").strip()
if not line:
continue
kv = _parse_ffmpeg_kv(line)
if not kv:
continue
key, val = next(iter(kv.items()))
if key == "frame":
frame = int(val) if val.isdigit() else frame
elif key == "fps":
try: fps = float(val)
except ValueError: pass
elif key == "out_time_ms":
try: out_time_s = int(val) / 1_000_000
except ValueError: pass
elif key == "out_time":
try:
parts = val.split(":")
if len(parts) == 3:
h, m, s = parts
out_time_s = int(h) * 3600 + int(m) * 60 + float(s)
except ValueError: pass
elif key == "speed":
try:
speed = float(val.replace("x", ""))
except ValueError: pass
elif key == "progress":
now = time.time()
if val == "end" or now - last_update_time >= 0.5:
last_update_time = now
ratio = min(1.0, out_time_s / total_duration) if total_duration > 0 else 0
mapped = int(progress_start + ratio * (progress_end - progress_start))
update_job(
job_id,
progress=mapped,
ffmpeg_frame=frame,
ffmpeg_fps=fps,
ffmpeg_out_time=round(out_time_s, 2),
ffmpeg_speed=speed,
total_duration=total_duration,
message=(
f"Encoding {out_time_s:.1f}s / {total_duration:.1f}s "
f"({speed:.1f}x speed)"
if total_duration > 0 and speed > 0 else "Encoding..."
),
)
if val == "end":
break
if JOBS.get(job_id, {}).get("status") == "failed":
proc.kill()
break
# =============================================================================
# CORE VIDEO PROCESSING
# =============================================================================
async def process_video_task(
job_id: str,
audio_path: Path,
subs_path: Path,
background: str,
duration: Optional[float],
):
try:
logger.info(f"Job {job_id}: starting encode")
update_job(job_id, status="processing", progress=10,
message="Downloading background video...", queue_position=None)
try:
# Auto-detect category from video name
# ss1, ss2, ss3, ss4, ss5 → subwaysurfers
# mc1, mc2, etc. → minecraft
if background.startswith('ss'):
category = 'subwaysurfers'
elif background.startswith('mc'):
category = 'minecraft'
else:
category = 'minecraft' # default fallback
bg_path = await download_video_from_dataset(background, category)
except HTTPException as e:
update_job(job_id, status="failed", error=str(e.detail))
return
update_job(job_id, progress=20, message="Analysing media...")
audio_dur = duration or get_media_duration(audio_path)
video_dur = get_media_duration(Path(bg_path))
if not (1 <= audio_dur <= 180):
update_job(job_id, status="failed", error="Audio duration out of range (1-180 s)")
return
logger.info(f"Job {job_id}: audio={audio_dur:.2f}s bg={video_dur:.2f}s")
if video_dur > audio_dur:
max_start = video_dur - audio_dur
start = random.uniform(0, max_start)
bg_input_args = [
"-ss", f"{start:.3f}",
"-t", f"{audio_dur:.3f}",
"-i", str(bg_path),
]
logger.info(f"Job {job_id}: trim start={start:.2f}s")
else:
loop_n = int(audio_dur / video_dur) + 2
bg_input_args = [
"-stream_loop", str(loop_n),
"-i", str(bg_path),
]
logger.info(f"Job {job_id}: loop x{loop_n}")
subs_str = str(subs_path).replace("\\", "/").replace(":", "\\:")
vf = (
f"[0:v]"
f"ass='{subs_str}',"
f"scale=1080:1920:force_original_aspect_ratio=increase,"
f"crop=1080:1920,"
f"setsar=1"
f"[v]"
)
output_path = OUTPUT_DIR / f"{job_id}.mp4"
final_cmd = [
"ffmpeg", "-y",
"-hide_banner",
"-loglevel", "error",
*bg_input_args,
"-i", str(audio_path),
"-filter_complex", vf,
"-map", "[v]",
"-map", "1:a",
"-c:v", "libx264",
"-preset", "medium",
"-crf", "16",
"-minrate", "8M",
"-maxrate", "16M",
"-bufsize", "16M",
"-profile:v", "high",
"-level:v", "4.2",
"-pix_fmt", "yuv420p",
"-c:a", "aac",
"-b:a", "192k",
"-shortest",
"-movflags", "+faststart",
"-threads", "0",
"-progress", "pipe:2",
"-t", f"{audio_dur:.3f}",
str(output_path),
]
update_job(job_id, progress=40, message="Encoding... (single-pass)", total_duration=audio_dur)
timeout = max(480, int(audio_dur * 5) + 120)
try:
encode_proc = await asyncio.create_subprocess_exec(
*final_cmd,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE,
)
_FFMPEG_PROCS[job_id] = encode_proc
await asyncio.wait_for(
_stream_ffmpeg_progress(encode_proc, job_id, audio_dur,
progress_start=40, progress_end=95),
timeout=timeout,
)
await asyncio.wait_for(encode_proc.wait(), timeout=30)
_FFMPEG_PROCS.pop(job_id, None)
if encode_proc.returncode != 0:
raise RuntimeError(f"FFmpeg exited with code {encode_proc.returncode}")
except asyncio.TimeoutError:
_kill_ffmpeg_for_job(job_id)
update_job(job_id, status="failed", error="Encoding timed out — try a shorter script")
return
except Exception as e:
_kill_ffmpeg_for_job(job_id)
if JOBS.get(job_id, {}).get("status") == "failed":
return
update_job(job_id, status="failed", error=f"Encoding failed: {str(e)[:200]}")
return
if not output_path.exists() or output_path.stat().st_size == 0:
update_job(job_id, status="failed", error="Output file missing after encode")
return
size_mb = output_path.stat().st_size / 1024 / 1024
logger.info(f"Job {job_id}: done {size_mb:.2f} MB")
for p in [audio_path, subs_path]:
try:
p.unlink(missing_ok=True)
except Exception:
pass
update_job(
job_id,
status="completed",
progress=100,
message=f"Done! {size_mb:.1f} MB",
output_path=str(output_path),
)
except Exception:
logger.exception(f"process_video_task unhandled for {job_id}")
update_job(job_id, status="failed", error="Unexpected server error")
async def _cleanup_old_videos():
"""Deletes videos older than 1 hour every 10 minutes"""
while True:
await asyncio.sleep(600) # 10 minutes
try:
cutoff_time = time.time() - 3600 # 1 hour ago
cleaned_count = 0
cleaned_size_mb = 0.0
for video_file in OUTPUT_DIR.glob("*.mp4"):
try:
file_stat = video_file.stat()
if file_stat.st_mtime < cutoff_time:
file_size_mb = file_stat.st_size / 1024 / 1024
video_file.unlink()
cleaned_count += 1
cleaned_size_mb += file_size_mb
logger.info(f"Cleaned up {video_file.name} ({file_size_mb:.1f}MB)")
except Exception as e:
logger.warning(f"Cleanup failed {video_file.name}: {e}")
if cleaned_count > 0:
logger.info(f"Cleaned {cleaned_count} videos, freed {cleaned_size_mb:.1f}MB")
except Exception as e:
logger.error(f"Cleanup error: {e}")
# =============================================================================
# FILE CLEANUP HELPER
# =============================================================================
def _delete_output_file(job_id: str, output_path: Path):
"""
Delete the video file after it has been sent to the user.
Called as a background task from /download so /tmp stays clean.
"""
try:
output_path.unlink(missing_ok=True)
if job_id in JOBS:
JOBS[job_id]["output_path"] = None
JOBS[job_id]["message"] = "Downloaded and cleaned up"
_persist_job(job_id)
logger.info(f"Cleaned up {job_id} after download")
except Exception as e:
logger.warning(f"Cleanup failed for {job_id}: {e}")
# =============================================================================
# API ENDPOINTS
# =============================================================================
@app.get("/")
async def root():
return {
"name": "ArchNemix Shorts Generator API",
"version": "5.1.0",
"queue": {
"max_concurrent": MAX_CONCURRENT,
"avg_job_duration_s": AVG_JOB_DURATION_S,
},
"endpoints": {
"register": "GET /register",
"generate": "POST /generate",
"cancel": "DELETE /job/{id}",
"job_status": "GET /job/{id}",
"queue": "GET /queue",
"stream": "GET /stream/{id} — inline video playback",
"download": "GET /download/{id} — file download (deletes after)",
"delete": "DELETE /video/{id}",
"videos": "GET /videos/{category}",
"health": "GET /health",
},
}
@app.get("/register")
async def register_client(req: Request):
client = resolve_client(req)
return JSONResponse(
{
"client_token": client["token"],
"is_new": client["job_count"] == 0,
"job_count": client["job_count"],
"instructions": (
"Store client_token in localStorage as 'archx_client_id'. "
"Send it as the X-Client-ID header on every request."
),
},
headers={"X-Client-ID": client["token"]},
)
@app.get("/health")
async def health():
try:
videos = await list_videos_from_dataset("minecraft")
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"videos": len(videos),
"running": len(_RUNNING_JOBS),
"queued": len(_JOB_QUEUE),
"active_jobs": len([j for j in JOBS.values() if j["status"] == "processing"]),
}
except Exception as e:
return JSONResponse({"status": "degraded", "error": str(e)}, status_code=503)
@app.get("/queue")
async def queue_status():
running = len(_RUNNING_JOBS)
waiting = len(_JOB_QUEUE)
return {
"running": running,
"waiting": waiting,
"total_active": running + waiting,
"avg_job_s": AVG_JOB_DURATION_S,
"estimated_wait_s": waiting * AVG_JOB_DURATION_S if running == 0 else
(waiting + 1) * AVG_JOB_DURATION_S,
"message": (
"No jobs running — submit yours now!" if (running + waiting) == 0 else
f"{running} running, {waiting} in queue"
),
}
@app.get("/videos/{category}")
async def list_videos(category: str = "minecraft"):
try:
# Map category aliases to dataset folder names
category_map = {
"minecraft": "minecraft",
"subwaysurfers": "subwaysurfers",
"ss": "subwaysurfers",
}
dataset_category = category_map.get(category.lower(), category.lower())
videos = await list_videos_from_dataset(dataset_category)
# Fallback to default list if no videos found
if not videos:
if dataset_category == "subwaysurfers":
videos = [f"ss{i}" for i in range(1, 6)] # ss1 through ss5
else:
videos = [f"mc{i}" for i in range(1, 7)]
return {"category": category, "videos": videos, "count": len(videos)}
except Exception as e:
# Fallback defaults
if category.lower() in ["subwaysurfers", "ss"]:
videos = [f"ss{i}" for i in range(1, 6)] # ss1 through ss5
else:
videos = [f"mc{i}" for i in range(1, 7)]
return {"category": category, "videos": videos, "count": len(videos), "error": str(e)}
@app.post("/generate")
async def generate_video(
req: GenerateRequest,
background_tasks: BackgroundTasks,
http_req: Request,
):
if not validate_app_key(http_req):
raise HTTPException(403, "Invalid API key")
if not validate_origin(http_req):
raise HTTPException(403, "Invalid origin")
client = resolve_client(http_req)
token = client["token"]
if not rate_limit(f"generate:{token}", GENERATE_LIMIT):
raise HTTPException(429, f"Rate limit: max {GENERATE_LIMIT} requests/hour")
cleanup_old_jobs()
active_for_client = [
j for j in JOBS.values()
if j.get("client_token") == token
and j["status"] in ("queued", "processing")
]
if active_for_client:
existing_id = active_for_client[0]["id"]
raise HTTPException(
409,
f"You already have an active job ({existing_id}). "
"Wait for it to finish or cancel it first.",
)
job_id = str(uuid.uuid4())
audio_path = AUDIO_DIR / f"{job_id}.mp3"
subs_path = AUDIO_DIR / f"{job_id}.ass"
create_job(job_id, token)
try:
await save_base64_audio(req.audio_base64, audio_path)
await save_subtitles(req.subtitles_ass, subs_path)
except ValueError as e:
update_job(job_id, status="failed", error=str(e))
raise HTTPException(400, str(e))
update_job(
job_id,
audio_path=str(audio_path),
subs_path=str(subs_path),
background=req.background,
requested_duration=req.duration,
)
client["job_count"] += 1
client["jobs"].append(job_id)
if len(client["jobs"]) > 50:
client["jobs"] = client["jobs"][-50:]
_save_client(client)
position = len(_JOB_QUEUE) + 1 if _RUNNING_JOBS else 0
_JOB_QUEUE.append(job_id)
_refresh_queue_positions()
if position == 0:
eta = {"message": "Starting very soon...", "wait_low_s": 0, "wait_high_s": 30}
else:
eta = queue_eta(position, req.duration)
return JSONResponse(
{
"job_id": job_id,
"client_token": token,
"status": "queued",
"queue_position": position if position > 0 else 1,
"queue": eta,
"check_status": f"/job/{job_id}",
},
headers={"X-Client-ID": token},
)
@app.get("/job/{job_id}")
async def job_status(job_id: str, http_req: Request):
if not rate_limit(f"status:{job_id}", STATUS_LIMIT):
return JSONResponse({"job_id": job_id, "status": "rate_limited"})
if job_id not in JOBS:
job_file = JOBS_DIR / f"{job_id}.json"
if job_file.exists():
try:
return JSONResponse(json.loads(job_file.read_text()))
except Exception:
pass
raise HTTPException(404, "Job not found")
job = JOBS[job_id]
if (datetime.now() - job["updated_at"]).total_seconds() > JOB_TIMEOUT:
if job["status"] not in ("completed", "failed"):
update_job(job_id, status="failed", error="Job timed out")
pos = queue_position(job_id)
resp: Dict = {
"job_id": job["id"],
"status": job["status"],
"progress": job["progress"],
"message": job["message"],
"queue_position": pos,
"queue": queue_eta(pos) if pos else None,
"created_at": job["created_at"].isoformat() if isinstance(job["created_at"], datetime) else job["created_at"],
"updated_at": job["updated_at"].isoformat() if isinstance(job["updated_at"], datetime) else job["updated_at"],
"encode_stats": {
"frame": job.get("ffmpeg_frame", 0),
"fps": job.get("ffmpeg_fps", 0.0),
"out_time_s": job.get("ffmpeg_out_time", 0.0),
"total_s": job.get("total_duration", 0.0),
"speed": job.get("ffmpeg_speed", 0.0),
},
}
# ── Return both URLs when completed — controller stores these directly ───
if job["status"] == "completed" and job.get("output_path"):
resp["download_url"] = f"/download/{job_id}"
resp["stream_url"] = f"/stream/{job_id}"
if job.get("error"):
resp["error"] = job["error"]
return JSONResponse(resp)
@app.get("/stream/{job_id}")
async def stream_video(job_id: str):
"""
Serve the video for inline browser playback.
Frontend uses this as <video src="..."> so user can watch immediately.
File is NOT deleted after streaming — user may rewatch or seek.
"""
output_path = OUTPUT_DIR / f"{job_id}.mp4"
if not output_path.exists():
raise HTTPException(404, "Video not found or not ready")
if job_id in JOBS and JOBS[job_id]["status"] != "completed":
raise HTTPException(400, "Video not ready yet")
return FileResponse(
output_path,
media_type="video/mp4",
headers={
"Cache-Control": "no-cache",
"Accept-Ranges": "bytes", # enables seeking in the browser player
},
)
@app.get("/download/{job_id}")
async def download_video(job_id: str, background_tasks: BackgroundTasks):
"""
Serve the video as a file download.
Deletes the file from /tmp after sending so disk stays clean.
"""
output_path = OUTPUT_DIR / f"{job_id}.mp4"
if not output_path.exists():
raise HTTPException(404, "Video not found or not ready")
if job_id in JOBS and JOBS[job_id]["status"] != "completed":
raise HTTPException(400, "Video not ready yet")
# Delete file from /tmp after response is sent
background_tasks.add_task(_delete_output_file, job_id, output_path)
return FileResponse(
output_path,
media_type="video/mp4",
filename=f"archnemix-short-{job_id[:8]}.mp4",
headers={
"Content-Disposition": f'attachment; filename="archnemix-short-{job_id[:8]}.mp4"',
"Cache-Control": "no-cache",
},
)
@app.delete("/video/{job_id}")
async def delete_video(job_id: str, http_req: Request):
if not validate_app_key(http_req):
raise HTTPException(403, "Invalid API key")
output_path = OUTPUT_DIR / f"{job_id}.mp4"
removed = False
if output_path.exists():
try:
output_path.unlink()
removed = True
logger.info(f"Deleted {job_id}")
except Exception as e:
raise HTTPException(500, f"Delete failed: {e}")
if job_id in JOBS:
JOBS[job_id]["output_path"] = None
JOBS[job_id]["message"] = "Video deleted by user"
_persist_job(job_id)
return {"job_id": job_id, "deleted": removed}
@app.delete("/job/{job_id}")
async def cancel_job(job_id: str, http_req: Request):
"""Called by controller when frontend goes silent."""
if not validate_app_key(http_req):
raise HTTPException(403, "Invalid API key")
if job_id not in JOBS:
return {"job_id": job_id, "cancelled": False, "reason": "not found"}
job = JOBS[job_id]
if job["status"] in ("completed", "failed"):
return {"job_id": job_id, "cancelled": False, "reason": job["status"]}
_kill_ffmpeg_for_job(job_id)
_RUNNING_JOBS.discard(job_id)
try:
_JOB_QUEUE.remove(job_id)
except ValueError:
pass
# Clean up output file if it exists
output_path = OUTPUT_DIR / f"{job_id}.mp4"
output_path.unlink(missing_ok=True)
update_job(job_id, status="failed", error="Cancelled by controller — client disconnected")
_refresh_queue_positions()
logger.info(f"Job {job_id} cancelled via controller request")
return {"job_id": job_id, "cancelled": True}
@app.get("/debug/dataset")
async def debug_dataset():
try:
api = get_hf_api()
files = list(api.list_repo_files(repo_id=HF_DATASET, repo_type="dataset"))
mc = [f for f in files if "minecraft" in f.lower() and f.endswith(".mp4")]
return {"dataset": HF_DATASET, "total_files": len(files), "minecraft": mc}
except Exception as e:
return JSONResponse({"status": "error", "error": str(e)}, status_code=500)
# =============================================================================
# STARTUP / SHUTDOWN
# =============================================================================
@app.on_event("startup")
async def startup():
global _QUEUE_LOCK
_QUEUE_LOCK = asyncio.Lock()
logger.info("=" * 60)
logger.info("ArchNemix Shorts Generator API v5.1.0")
logger.info(f"Dataset : {HF_DATASET}")
logger.info(f"Concurrent : {MAX_CONCURRENT}")
logger.info(f"Avg job : {AVG_JOB_DURATION_S}s ({AVG_JOB_DURATION_S // 60}m)")
logger.info("Quality : CRF 16 · medium · 8-16 Mbps · AAC 192k")
logger.info("Video path : stream from /tmp, delete on download")
logger.info("=" * 60)
asyncio.create_task(_queue_worker())
asyncio.create_task(_cleanup_old_videos())
try:
vids = await list_videos_from_dataset("minecraft")
logger.info(f"{len(vids)} videos cached")
except Exception as e:
logger.warning(f"Video cache warm-up: {e}")
cleanup_old_jobs()
logger.info("Ready")
@app.on_event("shutdown")
async def shutdown():
logger.info("Shutdown — cleaning jobs")
cleanup_old_jobs()