Skip to main content

⚡ Runner Module

The runner/ module provides execution control for agents with structured run management.


Module Overview


File Structure

src/openstackai/runner/
├── __init__.py
├── executor.py # Main Runner class
└── streaming.py # Streaming support

Runner

The primary execution engine for agents.

Basic Usage

from openstackai import Agent, Runner

agent = Agent(
name="Assistant",
instructions="You are helpful."
)

# Synchronous execution
result = Runner.run_sync(agent, "Hello!")
print(result.final_output)

# Asynchronous execution
result = await Runner.run_async(agent, "Hello!")

Execution Flow


RunConfig

Configuration options for execution.

from openstackai.runner import RunConfig

config = RunConfig(
max_turns=10, # Maximum iterations
max_time=300.0, # Timeout in seconds
timeout_per_turn=60.0, # Per-turn timeout
stop_on_tool_error=False,
verbose=True, # Debug output
trace_enabled=True, # Enable tracing
run_id="custom-id", # Custom run ID
metadata={ # Custom metadata
"user_id": "123",
"session": "abc"
}
)

result = Runner.run_sync(agent, "Hello", config=config)

Configuration Options

ParameterTypeDefaultDescription
max_turnsint10Max reasoning iterations
max_timefloat300.0Total timeout (seconds)
timeout_per_turnfloat60.0Per-turn timeout
stop_on_tool_errorboolFalseStop on tool failure
verboseboolFalseDebug output
trace_enabledboolTrueEnable tracing
run_idstrautoCustom run identifier
metadatadictCustom metadata

RunContext

Runtime context passed to agents during execution.

from openstackai.runner import RunContext

context = RunContext(
run_id="run-123",
turn_count=0,
variables={"user": "Alice"},
history=[]
)

# Access during execution
elapsed = context.elapsed_time()
context.set_variable("key", "value")
value = context.get_variable("key")

RunResult

Result of an agent execution.

result = Runner.run_sync(agent, "Hello")

# Access result data
print(result.final_output) # Final response text
print(result.run_id) # Run identifier
print(result.status) # RunStatus enum
print(result.turn_count) # Number of turns
print(result.elapsed_time) # Execution time
print(result.tool_calls) # Tool invocations
print(result.metadata) # Custom metadata

RunResult Structure


RunStatus

Execution status enumeration.

from openstackai.runner import RunStatus

# Possible statuses
RunStatus.QUEUED # Not yet started
RunStatus.RUNNING # In progress
RunStatus.COMPLETED # Successfully finished
RunStatus.FAILED # Error occurred
RunStatus.CANCELLED # User cancelled

Streaming

Stream responses in real-time.

from openstackai import Agent, Runner

agent = Agent(name="Assistant", instructions="Be helpful")

# Stream response
async for chunk in Runner.run_stream(agent, "Tell me a story"):
print(chunk, end="", flush=True)

StreamingRunner

from openstackai.runner import StreamingRunner

runner = StreamingRunner(agent)

async for event in runner.run("Write a poem"):
if event.type == "text":
print(event.content, end="")
elif event.type == "tool_call":
print(f"\n[Calling: {event.tool_name}]")
elif event.type == "tool_result":
print(f"[Result: {event.result}]")

Advanced Patterns

Multi-Turn Conversation

from openstackai import Agent, Runner
from openstackai.runner import RunConfig

agent = Agent(name="Assistant", instructions="Be helpful")

# Multi-turn with context
context = []
while True:
user_input = input("You: ")
context.append({"role": "user", "content": user_input})

result = Runner.run_sync(
agent,
user_input,
history=context
)

print(f"Agent: {result.final_output}")
context.append({"role": "assistant", "content": result.final_output})

Parallel Execution

import asyncio
from openstackai import Agent, Runner

agents = [
Agent(name="Agent1", instructions="..."),
Agent(name="Agent2", instructions="..."),
Agent(name="Agent3", instructions="...")
]

async def run_parallel():
tasks = [
Runner.run_async(agent, "Process this")
for agent in agents
]
results = await asyncio.gather(*tasks)
return results

results = asyncio.run(run_parallel())

Error Handling

from openstackai import Agent, Runner
from openstackai.runner import RunStatus
from openstackai.errors import AgentError, TimeoutError

try:
result = Runner.run_sync(agent, "Hello")

if result.status == RunStatus.FAILED:
print(f"Failed: {result.error}")
elif result.status == RunStatus.COMPLETED:
print(result.final_output)

except TimeoutError:
print("Execution timed out")
except AgentError as e:
print(f"Agent error: {e}")

➡️ [[Blueprint-Module]] | [[Core-Module]] | [[Home]]