Temporal Workflow Implementation
This document specifies how Farmercode workflows are implemented as Temporal Workflows using the Python SDK.
Workflow Structure
Workflows are Python classes decorated with @workflow.defn. They accept a typed WorkflowInput object that defines the runtime configuration.
Triggering
Workflows are triggered via the Farmercode API (POST /api/workflows). This API call handles the initial authentication and payload validation before starting the Temporal execution.
Workflow Input
@dataclass
class WorkflowInput:
# Workload Context
wrd_id: str
wrd_content: str
farmerspec_version: str # Git tag for reproducibility
# Operating Mode
mode: Literal["live", "rl"] = "live"
# Execution Profile
profile: Literal["local", "cluster"] = "local"
# Git Context
base_branch: str = "main"
feature_branch: str = "feature/wrd-{wrd_id}"
# Debugging
debug_mode: bool = False
State Management: Git-First
Unlike traditional architectures that rely on shared file systems (NFS), Farmercode uses Git as the state transfer mechanism between Activities.
- Stateless Workers: Activities do not assume the local disk preserves state from previous steps.
- Commit & Push: Every Activity that modifies code must commit and push to the remote
feature_branchbefore completing. - Fetch & Checkout: Every Activity starts by ensuring its local workspace is synced to the expected Git SHA or Branch.
Explicit Activity Definitions
To maximize observability in the Temporal UI, we define a distinct Activity Class for each phase.
```python from temporalio import workflow, activity
--- Activity Definitions ---
@activity.defn
class InitializeWorkflowActivity:
async def run(self, params: WorkflowInput) -> WorkflowContext:
# 1. Creates Parent Issue (WRD) in Live Mode (if not exists).
# 2. Creates/Resets the remote Feature Branch (feature/wrd-{id}) from Base.
# 3. Returns context containing the initial SHA.
pass
@activity.defn
class EmbedFarmerspecActivity:
async def run(self, version: str, workspace_path: str) -> None:
# 1. Checks out the specific tag of 'farmerspec'.
# 2. Copies/Embeds the prompt definitions into {workspace_path}/.farmerspec/.
# This ensures the Agent has its "Brain" (Skills/Contracts) locally available
# alongside the code it is working on.
pass
@activity.defn class CreateBlueprintActivity: async def run(self, args: BlueprintArgs) -> PhaseResult: # 1. Setup: Git Fetch & Checkout args.start_sha # 2. Embed: Ensure .farmerspec/ is present # 3. Execution: Agent runs "Duc" logic # 4. Teardown: Agent commits changes -> Pushes to origin # 5. Return: New SHA return await GenericPhase("agents/architect/skills/create_blueprint").execute( args, version=args.context.farmerspec_version )
@activity.defn class ValidateBlueprintActivity: async def run(self, args: ValidationArgs) -> PhaseResult: # 1. Setup: Git Fetch & Checkout args.input_sha (Result of CreateBlueprint) # 2. Embed: Ensure .farmerspec/ is present # 3. Execution: Agent runs "Baron" logic (Read-only or Comments) # 4. Return: Status (Proceed/Reject) return await GenericPhase("agents/architect/skills/validate_blueprint").execute( args, version=args.context.farmerspec_version )
@activity.defn class NotifyUserActivity: async def run(self, issue_number: int, message: str, agent_name: str) -> None: # Calls GitHub Middleware API: POST /api/github/comment # This posts the comment to GitHub using the specific Agent's GitHub App persona. pass
--- Workflow Implementation ---
@workflow.defn class FarmercodeWorkflow: @workflow.run async def run(self, params: WorkflowInput) -> Dict:
# 1. Start: Initialize Context & Git Branch
context = await workflow.execute_activity(
InitializeWorkflowActivity.run,
params,
start_to_close_timeout=timedelta(minutes=5)
)
try:
# 2. Blueprint Creation (Duc)
blueprint_group = await workflow.execute_activity(
CreateGroupActivity.run,
GroupArgs(parent_id=context.parent_issue_id, title="Blueprint", mode=params.mode)
)
# Execution moves state from SHA A -> SHA B
bp_create = await workflow.execute_activity(
CreateBlueprintActivity.run,
BlueprintArgs(
group=blueprint_group,
context=context,
start_sha=context.current_sha # Start from clean branch
),
start_to_close_timeout=timedelta(hours=1)
)
# 3. Blueprint Validation (Baron)
# Validators work on top of the Creator's SHA
bp_val = await workflow.execute_activity(
ValidateBlueprintActivity.run,
ValidationArgs(
group=blueprint_group,
input_sha=bp_create.sha, # Verify this specific commit
context=context
),
start_to_close_timeout=timedelta(minutes=15)
)
# 4. Handle Status
if bp_val.status != "proceed":
await self.handle_escalation(bp_val, params.mode, context.parent_issue_id, agent_name="architect")
# 5. Finalization (Live Mode Only)
if params.mode == "live":
await workflow.execute_activity(
CreatePullRequestActivity.run,
PrArgs(context=context, title="Implementation Complete", head_sha=bp_create.sha),
start_to_close_timeout=timedelta(minutes=5)
)
return {"status": "success"}
finally:
# Cleanup is less critical for state (Git handles it),
# but good for disk hygiene.
await workflow.execute_activity(
CleanupActivity.run,
CleanupArgs(workspace=context.workspace_path, debug_mode=params.debug_mode),
start_to_close_timeout=timedelta(minutes=5)
)
async def handle_escalation(self, result: PhaseResult, mode: str, issue_id: int, agent_name: str):
"""
Handles 'needs_clarification' or 'rejected' statuses.
In Live Mode: Asks human via GitHub Comment and waits for Signal.
In RL Mode: Throws exception to terminate run (negative signal).
"""
if mode == "rl":
raise ApplicationError(f"Agent failed validation: {result.status}", non_retryable=True)
# Live Mode: Human-in-the-Loop
# 1. Post the Blocker/Question to GitHub
await workflow.execute_activity(
NotifyUserActivity.run,
args=(issue_id, result.blockers_summary, agent_name),
start_to_close_timeout=timedelta(minutes=1)
)
# 2. Wait for Human to reply (via GitHub Middleware Signal)
# The GitHub Middleware will send "HumanInput" signal when a comment is added.
human_signal = await workflow.wait_for_signal("HumanInput")
# 3. Log the interaction (Optional, for context)
workflow.logger.info(f"Human replied: {human_signal['text']}")
# 4. Loop or Proceed?
# Typically we might loop back to the activity, but for now we just unblock.