<- Artifacts

Build Trace #2

shipped

Reliable Browser Workflow Replay test

test

Published June 19, 2026
test

Demo

Watch the workflow replay

Open on YouTube ->

Demo summary

test

What to watch for

  • test

Problem

The production problem

An h1 header

Paragraphs are separated by a blank line.

2nd paragraph. Italic, bold, and monospace. Itemized lists look like:

  • this one
  • that one
  • the other one

Note that --- not considering the asterisk --- the actual text content starts at 4-columns in.

Block quotes are written like so.

They can span multiple paragraphs, if you like.

Use 3 dashes for an em-dash. Use 2 dashes for ranges (ex., "it's all in chapters 12--14"). Three dots ... will be converted to an ellipsis. Unicode is supported. ☺

An h2 header

Here's a numbered list:

  1. first item
  2. second item
  3. third item

Note again how the actual text starts at 4 columns in (4 characters from the left side). Here's a code sample:

# Let me re-iterate ...
for i in 1 .. 10 { do-something(i) }

As you probably guessed, indented 4 spaces. By the way, instead of indenting the block, you can use delimited blocks, if you like:

define foobar() {
    print "Welcome to flavor country!";
}

(which makes copying & pasting easier). You can optionally mark the delimited block for Pandoc to syntax highlight it:

import time
# Quick, count to ten!
for i in range(10):
    # (but not *too* quick)
    time.sleep(0.5)
    print i

An h3 header

Now a nested list:

  1. First, get these ingredients:

    • carrots
    • celery
    • lentils
  2. Boil some water.

  3. Dump everything in the pot and follow this algorithm:

    find wooden spoon
    uncover pot
    stir
    cover pot
    balance wooden spoon precariously on pot handle
    wait 10 minutes
    goto first step (or shut off burner when done)
    

    Do not bump wooden spoon or it will fall.

Notice again how text always lines up on 4-space indents (including that last line which continues item 3 above).

Here's a link to a website, to a local doc, and to a section heading in the current doc. Here's a footnote 1.

Tables can look like this:

size material color


9 leather brown 10 hemp canvas natural 11 glass transparent

Table: Shoes, their sizes, and what they're made of

(The above is the caption for the table.) Pandoc also supports multi-line tables:


keyword text


red Sunsets, apples, and other red or reddish things.

green Leaves, grass, frogs and other things it's not easy being.


A horizontal rule follows.


Here's a definition list:

apples : Good for making applesauce. oranges : Citrus! tomatoes : There's no "e" in tomatoe.

Again, text is indented 4 spaces. (Put a blank line between each term/definition pair to spread things out more.)

Here's a "line block":

| Line one | Line too | Line tree

and images can be specified like so:

example image

Inline math equations go in like so: $\omega = d\phi / dt$. Display math should get its own line and be put in in double-dollarsigns:

$$I = \int \rho R^{2} dV$$

And note that you can backslash-escape any punctuation characters which you wish to be displayed literally, ex.: `foo`, *bar*, etc.

Footnotes

  1. Footnote text goes here.

What I built

The concrete system

AgentFlow-Pro

Process-supervised reinforcement learning for a multi-step reasoning agent.

A from-scratch rebuild of AgentFlow (ICLR 2026) that replaces the paper's outcome-only Flow-GRPO with a step-level Process Reward Model (PRM) and DAPO (Decoupled Clip + Dynamic Sampling Policy Optimization). The agent is a Planner → Executor → Verifier loop; only the Planner is trained, and it is trained to make better individual decisions, not just to land more correct final answers.

python rl trl backbone serving license


The thesis

Outcome-only RL (GRPO / Flow-GRPO) gives an agent one scalar of feedback per trajectory — did the final answer match? On a single-shot task that is fine. On a multi-step agentic loop it is a blunt instrument: a five-step trajectory that succeeded propagates the same gradient to a brilliant opening search and a wasteful think-loop in the middle. There is no notion of which step was the good move. Long trajectories dilute the signal across many tokens, and any prompt where all rollouts agree (all right or all wrong) contributes zero gradient while still costing a full set of rollouts.

AgentFlow-Pro attacks both problems directly:

  1. A learned Process Reward Model scores every Planner decision on a 0–1 quality scale, so the policy gets a dense, per-step signal instead of one trajectory-level bit.
  2. DAPO decouples the PPO clip bounds (lets clearly-good actions take a larger step), uses a token-level loss (a long trajectory no longer gets down-weighted into noise), filters overlong stalls, and — via a hand-built dynamic-sampling pass — drops zero-variance prompt groups so every optimizer step carries real signal.

