Source code for governance.enforcer

"""
Script
------
enforcer.py

Path
----
python/hillstar/governance/enforcer.py

Purpose
-------
Core governance enforcement: validate that a Hillstar workflow was executed
before allowing a git commit to proceed.

Reads .hillstar/commit_ready.json written by runner.py on successful execution.
Checks age, workflow ID, and policy compliance.

Inputs
------
- hillstar_dir: path to .hillstar directory (default: .hillstar in cwd)
- policy: GovernancePolicy instance

Outputs
-------
- (compliant: bool, reason: str)

Assumptions
-----------
- runner.py writes commit_ready.json on successful workflow completion
- .hillstar/ directory exists in the project root

Parameters
----------
See GovernancePolicy

Failure Modes
-------------
- commit_ready.json missing: non-compliant
- commit_ready.json stale (age > max_age_seconds): non-compliant
- HILLSTAR_FORCE_COMMIT=1 env var: override allowed if policy permits

Author: Julen Gamboa <julen.gamboa.ds@gmail.com>

Created
-------
2026-02-08

Last Edited
-----------
2026-02-08
"""

from __future__ import annotations

import json
import os
from datetime import datetime, timezone

from .policy import GovernancePolicy


COMMIT_READY_FILE = "commit_ready.json"


[docs] class GovernanceEnforcer: """Enforce workflow-driven development before git commits."""
[docs] def __init__(self, hillstar_dir: str = ".hillstar", policy: GovernancePolicy | None = None): self.hillstar_dir = hillstar_dir self.policy = policy or GovernancePolicy.load(hillstar_dir) self._marker_path = os.path.join(hillstar_dir, COMMIT_READY_FILE)
[docs] def check(self, dev_mode: bool = False) -> tuple[bool, str]: """ Check whether the current state is compliant for a git commit. Args: dev_mode: If True (or HILLSTAR_DEV_MODE=1 in env), skip governance check. Returns: (compliant, reason): compliant=True means commit is allowed. """ # Check development mode first if dev_mode or os.environ.get("HILLSTAR_DEV_MODE") == "1": return True, "Development mode active (--dev flag)" # Check force override if self.policy.allow_force_override and os.environ.get("HILLSTAR_FORCE_COMMIT") == "1": return True, "Force override active (HILLSTAR_FORCE_COMMIT=1)" # Check marker exists if not os.path.exists(self._marker_path): return False, ( "No workflow execution found. Run a Hillstar workflow first:\n" " hillstar execute <workflow.json>\n" "Or bypass with: HILLSTAR_FORCE_COMMIT=1 git commit ..." ) # Load marker try: with open(self._marker_path, encoding="utf-8") as f: marker = json.load(f) except Exception as e: return False, f"Could not read commit_ready marker: {e}" # Check age executed_at = marker.get("executed_at") if not executed_at: return False, "Commit ready marker missing 'executed_at' timestamp" try: ts = datetime.fromisoformat(executed_at) if ts.tzinfo is None: ts = ts.replace(tzinfo=timezone.utc) age_seconds = (datetime.now(timezone.utc) - ts).total_seconds() except Exception as e: return False, f"Could not parse executed_at timestamp: {e}" if age_seconds > self.policy.max_age_seconds: return False, ( f"Workflow execution is stale ({age_seconds:.0f}s ago, " f"max {self.policy.max_age_seconds}s). Re-run the workflow." ) # Check workflow ID if required if self.policy.require_workflow_id and not marker.get("workflow_id"): return False, "Commit ready marker missing workflow_id" workflow_id = marker.get("workflow_id", "unknown") workflow_file = marker.get("workflow_file", "unknown") return True, ( f"Compliant: workflow '{workflow_id}' ({workflow_file}) " f"executed {age_seconds:.0f}s ago" )
[docs] def write_marker(self, workflow_id: str, workflow_file: str, summary: str = "") -> None: """Write commit_ready marker after successful workflow execution.""" os.makedirs(self.hillstar_dir, exist_ok=True) marker = { "workflow_id": workflow_id, "workflow_file": workflow_file, "executed_at": datetime.now(timezone.utc).isoformat(), "summary": summary, } with open(self._marker_path, "w", encoding="utf-8") as f: json.dump(marker, f, indent=2)
[docs] def clear_marker(self) -> None: """Clear the commit_ready marker (e.g. after commit completes).""" if os.path.exists(self._marker_path): os.remove(self._marker_path)
[docs] def status(self) -> dict: """Return full status dictionary for display.""" compliant, reason = self.check() marker = {} if os.path.exists(self._marker_path): try: with open(self._marker_path, encoding="utf-8") as f: marker = json.load(f) except Exception: pass return { "compliant": compliant, "reason": reason, "marker": marker, "policy": { "max_age_seconds": self.policy.max_age_seconds, "allow_force_override": self.policy.allow_force_override, }, }