Graph

Graph

Workflow orchestration with dependency management and parallel execution

Overview

The Graph system enables you to create complex workflows by connecting tasks and agents with dependencies, conditions, and parallel execution capabilities. It provides a visual and programmatic way to orchestrate multi-step processes, handle branching logic, and coordinate multiple agents working together.

Creating a Graph

Graphs are composed of nodes (tasks or agents) and edges (connections between them):

import { Graph } from '@astreus-ai/astreus';

// Create a workflow graph with agent reference
const agent = await Agent.create({
  name: 'ContentAgent',
  model: 'gpt-4o'
});

const graph = new Graph({
  name: 'content-creation-pipeline',
  description: 'Research and write technical content',
  defaultAgentId: agent.id  // Use the agent ID
}, agent);

// Add task nodes
const researchNodeId = graph.addTaskNode({
  prompt: 'Research the latest TypeScript features and summarize key findings',
  model: 'gpt-4o',
  priority: 10,
  metadata: { type: 'research' }
});

const writeNodeId = graph.addTaskNode({
  prompt: 'Write a comprehensive blog post based on the research findings',
  model: 'gpt-4o',
  dependencies: [researchNodeId],  // Depends on research completion
  priority: 5,
  metadata: { type: 'writing' }
});

// Execute the graph
const results = await graph.run({ stream: true });

console.log('Success:', results.success);
console.log('Completed nodes:', results.completedNodes);
console.log('Failed nodes:', results.failedNodes);
console.log('Duration:', results.duration, 'ms');
console.log('Results:', results.results);

Graph Execution Flow

Node Resolution

Graph analyzes all nodes and their dependencies to determine execution order.

Parallel Execution

Independent nodes run simultaneously for optimal performance.

Dependency Waiting

Dependent nodes wait for their prerequisites to complete before starting.

Result Collection

All node outputs are collected and made available in the final result.

Advanced Example

Here's a complex workflow with dependencies, parallel execution, and error handling:

import { Graph } from '@astreus-ai/astreus';

// Create workflow graph with default agent
const agent = await Agent.create({
  name: 'OptimizationAgent',
  model: 'gpt-4o'
});

const graph = new Graph({
  name: 'code-optimization-pipeline',
  description: 'Analyze and optimize codebase',
  defaultAgentId: agent.id,
  maxConcurrency: 3,   // Allow 3 parallel nodes
  timeout: 300000,     // 5 minute timeout
  retryAttempts: 2     // Retry failed nodes twice
}, agent);

// Add task nodes with proper configuration
const analysisNodeId = graph.addTaskNode({
  prompt: 'Analyze the codebase for performance issues and categorize them by severity',
  model: 'gpt-4o',
  priority: 10,  // High priority
  metadata: { step: 'analysis', category: 'review' }
});

const optimizationNodeId = graph.addTaskNode({
  prompt: 'Based on the analysis, implement performance optimizations',
  model: 'gpt-4o',
  dependencies: [analysisNodeId],  // Depends on analysis
  priority: 8,
  metadata: { step: 'optimization', category: 'implementation' }
});

const testNodeId = graph.addTaskNode({
  prompt: 'Run performance tests and validate the optimizations',
  model: 'gpt-4o',
  dependencies: [optimizationNodeId],  // Depends on optimization
  priority: 6,
  stream: true,  // Enable streaming for real-time feedback
  metadata: { step: 'testing', category: 'validation' }
});

const documentationNodeId = graph.addTaskNode({
  prompt: 'Document all changes and performance improvements',
  model: 'gpt-4o',
  dependencies: [analysisNodeId],  // Can run parallel to optimization
  priority: 5,  // Lower priority
  metadata: { step: 'documentation', category: 'docs' }
});

// Add edges (optional, as dependencies already create edges)
graph.addEdge(analysisNodeId, optimizationNodeId);
graph.addEdge(analysisNodeId, documentationNodeId);
graph.addEdge(optimizationNodeId, testNodeId);

// Execute the graph
const results = await graph.run({ stream: true });

console.log('Pipeline results:', results);
console.log('Completed nodes:', results.completedNodes);
console.log('Failed nodes:', results.failedNodes);
console.log('Duration:', results.duration, 'ms');

// Access individual node results
Object.entries(results.results).forEach(([nodeId, result]) => {
  console.log(`Node ${nodeId}:`, result);
});

// Check for errors
if (results.errors && Object.keys(results.errors).length > 0) {
  console.log('Errors:', results.errors);
}

Graph Configuration

Graphs support various configuration options:

interface GraphConfig {
  id?: string;                 // Optional graph ID
  name: string;                // Graph name (required)
  description?: string;        // Graph description
  defaultAgentId?: number;     // Default agent for task nodes
  maxConcurrency?: number;     // Max parallel execution (default: 1)
  timeout?: number;            // Execution timeout in ms
  retryAttempts?: number;      // Retry attempts for failed nodes
  metadata?: MetadataObject;   // Custom metadata
  
  // Sub-agent integration options
  subAgentAware?: boolean;                           // Enable sub-agent awareness and optimization
  optimizeSubAgentUsage?: boolean;                   // Optimize sub-agent delegation patterns
  subAgentCoordination?: 'parallel' | 'sequential' | 'adaptive'; // Default sub-agent coordination
}