The result is a credit-assignment story you can point at: this action was good, that one stalled, and the gradient reflects it.


Architecture

              ┌───────────┐  next action   ┌───────────┐  result   ┌───────────┐  sufficient?
   query ───► │  Planner  │ ─────────────► │ Executor  │ ────────► │ Verifier  │ ──► answer ✔
              │(trainable)│                │  (tools)  │           │  (judge)  │ ──► loop ↺
              └─────▲─────┘                └───────────┘           └─────┬─────┘
                    │                                                   │
                    └───────────────  Memory (running state)  ◄──────────┘
                          loop runs up to max_steps, else a fallback answer
  • Planner (core/planner.py) — the only trainable module. Emits a grammar-constrained {thought, action, action_input} JSON each step; action ∈ {think, search, code, answer}.
  • Executor (core/executor.py) — pure dispatch, no LLM call: search → Tavily, code → a sandboxed Python REPL, think/answer → echo.
  • Verifier (core/verifier.py) — a separate judge that decides whether the running state is enough to answer or the loop should continue. Conservative by default (parse failure ⇒ keep going).
  • Memory (core/memory.py) — in-task running state today; a Qdrant cross-episode backend sits behind the same interface as a planned extension.

The loop, module contracts, and tool internals are documented in docs/architecture.md.


What changes vs. the paper

AgentFlow (paper)AgentFlow-Pro
BackboneQwen2.5-7BQwen3-8B (bf16 + LoRA via PEFT)
RL algorithmFlow-GRPO (outcome reward)DAPO — decoupled clip + dynamic sampling
Credit assignmenttrajectory-level+ step-level, via a learned PRM
Reward modelQwen3-0.6B regression head, trained on LLM-judge labels
Tool layerbespokeFastMCP server + sandboxed Python exec
LLM servingOllama native /api/chat (see Engineering below)
Memoryin-taskin-task + Qdrant cross-episode retrieval (--memory)

Full design + the experimental protocol: docs/research.md.


Contribution 1 — DAPO, component by component

DAPO is four techniques on top of GRPO. TRL 1.4's GRPOTrainer (loss_type="dapo") implements four of the five pieces I needed; the fifth is mine. Here is exactly where each piece comes from:

DAPO componentWhere it comes from
Clip-higher (decoupled clip)TRL — epsilon=0.2, epsilon_high=0.28
Token-level policy-gradient lossTRL — loss_type="dapo"
Overlong filteringTRL — mask_truncated_completions=True
Soft overlong punishmentTRL — get_soft_overlong_punishment(...)
Dynamic samplingtrain/dynamic_sampling.py — TRL does not implement it

Dynamic sampling (curate_prompts, has_signal) is the piece I built. Before training, each candidate prompt is rolled out G times and scored by the PRM; any prompt whose G rewards have ~zero variance (pstdev < 1e-3) produces zero advantage on every token — a wasted rollout — and is dropped. What survives is a dataset of informative states where the policy can actually learn the difference between a good action and a bad one. The trainer wiring (loss_type="dapo", beta=0.0 for the KL-free DAPO objective, clip-higher, overlong handling) lives in train/dapo.py.


Contribution 2 — a learned Process Reward Model

The headline research piece. Instead of hand-tuned heuristic step rewards, the PRM is a trained model that learns what a good Planner decision looks like.

Pipeline (train/judge.pytrain/prm.pytrain/reward.py):

  1. Collect trajectories by running the untrained agent over the AIME training split, dumping full step-by-step traces (eval/run.py --benchmark aime_train).
  2. Label every step with an LLM judge (train/judge.py). The judge is DeepSeek (deepseek-chat), deliberately stronger than the 8B policy it supervises — the standard distillation/RLHF principle that your reward signal should come from a more capable model. It rates each step 0–1 on a calibrated rubric. One-shot, cheap (well under $1).
  3. Train a Qwen3-0.6B sequence-regression head (num_labels=1, MSE) on those labels (train/prm.py). MAE on a held-out split is the convergence check.
  4. Reward at training time (train/reward.py): the PRM scores each generated Planner action. Malformed JSON / unknown action ⇒ 0.0; otherwise the PRM's [0,1] score is the reward. The live RL signal is the PRM, not DeepSeek — the judge is used exactly once, to build the dataset.

A detail I care about: build_prm_input (train/data.py) is the single source of truth for the text the PRM scores, shared by the labeler, the trainer, and the reward function so they can never drift. It deliberately excludes the tool result — the PRM rates the decision, which is the thing being optimized, not the environment's response to it.

The deployed agent stays 100% Qwen3-8B. DeepSeek never runs at inference or in the RL loop.


