Argus-style Workflow

A multi-step agent workflow with CueAPI as the coordinator, modeled after Argus.

What Argus does, abbreviated

Argus is a multi-agent pipeline that audits the CueAPI sprint across five repos: it kicks off on a schedule, runs a series of specialized agent passes, writes each finding with an evidence URL, and chains a follow-up pass whenever new work appears. It's a good concrete shape for "a real agent workflow that relies on CueAPI not failing silently."

This guide walks through the same pattern with a smaller example: a nightly codebase-audit agent that files issues when it finds problems, and re-runs itself the next morning to check whether those issues were addressed.

Coordination shape

┌──────────────────────────────────────────────────────────────┐
│ nightly-audit cue (cron 0 2 * * *)                           │
│   ↓                                                           │
│ worker handler audit.py                                       │
│   - runs the audit agent                                      │
│   - files issues for each finding                             │
│   - writes evidence to $CUEAPI_OUTCOME_FILE                   │
│   - POSTs /v1/cues to schedule a follow-up in 24h             │
│   ↓                                                           │
│ follow-up-<exec_id> cue (at T+24h)                            │
│   ↓                                                           │
│ worker handler check_followup.py                              │
│   - verifies each issue was addressed                         │
│   - reports evidence                                          │
└──────────────────────────────────────────────────────────────┘

Three CueAPI features are doing the coordination work:

  1. Cron-scheduled kickoff — the top-level cue fires on time even if nothing else is running.
  2. Outcome file — every pass writes evidence, so "did the audit actually happen?" is answerable from the execution record.
  3. Chain pattern — each pass schedules the next one directly, with no orchestrator in the loop.

Step 1 — schedule the kickoff cue

bash
cueapi create \
  --name "nightly-audit" \
  --cron "0 2 * * *" \
  --timezone "America/New_York" \
  --transport worker \
  --verification-mode require_external_id \
  --payload '{"task": "audit"}'

require_external_id means the outcome is only verified_success if the handler reports an external_id — the audit run ID in this case. A run that forgets to report one lands at verification_failed, which surfaces as an alert.

Step 2 — worker handler

cueapi-worker.yaml:

yaml
handlers:
  audit:
    cmd: "python3 audit.py"
    timeout: 3600
    env:
      ANTHROPIC_API_KEY: "${ANTHROPIC_API_KEY}"
      GITHUB_TOKEN: "${GITHUB_TOKEN}"
 
  check-followup:
    cmd: "python3 check_followup.py"
    timeout: 600
    env:
      GITHUB_TOKEN: "${GITHUB_TOKEN}"

Step 3 — audit.py

python
import json
import os
import uuid
from datetime import datetime, timedelta, timezone
 
import urllib.request
 
RUN_ID = str(uuid.uuid4())
 
 
def post_cue(name: str, at: datetime, payload: dict):
    body = json.dumps({
        "name": name,
        "at": at.isoformat(),
        "worker": True,
        "payload": payload,
    }).encode()
    req = urllib.request.Request(
        f"{os.environ['CUEAPI_BASE_URL']}/v1/cues",
        data=body,
        headers={
            "Authorization": f"Bearer {os.environ['CUEAPI_API_KEY']}",
            "Content-Type": "application/json",
        },
        method="POST",
    )
    with urllib.request.urlopen(req, timeout=10) as r:
        return json.loads(r.read())
 
 
def main():
    # ... agent runs audit, files issues ...
    issue_numbers = run_audit()  # your audit code
 
    # Schedule the follow-up pass.
    tomorrow_2am = (
        datetime.now(timezone.utc).replace(hour=6, minute=0, second=0, microsecond=0)
        + timedelta(days=1)
    )
    post_cue(
        name=f"check-followup-{RUN_ID}",
        at=tomorrow_2am,
        payload={
            "task": "check-followup",
            "parent_run_id": RUN_ID,
            "issues": issue_numbers,
        },
    )
 
    # Report evidence for this execution.
    with open(os.environ["CUEAPI_OUTCOME_FILE"], "w") as f:
        json.dump({
            "success": True,
            "external_id": RUN_ID,
            "result_url": f"https://github.com/org/repo/issues?q=is:issue+label:audit-{RUN_ID}",
            "result_type": "audit_run",
            "summary": f"Filed {len(issue_numbers)} issues, follow-up scheduled for tomorrow",
        }, f)
 
 
if __name__ == "__main__":
    main()

Step 4 — check_followup.py

python
import json
import os
 
def main():
    payload = json.loads(os.environ["CUEAPI_PAYLOAD"])
    parent_run_id = payload["parent_run_id"]
    issues = payload["issues"]
 
    # ... check each issue's state on GitHub ...
    still_open = check_issues(issues)
 
    with open(os.environ["CUEAPI_OUTCOME_FILE"], "w") as f:
        json.dump({
            "success": True,
            "external_id": parent_run_id,
            "result_type": "audit_followup",
            "summary": f"{len(issues) - len(still_open)} of {len(issues)} issues addressed",
            "metadata": {"still_open": still_open},
        }, f)
 
 
if __name__ == "__main__":
    main()

Why this shape is the point

A naive version of this workflow runs as a long-lived Python script with time.sleep(86400) in the middle. That works until the machine reboots, the process crashes, or the agent exits early — and when it fails, nothing fires an alert, because no one was watching. The chain + outcome-file pattern is the difference between a process and a record:

  • Every step is a durable cue on the schedule, independent of the previous step's process.
  • Every step records evidence of what happened, queryable months later.
  • Missed executions trigger alerts; silent failures are the thing CueAPI exists to prevent.
  • parent_run_id + result_type let you reconstruct the whole chain by filtering GET /v1/executions.
How do I know if my agent ran successfully?
Ctrl+K