Argus-style Workflow
A multi-step agent workflow with CueAPI as the coordinator, modeled after Argus.
What Argus does, abbreviated
Argus is a multi-agent pipeline that audits the CueAPI sprint across five repos: it kicks off on a schedule, runs a series of specialized agent passes, writes each finding with an evidence URL, and chains a follow-up pass whenever new work appears. It's a good concrete shape for "a real agent workflow that relies on CueAPI not failing silently."
This guide walks through the same pattern with a smaller example: a nightly codebase-audit agent that files issues when it finds problems, and re-runs itself the next morning to check whether those issues were addressed.
Coordination shape
┌──────────────────────────────────────────────────────────────┐
│ nightly-audit cue (cron 0 2 * * *) │
│ ↓ │
│ worker handler audit.py │
│ - runs the audit agent │
│ - files issues for each finding │
│ - writes evidence to $CUEAPI_OUTCOME_FILE │
│ - POSTs /v1/cues to schedule a follow-up in 24h │
│ ↓ │
│ follow-up-<exec_id> cue (at T+24h) │
│ ↓ │
│ worker handler check_followup.py │
│ - verifies each issue was addressed │
│ - reports evidence │
└──────────────────────────────────────────────────────────────┘
Three CueAPI features are doing the coordination work:
- Cron-scheduled kickoff — the top-level cue fires on time even if nothing else is running.
- Outcome file — every pass writes evidence, so "did the audit actually happen?" is answerable from the execution record.
- Chain pattern — each pass schedules the next one directly, with no orchestrator in the loop.
Step 1 — schedule the kickoff cue
cueapi create \
--name "nightly-audit" \
--cron "0 2 * * *" \
--timezone "America/New_York" \
--transport worker \
--verification-mode require_external_id \
--payload '{"task": "audit"}'require_external_id means the outcome is only verified_success if the handler reports an external_id — the audit run ID in this case. A run that forgets to report one lands at verification_failed, which surfaces as an alert.
Step 2 — worker handler
cueapi-worker.yaml:
handlers:
audit:
cmd: "python3 audit.py"
timeout: 3600
env:
ANTHROPIC_API_KEY: "${ANTHROPIC_API_KEY}"
GITHUB_TOKEN: "${GITHUB_TOKEN}"
check-followup:
cmd: "python3 check_followup.py"
timeout: 600
env:
GITHUB_TOKEN: "${GITHUB_TOKEN}"Step 3 — audit.py
import json
import os
import uuid
from datetime import datetime, timedelta, timezone
import urllib.request
RUN_ID = str(uuid.uuid4())
def post_cue(name: str, at: datetime, payload: dict):
body = json.dumps({
"name": name,
"at": at.isoformat(),
"worker": True,
"payload": payload,
}).encode()
req = urllib.request.Request(
f"{os.environ['CUEAPI_BASE_URL']}/v1/cues",
data=body,
headers={
"Authorization": f"Bearer {os.environ['CUEAPI_API_KEY']}",
"Content-Type": "application/json",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=10) as r:
return json.loads(r.read())
def main():
# ... agent runs audit, files issues ...
issue_numbers = run_audit() # your audit code
# Schedule the follow-up pass.
tomorrow_2am = (
datetime.now(timezone.utc).replace(hour=6, minute=0, second=0, microsecond=0)
+ timedelta(days=1)
)
post_cue(
name=f"check-followup-{RUN_ID}",
at=tomorrow_2am,
payload={
"task": "check-followup",
"parent_run_id": RUN_ID,
"issues": issue_numbers,
},
)
# Report evidence for this execution.
with open(os.environ["CUEAPI_OUTCOME_FILE"], "w") as f:
json.dump({
"success": True,
"external_id": RUN_ID,
"result_url": f"https://github.com/org/repo/issues?q=is:issue+label:audit-{RUN_ID}",
"result_type": "audit_run",
"summary": f"Filed {len(issue_numbers)} issues, follow-up scheduled for tomorrow",
}, f)
if __name__ == "__main__":
main()Step 4 — check_followup.py
import json
import os
def main():
payload = json.loads(os.environ["CUEAPI_PAYLOAD"])
parent_run_id = payload["parent_run_id"]
issues = payload["issues"]
# ... check each issue's state on GitHub ...
still_open = check_issues(issues)
with open(os.environ["CUEAPI_OUTCOME_FILE"], "w") as f:
json.dump({
"success": True,
"external_id": parent_run_id,
"result_type": "audit_followup",
"summary": f"{len(issues) - len(still_open)} of {len(issues)} issues addressed",
"metadata": {"still_open": still_open},
}, f)
if __name__ == "__main__":
main()Why this shape is the point
A naive version of this workflow runs as a long-lived Python script with time.sleep(86400) in the middle. That works until the machine reboots, the process crashes, or the agent exits early — and when it fails, nothing fires an alert, because no one was watching. The chain + outcome-file pattern is the difference between a process and a record:
- Every step is a durable cue on the schedule, independent of the previous step's process.
- Every step records evidence of what happened, queryable months later.
- Missed executions trigger alerts; silent failures are the thing CueAPI exists to prevent.
parent_run_id+result_typelet you reconstruct the whole chain by filteringGET /v1/executions.
Related
- Outcome File — evidence reporting mechanics
- Chain Pattern — the handler-schedules-next-cue primitive
- Agent Turn Scheduling — the simpler one-shot variant
- Outcome States — the verification model in play here
- On-Failure Escalation — alerting when a chain stalls