Agent Orchestration Framework
Orchflow
A dependency-free Python framework for readable multi-agent pipelines: sequential, parallel, conditional, and resumable flows.
0 deps
zero runtime dependencies in core
- Sequential, parallel, conditional & retryable flows
- Shared StepContext, flat traces, live lifecycle events
- JSON checkpoint / resume + lightweight human-review gates
- Zero core dependencies; optional LiteLLM Agent with structured output
The problem
Between fragile function-chaining and heavy graph runtimes
Chaining a few async functions is fine — until the pipeline needs retries, parallel work, branching, shared state, or traces. Then the glue code grows faster than the logic.
Full graph frameworks solve that, but they ask you to rebuild your workflow as nodes and edges in their runtime. Orchflow sits in the middle: you write normal Python functions, and it handles the orchestration mechanics — with nothing to install but itself.
How it works
Compose functions into a Flow
@step
Write async functions taking (input, context). Add retry= per step. Sync functions run on worker threads.
Flow([...])
List steps to run in order; nest a list to fan out in parallel; use condition() to branch.
run / resume
Run with retries and a checkpoint store; resume() picks up where a failed run stopped.
FlowResult
Get output, shared state, success, and a flat list of StepTrace records — plus live events.
The API
The pipeline reads like the work
from orchflow import Flow, StepContext, step, condition@step(name="research", retry=2)async def research(input: str, context: StepContext) -> str:context.state["topic"] = inputreturn f"findings about {input}"
flow = Flow([plan,[web_research, docs_research], # run concurrentlycondition(when=lambda ctx: ctx.previous == "technical",then=technical_writer,otherwise=general_writer,),])result = await flow.run("transformers vs. RNNs")
resilience, built in
JsonCheckpointStore saves after each completed item; if a run fails, await flow.resume(store) continues from there. human_input() drops a review gate into the flow with no separate UI or queue.
See it run
A parallel flow's live event stream
flow.events(...) yields lifecycle events as the pipeline executes — plan, a concurrent fan-out, a merge, then synthesis.
await flow.run("transformers vs. RNNs")What I built
Lightweight on purpose
Zero runtime dependencies
The core is pure standard-library Python — nothing to pin, audit, or break on upgrade. LiteLLM is pulled in only if you opt into the optional Agent.
Steps are just functions
An @step is a normal async function taking (input, context). No nodes, no graph DSL — you read the pipeline top to bottom like the code it is.
Flat traces, live events
Every attempt emits a StepTrace; flow_started → step_completed → retry_scheduled → flow_completed stream live via flow.events(...). Observability without a backend.
Checkpoint, resume, and gate
JsonCheckpointStore saves after each top-level item, so flow.resume() picks up a failed run. human_input() adds a review gate with no separate UI or queue.
Get started
Install from PyPI
$ pip install orchflow$ pip install "orchflow[litellm]"
v0.5.0 · tag-based PyPI releases · core has zero runtime dependencies.