RFC-0005: Integration Pseudocode and Migration¶
Status: Draft Date: 2026-05-26 Depends on: RFC-0001 through RFC-0004
1. Purpose¶
Provide an intermediate pseudocode layer before production implementation.
Goals:
- Validate integration semantics with current runtime.
- Make hidden collisions explicit before code churn.
- Keep migration incremental with explicit hard cutover and retirement.
2. Interfaces¶
class RunStoreV2:
def create_run(self, run: dict) -> dict: ...
def get_run(self, run_id: str) -> dict | None: ...
def update_status(self, run_id: str, status: str, **fields) -> None: ...
def patch_run(self, run_id: str, fields: dict) -> None: ...
class CheckpointStore:
def save_checkpoint(self, checkpoint: dict, state_snapshot: dict) -> str: ...
def load_checkpoint(self, run_id: str, checkpoint_id: str | None = None) -> dict | None: ...
def list_checkpoints(self, run_id: str) -> list[dict]: ...
class EventStore:
def append(self, event: dict) -> None: ...
def list_events(self, run_id: str, cursor: str | None = None, limit: int = 100) -> list[dict]: ...
class SideEffectLedger:
def find_by_idempotency_key(self, key: str) -> dict | None: ...
def record_attempt(self, record: dict) -> str: ...
def record_result(self, effect_id: str, result: dict) -> None: ...
3. Durable Execution Loop (Pseudocode)¶
def execute_run(run_id: str):
run = run_store_v2.get_run(run_id)
state = checkpoint_store.load_checkpoint(run_id, run.get("checkpoint_head"))
if state is None:
state = create_initial_state(run)
emit_event("run.started", run, state)
run_store_v2.update_status(run_id, "running")
while not is_terminal_state(state):
step = scheduler_next_step(state)
if step is None:
break
emit_event("step.started", run, state, step=step)
policy_decision = policy_manager.evaluate(state, step)
if policy_decision.requires_human:
checkpoint_id = checkpoint_boundary(
run,
state,
step,
kind="waiting_for_human",
)
run_store_v2.update_status(
run_id,
"waiting_for_human",
current_step_id=step.id,
checkpoint_head=checkpoint_id,
)
emit_event("run.waiting_for_human", run, state, step=step)
return
step_result = execute_step_with_effect_safety(run, state, step)
apply_step_result(state, step_result)
checkpoint_id = checkpoint_boundary(run, state, step, kind="step_completed")
run_store_v2.patch_run(
run_id,
{
"current_step_id": step.id,
"checkpoint_head": checkpoint_id,
},
)
emit_event("step.completed", run, state, step=step)
finalize_status = derive_final_status(state)
run_store_v2.update_status(run_id, finalize_status)
emit_event(f"run.{finalize_status}", run, state)
4. Side-Effect Safety (Pseudocode)¶
def execute_step_with_effect_safety(run: dict, state: dict, step: dict):
if not is_side_effecting(step):
return binding_executor.execute(step)
idem_key = build_idempotency_key(run, step, state)
previous = side_effect_ledger.find_by_idempotency_key(idem_key)
replay_policy = resolve_replay_policy(step)
if previous and replay_policy == "use_recorded_result":
emit_event("side_effect.reused", run, state, step=step, effect_id=previous["effect_id"])
return load_recorded_result(previous)
if previous and replay_policy == "require_human":
raise RuntimeNeedsHumanApproval(step_id=step["id"], reason="side_effect_replay")
effect_id = side_effect_ledger.record_attempt(
{
"run_id": run["run_id"],
"step_id": step["id"],
"idempotency_key": idem_key,
"request_hash": hash_request(step),
"replay_policy": replay_policy,
}
)
result = binding_executor.execute(step)
side_effect_ledger.record_result(effect_id, result)
emit_event("side_effect.recorded", run, state, step=step, effect_id=effect_id)
return result
5. Migration Order¶
- Introduce RunStoreV2 as canonical run persistence and wrap existing run_store calls as compatibility shims.
- Introduce CheckpointManager around existing checkpoint serializer and enforce boundary policy.
- Add state-machine enforcement and waiting_for_human status as canonical lifecycle behavior.
- Add SideEffectLedger and idempotency policy evaluation in canonical execution path.
- Add runtime APIs for resume/replay/fork/approve/deny and route them to canonical core.
- Add EventStore canonical events and treat log events as projections.
- Perform hard cutover to canonical async core.
- Retire legacy execution path, legacy run model, and legacy-only event semantics.
6. Adopted Architectural Decisions¶
- Async core is canonical target architecture.
- Legacy /v1/skills/{id}/execute/async compatibility exists only during migration window.
- use_recorded_result is default replay policy for external_action side effects.
- EventStore is canonical truth; observability logs are projections.
- Checkpoint serialization remains backward compatible via versioned upgrades.
- Legacy components are retired after cutover gates in RFC-0006.
7. Exit Criteria for Design Phase¶
Design phase is complete when:
- RFC-0001 to RFC-0005 are accepted for implementation.
- Implementation slicing and cutover gates are agreed.
- Legacy retirement matrix (RFC-0006) is accepted.