# MediaVerse — Automation Platform Specification

## 1. Platform Overview

The Automation Platform transforms MediaVerse from a manual tool into an intelligent, self-operating media management system. Users can create sophisticated workflows that automatically handle downloads, conversions, organization, and notifications based on triggers, conditions, and actions.

### Core Philosophy
- **Visual First**: Drag-and-drop workflow builder
- **Powerful Yet Simple**: From basic automation to complex logic
- **Reliable**: Guaranteed execution with retry and recovery
- **Observable**: Full visibility into what happened and why

---

## 2. Feature Specifications

### 2.1 Workflow Builder

#### Visual Editor

```
┌─────────────────────────────────────────────────────────────────┐
│  Workflow Builder: "Auto-Archive YouTube Subscriptions"        │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐     │
│  │  ⏰    │───▶│  🔍    │───▶│  ⚙️    │───▶│  📁    │     │
│  │ Trigger │    │ Filter  │    │ Process │    │ Organize│     │
│  │ Daily   │    │ Channel │    │ Convert │    │  Sort   │     │
│  │ 9:00 AM │    │ Match   │    │ to MP4  │    │ by Date │     │
│  └─────────┘    └─────────┘    └─────────┘    └─────────┘     │
│       │              │              │              │          │
│       ▼              ▼              ▼              ▼          │
│  [Edit]           [Edit]         [Edit]         [Edit]         │
│                                                                  │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │  [+ Add Step]  [Add Branch]  [Add Delay]  [Add Loop]   │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                  │
│  [💾 Save] [▶️ Test] [📋 Duplicate] [🗑️ Delete]              │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘
```

#### Builder Components

| Component | Description | Use Case |
|-----------|-------------|----------|
| **Trigger Node** | Entry point, defines when workflow runs | Schedule, event, webhook |
| **Action Node** | Performs an operation | Download, convert, move |
| **Condition Node** | Branching logic | If/then/else decisions |
| **Delay Node** | Pause execution | Wait for time or event |
| **Loop Node** | Iterate over collection | Process multiple items |
| **Parallel Node** | Execute branches concurrently | Speed up independent tasks |
| **Merge Node** | Combine parallel branches | Synchronize execution |
| **Error Node** | Handle failures | Retry, notify, abort |

#### Node Configuration Panel

```
┌─────────────────────────────────────────┐
│  Configure: Download Action             │
├─────────────────────────────────────────┤
│                                         │
│  Name: [Download from URL             ] │
│                                         │
│  URL Source:                            │
│  (•) Static URL: [                    ] │
│  ( ) From previous step                 │
│  ( ) From variable: [${trigger.url}  ] │
│                                         │
│  Quality: [Best Available ▼]           │
│  Format: [MP4 ▼]                       │
│  Output: [~/Downloads/Workflow/       ] │
│                                         │
│  Advanced:                              │
│  [ ] Extract audio only                 │
│  [ ] Download subtitles                 │
│  [ ] Embed metadata                     │
│                                         │
│  On Error:                              │
│  (•) Retry 3 times, then notify        │
│  ( ) Skip and continue                  │
│  ( ) Abort workflow                     │
│                                         │
│           [Cancel]  [Save Changes]      │
│                                         │
└─────────────────────────────────────────┘
```

### 2.2 Rule Engine

#### Rule Structure

```typescript
interface Rule {
  id: string;
  name: string;
  description?: string;
  enabled: boolean;
  
  // Trigger configuration
  trigger: Trigger;
  
  // Condition (optional)
  condition?: Condition;
  
  // Actions to execute
  actions: Action[];
  
  // Execution settings
  settings: RuleSettings;
  
  metadata: {
    createdAt: Date;
    updatedAt: Date;
    createdBy: string;
    executionCount: number;
    lastExecutedAt?: Date;
    lastError?: string;
  };
}

interface RuleSettings {
  priority: number;           // 1-10, higher = earlier
  maxConcurrent: number;      // Parallel executions
  timeout: number;            // Seconds before abort
  retryPolicy: RetryPolicy;
  logLevel: 'debug' | 'info' | 'warn' | 'error';
}
```

#### Condition Types

```typescript
type Condition = 
  | LogicalCondition
  | ComparisonCondition
  | RegexCondition
  | TimeCondition
  | MediaCondition
  | CustomCondition;

interface LogicalCondition {
  type: 'and' | 'or' | 'not';
  conditions: Condition[];
}

interface ComparisonCondition {
  type: 'comparison';
  left: ValueReference;
  operator: 'eq' | 'ne' | 'gt' | 'gte' | 'lt' | 'lte' | 'contains' | 'startsWith' | 'endsWith';
  right: ValueReference;
}

interface RegexCondition {
  type: 'regex';
  field: ValueReference;
  pattern: string;
  flags?: string;
}

interface TimeCondition {
  type: 'time';
  operator: 'before' | 'after' | 'between' | 'weekday' | 'weekend';
  value: TimeValue;
}

interface MediaCondition {
  type: 'media';
  property: 'duration' | 'size' | 'format' | 'resolution' | 'codec';
  operator: ComparisonOperator;
  value: any;
}
```