Engineering highlights

The kind of thing that doesn't show up in a results table but is most of the actual work:

  • A 53× serving fix. Ollama's OpenAI-compatible /v1 endpoint silently ignores think: false — Qwen3 kept emitting reasoning tokens until it burned the entire budget and returned empty content, so every structured call failed to parse, retried, and degraded. The native /api/chat endpoint honors it. A trivial two-step query went from 11m27s → ~13s. The whole LLM layer (core/llm.py) is built on the native endpoint as a result.
  • Grammar-constrained structured output. Planner/Verifier pass a Pydantic model_json_schema() straight to Ollama's format field, so every required field is grammar-guaranteed — no more missing-field crashes — backed by a retry-once-then-degrade fallback that never raises.
  • A sandboxed Python REPL (tools/builtin/python_exec.py) with a stdlib whitelist (sympy/numpy/mpmath allowed because AIME needs symbolic math), auto-printing of a bare final expression, and a lenient parser that tolerates the stray indentation small models emit. Not a hardened security boundary — documented as such.
  • Leakage-free evaluation. Training data is di-zhang-fdu/AIME_1983_2024 filtered to Year ≤ 2023 (918 problems), explicitly de-duplicated against the AIME 2024 test set. The model is never trained on what it is scored on.
  • A Python-3.14 dill/datasets incompatibility worked around with a targeted monkey-patch so caching doesn't break on the dev box.

Results

Before/after, same harness, same Q4_K_M quantization — the only variable is the PRM-guided DAPO training. Full per-problem trajectories and the curated reports live in results/.

ModelBenchmarkAccuracyAvg stepsNotes
qwen3:8b (untrained)AIME 2024 (30)33.3% (10/30)4.03baseline; think off, temp=0, verified
qwen3:8b + DAPO + PRMAIME 2024 (30)30.0% (9/30)4.37flat within noise (n=30, ±~17pt CI; 11/30 flipped: +5/−6)
qwen3:8b (untrained)GPQA Diamond (100)40.0% (40/100)3.09baseline
qwen3:8b + DAPO + PRMGPQA Diamond (100)45.0% (45/100)3.19+5.0 pts — cross-domain (trained on AIME math)

What this shows. A learned PRM driving DAPO (with hand-built dynamic sampling) trained end-to-end and produced a +5.0 pt cross-domain gain on GPQA (n=100, the reliable test) from a Planner trained only on AIME math. On AIME24 the result is flat within noise — at n=30 the 95% CI is ≈±17 pts, and the policy in fact changed a lot (11 of 30 problems flipped, net −1), i.e. small-sample variance rather than a regression. The trained Planner also reasons more deliberately (avg steps ↑), the expected signature of a process reward. This is a deliberately minimal demo (300 LoRA steps, 8B); it validates the method, not a leaderboard push — see results/README.md and docs/research.md for the full analysis and the levers for larger gains (more steps, stronger PRM, outcome-reward mixing, vLLM, bigger policy).


The training pipeline (Phase 4)

Everything below is built and committed; it runs as one session on a rented 24–48 GB GPU (A40 recommended, **$8–15 total**). Step-by-step infra guide: docs/phase4-runpod-guide.md.

# on the GPU box, after `uv sync --extra rl --extra eval`
uv run python -m eval.run --benchmark aime_train --limit 150 --max-steps 6   # 1. collect trajectories
uv run python -m train.judge --runs "runs/eval_aime_train_*.json"            # 2. LLM-judge labels (DeepSeek)
uv run python -m train.prm   train --labels artifacts/prm_labels.jsonl       # 3. train the PRM
uv run python -m train.dapo  --prm artifacts/prm --runs "runs/eval_aime_train_*.json"   # 4. DAPO-train the Planner
# 5. export to GGUF, load into Ollama, re-run the eval for the "after" numbers

Quickstart

Requires Python 3.11, uv, and a running Ollama.

uv sync
ollama pull qwen3:8b

uv run python main.py "What is 15% of 240, then doubled?"
uv run python main.py "Explain how transformers work" --max-steps 3
uv run python main.py "..." --think          # enable Qwen3 reasoning tokens (slower, default off)

Web search uses Tavily — put TAVILY_API_KEY=... in .env (copy .env.example).

Evaluation

uv sync --extra eval
uv run python -m eval.run -b aime24 --limit 5 --max-steps 8    # small subset first
uv run python -m eval.run -b aime24                            # full AIME24 (30)
uv run python -m eval.run -b gpqa  --limit 5 --max-steps 6     # GPQA Diamond (gated; HF_TOKEN)

Repo map