// Example with full configuration including sub-agent support
const graph = new Graph({
  name: 'advanced-pipeline',
  description: 'Complex workflow with error handling and sub-agent coordination',
  defaultAgentId: agent.id,
  maxConcurrency: 5,
  timeout: 600000,  // 10 minutes
  retryAttempts: 3,
  subAgentAware: true,
  optimizeSubAgentUsage: true,
  subAgentCoordination: 'adaptive',
  metadata: { project: 'automation', version: '1.0' }
}, agent);

Node Types and Options

Task Nodes

interface AddTaskNodeOptions {
  name?: string;               // Node name for easy referencing
  prompt: string;              // Task prompt (required)
  model?: string;              // Override model for this task
  agentId?: number;            // Override default agent
  stream?: boolean;            // Enable streaming for this task
  schedule?: string;           // Simple schedule string (e.g., 'daily@09:00')
  dependencies?: string[];     // Node IDs this task depends on
  dependsOn?: string[];        // Node names this task depends on (easier than IDs)
  priority?: number;           // Execution priority (higher = earlier)
  metadata?: MetadataObject;   // Custom metadata
  
  // Sub-agent delegation options
  useSubAgents?: boolean;                        // Force enable/disable sub-agent usage for this task
  subAgentDelegation?: 'auto' | 'manual' | 'sequential'; // Sub-agent delegation strategy
  subAgentCoordination?: 'parallel' | 'sequential';      // Sub-agent coordination pattern
}

Agent Nodes

interface AddAgentNodeOptions {
  agentId: number;             // Agent ID (required)
  dependencies?: string[];     // Node IDs this agent depends on
  priority?: number;           // Execution priority
  metadata?: MetadataObject;   // Custom metadata
}

Sub-Agent Configuration Options

When configuring graphs with sub-agent support, you have comprehensive control over delegation and coordination:

Graph-Level Sub-Agent Configuration

  • subAgentAware: Enables automatic detection and optimization of sub-agent opportunities across the graph
  • optimizeSubAgentUsage: Enables real-time performance monitoring and automatic strategy adjustment for better efficiency
  • subAgentCoordination: Sets the default coordination pattern:
    • 'parallel': Sub-agents work simultaneously across different nodes
    • 'sequential': Sub-agents work in dependency order, passing context between executions
    • 'adaptive': Dynamically chooses the best coordination pattern based on task complexity and dependencies

Node-Level Sub-Agent Configuration

Each task node can override graph-level settings with specific sub-agent behavior:

  • useSubAgents: Force enable or disable sub-agent delegation for specific nodes
  • subAgentDelegation: Control how tasks are distributed to sub-agents at the node level
  • subAgentCoordination: Override the graph's default coordination pattern for specific nodes

Enhanced Graph Workflow with Sub-Agents

import { Graph, Agent } from '@astreus-ai/astreus';

// Create specialized sub-agents
const researcher = await Agent.create({
  name: 'DataResearcher',
  systemPrompt: 'You specialize in gathering and analyzing data from multiple sources.'
});

const analyst = await Agent.create({
  name: 'TechnicalAnalyst', 
  systemPrompt: 'You provide technical insights and recommendations.'
});

const writer = await Agent.create({
  name: 'TechnicalWriter',
  systemPrompt: 'You create clear, comprehensive technical documentation.'
});

// Main coordinator with sub-agents
const coordinator = await Agent.create({
  name: 'ProjectCoordinator',
  systemPrompt: 'You orchestrate complex projects using specialized team members.',
  subAgents: [researcher, analyst, writer]
});

// Create sub-agent optimized graph
const projectGraph = new Graph({
  name: 'Technical Documentation Pipeline',
  description: 'Automated technical documentation creation with specialized agents',
  defaultAgentId: coordinator.id,
  maxConcurrency: 3,
  subAgentAware: true,
  optimizeSubAgentUsage: true,
  subAgentCoordination: 'adaptive'
}, coordinator);

// Research phase with automatic sub-agent delegation
const researchNode = projectGraph.addTaskNode({
  name: 'Market Research',
  prompt: 'Research current trends in cloud computing and serverless architecture',
  useSubAgents: true,
  subAgentDelegation: 'auto',
  priority: 10,
  metadata: { phase: 'research', category: 'data-gathering' }
});

// Analysis phase with sequential sub-agent coordination
const analysisNode = projectGraph.addTaskNode({
  name: 'Technical Analysis',
  prompt: 'Analyze research findings and identify key technical patterns',
  dependencies: [researchNode],
  useSubAgents: true,
  subAgentDelegation: 'auto',
  subAgentCoordination: 'sequential',
  priority: 8,
  metadata: { phase: 'analysis', category: 'insights' }
});

// Documentation phase with parallel sub-agent work
const docNode = projectGraph.addTaskNode({
  name: 'Documentation Creation',
  prompt: 'Create comprehensive technical documentation and executive summary',
  dependencies: [analysisNode],
  useSubAgents: true,
  subAgentDelegation: 'manual',
  subAgentCoordination: 'parallel',
  priority: 6,
  metadata: { phase: 'documentation', category: 'deliverables' }
});

// Execute with performance monitoring
const result = await projectGraph.run({ stream: true });

// Access sub-agent performance insights
if (projectGraph.generateSubAgentPerformanceReport) {
  const performanceReport = projectGraph.generateSubAgentPerformanceReport();
  console.log('Sub-agent performance:', performanceReport);
}

console.log('Pipeline completed:', result.success);
console.log('Node results:', result.results);

How is this guide?