#### Example Rules

```typescript
// Rule 1: Auto-download from favorite channels
const favoriteChannelsRule: Rule = {
  trigger: {
    type: 'schedule',
    cron: '0 */6 * * *', // Every 6 hours
  },
  condition: {
    type: 'and',
    conditions: [
      {
        type: 'comparison',
        left: { source: 'channel', path: 'isFavorite' },
        operator: 'eq',
        right: { value: true }
      },
      {
        type: 'comparison',
        left: { source: 'video', path: 'publishedAt' },
        operator: 'gt',
        right: { value: 'lastCheckTime' }
      }
    ]
  },
  actions: [
    { type: 'download', quality: 'best', format: 'mp4' },
    { type: 'tag', tags: ['auto-downloaded'] },
    { type: 'notify', message: 'New video downloaded: {{title}}' }
  ]
};

// Rule 2: Convert long videos to audio for podcast
const podcastRule: Rule = {
  trigger: {
    type: 'event',
    event: 'download.completed',
  },
  condition: {
    type: 'and',
    conditions: [
      {
        type: 'comparison',
        left: { source: 'media', path: 'duration' },
        operator: 'gt',
        right: { value: 1800 } // 30 minutes
      },
      {
        type: 'regex',
        field: { source: 'media', path: 'title' },
        pattern: 'podcast|interview|talk',
        flags: 'i'
      }
    ]
  },
  actions: [
    { 
      type: 'convert', 
      outputFormat: 'mp3',
      audioOnly: true,
      bitrate: 192
    },
    { 
      type: 'move', 
      destination: '~/Podcasts/{{date:YYYY-MM}}/'
    }
  ]
};
```

### 2.3 Task Queue

#### Queue Architecture

```
┌─────────────────────────────────────────────────────────────┐
│                      Task Queue System                        │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐    │
│  │   Priority  │    │   Delayed   │    │   Dead      │    │
│  │   Queue     │    │   Queue     │    │   Letter    │    │
│  │  (Active)   │    │  (Scheduled)│    │   Queue     │    │
│  └──────┬──────┘    └──────┬──────┘    └──────┬──────┘    │
│         │                  │                  │            │
│         ▼                  ▼                  ▼            │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              Redis / SQLite Queue                    │   │
│  │  • Job serialization                                │   │
│  │  • Priority ordering                              │   │
│  │  • Delayed execution                              │   │
│  │  • Dead letter handling                           │   │
│  └─────────────────────────────────────────────────────┘   │
│                              │                               │
│                              ▼                               │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              Worker Pool                             │   │
│  │  ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐   │   │
│  │  │  W1 │ │  W2 │ │  W3 │ │  W4 │ │  W5 │ │  W6 │   │   │
│  │  └─────┘ └─────┘ └─────┘ └─────┘ └─────┘ └─────┘   │   │
│  │  (Scalable based on CPU/memory)                     │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                              │
└─────────────────────────────────────────────────────────────┘
```

#### Job Model

```typescript
interface Job {
  id: string;
  type: JobType;
  name: string;
  
  // Payload
  data: Record<string, any>;
  context: JobContext;
  
  // Execution
  status: JobStatus;
  priority: number;
  attempts: number;
  maxAttempts: number;
  
  // Timing
  createdAt: Date;
  scheduledAt?: Date;
  startedAt?: Date;
  completedAt?: Date;
  failedAt?: Date;
  
  // Progress
  progress: number;           // 0-100
  currentStep?: string;
  logs: JobLog[];
  
  // Error handling
  error?: JobError;
  retryDelay?: number;
  
  // Relations
  workflowId?: string;
  ruleId?: string;
  parentJobId?: string;
  childJobIds: string[];
}

interface JobContext {
  workflowId?: string;
  ruleId?: string;
  triggerData?: any;
  userId: string;
  sessionId: string;
  variables: Map<string, any>;
}

interface JobLog {
  timestamp: Date;
  level: 'debug' | 'info' | 'warn' | 'error';
  message: string;
  step?: string;
  metadata?: Record<string, any>;
}

type JobStatus = 
  | 'pending'      // Waiting in queue
  | 'scheduled'    // Delayed execution
  | 'processing'   // Currently executing
  | 'completed'    // Success
  | 'failed'       // Error, may retry
  | 'dead'         // Max retries exceeded
  | 'cancelled';   // User cancelled
```