PathWhat
core/the inference engine — llm (native-Ollama client), types (Pydantic models), memory, planner, executor, verifier, solver
tools/mcp_server.py (FastMCP), builtin/search.py (Tavily), builtin/python_exec.py (sandboxed REPL)
eval/datasets.py (AIME24 / GPQA / AIME-train loaders), scorer.py (math-verify + MC), runner.py, run.py (CLI)
train/the RL stack — data.py (shared plumbing), judge.py (LLM-judge labeling), prm.py (learned PRM), reward.py (PRM→reward), dynamic_sampling.py (the DAPO piece TRL lacks), dapo.py (trainer)
docs/architecture.md, research.md (the DAPO + PRM design), phase4-runpod-guide.md
main.pythe solve CLI
runs/eval reports (gitignored)

Contributors / agents: start with AGENTS.md; the phased plan and status live in ROADMAP.md.


Status

Scaffold · core loop · real tools · eval harness · the full DAPO + PRM training pipeline are built and committed, and the Phase 4 GPU run is complete (collect → judge → train PRM → DAPO → GGUF → re-eval). Both baselines and trained-model numbers are recorded above: GPQA +5.0 pts (cross-domain), AIME24 flat within noise. See results/ for the full analysis.

References

License

MIT. Built on the ideas of the AgentFlow paper; not affiliated with its authors.

Architecture

System shape and data flow

Orchflow

PyPI version Python versions CI License

Orchflow is a lightweight Python framework for readable multi-agent pipelines. It gives you sequential, parallel, conditional, retryable, and observable orchestration, with lightweight human review and JSON resume, without forcing every workflow into a heavy graph runtime.

Video Demo

Watch the walkthrough to see Orchflow used as a real multi-agent launch-brief pipeline: readable Python steps, parallel research branches, retries, live events, traces, JSON checkpoints, and resume after failure.

Orchflow video demo: readable, observable, recoverable multi-agent pipelines

Open the video on YouTube

pip install orchflow

Build a readable workflow with normal Python functions, then let Orchflow handle parallel execution, routing, retries, checkpointing, and traces.

import asyncio

from orchflow import Flow, JsonCheckpointStore, StepContext, condition, step


@step(name="plan", retry=2)
async def plan(input: str, context: StepContext) -> dict[str, str]:
    context.state["topic"] = input
    return {
        "topic": input,
        "audience": "AI engineering teams",
    }


@step(name="research_docs")
async def research_docs(input: str, context: StepContext) -> str:
    return "Orchflow supports retries, traces, events, and JSON resume."


@step(name="research_market")
async def research_market(input: str, context: StepContext) -> str:
    return "Teams want agent workflows without modeling everything as a graph."


@step(name="synthesize")
async def synthesize(input: str, context: StepContext) -> str:
    research = context.previous
    draft = (
        f"{context.state['topic']}: "
        f"{research['research_docs']} "
        f"{research['research_market']}"
    )
    context.state["draft"] = draft
    return draft


@step(name="publish")
async def publish(input: str, context: StepContext) -> str:
    return f"Published: {context.previous}"


@step(name="revise")
async def revise(input: str, context: StepContext) -> str:
    return f"Needs revision: {context.previous}"


async def main() -> None:
    flow = Flow(
        [
            plan,
            [research_docs, research_market],
            synthesize,
            condition(
                when=lambda ctx: len(str(ctx.previous)) <= 240,
                then=publish,
                otherwise=revise,
            ),
        ],
        name="launch-note",
    )

    result = await flow.run(
        "Orchflow v0.5 release",
        checkpoint=JsonCheckpointStore("orchflow-checkpoint.json"),
    )

    print(result.output)
    print([trace.step_name for trace in result.traces])


asyncio.run(main())

Why Orchflow Exists

Plain Python function chaining is easy to read, but it becomes fragile as soon as a workflow needs retries, parallel work, branching, shared state, or traces. Large graph frameworks are powerful, but they can add more abstraction than a small agent pipeline needs.

Orchflow sits in the middle: the user writes normal Python functions, while the framework handles orchestration mechanics that should be reliable and inspectable.

flowchart LR
    A["flow.run(input)"] --> B["Step: research"]
    B --> C{"Parallel group"}
    C --> D["Step: web_research"]
    C --> E["Step: docs_research"]
    D --> F["Merge outputs"]
    E --> F
    F --> G{"Condition"}
    G --> H["technical_writer"]
    G --> I["general_writer"]
    H --> J["FlowResult + traces"]
    I --> J

Orchflow architecture

What It Demonstrates

