Skip to main content

🔗 Blueprint Module

The blueprint/ module provides multi-agent orchestration with workflows, patterns, and pipelines.


Module Overview


File Structure

src/openstackai/blueprint/
├── __init__.py
├── workflow.py # Workflow and Step classes
├── patterns.py # Orchestration patterns
├── pipeline.py # Pipeline processing
├── orchestrator.py # High-level orchestration
└── blueprint.py # Blueprint definitions

Workflow

Multi-step agent processes with context sharing.

Basic Workflow

from openstackai import Agent
from openstackai.blueprint import Workflow, Step

# Create agents
researcher = Agent(name="Researcher", instructions="Find information")
writer = Agent(name="Writer", instructions="Write content")
editor = Agent(name="Editor", instructions="Polish content")

# Build workflow
workflow = (Workflow("ContentPipeline")
.add_step(Step("research", researcher))
.add_step(Step("write", writer))
.add_step(Step("edit", editor))
.build())

# Execute
result = await workflow.run("Write an article about AI")

Workflow Architecture


Step

Individual workflow step definition.

Step Types

Step Configuration

from openstackai.blueprint import Step, StepType

# Agent step
agent_step = Step(
name="research",
executor=researcher,
step_type=StepType.AGENT
)

# Function step
def custom_process(context):
return context.get("data").upper()

function_step = Step(
name="process",
executor=custom_process,
step_type=StepType.FUNCTION
)

# Conditional step
condition_step = Step(
name="check",
executor=lambda ctx: ctx.get("approved"),
step_type=StepType.CONDITION,
on_true="approve_step",
on_false="reject_step"
)

WorkflowContext

Context passed through workflow steps.

from openstackai.blueprint import WorkflowContext

context = WorkflowContext()

# Set/get values
context.set("key", "value")
value = context.get("key")

# Update multiple
context.update({"a": 1, "b": 2})

# Add to history
context.add_to_history("step1", result)

# Clone context
new_context = context.clone()

Orchestration Patterns

ChainPattern

Sequential execution through multiple agents.

from openstackai.blueprint import ChainPattern

chain = ChainPattern([
researcher,
writer,
editor
])

result = await chain.run("Write about AI trends")

RouterPattern

Intelligent routing to specialized agents.

from openstackai.blueprint import RouterPattern

router = RouterPattern()

# Add routes with keywords
router.add_route("code", coder, keywords=["python", "code", "function"])
router.add_route("math", calculator, keywords=["calculate", "math", "equation"])
router.add_route("write", writer, keywords=["write", "content", "article"])

# Or with custom classifier
router.add_route("code", coder, classifier=lambda q: "code" in q.lower())

# Execute - automatically routes
result = await router.run("Write a Python function") # → coder
result = await router.run("Calculate 2+2") # → calculator

MapReducePattern

Parallel processing with result aggregation.

from openstackai.blueprint import MapReducePattern

# Create mappers (parallel workers)
mappers = [
Agent(name="Researcher1", instructions="Research topic A"),
Agent(name="Researcher2", instructions="Research topic B"),
Agent(name="Researcher3", instructions="Research topic C")
]

# Create reducer (synthesizer)
reducer = Agent(name="Synthesizer", instructions="Combine research")

pattern = MapReducePattern(
mappers=mappers,
reducer=reducer
)

result = await pattern.run("Research AI trends")

SupervisorPattern

Manager-worker hierarchy.

from openstackai.blueprint import SupervisorPattern

pattern = SupervisorPattern(
manager=Agent(name="Manager", instructions="Coordinate and delegate"),
workers=[
Agent(name="Coder", instructions="Write code"),
Agent(name="Tester", instructions="Write tests"),
Agent(name="Reviewer", instructions="Review code")
]
)

result = await pattern.run("Build a REST API")

Pipeline

Processing pipeline for data transformation.

from openstackai.blueprint import Pipeline, PipelineStage

pipeline = Pipeline("DataPipeline")

# Add stages
pipeline.add_stage(PipelineStage(
name="extract",
processor=extract_data
))
pipeline.add_stage(PipelineStage(
name="transform",
processor=transform_data
))
pipeline.add_stage(PipelineStage(
name="load",
processor=load_data
))

# Execute
result = await pipeline.run(input_data)

Complex Workflow Example

from openstackai import Agent
from openstackai.blueprint import Workflow, Step, StepType

# Define agents
planner = Agent(name="Planner", instructions="Create project plans")
researchers = [
Agent(name="TechResearcher", instructions="Research technology"),
Agent(name="MarketResearcher", instructions="Research market")
]
writer = Agent(name="Writer", instructions="Write reports")
reviewer = Agent(name="Reviewer", instructions="Review and improve")

# Build complex workflow
workflow = (Workflow("ProjectResearch")
# Phase 1: Planning
.add_step(Step("plan", planner))

# Phase 2: Parallel research
.add_step(Step(
"research",
researchers,
step_type=StepType.PARALLEL
))

# Phase 3: Writing
.add_step(Step("write", writer))

# Phase 4: Review with condition
.add_step(Step(
"review",
reviewer,
step_type=StepType.CONDITION,
on_true="finalize",
on_false="write" # Loop back if not approved
))

# Phase 5: Finalize
.add_step(Step("finalize", lambda ctx: ctx.get("report")))

.build())

➡️ [[Skills-Module]] | [[Runner-Module]] | [[Home]]