251 lines
7.7 KiB
Python
251 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Record a selected $imagegen output for a Codex pet generation job."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import shutil
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from PIL import Image
|
|
|
|
CANONICAL_BASE_PATH = "references/canonical-base.png"
|
|
|
|
|
|
def load_jobs(path: Path) -> dict[str, object]:
|
|
if not path.exists():
|
|
raise SystemExit(f"job manifest not found: {path}")
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def job_list(manifest: dict[str, object]) -> list[dict[str, object]]:
|
|
jobs = manifest.get("jobs")
|
|
if not isinstance(jobs, list):
|
|
raise SystemExit("invalid imagegen-jobs.json: jobs must be a list")
|
|
return [job for job in jobs if isinstance(job, dict)]
|
|
|
|
|
|
def find_job(manifest: dict[str, object], job_id: str) -> dict[str, object]:
|
|
for job in job_list(manifest):
|
|
if job.get("id") == job_id:
|
|
return job
|
|
raise SystemExit(f"unknown job id: {job_id}")
|
|
|
|
|
|
def image_metadata(path: Path) -> dict[str, object]:
|
|
with Image.open(path) as image:
|
|
image.verify()
|
|
with Image.open(path) as image:
|
|
return {
|
|
"width": image.width,
|
|
"height": image.height,
|
|
"mode": image.mode,
|
|
"format": image.format,
|
|
}
|
|
|
|
|
|
def file_sha256(path: Path) -> str:
|
|
digest = hashlib.sha256()
|
|
with path.open("rb") as file:
|
|
for chunk in iter(lambda: file.read(1024 * 1024), b""):
|
|
digest.update(chunk)
|
|
return digest.hexdigest()
|
|
|
|
|
|
def manifest_relative(path: Path, run_dir: Path) -> str:
|
|
return str(path.resolve().relative_to(run_dir.resolve()))
|
|
|
|
|
|
def completed_job_ids(manifest: dict[str, object]) -> set[str]:
|
|
return {
|
|
str(job["id"])
|
|
for job in job_list(manifest)
|
|
if job.get("status") == "complete" and isinstance(job.get("id"), str)
|
|
}
|
|
|
|
|
|
def is_relative_to(path: Path, root: Path) -> bool:
|
|
try:
|
|
path.relative_to(root)
|
|
except ValueError:
|
|
return False
|
|
return True
|
|
|
|
|
|
def default_generated_images_root() -> Path:
|
|
codex_home = Path(os.environ.get("CODEX_HOME") or "~/.codex").expanduser().resolve()
|
|
return codex_home / "generated_images"
|
|
|
|
|
|
def validate_source_path(
|
|
*,
|
|
source: Path,
|
|
run_dir: Path,
|
|
allow_synthetic_test_source: bool,
|
|
) -> str:
|
|
if allow_synthetic_test_source:
|
|
return "synthetic-test"
|
|
if is_relative_to(source, run_dir):
|
|
raise SystemExit(
|
|
"source image is inside the pet run directory; record the original "
|
|
"$imagegen output from $CODEX_HOME/generated_images/.../ig_*.png instead"
|
|
)
|
|
generated_root = default_generated_images_root()
|
|
if not is_relative_to(source, generated_root) or not source.name.startswith("ig_"):
|
|
raise SystemExit(
|
|
"source image does not look like a built-in $imagegen output; expected "
|
|
f"{generated_root}/.../ig_*.png. Do not ingest locally drawn or "
|
|
"post-processed row strips as visual job outputs."
|
|
)
|
|
return "built-in-imagegen"
|
|
|
|
|
|
def validate_required_grounding(job: dict[str, object], run_dir: Path) -> None:
|
|
if job.get("allow_prompt_only_generation") is not False:
|
|
return
|
|
inputs = job.get("input_images")
|
|
if not isinstance(inputs, list) or not inputs:
|
|
raise SystemExit(
|
|
f"job {job.get('id')} does not list input_images; grounded row jobs must attach references"
|
|
)
|
|
missing = []
|
|
for item in inputs:
|
|
if not isinstance(item, dict) or not isinstance(item.get("path"), str):
|
|
raise SystemExit(f"job {job.get('id')} has an invalid input image entry")
|
|
path = run_dir / item["path"]
|
|
if not path.is_file():
|
|
missing.append(str(path))
|
|
if missing:
|
|
raise SystemExit(
|
|
f"job {job.get('id')} is missing required grounding image(s): "
|
|
+ ", ".join(missing)
|
|
)
|
|
|
|
|
|
def update_base_canonical_reference(
|
|
*,
|
|
run_dir: Path,
|
|
output: Path,
|
|
manifest: dict[str, object],
|
|
job: dict[str, object],
|
|
metadata: dict[str, object],
|
|
) -> None:
|
|
if job.get("id") != "base":
|
|
return
|
|
|
|
canonical = run_dir / CANONICAL_BASE_PATH
|
|
canonical.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(output, canonical)
|
|
canonical_sha = file_sha256(canonical)
|
|
reference = {
|
|
"path": manifest_relative(canonical, run_dir),
|
|
"source_job": "base",
|
|
"sha256": canonical_sha,
|
|
"metadata": metadata,
|
|
}
|
|
job["canonical_reference_path"] = reference["path"]
|
|
manifest["canonical_identity_reference"] = reference
|
|
|
|
request_path = run_dir / "pet_request.json"
|
|
if request_path.exists():
|
|
request = json.loads(request_path.read_text(encoding="utf-8"))
|
|
request["canonical_identity_reference"] = reference
|
|
request_path.write_text(json.dumps(request, indent=2) + "\n", encoding="utf-8")
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--run-dir", required=True)
|
|
parser.add_argument("--job-id", required=True)
|
|
parser.add_argument("--source", required=True)
|
|
parser.add_argument("--force", action="store_true")
|
|
parser.add_argument(
|
|
"--allow-synthetic-test-source", action="store_true", help=argparse.SUPPRESS
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
run_dir = Path(args.run_dir).expanduser().resolve()
|
|
source = Path(args.source).expanduser().resolve()
|
|
if not source.is_file():
|
|
raise SystemExit(f"source image not found: {source}")
|
|
source_provenance = validate_source_path(
|
|
source=source,
|
|
run_dir=run_dir,
|
|
allow_synthetic_test_source=args.allow_synthetic_test_source,
|
|
)
|
|
|
|
manifest_path = run_dir / "imagegen-jobs.json"
|
|
manifest = load_jobs(manifest_path)
|
|
job = find_job(manifest, args.job_id)
|
|
|
|
missing_deps = [
|
|
dep
|
|
for dep in job.get("depends_on", [])
|
|
if isinstance(dep, str) and dep not in completed_job_ids(manifest)
|
|
]
|
|
if missing_deps:
|
|
raise SystemExit(
|
|
f"job {args.job_id} is not ready; missing dependency result(s): {', '.join(missing_deps)}"
|
|
)
|
|
validate_required_grounding(job, run_dir)
|
|
|
|
output_raw = job.get("output_path")
|
|
if not isinstance(output_raw, str):
|
|
raise SystemExit(f"job {args.job_id} has no output_path")
|
|
output = run_dir / output_raw
|
|
if output.exists() and not args.force:
|
|
raise SystemExit(f"{output} already exists; pass --force to replace it")
|
|
|
|
output.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy2(source, output)
|
|
metadata = image_metadata(output)
|
|
|
|
job["status"] = "complete"
|
|
job["source_path"] = str(source)
|
|
job["source_provenance"] = source_provenance
|
|
job["source_sha256"] = file_sha256(source)
|
|
job["output_sha256"] = file_sha256(output)
|
|
if source_provenance == "synthetic-test":
|
|
job["synthetic_test_source"] = True
|
|
else:
|
|
job.pop("synthetic_test_source", None)
|
|
job["completed_at"] = datetime.now(timezone.utc).isoformat()
|
|
job["metadata"] = metadata
|
|
for key in [
|
|
"last_error",
|
|
"secondary_fallback",
|
|
"derived_from",
|
|
"mirror_decision",
|
|
"repair_reason",
|
|
"queued_at",
|
|
]:
|
|
job.pop(key, None)
|
|
update_base_canonical_reference(
|
|
run_dir=run_dir,
|
|
output=output,
|
|
manifest=manifest,
|
|
job=job,
|
|
metadata=metadata,
|
|
)
|
|
|
|
manifest_path.write_text(json.dumps(manifest, indent=2) + "\n", encoding="utf-8")
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"ok": True,
|
|
"job_id": args.job_id,
|
|
"output": str(output),
|
|
"metadata": metadata,
|
|
},
|
|
indent=2,
|
|
)
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|