Orchflow is intentionally small, but it is built like a real package:

  • Async-first execution with sync-step support through worker threads
  • Sequential pipelines, parallel fan-out, and conditional routing
  • Retry policy at both flow and step level
  • Shared run state with explicit StepContext
  • Flat StepTrace records for every attempt, including failures
  • Live lifecycle events with Flow.events(...)
  • Lightweight human input gates with callback or stdin providers
  • JSON checkpoints and resume for practical long-running workflows
  • Optional LiteLLM-backed Agent without making LiteLLM a core dependency
  • Structured agent outputs with JSON schema or optional Pydantic models
  • Offline test helpers under orchflow.testing
  • Typed package metadata, CI, TestPyPI/PyPI release workflows, and tag releases

Core Concepts

Orchflow keeps the public model deliberately small.

ConceptPurpose
AgentStateless role-based LLM helper with optional LiteLLM support
AgentConfigTyped provider configuration for Agent calls
@stepDecorator for a unit of workflow work
StepContextCarries previous output, original input, metadata, and shared state
FlowOrchestrates sequential, parallel, and conditional execution
FlowResultFinal output, traces, state, timing, and failure details
FlowEventLive lifecycle event emitted while a flow runs
human_inputStep helper for pausing a flow and collecting reviewer text
JsonCheckpointStoreLocal JSON checkpoint store for resume

Sequential Flow

from orchflow import Flow, StepContext, step


@step(name="research", retry=2)
async def research(input: str, context: StepContext) -> str:
    return f"notes about {input}"


@step(name="draft")
async def draft(input: str, context: StepContext) -> str:
    return f"article based on {context.previous}"


result = await Flow([research, draft], name="content-pipeline").run(
    "AI agent orchestration"
)

print(result.output)
print([trace.step_name for trace in result.traces])

Important data-flow rule: the first input argument is always the original flow.run(...) input. Previous step output is available as context.previous.

Parallel Flow

Wrap independent steps in a list to run them concurrently.

flow = Flow([
    plan,
    [web_research, docs_research],
    synthesize,
])

result = await flow.run("workflow frameworks")

The next step receives a dictionary keyed by step name:

{
    "web_research": "...",
    "docs_research": "..."
}

Parallel steps produce separate flat trace entries with the same parallel_group_id.

Conditional Flow

from orchflow import Flow, condition

flow = Flow([
    classify,
    condition(
        when=lambda ctx: ctx.previous == "technical",
        then=technical_writer,
        otherwise=general_writer,
    ),
])

The predicate receives the current StepContext, so routing can use context.previous, shared state, or run metadata.

Human Review

Use human_input(...) when a pipeline needs a lightweight review point without adding checkpointing, queues, or a separate UI.

from orchflow import Flow, StepContext, condition, human_input, step


@step
async def draft(input: str, context: StepContext) -> str:
    text = f"Draft about {input}"
    context.state["draft"] = text
    return text


review = human_input(
    lambda ctx: f"Review this draft:\n{ctx.previous}\n\nDecision: ",
    name="human_review",
)


@step
async def publish(input: str, context: StepContext) -> str:
    return f"Published: {context.state['draft']}"


@step
async def revise(input: str, context: StepContext) -> str:
    return f"Revision requested: {context.previous}"


flow = Flow([
    draft,
    review,
    condition(
        when=lambda ctx: str(ctx.previous).strip().lower() == "approve",
        then=publish,
        otherwise=revise,
    ),
])

By default, human_input(...) reads from stdin. Applications and tests can pass a sync or async provider(prompt, context) callback instead. The human response is normal step output, so it is available as context.previous to the next step.

Checkpoint And Resume

Use JsonCheckpointStore when a flow should survive a transient failure without rerunning completed top-level work.

from orchflow import Flow, JsonCheckpointStore

store = JsonCheckpointStore("orchflow-checkpoint.json")
flow = Flow([collect, draft, publish], name="checkpointed-pipeline")

first = await flow.run(
    "AI agent orchestration",
    checkpoint=store,
    raise_on_error=False,
)

if not first.success:
    resumed = await flow.resume(store)
    print(resumed.output)

Checkpoints are plain JSON and are saved after each completed top-level item: single steps, selected condition branches, or complete parallel groups. A failed parallel group resumes by rerunning the whole group. Successful flows keep the checkpoint file and mark it completed for inspection.

Live Events

Flow.events(...) lets applications observe a workflow while it runs.

async for event in flow.events("agent observability"):
    print(event.type, event.step_name, event.attempt)

Event types:

  • flow_started
  • step_started
  • step_completed
  • step_failed
  • retry_scheduled
  • flow_completed
  • flow_failed
  • checkpoint_saved
  • checkpoint_loaded