#### Queue Operations

```typescript
class TaskQueue {
  async enqueue(job: Omit<Job, 'id' | 'status'>): Promise<Job> {
    const id = generateUUID();
    const fullJob: Job = {
      ...job,
      id,
      status: job.scheduledAt ? 'scheduled' : 'pending',
      attempts: 0,
      progress: 0,
      logs: [],
      childJobIds: []
    };
    
    await this.persistJob(fullJob);
    
    if (!job.scheduledAt) {
      await this.pushToQueue(fullJob);
    } else {
      await this.scheduleJob(fullJob);
    }
    
    this.emit('job:created', fullJob);
    return fullJob;
  }
  
  async dequeue(): Promise<Job | null> {
    // Atomic pop from priority queue
    const job = await this.redis.lpop('queue:priority');
    if (!job) return null;
    
    const parsed = JSON.parse(job);
    await this.updateJobStatus(parsed.id, 'processing', {
      startedAt: new Date()
    });
    
    return parsed;
  }
  
  async complete(jobId: string, result: any): Promise<void> {
    await this.updateJobStatus(jobId, 'completed', {
      completedAt: new Date(),
      result
    });
    
    // Trigger dependent jobs
    await this.processDependents(jobId);
    
    this.emit('job:completed', { jobId, result });
  }
  
  async fail(jobId: string, error: Error): Promise<void> {
    const job = await this.getJob(jobId);
    const newAttempts = job.attempts + 1;
    
    if (newAttempts >= job.maxAttempts) {
      await this.moveToDeadLetter(job, error);
    } else {
      const retryDelay = this.calculateRetryDelay(job);
      await this.scheduleRetry(job, retryDelay, error);
    }
  }
  
  async cancel(jobId: string): Promise<void> {
    const job = await this.getJob(jobId);
    if (job.status === 'processing') {
      await this.signalCancellation(jobId);
    }
    await this.updateJobStatus(jobId, 'cancelled');
  }
}
```

### 2.4 Event Bus

#### Event Architecture

```
┌─────────────────────────────────────────────────────────────┐
│                      Event Bus                               │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│   Publishers                    Subscribers                  │
│   ──────────                    ───────────                  │
│                                                              │
│   Download Module    ─────┐     ┌─────▶  Automation Engine   │
│   • download.started      │     │        • Trigger workflows  │
│   • download.completed    │     │                              │
│   • download.failed       │     ├─────▶  Notification Service │
│                           │     │        • Push notifications│
│   Media Manager      ─────┤     │        • Email alerts      │
│   • media.added           │     │                              │
│   • media.updated         │     ├─────▶  Analytics           │
│   • media.deleted         │     │        • Usage metrics      │
│   • media.played          │     │        • Event tracking     │
│                           │     │                              │
│   User Actions       ─────┤     ├─────▶  Logging             │
│   • user.login            │     │        • Audit trail        │
│   • user.preference.change│     │                              │
│   • user.created          │     └─────▶  External Webhooks    │
│                           │              • Zapier integration │
│   System Events      ─────┘              • Custom endpoints   │
│   • system.startup                                                     │
│   • system.shutdown                                                      │
│   • storage.low                                                        │
│                                                              │
└─────────────────────────────────────────────────────────────┘
```

#### Event Types

```typescript
// Domain Events
interface DownloadCompletedEvent {
  type: 'download.completed';
  payload: {
    downloadId: string;
    mediaId: string;
    filePath: string;
    metadata: MediaMetadata;
  };
  timestamp: Date;
  correlationId: string;
}

interface MediaAddedEvent {
  type: 'media.added';
  payload: {
    mediaId: string;
    source: 'download' | 'import' | 'manual';
    folderId?: string;
  };
  timestamp: Date;
}

interface UserActionEvent {
  type: 'user.action';
  payload: {
    userId: string;
    action: string;
    target?: string;
    metadata?: Record<string, any>;
  };
  timestamp: Date;
}

// System Events
interface SystemEvent {
  type: 'system.startup' | 'system.shutdown' | 'system.error';
  payload: {
    version: string;
    platform: string;
    uptime?: number;
    error?: ErrorInfo;
  };
  timestamp: Date;
}

// Event Bus Interface
interface EventBus {
  publish<T extends Event>(event: T): Promise<void>;
  subscribe<T extends Event>(
    eventType: string,
    handler: EventHandler<T>,
    options?: SubscribeOptions
  ): Subscription;
  subscribePattern(
    pattern: string,  // Wildcard: 'download.*'
    handler: EventHandler<Event>
  ): Subscription;
}
```

### 2.5 Cron Scheduler

#### Scheduler Configuration

