Skip to main content

Workflows

Multi-step agent processes in the blueprint/ module.


Basic Workflow

from openstackai import Agent
from openstackai.blueprint import Workflow, Step

# Create specialized agents
researcher = Agent(name="Researcher", instructions="Find information on topics.")
writer = Agent(name="Writer", instructions="Write clear, engaging content.")
editor = Agent(name="Editor", instructions="Review and improve writing.")

# Build workflow
workflow = (Workflow("ContentPipeline")
.add_step(Step("research", researcher))
.add_step(Step("write", writer))
.add_step(Step("edit", editor))
.build())

# Run
result = await workflow.run("Create an article about AI trends")
print(result.final_output)

Step Types

TypeDescriptionUse Case
AGENTExecute an agentMain processing
SKILLCall a specific skillTool invocation
FUNCTIONRun Python functionCustom logic
CONDITIONConditional branchingDecision points
PARALLELConcurrent executionSpeed optimization
LOOPRepeated executionIteration

Conditional Steps

from openstackai.blueprint import Workflow, Step, Condition

def needs_review(context):
return context.get("complexity") == "high"

workflow = (Workflow("ReviewPipeline")
.add_step(Step("analyze", analyzer))
.add_step(Step("review", reviewer, condition=Condition(needs_review)))
.add_step(Step("finalize", finalizer))
.build())

Parallel Execution

from openstackai.blueprint import Workflow, ParallelStep

# Run multiple agents concurrently
workflow = (Workflow("ParallelResearch")
.add_step(ParallelStep("research", [
("tech", tech_researcher),
("market", market_researcher),
("competitor", competitor_researcher),
]))
.add_step(Step("synthesize", synthesizer))
.build())

Workflow Context

Data flows through the workflow via context:

from openstackai.blueprint import WorkflowContext

# Initial context
context = WorkflowContext(
input="Research topic",
variables={"audience": "technical"}
)

result = await workflow.run(context)

# Access step outputs
print(result.steps["research"].output)
print(result.steps["write"].output)

Error Handling

workflow = (Workflow("SafePipeline")
.add_step(Step("process", processor,
on_error="skip", # skip, retry, fail
retry_count=3,
retry_delay=1.0
))
.build())

Real-World Example: Content Creation

from openstackai import Agent
from openstackai.blueprint import Workflow, Step, ParallelStep

# Specialized team
topic_researcher = Agent(
name="TopicResearcher",
instructions="Research topics deeply and provide key facts."
)

seo_specialist = Agent(
name="SEOSpecialist",
instructions="Identify keywords and SEO opportunities."
)

content_writer = Agent(
name="ContentWriter",
instructions="Write engaging blog posts with SEO optimization."
)

editor = Agent(
name="Editor",
instructions="Polish content for grammar, clarity, and flow."
)

# Content factory workflow
content_factory = (Workflow("ContentFactory")
# Parallel research
.add_step(ParallelStep("research", [
("topic", topic_researcher),
("seo", seo_specialist),
]))
# Sequential writing and editing
.add_step(Step("write", content_writer))
.add_step(Step("edit", editor))
.build())

# Generate content
result = await content_factory.run("AI in Healthcare: 2024 Trends")
print(result.final_output)

Workflow Persistence

Save and resume workflows:

# Save checkpoint
checkpoint = workflow.save_checkpoint()

# Resume later
workflow.restore_checkpoint(checkpoint)
result = await workflow.resume()

Combining with Patterns

Workflows can use orchestration patterns:

from openstackai.blueprint import ChainPattern, RouterPattern

# Chain pattern within workflow
chain = ChainPattern()
chain.add("analyze", analyzer)
chain.add("recommend", recommender)

workflow = (Workflow("Analysis")
.add_step(Step("chain", chain))
.build())

Next Steps

  • [[Orchestration Patterns]] - Chain, Router, MapReduce, Supervisor
  • [[Handoffs]] - Agent-to-agent transfers
  • [[Agent]] - Create agents