Events are orchestration lifecycle events, not token streaming. The final flow_completed or flow_failed event carries a FlowResult.

Trace Output

Every step attempt creates a flat StepTrace.

result = await flow.run("topic")

for trace in result.traces:
    print(trace.to_dict())

Example shape:

{
    "step_name": "draft",
    "input": "topic",
    "output": "article...",
    "error": None,
    "attempt": 1,
    "parallel_group_id": None,
    "duration_seconds": 0.42,
    "started_at": "2026-05-10T03:13:48.994932+00:00",
    "ended_at": "2026-05-10T03:13:49.414932+00:00",
    "success": True,
}

Optional LLM Agent

Core Orchflow has no runtime dependencies. The public Agent uses LiteLLM only when you install the optional extra:

pip install "orchflow[litellm]"
from orchflow import Agent

writer = Agent(
    name="writer",
    role="You write concise technical explanations.",
    model="gpt-4o-mini",
)

text = await writer.run("Explain lightweight orchestration")

For structured workflows, use AgentConfig and run_structured(...):

from orchflow import Agent, AgentConfig

extractor = Agent(
    name="extractor",
    role="Extract structured data. Return only JSON.",
    config=AgentConfig(model="gpt-4o-mini", temperature=0),
)

person = await extractor.run_structured(
    "Ada works at OpenAI.",
    schema={
        "title": "person",
        "type": "object",
        "properties": {
            "name": {"type": "string"},
            "company": {"type": "string"},
        },
        "required": ["name", "company"],
    },
)

run_structured(...) returns parsed JSON for schema dictionaries and a Pydantic model instance when a Pydantic model class is passed. Tool-calling loops, memory, and durable agent state are intentionally outside the current scope. Orchflow focuses on orchestration first.

Examples

uv run python examples/basic_sequential.py
uv run python examples/parallel_steps.py
uv run python examples/conditional_flow.py
uv run python examples/live_events.py
uv run python examples/human_review.py
uv run python examples/checkpoint_resume.py

Optional LiteLLM-backed examples after installing orchflow[litellm] and configuring a provider API key:

uv run python examples/litellm_agent.py
uv run python examples/structured_agent.py

Docs:

Development

uv sync --extra dev
uv run pytest
uv run ruff check
uv run ruff format --check
uv run pyright
uv build

Release Process

TestPyPI publishing is manual through .github/workflows/publish-testpypi.yml. Real PyPI publishing is tag-based through .github/workflows/publish-pypi.yml.

git tag -a v0.5.0 -m "Release v0.5.0"
git push origin v0.5.0

The release workflow verifies that the Git tag matches pyproject.toml, uploads to PyPI through trusted publishing, and creates a GitHub Release.

Roadmap

  • 0.5.x: structured agent polish and docs improvements
  • 0.6.0: evaluate one-turn tool execution

Source Of Truth

Project decisions live in AGENTS.md. Implementation follows that document.

Implementation details

Design choices and constraints

Orchflow

PyPI version Python versions CI License

Orchflow is a lightweight Python framework for readable multi-agent pipelines. It gives you sequential, parallel, conditional, retryable, and observable orchestration, with lightweight human review and JSON resume, without forcing every workflow into a heavy graph runtime.

Video Demo

Watch the walkthrough to see Orchflow used as a real multi-agent launch-brief pipeline: readable Python steps, parallel research branches, retries, live events, traces, JSON checkpoints, and resume after failure.

Orchflow video demo: readable, observable, recoverable multi-agent pipelines

Open the video on YouTube

pip install orchflow

Build a readable workflow with normal Python functions, then let Orchflow handle parallel execution, routing, retries, checkpointing, and traces.

import asyncio

from orchflow import Flow, JsonCheckpointStore, StepContext, condition, step


@step(name="plan", retry=2)
async def plan(input: str, context: StepContext) -> dict[str, str]:
    context.state["topic"] = input
    return {
        "topic": input,
        "audience": "AI engineering teams",
    }


@step(name="research_docs")
async def research_docs(input: str, context: StepContext) -> str:
    return "Orchflow supports retries, traces, events, and JSON resume."


@step(name="research_market")
async def research_market(input: str, context: StepContext) -> str:
    return "Teams want agent workflows without modeling everything as a graph."


@step(name="synthesize")
async def synthesize(input: str, context: StepContext) -> str:
    research = context.previous
    draft = (
        f"{context.state['topic']}: "
        f"{research['research_docs']} "
        f"{research['research_market']}"
    )
    context.state["draft"] = draft
    return draft