```typescript
interface ScheduledTask {
  id: string;
  name: string;
  description?: string;
  
  // Schedule
  schedule: CronSchedule | IntervalSchedule | OneTimeSchedule;
  timezone: string;
  
  // What to execute
  action: ScheduledAction;
  
  // Execution settings
  settings: {
    enabled: boolean;
    catchUp: boolean;        // Run missed executions
    concurrent: boolean;     // Allow overlapping
    timeout: number;        // Max execution time
  };
  
  // State
  state: {
    lastRunAt?: Date;
    nextRunAt?: Date;
    lastResult?: 'success' | 'failure';
    lastError?: string;
    runCount: number;
    failureCount: number;
  };
}

interface CronSchedule {
  type: 'cron';
  expression: string;        // Standard cron: "0 9 * * 1-5"
}

interface IntervalSchedule {
  type: 'interval';
  interval: number;          // Seconds
  startAt?: Date;
}

interface OneTimeSchedule {
  type: 'once';
  at: Date;
}

type ScheduledAction = 
  | { type: 'workflow'; workflowId: string; inputs?: Record<string, any> }
  | { type: 'rule'; ruleId: string }
  | { type: 'script'; script: string; language: 'javascript' | 'python' }
  | { type: 'webhook'; url: string; method: string; payload?: any };
```

#### Cron Expression Builder

```
┌─────────────────────────────────────────────────────────┐
│  Schedule Builder                                       │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  Presets:                                               │
│  [Every hour] [Daily] [Weekly] [Monthly] [Custom]      │
│                                                         │
│  Custom Schedule:                                       │
│  ┌─────────────────────────────────────────────────┐   │
│  │  Minute    Hour    Day    Month    Weekday      │   │
│  │  [0    ]  [9   ]  [*   ]  [*    ]  [1-5   ]    │   │
│  │   0-59   0-23    1-31   1-12     0-6          │   │
│  └─────────────────────────────────────────────────┘   │
│                                                         │
│  Human readable: "At 09:00 AM, Monday through Friday"   │
│                                                         │
│  Next 5 runs:                                           │
│  • 2024-01-15 09:00:00 (Monday)                        │
│  • 2024-01-16 09:00:00 (Tuesday)                       │
│  • 2024-01-17 09:00:00 (Wednesday)                     │
│  • 2024-01-18 09:00:00 (Thursday)                      │
│  • 2024-01-19 09:00:00 (Friday)                        │
│                                                         │
│  Timezone: [America/New_York ▼]                        │
│                                                         │
│           [Cancel]  [Save Schedule]                     │
│                                                         │
└─────────────────────────────────────────────────────────┘
```

---

## 3. Execution Architecture

### 3.1 Worker Architecture

```
┌─────────────────────────────────────────────────────────────┐
│                    Worker Pool                               │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌─────────────────────────────────────────────────────────┐ │
│  │  Worker Manager                                         │ │
│  │  • Monitors queue depth                                 │ │
│  │  • Scales workers up/down                             │ │
│  │  • Handles worker health                              │ │
│  │  • Distributes jobs                                   │ │
│  └─────────────────────────────────────────────────────────┘ │
│                              │                               │
│          ┌───────────────────┼───────────────────┐          │
│          │                   │                   │          │
│          ▼                   ▼                   ▼          │
│  ┌─────────────┐     ┌─────────────┐     ┌─────────────┐  │
│  │  Worker #1  │     │  Worker #2  │     │  Worker #N  │  │
│  │             │     │             │     │             │  │
│  │ ┌─────────┐ │     │ ┌─────────┐ │     │ ┌─────────┐ │  │
│  │ │ Sandbox │ │     │ │ Sandbox │ │     │ │ Sandbox │ │  │
│  │ │ Process│ │     │ │ Process│ │     │ │ Process│ │  │
│  │ │         │ │     │ │         │ │     │ │         │ │  │
│  │ │ • Job   │ │     │ │ • Job   │ │     │ │ • Job   │ │  │
│  │ │   exec  │ │     │ │   exec  │ │     │ │   exec  │ │  │
│  │ │ • Steps │ │     │ │ • Steps │ │     │ │ • Steps │ │  │
│  │ │ • State │ │     │ │ • State │ │     │ │ • State │ │  │
│  │ └─────────┘ │     │ └─────────┘ │     │ └─────────┘ │  │
│  │             │     │             │     │             │  │
│  │ Resources:  │     │ Resources:  │     │ Resources:  │  │
│  │ CPU: 25%    │     │ CPU: 40%    │     │ CPU: 15%    │  │
│  │ Mem: 512MB  │     │ Mem: 1.2GB  │     │ Mem: 380MB  │  │
│  └─────────────┘     └─────────────┘     └─────────────┘  │
│                                                              │
│  [Scale: Auto] [Max Workers: 8] [Current: 3 active]         │
│                                                              │
└─────────────────────────────────────────────────────────────┘
```

