← Projects

Agent Orchestration Framework

Orchflow

A dependency-free Python framework for readable multi-agent pipelines: sequential, parallel, conditional, and resumable flows.

0 deps

zero runtime dependencies in core

AsyncIOLiteLLMPydantic
  • Sequential, parallel, conditional & retryable flows
  • Shared StepContext, flat traces, live lifecycle events
  • JSON checkpoint / resume + lightweight human-review gates
  • Zero core dependencies; optional LiteLLM Agent with structured output

The problem

Between fragile function-chaining and heavy graph runtimes

Chaining a few async functions is fine — until the pipeline needs retries, parallel work, branching, shared state, or traces. Then the glue code grows faster than the logic.

Full graph frameworks solve that, but they ask you to rebuild your workflow as nodes and edges in their runtime. Orchflow sits in the middle: you write normal Python functions, and it handles the orchestration mechanics — with nothing to install but itself.

How it works

Compose functions into a Flow

decorate

@step

Write async functions taking (input, context). Add retry= per step. Sync functions run on worker threads.

compose

Flow([...])

List steps to run in order; nest a list to fan out in parallel; use condition() to branch.

execute

run / resume

Run with retries and a checkpoint store; resume() picks up where a failed run stopped.

observe

FlowResult

Get output, shared state, success, and a flat list of StepTrace records — plus live events.

The API

The pipeline reads like the work

a step is a normal async function
from orchflow import Flow, StepContext, step, condition
 
@step(name="research", retry=2)
async def research(input: str, context: StepContext) -> str:
context.state["topic"] = input
return f"findings about {input}"
sequential, parallel (nested list), conditional
flow = Flow([
plan,
[web_research, docs_research], # run concurrently
condition(
when=lambda ctx: ctx.previous == "technical",
then=technical_writer,
otherwise=general_writer,
),
])
result = await flow.run("transformers vs. RNNs")

resilience, built in

JsonCheckpointStore saves after each completed item; if a run fails, await flow.resume(store) continues from there. human_input() drops a review gate into the flow with no separate UI or queue.

See it run

A parallel flow's live event stream

flow.events(...) yields lifecycle events as the pipeline executes — plan, a concurrent fan-out, a merge, then synthesis.

orchflow · content-pipeline
1/7
RUN
await flow.run("transformers vs. RNNs")
representative trace

What I built

Lightweight on purpose

Zero runtime dependencies

The core is pure standard-library Python — nothing to pin, audit, or break on upgrade. LiteLLM is pulled in only if you opt into the optional Agent.

Steps are just functions

An @step is a normal async function taking (input, context). No nodes, no graph DSL — you read the pipeline top to bottom like the code it is.

Flat traces, live events

Every attempt emits a StepTrace; flow_started → step_completed → retry_scheduled → flow_completed stream live via flow.events(...). Observability without a backend.

Checkpoint, resume, and gate

JsonCheckpointStore saves after each top-level item, so flow.resume() picks up a failed run. human_input() adds a review gate with no separate UI or queue.

Get started

Install from PyPI

pip
$ pip install orchflow
$ pip install "orchflow[litellm]"

v0.5.0 · tag-based PyPI releases · core has zero runtime dependencies.

← All projects