@step(name="publish")
async def publish(input: str, context: StepContext) -> str:
    return f"Published: {context.previous}"


@step(name="revise")
async def revise(input: str, context: StepContext) -> str:
    return f"Needs revision: {context.previous}"


async def main() -> None:
    flow = Flow(
        [
            plan,
            [research_docs, research_market],
            synthesize,
            condition(
                when=lambda ctx: len(str(ctx.previous)) <= 240,
                then=publish,
                otherwise=revise,
            ),
        ],
        name="launch-note",
    )

    result = await flow.run(
        "Orchflow v0.5 release",
        checkpoint=JsonCheckpointStore("orchflow-checkpoint.json"),
    )

    print(result.output)
    print([trace.step_name for trace in result.traces])


asyncio.run(main())

Why Orchflow Exists

Plain Python function chaining is easy to read, but it becomes fragile as soon as a workflow needs retries, parallel work, branching, shared state, or traces. Large graph frameworks are powerful, but they can add more abstraction than a small agent pipeline needs.

Orchflow sits in the middle: the user writes normal Python functions, while the framework handles orchestration mechanics that should be reliable and inspectable.

flowchart LR
    A["flow.run(input)"] --> B["Step: research"]
    B --> C{"Parallel group"}
    C --> D["Step: web_research"]
    C --> E["Step: docs_research"]
    D --> F["Merge outputs"]
    E --> F
    F --> G{"Condition"}
    G --> H["technical_writer"]
    G --> I["general_writer"]
    H --> J["FlowResult + traces"]
    I --> J

Orchflow architecture

What It Demonstrates

Orchflow is intentionally small, but it is built like a real package:

  • Async-first execution with sync-step support through worker threads
  • Sequential pipelines, parallel fan-out, and conditional routing
  • Retry policy at both flow and step level
  • Shared run state with explicit StepContext
  • Flat StepTrace records for every attempt, including failures
  • Live lifecycle events with Flow.events(...)
  • Lightweight human input gates with callback or stdin providers
  • JSON checkpoints and resume for practical long-running workflows
  • Optional LiteLLM-backed Agent without making LiteLLM a core dependency
  • Structured agent outputs with JSON schema or optional Pydantic models
  • Offline test helpers under orchflow.testing
  • Typed package metadata, CI, TestPyPI/PyPI release workflows, and tag releases

Core Concepts

Orchflow keeps the public model deliberately small.

ConceptPurpose
AgentStateless role-based LLM helper with optional LiteLLM support
AgentConfigTyped provider configuration for Agent calls
@stepDecorator for a unit of workflow work
StepContextCarries previous output, original input, metadata, and shared state
FlowOrchestrates sequential, parallel, and conditional execution
FlowResultFinal output, traces, state, timing, and failure details
FlowEventLive lifecycle event emitted while a flow runs
human_inputStep helper for pausing a flow and collecting reviewer text
JsonCheckpointStoreLocal JSON checkpoint store for resume

Sequential Flow

from orchflow import Flow, StepContext, step


@step(name="research", retry=2)
async def research(input: str, context: StepContext) -> str:
    return f"notes about {input}"


@step(name="draft")
async def draft(input: str, context: StepContext) -> str:
    return f"article based on {context.previous}"


result = await Flow([research, draft], name="content-pipeline").run(
    "AI agent orchestration"
)

print(result.output)
print([trace.step_name for trace in result.traces])

Important data-flow rule: the first input argument is always the original flow.run(...) input. Previous step output is available as context.previous.

Parallel Flow

Wrap independent steps in a list to run them concurrently.

flow = Flow([
    plan,
    [web_research, docs_research],
    synthesize,
])

result = await flow.run("workflow frameworks")

The next step receives a dictionary keyed by step name:

{
    "web_research": "...",
    "docs_research": "..."
}

Parallel steps produce separate flat trace entries with the same parallel_group_id.

Conditional Flow

from orchflow import Flow, condition

flow = Flow([
    classify,
    condition(
        when=lambda ctx: ctx.previous == "technical",
        then=technical_writer,
        otherwise=general_writer,
    ),
])

The predicate receives the current StepContext, so routing can use context.previous, shared state, or run metadata.

Human Review

Use human_input(...) when a pipeline needs a lightweight review point without adding checkpointing, queues, or a separate UI.

from orchflow import Flow, StepContext, condition, human_input, step


@step
async def draft(input: str, context: StepContext) -> str:
    text = f"Draft about {input}"
    context.state["draft"] = text
    return text


review = human_input(
    lambda ctx: f"Review this draft:\n{ctx.previous}\n\nDecision: ",
    name="human_review",
)


