Graph
Workflow orchestration with dependency management and parallel execution
Overview
The Graph system enables you to create complex workflows by connecting tasks and agents with dependencies, conditions, and parallel execution capabilities. It provides a visual and programmatic way to orchestrate multi-step processes, handle branching logic, and coordinate multiple agents working together.
Creating a Graph
Graphs are composed of nodes (tasks or agents) and edges (connections between them):
import { Graph } from '@astreus-ai/astreus';
// Create a workflow graph with agent reference
const agent = await Agent.create({
name: 'ContentAgent',
model: 'gpt-4o'
});
const graph = new Graph({
name: 'content-creation-pipeline',
description: 'Research and write technical content',
defaultAgentId: agent.id // Use the agent ID
}, agent);
// Add task nodes
const researchNodeId = graph.addTaskNode({
prompt: 'Research the latest TypeScript features and summarize key findings',
model: 'gpt-4o',
priority: 10,
metadata: { type: 'research' }
});
const writeNodeId = graph.addTaskNode({
prompt: 'Write a comprehensive blog post based on the research findings',
model: 'gpt-4o',
dependencies: [researchNodeId], // Depends on research completion
priority: 5,
metadata: { type: 'writing' }
});
// Execute the graph
const results = await graph.run({ stream: true });
console.log('Success:', results.success);
console.log('Completed nodes:', results.completedNodes);
console.log('Failed nodes:', results.failedNodes);
console.log('Duration:', results.duration, 'ms');
console.log('Results:', results.results);
Graph Execution Flow
Node Resolution
Graph analyzes all nodes and their dependencies to determine execution order.
Parallel Execution
Independent nodes run simultaneously for optimal performance.
Dependency Waiting
Dependent nodes wait for their prerequisites to complete before starting.
Result Collection
All node outputs are collected and made available in the final result.
Advanced Example
Here's a complex workflow with dependencies, parallel execution, and error handling:
import { Graph } from '@astreus-ai/astreus';
// Create workflow graph with default agent
const agent = await Agent.create({
name: 'OptimizationAgent',
model: 'gpt-4o'
});
const graph = new Graph({
name: 'code-optimization-pipeline',
description: 'Analyze and optimize codebase',
defaultAgentId: agent.id,
maxConcurrency: 3, // Allow 3 parallel nodes
timeout: 300000, // 5 minute timeout
retryAttempts: 2 // Retry failed nodes twice
}, agent);
// Add task nodes with proper configuration
const analysisNodeId = graph.addTaskNode({
prompt: 'Analyze the codebase for performance issues and categorize them by severity',
model: 'gpt-4o',
priority: 10, // High priority
metadata: { step: 'analysis', category: 'review' }
});
const optimizationNodeId = graph.addTaskNode({
prompt: 'Based on the analysis, implement performance optimizations',
model: 'gpt-4o',
dependencies: [analysisNodeId], // Depends on analysis
priority: 8,
metadata: { step: 'optimization', category: 'implementation' }
});
const testNodeId = graph.addTaskNode({
prompt: 'Run performance tests and validate the optimizations',
model: 'gpt-4o',
dependencies: [optimizationNodeId], // Depends on optimization
priority: 6,
stream: true, // Enable streaming for real-time feedback
metadata: { step: 'testing', category: 'validation' }
});
const documentationNodeId = graph.addTaskNode({
prompt: 'Document all changes and performance improvements',
model: 'gpt-4o',
dependencies: [analysisNodeId], // Can run parallel to optimization
priority: 5, // Lower priority
metadata: { step: 'documentation', category: 'docs' }
});
// Add edges (optional, as dependencies already create edges)
graph.addEdge(analysisNodeId, optimizationNodeId);
graph.addEdge(analysisNodeId, documentationNodeId);
graph.addEdge(optimizationNodeId, testNodeId);
// Execute the graph
const results = await graph.run({ stream: true });
console.log('Pipeline results:', results);
console.log('Completed nodes:', results.completedNodes);
console.log('Failed nodes:', results.failedNodes);
console.log('Duration:', results.duration, 'ms');
// Access individual node results
Object.entries(results.results).forEach(([nodeId, result]) => {
console.log(`Node ${nodeId}:`, result);
});
// Check for errors
if (results.errors && Object.keys(results.errors).length > 0) {
console.log('Errors:', results.errors);
}
Graph Configuration
Graphs support various configuration options:
interface GraphConfig {
id?: string; // Optional graph ID
name: string; // Graph name (required)
description?: string; // Graph description
defaultAgentId?: number; // Default agent for task nodes
maxConcurrency?: number; // Max parallel execution (default: 1)
timeout?: number; // Execution timeout in ms
retryAttempts?: number; // Retry attempts for failed nodes
metadata?: MetadataObject; // Custom metadata
// Sub-agent integration options
subAgentAware?: boolean; // Enable sub-agent awareness and optimization
optimizeSubAgentUsage?: boolean; // Optimize sub-agent delegation patterns
subAgentCoordination?: 'parallel' | 'sequential' | 'adaptive'; // Default sub-agent coordination
}
// Example with full configuration including sub-agent support
const graph = new Graph({
name: 'advanced-pipeline',
description: 'Complex workflow with error handling and sub-agent coordination',
defaultAgentId: agent.id,
maxConcurrency: 5,
timeout: 600000, // 10 minutes
retryAttempts: 3,
subAgentAware: true,
optimizeSubAgentUsage: true,
subAgentCoordination: 'adaptive',
metadata: { project: 'automation', version: '1.0' }
}, agent);
Node Types and Options
Task Nodes
interface AddTaskNodeOptions {
name?: string; // Node name for easy referencing
prompt: string; // Task prompt (required)
model?: string; // Override model for this task
agentId?: number; // Override default agent
stream?: boolean; // Enable streaming for this task
schedule?: string; // Simple schedule string (e.g., 'daily@09:00')
dependencies?: string[]; // Node IDs this task depends on
dependsOn?: string[]; // Node names this task depends on (easier than IDs)
priority?: number; // Execution priority (higher = earlier)
metadata?: MetadataObject; // Custom metadata
// Sub-agent delegation options
useSubAgents?: boolean; // Force enable/disable sub-agent usage for this task
subAgentDelegation?: 'auto' | 'manual' | 'sequential'; // Sub-agent delegation strategy
subAgentCoordination?: 'parallel' | 'sequential'; // Sub-agent coordination pattern
}
Agent Nodes
interface AddAgentNodeOptions {
agentId: number; // Agent ID (required)
dependencies?: string[]; // Node IDs this agent depends on
priority?: number; // Execution priority
metadata?: MetadataObject; // Custom metadata
}
Sub-Agent Configuration Options
When configuring graphs with sub-agent support, you have comprehensive control over delegation and coordination:
Graph-Level Sub-Agent Configuration
- subAgentAware: Enables automatic detection and optimization of sub-agent opportunities across the graph
- optimizeSubAgentUsage: Enables real-time performance monitoring and automatic strategy adjustment for better efficiency
- subAgentCoordination: Sets the default coordination pattern:
'parallel'
: Sub-agents work simultaneously across different nodes'sequential'
: Sub-agents work in dependency order, passing context between executions'adaptive'
: Dynamically chooses the best coordination pattern based on task complexity and dependencies
Node-Level Sub-Agent Configuration
Each task node can override graph-level settings with specific sub-agent behavior:
- useSubAgents: Force enable or disable sub-agent delegation for specific nodes
- subAgentDelegation: Control how tasks are distributed to sub-agents at the node level
- subAgentCoordination: Override the graph's default coordination pattern for specific nodes
Enhanced Graph Workflow with Sub-Agents
import { Graph, Agent } from '@astreus-ai/astreus';
// Create specialized sub-agents
const researcher = await Agent.create({
name: 'DataResearcher',
systemPrompt: 'You specialize in gathering and analyzing data from multiple sources.'
});
const analyst = await Agent.create({
name: 'TechnicalAnalyst',
systemPrompt: 'You provide technical insights and recommendations.'
});
const writer = await Agent.create({
name: 'TechnicalWriter',
systemPrompt: 'You create clear, comprehensive technical documentation.'
});
// Main coordinator with sub-agents
const coordinator = await Agent.create({
name: 'ProjectCoordinator',
systemPrompt: 'You orchestrate complex projects using specialized team members.',
subAgents: [researcher, analyst, writer]
});
// Create sub-agent optimized graph
const projectGraph = new Graph({
name: 'Technical Documentation Pipeline',
description: 'Automated technical documentation creation with specialized agents',
defaultAgentId: coordinator.id,
maxConcurrency: 3,
subAgentAware: true,
optimizeSubAgentUsage: true,
subAgentCoordination: 'adaptive'
}, coordinator);
// Research phase with automatic sub-agent delegation
const researchNode = projectGraph.addTaskNode({
name: 'Market Research',
prompt: 'Research current trends in cloud computing and serverless architecture',
useSubAgents: true,
subAgentDelegation: 'auto',
priority: 10,
metadata: { phase: 'research', category: 'data-gathering' }
});
// Analysis phase with sequential sub-agent coordination
const analysisNode = projectGraph.addTaskNode({
name: 'Technical Analysis',
prompt: 'Analyze research findings and identify key technical patterns',
dependencies: [researchNode],
useSubAgents: true,
subAgentDelegation: 'auto',
subAgentCoordination: 'sequential',
priority: 8,
metadata: { phase: 'analysis', category: 'insights' }
});
// Documentation phase with parallel sub-agent work
const docNode = projectGraph.addTaskNode({
name: 'Documentation Creation',
prompt: 'Create comprehensive technical documentation and executive summary',
dependencies: [analysisNode],
useSubAgents: true,
subAgentDelegation: 'manual',
subAgentCoordination: 'parallel',
priority: 6,
metadata: { phase: 'documentation', category: 'deliverables' }
});
// Execute with performance monitoring
const result = await projectGraph.run({ stream: true });
// Access sub-agent performance insights
if (projectGraph.generateSubAgentPerformanceReport) {
const performanceReport = projectGraph.generateSubAgentPerformanceReport();
console.log('Sub-agent performance:', performanceReport);
}
console.log('Pipeline completed:', result.success);
console.log('Node results:', result.results);
How is this guide?