### 3.2 Worker Implementation

```typescript
class WorkflowWorker {
  private currentJob: Job | null = null;
  private executionContext: ExecutionContext;
  private stepResults: Map<string, any> = new Map();
  
  async start(): Promise<void> {
    while (this.shouldContinue()) {
      const job = await this.queue.dequeue();
      if (!job) {
        await this.sleep(1000);
        continue;
      }
      
      await this.executeJob(job);
    }
  }
  
  async executeJob(job: Job): Promise<void> {
    this.currentJob = job;
    this.executionContext = this.createContext(job);
    
    try {
      await this.updateStatus('processing');
      
      const workflow = await this.loadWorkflow(job.workflowId);
      const result = await this.executeWorkflow(workflow, job.data);
      
      await this.queue.complete(job.id, result);
      
    } catch (error) {
      await this.handleError(job, error);
    } finally {
      this.currentJob = null;
      this.stepResults.clear();
    }
  }
  
  async executeWorkflow(workflow: Workflow, inputs: any): Promise<any> {
    // Build execution graph
    const graph = this.buildExecutionGraph(workflow);
    
    // Execute in topological order
    for (const node of graph.nodesInOrder()) {
      if (this.isCancelled()) throw new CancellationError();
      
      const result = await this.executeNode(node, inputs);
      this.stepResults.set(node.id, result);
      
      await this.reportProgress(node, result);
    }
    
    return this.stepResults.get(workflow.outputNode);
  }
  
  async executeNode(node: WorkflowNode, inputs: any): Promise<any> {
    const startTime = performance.now();
    
    // Resolve inputs from previous steps
    const resolvedInputs = this.resolveInputs(node.inputs);
    
    // Execute based on type
    switch (node.type) {
      case 'trigger':
        return inputs;
        
      case 'action':
        return await this.executeAction(node.config, resolvedInputs);
        
      case 'condition':
        return await this.evaluateCondition(node.config, resolvedInputs);
        
      case 'delay':
        await this.sleep(node.config.duration);
        return resolvedInputs;
        
      case 'loop':
        return await this.executeLoop(node, resolvedInputs);
        
      case 'parallel':
        return await this.executeParallel(node, resolvedInputs);
    }
  }
  
  async executeAction(config: ActionConfig, inputs: any): Promise<any> {
    const action = this.actionRegistry.get(config.actionType);
    if (!action) {
      throw new Error(`Unknown action: ${config.actionType}`);
    }
    
    // Pre-process inputs
    const processedInputs = await this.interpolateVariables(config.inputs, inputs);
    
    // Execute with timeout
    const result = await this.withTimeout(
      () => action.execute(processedInputs, this.executionContext),
      config.timeout || 300000 // 5 min default
    );
    
    // Post-process outputs
    return config.outputMapping 
      ? this.mapOutputs(result, config.outputMapping)
      : result;
  }
  
  private async withTimeout<T>(
    fn: () => Promise<T>,
    timeoutMs: number
  ): Promise<T> {
    return Promise.race([
      fn(),
      new Promise<never>((_, reject) => 
        setTimeout(() => reject(new TimeoutError()), timeoutMs)
      )
    ]);
  }
}
```

### 3.3 Scaling Model

#### Auto-Scaling Configuration

```typescript
interface ScalingConfig {
  // Scale up triggers
  scaleUp: {
    queueDepthThreshold: number;    // Jobs waiting
    cpuThreshold: number;           // Average CPU %
    memoryThreshold: number;        // Average memory %
    latencyThreshold: number;         // P95 job start time
  };
  
  // Scale down triggers
  scaleDown: {
    idleTimeout: number;              // Seconds before scale down
    minWorkers: number;
    maxWorkers: number;
  };
  
  // Resource limits per worker
  resources: {
    maxMemory: number;              // MB
    maxCpu: number;                 // Percentage
    maxDisk: number;                // MB
  };
}

class AutoScaler {
  private workers: Map<string, Worker> = new Map();
  private config: ScalingConfig;
  
  async evaluate(): Promise<void> {
    const metrics = await this.collectMetrics();
    
    // Scale up conditions
    if (metrics.queueDepth > this.config.scaleUp.queueDepthThreshold ||
        metrics.avgCpu > this.config.scaleUp.cpuThreshold) {
      await this.scaleUp();
    }
    
    // Scale down conditions
    const idleWorkers = this.getIdleWorkers();
    if (idleWorkers.length > 0 && 
        metrics.queueDepth === 0 &&
        this.workers.size > this.config.scaleDown.minWorkers) {
      await this.scaleDown(idleWorkers);
    }
  }
  
  async scaleUp(): Promise<void> {
    const newWorker = await this.spawnWorker();
    this.workers.set(newWorker.id, newWorker);
    
    this.emit('worker:scaled_up', {
      workerId: newWorker.id,
      totalWorkers: this.workers.size
    });
  }
  
  async scaleDown(workers: Worker[]): Promise<void> {
    for (const worker of workers) {
      // Graceful shutdown
      await worker.drain();
      await worker.terminate();
      this.workers.delete(worker.id);
    }
    
    this.emit('worker:scaled_down', {
      removedCount: workers.length,
      totalWorkers: this.workers.size
    });
  }
}
```