@step
async def publish(input: str, context: StepContext) -> str:
    return f"Published: {context.state['draft']}"


@step
async def revise(input: str, context: StepContext) -> str:
    return f"Revision requested: {context.previous}"


flow = Flow([
    draft,
    review,
    condition(
        when=lambda ctx: str(ctx.previous).strip().lower() == "approve",
        then=publish,
        otherwise=revise,
    ),
])

By default, human_input(...) reads from stdin. Applications and tests can pass a sync or async provider(prompt, context) callback instead. The human response is normal step output, so it is available as context.previous to the next step.

Checkpoint And Resume

Use JsonCheckpointStore when a flow should survive a transient failure without rerunning completed top-level work.

from orchflow import Flow, JsonCheckpointStore

store = JsonCheckpointStore("orchflow-checkpoint.json")
flow = Flow([collect, draft, publish], name="checkpointed-pipeline")

first = await flow.run(
    "AI agent orchestration",
    checkpoint=store,
    raise_on_error=False,
)

if not first.success:
    resumed = await flow.resume(store)
    print(resumed.output)

Checkpoints are plain JSON and are saved after each completed top-level item: single steps, selected condition branches, or complete parallel groups. A failed parallel group resumes by rerunning the whole group. Successful flows keep the checkpoint file and mark it completed for inspection.

Live Events

Flow.events(...) lets applications observe a workflow while it runs.

async for event in flow.events("agent observability"):
    print(event.type, event.step_name, event.attempt)

Event types:

  • flow_started
  • step_started
  • step_completed
  • step_failed
  • retry_scheduled
  • flow_completed
  • flow_failed
  • checkpoint_saved
  • checkpoint_loaded

Events are orchestration lifecycle events, not token streaming. The final flow_completed or flow_failed event carries a FlowResult.

Trace Output

Every step attempt creates a flat StepTrace.

result = await flow.run("topic")

for trace in result.traces:
    print(trace.to_dict())

Example shape:

{
    "step_name": "draft",
    "input": "topic",
    "output": "article...",
    "error": None,
    "attempt": 1,
    "parallel_group_id": None,
    "duration_seconds": 0.42,
    "started_at": "2026-05-10T03:13:48.994932+00:00",
    "ended_at": "2026-05-10T03:13:49.414932+00:00",
    "success": True,
}

Optional LLM Agent

Core Orchflow has no runtime dependencies. The public Agent uses LiteLLM only when you install the optional extra:

pip install "orchflow[litellm]"
from orchflow import Agent

writer = Agent(
    name="writer",
    role="You write concise technical explanations.",
    model="gpt-4o-mini",
)

text = await writer.run("Explain lightweight orchestration")

For structured workflows, use AgentConfig and run_structured(...):

from orchflow import Agent, AgentConfig

extractor = Agent(
    name="extractor",
    role="Extract structured data. Return only JSON.",
    config=AgentConfig(model="gpt-4o-mini", temperature=0),
)

person = await extractor.run_structured(
    "Ada works at OpenAI.",
    schema={
        "title": "person",
        "type": "object",
        "properties": {
            "name": {"type": "string"},
            "company": {"type": "string"},
        },
        "required": ["name", "company"],
    },
)

run_structured(...) returns parsed JSON for schema dictionaries and a Pydantic model instance when a Pydantic model class is passed. Tool-calling loops, memory, and durable agent state are intentionally outside the current scope. Orchflow focuses on orchestration first.

Examples

uv run python examples/basic_sequential.py
uv run python examples/parallel_steps.py
uv run python examples/conditional_flow.py
uv run python examples/live_events.py
uv run python examples/human_review.py
uv run python examples/checkpoint_resume.py

Optional LiteLLM-backed examples after installing orchflow[litellm] and configuring a provider API key:

uv run python examples/litellm_agent.py
uv run python examples/structured_agent.py

Docs:

Development

uv sync --extra dev
uv run pytest
uv run ruff check
uv run ruff format --check
uv run pyright
uv build

Release Process

TestPyPI publishing is manual through .github/workflows/publish-testpypi.yml. Real PyPI publishing is tag-based through .github/workflows/publish-pypi.yml.

git tag -a v0.5.0 -m "Release v0.5.0"
git push origin v0.5.0

The release workflow verifies that the Git tag matches pyproject.toml, uploads to PyPI through trusted publishing, and creates a GitHub Release.

Roadmap

  • 0.5.x: structured agent polish and docs improvements
  • 0.6.0: evaluate one-turn tool execution

Source Of Truth

Project decisions live in AGENTS.md. Implementation follows that document.

Related links

More context