Skip to content

Temporal Workflow Implementation

This document specifies how Farmercode workflows are implemented as Temporal Workflows using the Python SDK.

Workflow Structure

Workflows are Python classes decorated with @workflow.defn. They accept a typed WorkflowInput object that defines the runtime configuration.

Triggering

Workflows are triggered via the Farmercode API (POST /api/workflows). This API call handles the initial authentication and payload validation before starting the Temporal execution.

Workflow Input

@dataclass
class WorkflowInput:
    # Workload Context
    wrd_id: str
    wrd_content: str
    farmerspec_version: str  # Git tag for reproducibility

    # Operating Mode
    mode: Literal["live", "rl"] = "live"

    # Execution Profile
    profile: Literal["local", "cluster"] = "local"

    # Git Context
    base_branch: str = "main"
    feature_branch: str = "feature/wrd-{wrd_id}"

    # Debugging
    debug_mode: bool = False

State Management: Git-First

Unlike traditional architectures that rely on shared file systems (NFS), Farmercode uses Git as the state transfer mechanism between Activities.

  • Stateless Workers: Activities do not assume the local disk preserves state from previous steps.
  • Commit & Push: Every Activity that modifies code must commit and push to the remote feature_branch before completing.
  • Fetch & Checkout: Every Activity starts by ensuring its local workspace is synced to the expected Git SHA or Branch.

Explicit Activity Definitions

To maximize observability in the Temporal UI, we define a distinct Activity Class for each phase.

```python from temporalio import workflow, activity

--- Activity Definitions ---

@activity.defn class InitializeWorkflowActivity: async def run(self, params: WorkflowInput) -> WorkflowContext: # 1. Creates Parent Issue (WRD) in Live Mode (if not exists). # 2. Creates/Resets the remote Feature Branch (feature/wrd-{id}) from Base. # 3. Returns context containing the initial SHA. pass

@activity.defn class EmbedFarmerspecActivity: async def run(self, version: str, workspace_path: str) -> None: # 1. Checks out the specific tag of 'farmerspec'. # 2. Copies/Embeds the prompt definitions into {workspace_path}/.farmerspec/. # This ensures the Agent has its "Brain" (Skills/Contracts) locally available # alongside the code it is working on. pass

@activity.defn class CreateBlueprintActivity: async def run(self, args: BlueprintArgs) -> PhaseResult: # 1. Setup: Git Fetch & Checkout args.start_sha # 2. Embed: Ensure .farmerspec/ is present # 3. Execution: Agent runs "Duc" logic # 4. Teardown: Agent commits changes -> Pushes to origin # 5. Return: New SHA return await GenericPhase("agents/architect/skills/create_blueprint").execute( args, version=args.context.farmerspec_version )

@activity.defn class ValidateBlueprintActivity: async def run(self, args: ValidationArgs) -> PhaseResult: # 1. Setup: Git Fetch & Checkout args.input_sha (Result of CreateBlueprint) # 2. Embed: Ensure .farmerspec/ is present # 3. Execution: Agent runs "Baron" logic (Read-only or Comments) # 4. Return: Status (Proceed/Reject) return await GenericPhase("agents/architect/skills/validate_blueprint").execute( args, version=args.context.farmerspec_version )

@activity.defn class NotifyUserActivity: async def run(self, issue_number: int, message: str, agent_name: str) -> None: # Calls GitHub Middleware API: POST /api/github/comment # This posts the comment to GitHub using the specific Agent's GitHub App persona. pass

--- Workflow Implementation ---

@workflow.defn class FarmercodeWorkflow: @workflow.run async def run(self, params: WorkflowInput) -> Dict:

    # 1. Start: Initialize Context & Git Branch
    context = await workflow.execute_activity(
        InitializeWorkflowActivity.run,
        params,
        start_to_close_timeout=timedelta(minutes=5)
    )

    try:
        # 2. Blueprint Creation (Duc)
        blueprint_group = await workflow.execute_activity(
            CreateGroupActivity.run,
            GroupArgs(parent_id=context.parent_issue_id, title="Blueprint", mode=params.mode)
        )

        # Execution moves state from SHA A -> SHA B
        bp_create = await workflow.execute_activity(
            CreateBlueprintActivity.run,
            BlueprintArgs(
                group=blueprint_group, 
                context=context,
                start_sha=context.current_sha # Start from clean branch
            ),
            start_to_close_timeout=timedelta(hours=1)
        )

        # 3. Blueprint Validation (Baron)
        # Validators work on top of the Creator's SHA
        bp_val = await workflow.execute_activity(
            ValidateBlueprintActivity.run,
            ValidationArgs(
                group=blueprint_group, 
                input_sha=bp_create.sha, # Verify this specific commit
                context=context
            ),
            start_to_close_timeout=timedelta(minutes=15)
        )

        # 4. Handle Status
        if bp_val.status != "proceed":
            await self.handle_escalation(bp_val, params.mode, context.parent_issue_id, agent_name="architect")

        # 5. Finalization (Live Mode Only)
        if params.mode == "live":
            await workflow.execute_activity(
                CreatePullRequestActivity.run,
                PrArgs(context=context, title="Implementation Complete", head_sha=bp_create.sha),
                start_to_close_timeout=timedelta(minutes=5)
            )

        return {"status": "success"}

    finally:
        # Cleanup is less critical for state (Git handles it), 
        # but good for disk hygiene.
        await workflow.execute_activity(
            CleanupActivity.run,
            CleanupArgs(workspace=context.workspace_path, debug_mode=params.debug_mode),
            start_to_close_timeout=timedelta(minutes=5)
        )

async def handle_escalation(self, result: PhaseResult, mode: str, issue_id: int, agent_name: str):
    """
    Handles 'needs_clarification' or 'rejected' statuses.
    In Live Mode: Asks human via GitHub Comment and waits for Signal.
    In RL Mode: Throws exception to terminate run (negative signal).
    """
    if mode == "rl":
        raise ApplicationError(f"Agent failed validation: {result.status}", non_retryable=True)

    # Live Mode: Human-in-the-Loop
    # 1. Post the Blocker/Question to GitHub
    await workflow.execute_activity(
        NotifyUserActivity.run,
        args=(issue_id, result.blockers_summary, agent_name),
        start_to_close_timeout=timedelta(minutes=1)
    )

    # 2. Wait for Human to reply (via GitHub Middleware Signal)
    # The GitHub Middleware will send "HumanInput" signal when a comment is added.
    human_signal = await workflow.wait_for_signal("HumanInput")

    # 3. Log the interaction (Optional, for context)
    workflow.logger.info(f"Human replied: {human_signal['text']}")

    # 4. Loop or Proceed? 
    # Typically we might loop back to the activity, but for now we just unblock.