---

## 4. Action Library

### 4.1 Built-in Actions

| Category | Action | Description |
|----------|--------|-------------|
| **Download** | `download.url` | Download from URL |
| | `download.playlist` | Download entire playlist |
| | `download.channel` | Download channel content |
| **Media** | `media.convert` | Convert format |
| | `media.extract_audio` | Extract audio track |
| | `media.thumbnail` | Generate thumbnails |
| | `media.metadata` | Update metadata |
| **Organize** | `media.move` | Move to folder |
| | `media.copy` | Copy to location |
| | `media.rename` | Rename file |
| | `media.delete` | Delete file |
| | `media.tag` | Add/remove tags |
| | `media.add_to_collection` | Add to collection |
| **Notify** | `notify.desktop` | Desktop notification |
| | `notify.email` | Send email |
| | `notify.webhook` | HTTP webhook |
| | `notify.slack` | Slack message |
| **System** | `system.exec` | Execute command |
| | `system.script` | Run custom script |
| | `system.wait` | Delay execution |
| | `system.condition` | Conditional branch |
| **External** | `external.api` | Call external API |
| | `external.zapier` | Zapier trigger |
| | `external.ifttt` | IFTTT trigger |

### 4.2 Action Implementation

```typescript
interface Action {
  readonly type: string;
  readonly name: string;
  readonly description: string;
  readonly category: string;
  readonly icon: string;
  
  // Input/output schema
  inputSchema: JSONSchema;
  outputSchema: JSONSchema;
  
  // Execution
  execute(inputs: any, context: ExecutionContext): Promise<any>;
  
  // Validation
  validateInputs(inputs: any): ValidationResult;
  
  // UI
  renderConfig?(config: any, onChange: (config: any) => void): ReactNode;
}

// Example: Download Action
class DownloadAction implements Action {
  readonly type = 'download.url';
  readonly name = 'Download from URL';
  readonly description = 'Download media from a URL';
  readonly category = 'Download';
  readonly icon = 'download';
  
  inputSchema = {
    type: 'object',
    properties: {
      url: { type: 'string', format: 'uri' },
      quality: { 
        type: 'string', 
        enum: ['best', 'worst', '1080p', '720p', '480p'],
        default: 'best'
      },
      format: {
        type: 'string',
        enum: ['mp4', 'mkv', 'webm', 'mp3', 'aac'],
        default: 'mp4'
      },
      outputPath: { type: 'string' }
    },
    required: ['url']
  };
  
  async execute(inputs: any, context: ExecutionContext): Promise<any> {
    const { url, quality, format, outputPath } = inputs;
    
    // Validate URL
    if (!this.isValidUrl(url)) {
      throw new ValidationError('Invalid URL provided');
    }
    
    // Start download
    const download = await context.services.downloader.createDownload({
      url,
      quality,
      format,
      outputPath: outputPath || context.variables.defaultDownloadPath
    });
    
    // Wait for completion
    const result = await this.waitForDownload(download.id, context);
    
    return {
      downloadId: download.id,
      filePath: result.filePath,
      mediaId: result.mediaId,
      metadata: result.metadata
    };
  }
  
  private async waitForDownload(
    downloadId: string, 
    context: ExecutionContext
  ): Promise<DownloadResult> {
    return new Promise((resolve, reject) => {
      const unsubscribe = context.events.subscribe(
        'download.completed',
        (event) => {
          if (event.payload.downloadId === downloadId) {
            unsubscribe();
            resolve(event.payload);
          }
        }
      );
      
      // Timeout after 30 minutes
      setTimeout(() => {
        unsubscribe();
        reject(new TimeoutError('Download timed out'));
      }, 30 * 60 * 1000);
    });
  }
}
```

---

## 5. Retry & Error Handling

### 5.1 Retry Policies

