🔗 Blueprint Module
The blueprint/ module provides multi-agent orchestration with workflows, patterns, and pipelines.
Module Overview
File Structure
src/openstackai/blueprint/
├── __init__.py
├── workflow.py # Workflow and Step classes
├── patterns.py # Orchestration patterns
├── pipeline.py # Pipeline processing
├── orchestrator.py # High-level orchestration
└── blueprint.py # Blueprint definitions
Workflow
Multi-step agent processes with context sharing.
Basic Workflow
from openstackai import Agent
from openstackai.blueprint import Workflow, Step
# Create agents
researcher = Agent(name="Researcher", instructions="Find information")
writer = Agent(name="Writer", instructions="Write content")
editor = Agent(name="Editor", instructions="Polish content")
# Build workflow
workflow = (Workflow("ContentPipeline")
.add_step(Step("research", researcher))
.add_step(Step("write", writer))
.add_step(Step("edit", editor))
.build())
# Execute
result = await workflow.run("Write an article about AI")
Workflow Architecture
Step
Individual workflow step definition.
Step Types
Step Configuration
from openstackai.blueprint import Step, StepType
# Agent step
agent_step = Step(
name="research",
executor=researcher,
step_type=StepType.AGENT
)
# Function step
def custom_process(context):
return context.get("data").upper()
function_step = Step(
name="process",
executor=custom_process,
step_type=StepType.FUNCTION
)
# Conditional step
condition_step = Step(
name="check",
executor=lambda ctx: ctx.get("approved"),
step_type=StepType.CONDITION,
on_true="approve_step",
on_false="reject_step"
)
WorkflowContext
Context passed through workflow steps.
from openstackai.blueprint import WorkflowContext
context = WorkflowContext()
# Set/get values
context.set("key", "value")
value = context.get("key")
# Update multiple
context.update({"a": 1, "b": 2})
# Add to history
context.add_to_history("step1", result)
# Clone context
new_context = context.clone()
Orchestration Patterns
ChainPattern
Sequential execution through multiple agents.
from openstackai.blueprint import ChainPattern
chain = ChainPattern([
researcher,
writer,
editor
])
result = await chain.run("Write about AI trends")
RouterPattern
Intelligent routing to specialized agents.
from openstackai.blueprint import RouterPattern
router = RouterPattern()
# Add routes with keywords
router.add_route("code", coder, keywords=["python", "code", "function"])
router.add_route("math", calculator, keywords=["calculate", "math", "equation"])
router.add_route("write", writer, keywords=["write", "content", "article"])
# Or with custom classifier
router.add_route("code", coder, classifier=lambda q: "code" in q.lower())
# Execute - automatically routes
result = await router.run("Write a Python function") # → coder
result = await router.run("Calculate 2+2") # → calculator
MapReducePattern
Parallel processing with result aggregation.
from openstackai.blueprint import MapReducePattern
# Create mappers (parallel workers)
mappers = [
Agent(name="Researcher1", instructions="Research topic A"),
Agent(name="Researcher2", instructions="Research topic B"),
Agent(name="Researcher3", instructions="Research topic C")
]
# Create reducer (synthesizer)
reducer = Agent(name="Synthesizer", instructions="Combine research")
pattern = MapReducePattern(
mappers=mappers,
reducer=reducer
)
result = await pattern.run("Research AI trends")
SupervisorPattern
Manager-worker hierarchy.
from openstackai.blueprint import SupervisorPattern
pattern = SupervisorPattern(
manager=Agent(name="Manager", instructions="Coordinate and delegate"),
workers=[
Agent(name="Coder", instructions="Write code"),
Agent(name="Tester", instructions="Write tests"),
Agent(name="Reviewer", instructions="Review code")
]
)
result = await pattern.run("Build a REST API")
Pipeline
Processing pipeline for data transformation.
from openstackai.blueprint import Pipeline, PipelineStage
pipeline = Pipeline("DataPipeline")
# Add stages
pipeline.add_stage(PipelineStage(
name="extract",
processor=extract_data
))
pipeline.add_stage(PipelineStage(
name="transform",
processor=transform_data
))
pipeline.add_stage(PipelineStage(
name="load",
processor=load_data
))
# Execute
result = await pipeline.run(input_data)
Complex Workflow Example
from openstackai import Agent
from openstackai.blueprint import Workflow, Step, StepType
# Define agents
planner = Agent(name="Planner", instructions="Create project plans")
researchers = [
Agent(name="TechResearcher", instructions="Research technology"),
Agent(name="MarketResearcher", instructions="Research market")
]
writer = Agent(name="Writer", instructions="Write reports")
reviewer = Agent(name="Reviewer", instructions="Review and improve")
# Build complex workflow
workflow = (Workflow("ProjectResearch")
# Phase 1: Planning
.add_step(Step("plan", planner))
# Phase 2: Parallel research
.add_step(Step(
"research",
researchers,
step_type=StepType.PARALLEL
))
# Phase 3: Writing
.add_step(Step("write", writer))
# Phase 4: Review with condition
.add_step(Step(
"review",
reviewer,
step_type=StepType.CONDITION,
on_true="finalize",
on_false="write" # Loop back if not approved
))
# Phase 5: Finalize
.add_step(Step("finalize", lambda ctx: ctx.get("report")))
.build())
➡️ [[Skills-Module]] | [[Runner-Module]] | [[Home]]