open-design/skills/hatch-pet/scripts/queue_pet_repairs.py
Zakaria a46764fb1b
Some checks failed
ci / Validate workspace (push) Has been cancelled
landing-page-ci / Validate landing page (push) Has been cancelled
landing-page-deploy / Deploy landing page (push) Has been cancelled
github-metrics / Generate repository metrics SVG (push) Has been cancelled
first-commit
2026-05-04 14:58:14 -04:00

173 lines
6.6 KiB
Python

#!/usr/bin/env python3
"""Reopen failed Codex pet row jobs after frame QA."""
from __future__ import annotations
import argparse
import json
import shutil
from datetime import datetime, timezone
from pathlib import Path
def load_json(path: Path) -> dict[str, object]:
if not path.exists():
raise SystemExit(f"file not found: {path}")
return json.loads(path.read_text(encoding="utf-8"))
def rows_to_repair(
review: dict[str, object], *, repair_on_warnings: bool
) -> list[dict[str, object]]:
rows = review.get("rows")
if not isinstance(rows, list):
raise SystemExit("review does not contain row-level results")
repairs: list[dict[str, object]] = []
for row in rows:
if not isinstance(row, dict) or not isinstance(row.get("state"), str):
continue
errors = row.get("errors") if isinstance(row.get("errors"), list) else []
warnings = row.get("warnings") if isinstance(row.get("warnings"), list) else []
if errors or (repair_on_warnings and warnings):
repairs.append(
{
"state": row["state"],
"reason": "; ".join(str(item) for item in [*errors, *warnings])
or "the row did not pass visual QA",
}
)
return repairs
def append_repair_note(run_dir: Path, state: str, attempt: int, reason: str) -> None:
prompt_path = run_dir / "prompts" / "rows" / f"{state}.md"
if not prompt_path.exists():
raise SystemExit(f"row prompt not found: {prompt_path}")
existing = prompt_path.read_text(encoding="utf-8")
note = f"""
Repair attempt {attempt}:
- The previous `{state}` strip failed QA: {reason}
- Regenerate the entire row, not just one pose.
- Fill every requested frame slot with one complete centered full-body pet pose.
- Keep large gaps of pure chroma key only between slots; do not leave a requested slot empty.
- Avoid pose overlap, clipping, edge slivers, extra partial sprites, and detached fragments from neighboring poses.
- Use the canonical base image and any original references listed in `imagegen-jobs.json` as grounding inputs.
- Do not redesign the pet. Keep the exact same head shape, face design, markings, body proportions, palette, outline weight, materials, and props as the approved base pet.
- If the contact sheet shows identity drift, repair only this row while preserving the canonical base identity.
"""
prompt_path.write_text(existing.rstrip() + note.rstrip() + "\n", encoding="utf-8")
def job_list(manifest: dict[str, object]) -> list[dict[str, object]]:
jobs = manifest.get("jobs")
if not isinstance(jobs, list):
raise SystemExit("invalid imagegen-jobs.json: jobs must be a list")
return [job for job in jobs if isinstance(job, dict)]
def next_archive_path(archive_dir: Path, state: str, attempt: int, suffix: str) -> Path:
candidate = archive_dir / f"{state}-attempt-{attempt}-previous{suffix}"
if not candidate.exists():
return candidate
counter = 2
while True:
candidate = archive_dir / f"{state}-attempt-{attempt}-previous-{counter}{suffix}"
if not candidate.exists():
return candidate
counter += 1
def archive_decoded_output(run_dir: Path, job: dict[str, object], state: str, attempt: int) -> str | None:
output_raw = job.get("output_path")
output = (
run_dir / output_raw
if isinstance(output_raw, str) and output_raw
else run_dir / "decoded" / f"{state}.png"
)
if not output.exists():
return None
archive_dir = run_dir / "decoded" / "repair-archive"
archive_dir.mkdir(parents=True, exist_ok=True)
archived = next_archive_path(archive_dir, state, attempt, output.suffix or ".png")
shutil.move(str(output), archived)
return str(archived.relative_to(run_dir))
def queue_repair(manifest: dict[str, object], run_dir: Path, state: str, reason: str) -> dict[str, object]:
for job in job_list(manifest):
if job.get("id") != state:
continue
attempt = int(job.get("repair_attempt", 0)) + 1
archived_output = archive_decoded_output(run_dir, job, state, attempt)
job["status"] = "pending"
job["repair_attempt"] = attempt
job["repair_reason"] = reason
job["queued_at"] = datetime.now(timezone.utc).isoformat()
if archived_output is not None:
previous_outputs = job.setdefault("previous_outputs", [])
if not isinstance(previous_outputs, list):
previous_outputs = []
job["previous_outputs"] = previous_outputs
previous_outputs.append(
{
"attempt": attempt,
"path": archived_output,
"archived_at": job["queued_at"],
}
)
for key in [
"source_path",
"source_provenance",
"source_sha256",
"output_sha256",
"completed_at",
"metadata",
"synthetic_test_source",
"secondary_fallback",
"derived_from",
"mirror_decision",
]:
job.pop(key, None)
result: dict[str, object] = {"attempt": attempt}
if archived_output is not None:
result["archived_output"] = archived_output
return result
raise SystemExit(f"unknown row job id: {state}")
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--run-dir", required=True)
parser.add_argument("--review", default="")
parser.add_argument("--repair-on-warnings", action="store_true")
args = parser.parse_args()
run_dir = Path(args.run_dir).expanduser().resolve()
review_path = (
Path(args.review).expanduser().resolve()
if args.review
else run_dir / "qa" / "review.json"
)
manifest_path = run_dir / "imagegen-jobs.json"
review = load_json(review_path)
manifest = load_json(manifest_path)
repairs = rows_to_repair(review, repair_on_warnings=args.repair_on_warnings)
queued: list[dict[str, object]] = []
for repair in repairs:
state = str(repair["state"])
reason = str(repair["reason"])
queued_repair = queue_repair(manifest, run_dir, state, reason)
attempt = int(queued_repair["attempt"])
append_repair_note(run_dir, state, attempt, reason)
queued.append({"state": state, "reason": reason, **queued_repair})
manifest_path.write_text(json.dumps(manifest, indent=2) + "\n", encoding="utf-8")
print(json.dumps({"ok": True, "queued": queued}, indent=2))
if __name__ == "__main__":
main()