```typescript
interface RetryPolicy {
  maxAttempts: number;
  backoffStrategy: 'fixed' | 'linear' | 'exponential';
  initialDelay: number;           // ms
  maxDelay: number;               // ms
  retryableErrors: string[];      // Error types to retry
  onRetry?: (attempt: number, error: Error) => void;
}

const defaultRetryPolicy: RetryPolicy = {
  maxAttempts: 3,
  backoffStrategy: 'exponential',
  initialDelay: 1000,
  maxDelay: 30000,
  retryableErrors: [
    'NetworkError',
    'TimeoutError',
    'RateLimitError',
    'ServiceUnavailableError'
  ]
};

class RetryManager {
  async executeWithRetry<T>(
    operation: () => Promise<T>,
    policy: RetryPolicy,
    context: ExecutionContext
  ): Promise<T> {
    let attempt = 0;
    let delay = policy.initialDelay;
    
    while (attempt < policy.maxAttempts) {
      try {
        return await operation();
      } catch (error) {
        attempt++;
        
        if (attempt >= policy.maxAttempts) {
          throw new MaxRetriesExceededError(error, attempt);
        }
        
        if (!this.isRetryable(error, policy)) {
          throw error;
        }
        
        context.log('warn', `Retry ${attempt}/${policy.maxAttempts} after error: ${error.message}`);
        
        if (policy.onRetry) {
          policy.onRetry(attempt, error);
        }
        
        await this.sleep(delay);
        delay = this.calculateNextDelay(delay, policy);
      }
    }
    
    throw new Error('Unreachable');
  }
  
  private calculateNextDelay(currentDelay: number, policy: RetryPolicy): number {
    switch (policy.backoffStrategy) {
      case 'fixed':
        return policy.initialDelay;
      case 'linear':
        return Math.min(currentDelay + policy.initialDelay, policy.maxDelay);
      case 'exponential':
        return Math.min(currentDelay * 2, policy.maxDelay);
    }
  }
}
```

### 5.2 Error Recovery

```typescript
interface ErrorHandler {
  canHandle(error: Error): boolean;
  handle(error: Error, context: ExecutionContext): Promise<RecoveryAction>;
}

class ErrorRecoverySystem {
  private handlers: ErrorHandler[] = [];
  
  registerHandler(handler: ErrorHandler): void {
    this.handlers.push(handler);
  }
  
  async handleError(error: Error, context: ExecutionContext): Promise<RecoveryAction> {
    // Find appropriate handler
    for (const handler of this.handlers) {
      if (handler.canHandle(error)) {
        return await handler.handle(error, context);
      }
    }
    
    // Default: fail the job
    return { type: 'fail', propagate: true };
  }
}

// Example handlers
class NetworkErrorHandler implements ErrorHandler {
  canHandle(error: Error): boolean {
    return error instanceof NetworkError || 
           error.code === 'ECONNREFUSED' ||
           error.code === 'ETIMEDOUT';
  }
  
  async handle(error: Error, context: ExecutionContext): Promise<RecoveryAction> {
    // Check if we should retry
    if (context.attempts < 3) {
      return { type: 'retry', delay: 5000 };
    }
    
    // Try alternative approach
    return { 
      type: 'fallback',
      fallbackAction: {
        type: 'notify',
        message: `Network error after retries: ${error.message}`
      }
    };
  }
}

class DiskSpaceErrorHandler implements ErrorHandler {
  canHandle(error: Error): boolean {
    return error.message.includes('ENOSPC') || 
           error.message.includes('disk full');
  }
  
  async handle(error: Error, context: ExecutionContext): Promise<RecoveryAction> {
    // Pause and notify user
    return {
      type: 'pause',
      notify: true,
      message: 'Disk space full. Please free up space to continue.'
    };
  }
}
```

---

## 6. Monitoring & Logging

### 6.1 Execution Logging

```typescript
interface ExecutionLogger {
  log(level: LogLevel, message: string, metadata?: any): void;
  logStep(stepId: string, action: string, inputs: any, outputs: any): void;
  logError(error: Error, context: any): void;
  logMetric(name: string, value: number, tags?: Record<string, string>): void;
}

class StructuredLogger implements ExecutionLogger {
  private jobId: string;
  private workflowId?: string;
  private logs: ExecutionLog[] = [];
  
  log(level: LogLevel, message: string, metadata?: any): void {
    const log: ExecutionLog = {
      timestamp: new Date(),
      level,
      message,
      jobId: this.jobId,
      workflowId: this.workflowId,
      stepId: this.currentStep,
      metadata,
      threadId: this.getThreadId()
    };
    
    this.logs.push(log);
    
    // Send to central logging
    this.transport.send(log);
    
    // Update job record
    this.updateJobLog(log);
  }
  
  logStep(stepId: string, action: string, inputs: any, outputs: any): void {
    this.log('info', `Executed ${action}`, {
      stepId,
      action,
      inputs: this.sanitize(inputs),
      outputs: this.sanitize(outputs),
      duration: this.getStepDuration(stepId)
    });
  }
  
  private sanitize(data: any): any {
    // Remove sensitive fields
    const sensitive = ['password', 'token', 'apiKey', 'secret'];
    return JSON.parse(JSON.stringify(data, (key, value) => 
      sensitive.includes(key) ? '***REDACTED***' : value
    ));
  }
}
```

### 6.2 Execution Metrics

| Metric | Type | Description |
|--------|------|-------------|
| `workflow.executions` | Counter | Total workflow runs |
| `workflow.duration` | Histogram | Execution time |
| `workflow.success_rate` | Gauge | Success percentage |
| `step.duration` | Histogram | Per-step timing |
| `queue.depth` | Gauge | Pending jobs |
| `worker.utilization` | Gauge | Worker busy % |
| `retry.count` | Counter | Total retries |
| `error.rate` | Gauge | Errors per minute |

### 6.3 Health Monitoring

```typescript
interface HealthCheck {
  name: string;
  check(): Promise<HealthStatus>;
}

class HealthMonitor {
  private checks: HealthCheck[] = [];
  
  async runChecks(): Promise<HealthReport> {
    const results = await Promise.all(
      this.checks.map(async check => ({
        name: check.name,
        status: await check.check()
      }))
    );
    
    const healthy = results.every(r => r.status === 'healthy');
    
    return {
      status: healthy ? 'healthy' : 'degraded',
      checks: results,
      timestamp: new Date()
    };
  }
}

// Built-in checks
const queueHealthCheck: HealthCheck = {
  name: 'task_queue',
  async check() {
    const depth = await queue.getDepth();
    const oldestJob = await queue.getOldestJob();
    
    if (depth > 1000) return { status: 'unhealthy', reason: 'Queue backlog' };
    if (oldestJob && Date.now() - oldestJob.createdAt > 3600000) {
      return { status: 'degraded', reason: 'Stale jobs' };
    }
    
    return { status: 'healthy' };
  }
};

const workerHealthCheck: HealthCheck = {
  name: 'workers',
  async check() {
    const workers = await workerPool.getStatus();
    const healthyWorkers = workers.filter(w => w.healthy);
    
    if (healthyWorkers.length === 0) return { status: 'unhealthy' };
    if (healthyWorkers.length < workers.length / 2) {
      return { status: 'degraded', reason: 'Worker failures' };
    }
    
    return { status: 'healthy' };
  }
};
```

---

## 7. Workflow Templates

### 7.1 Template Marketplace

```typescript
interface WorkflowTemplate {
  id: string;
  name: string;
  description: string;
  category: string;
  author: string;
  version: string;
  
  // Template content
  workflow: WorkflowDefinition;
  
  // Metadata
  downloads: number;
  rating: number;
  tags: string[];
  createdAt: Date;
  updatedAt: Date;
  
  // Usage
  installCount: number;
  exampleInputs?: Record<string, any>;
}

// Example templates
const templates: WorkflowTemplate[] = [
  {
    id: 'auto-download-playlist',
    name: 'Auto-Download Playlist Updates',
    description: 'Automatically check and download new videos from a playlist',
    category: 'Download',
    workflow: {
      trigger: { type: 'schedule', cron: '0 0 * * *' },
      steps: [
        { type: 'action', action: 'download.playlist', inputs: { /* ... */ } },
        { type: 'action', action: 'notify.desktop', inputs: { message: 'Downloaded {{count}} new videos' } }
      ]
    }
  },
  {
    id: 'convert-to-podcast',
    name: 'Convert Long Videos to Podcast',
    description: 'Extract audio from videos over 30 minutes and organize',
    category: 'Conversion',
    workflow: {
      trigger: { type: 'event', event: 'download.completed' },
      condition: { field: 'duration', operator: 'gt', value: 1800 },
      steps: [
        { type: 'action', action: 'media.extract_audio', inputs: { format: 'mp3', bitrate: 192 } },
        { type: 'action', action: 'media.move', inputs: { destination: '~/Podcasts/' } },
        { type: 'action', action: 'media.tag', inputs: { tags: ['podcast', 'auto-converted'] } }
      ]
    }
  },
  {
    id: 'archive-old-content',
    name: 'Archive Old Downloads',
    description: 'Move content older than 90 days to archive folder',
    category: 'Organization',
    workflow: {
      trigger: { type: 'schedule', cron: '0 2 * * 0' }, // Weekly
      steps: [
        { 
          type: 'loop', 
          over: 'media',
          where: { field: 'downloadedAt', operator: 'lt', value: '90 days ago' },
          steps: [
            { type: 'action', action: 'media.move', inputs: { destination: '~/Archive/{{date:YYYY}}/{{date:MM}}/' } }
          ]
        }
      ]
    }
  }
];
```

---

*Document Version: 1.0*
*Last Updated: 2026-05-27*
*Owner: Engineering Team*