package computer import ( "context" "encoding/json" "errors" "fmt" "sync" "time" "github.com/gooseek/backend/internal/computer/connectors" "github.com/gooseek/backend/internal/llm" "github.com/google/uuid" ) type ComputerConfig struct { MaxParallelTasks int MaxSubTasks int TaskTimeout time.Duration SubTaskTimeout time.Duration TotalBudget float64 EnableSandbox bool EnableScheduling bool EnableBrowser bool SandboxImage string ArtifactStorageURL string BrowserServerURL string CheckpointStorePath string MaxConcurrentTasks int HeartbeatInterval time.Duration CheckpointInterval time.Duration } func DefaultConfig() ComputerConfig { return ComputerConfig{ MaxParallelTasks: 10, MaxSubTasks: 100, TaskTimeout: 365 * 24 * time.Hour, SubTaskTimeout: 2 * time.Hour, TotalBudget: 100.0, EnableSandbox: true, EnableScheduling: true, EnableBrowser: true, SandboxImage: "gooseek/sandbox:latest", BrowserServerURL: "http://browser-svc:3050", CheckpointStorePath: "/data/checkpoints", MaxConcurrentTasks: 50, HeartbeatInterval: 30 * time.Second, CheckpointInterval: 15 * time.Minute, } } func GetDurationConfig(mode DurationMode) (maxDuration, checkpointFreq, heartbeatFreq time.Duration, maxIter int) { cfg, ok := DurationModeConfigs[mode] if !ok { cfg = DurationModeConfigs[DurationMedium] } return cfg.MaxDuration, cfg.CheckpointFreq, cfg.HeartbeatFreq, cfg.MaxIterations } type Dependencies struct { Registry *llm.ModelRegistry TaskRepo TaskRepository MemoryRepo MemoryRepository ArtifactRepo ArtifactRepository } type TaskRepository interface { Create(ctx context.Context, task *ComputerTask) error Update(ctx context.Context, task *ComputerTask) error GetByID(ctx context.Context, id string) (*ComputerTask, error) GetByUserID(ctx context.Context, userID string, limit, offset int) ([]ComputerTask, error) GetScheduled(ctx context.Context) ([]ComputerTask, error) Delete(ctx context.Context, id string) error } type MemoryRepository interface { Store(ctx context.Context, entry *MemoryEntry) error GetByUser(ctx context.Context, userID string, limit int) ([]MemoryEntry, error) GetByTask(ctx context.Context, taskID string) ([]MemoryEntry, error) Search(ctx context.Context, userID, query string, limit int) ([]MemoryEntry, error) Delete(ctx context.Context, id string) error } type ArtifactRepository interface { Create(ctx context.Context, artifact *Artifact) error GetByID(ctx context.Context, id string) (*Artifact, error) GetByTaskID(ctx context.Context, taskID string) ([]Artifact, error) Delete(ctx context.Context, id string) error } type Computer struct { cfg ComputerConfig planner *Planner router *Router executor *Executor sandbox *SandboxManager memory *MemoryStore scheduler *Scheduler connectors *connectors.ConnectorHub registry *llm.ModelRegistry taskRepo TaskRepository eventBus *EventBus mu sync.RWMutex tasks map[string]*ComputerTask } func NewComputer(cfg ComputerConfig, deps Dependencies) *Computer { eventBus := NewEventBus() c := &Computer{ cfg: cfg, registry: deps.Registry, taskRepo: deps.TaskRepo, eventBus: eventBus, tasks: make(map[string]*ComputerTask), } c.planner = NewPlanner(deps.Registry) c.router = NewRouter(deps.Registry) c.executor = NewExecutor(c.router, cfg.MaxParallelTasks) c.memory = NewMemoryStore(deps.MemoryRepo) c.connectors = connectors.NewConnectorHub() if cfg.EnableSandbox { c.sandbox = NewSandboxManager(SandboxConfig{ Image: cfg.SandboxImage, Timeout: cfg.SubTaskTimeout, }) c.executor.SetSandbox(c.sandbox) } if cfg.EnableScheduling { c.scheduler = NewScheduler(deps.TaskRepo, c) } return c } func (c *Computer) Execute(ctx context.Context, userID, query string, opts ExecuteOptions) (*ComputerTask, error) { if opts.ResumeFromID != "" { return c.resumeFromCheckpoint(ctx, opts.ResumeFromID, opts) } durationMode := opts.DurationMode if durationMode == "" { durationMode = DurationMedium } maxDuration, _, _, maxIter := GetDurationConfig(durationMode) task := &ComputerTask{ ID: uuid.New().String(), UserID: userID, Query: query, Status: StatusPending, Memory: make(map[string]interface{}), CreatedAt: time.Now(), UpdatedAt: time.Now(), DurationMode: durationMode, MaxDuration: maxDuration, MaxIterations: maxIter, Priority: opts.Priority, } if opts.Priority == "" { task.Priority = PriorityNormal } if opts.ResourceLimits != nil { task.ResourceLimits = opts.ResourceLimits } if opts.Schedule != nil { task.Schedule = opts.Schedule task.Status = StatusScheduled } if opts.Context != nil { task.Memory = opts.Context } estimatedEnd := time.Now().Add(maxDuration) task.EstimatedEnd = &estimatedEnd if err := c.taskRepo.Create(ctx, task); err != nil { return nil, fmt.Errorf("failed to create task: %w", err) } c.mu.Lock() c.tasks[task.ID] = task c.mu.Unlock() c.emitEvent(TaskEvent{ Type: EventTaskCreated, TaskID: task.ID, Status: task.Status, Message: fmt.Sprintf("Task created (mode: %s, max duration: %v)", durationMode, maxDuration), Timestamp: time.Now(), Data: map[string]interface{}{ "durationMode": durationMode, "maxDuration": maxDuration.String(), "maxIterations": maxIter, }, }) if opts.Async { go c.executeTaskWithCheckpoints(context.Background(), task, opts) return task, nil } return c.executeTaskWithCheckpoints(ctx, task, opts) } func (c *Computer) resumeFromCheckpoint(ctx context.Context, checkpointID string, opts ExecuteOptions) (*ComputerTask, error) { task, err := c.taskRepo.GetByID(ctx, checkpointID) if err != nil { return nil, fmt.Errorf("task not found: %w", err) } if task.Checkpoint == nil { return nil, errors.New("no checkpoint found for this task") } task.Status = StatusExecuting now := time.Now() task.ResumedAt = &now task.UpdatedAt = now c.emitEvent(TaskEvent{ Type: EventResumed, TaskID: task.ID, Status: task.Status, Message: fmt.Sprintf("Resumed from checkpoint (wave: %d, subtask: %d)", task.Checkpoint.WaveIndex, task.Checkpoint.SubTaskIndex), Progress: task.Checkpoint.Progress, Timestamp: time.Now(), }) c.mu.Lock() c.tasks[task.ID] = task c.mu.Unlock() if opts.Async { go c.executeTaskWithCheckpoints(context.Background(), task, opts) return task, nil } return c.executeTaskWithCheckpoints(ctx, task, opts) } func (c *Computer) executeTask(ctx context.Context, task *ComputerTask, opts ExecuteOptions) (*ComputerTask, error) { return c.executeTaskWithCheckpoints(ctx, task, opts) } func (c *Computer) executeTaskWithCheckpoints(ctx context.Context, task *ComputerTask, opts ExecuteOptions) (*ComputerTask, error) { maxDuration, checkpointFreq, heartbeatFreq, _ := GetDurationConfig(task.DurationMode) if opts.Timeout > 0 { maxDuration = time.Duration(opts.Timeout) * time.Second } ctx, cancel := context.WithTimeout(ctx, maxDuration) defer cancel() budget := c.cfg.TotalBudget if opts.MaxCost > 0 { budget = opts.MaxCost } if task.ResourceLimits != nil && task.ResourceLimits.MaxTotalCost > 0 { budget = task.ResourceLimits.MaxTotalCost } startWave := 0 if task.Checkpoint != nil { startWave = task.Checkpoint.WaveIndex for k, v := range task.Checkpoint.Memory { task.Memory[k] = v } } if task.Plan == nil { task.Status = StatusPlanning task.UpdatedAt = time.Now() c.updateTask(ctx, task) c.emitEvent(TaskEvent{ Type: EventTaskStarted, TaskID: task.ID, Status: StatusPlanning, Message: "Planning task execution", Timestamp: time.Now(), }) userMemory, _ := c.memory.GetUserContext(ctx, task.UserID) memoryContext := make(map[string]interface{}) for k, v := range userMemory { memoryContext[k] = v } for k, v := range task.Memory { memoryContext[k] = v } plan, err := c.planner.Plan(ctx, task.Query, memoryContext) if err != nil { task.Status = StatusFailed task.Error = fmt.Sprintf("Planning failed: %v", err) task.UpdatedAt = time.Now() c.updateTask(ctx, task) c.emitEvent(TaskEvent{ Type: EventTaskFailed, TaskID: task.ID, Status: StatusFailed, Message: task.Error, Timestamp: time.Now(), }) return task, err } task.Plan = plan task.SubTasks = plan.SubTasks } task.Status = StatusLongRunning task.UpdatedAt = time.Now() c.updateTask(ctx, task) c.emitEvent(TaskEvent{ Type: EventTaskProgress, TaskID: task.ID, Status: StatusLongRunning, Progress: 10, Message: fmt.Sprintf("Executing %d subtasks (long-running mode)", len(task.Plan.SubTasks)), Data: map[string]interface{}{ "plan": task.Plan, "durationMode": task.DurationMode, "checkpointFreq": checkpointFreq.String(), }, Timestamp: time.Now(), }) heartbeatTicker := time.NewTicker(heartbeatFreq) defer heartbeatTicker.Stop() checkpointTicker := time.NewTicker(checkpointFreq) defer checkpointTicker.Stop() go func() { for { select { case <-ctx.Done(): return case <-heartbeatTicker.C: now := time.Now() task.HeartbeatAt = &now c.emitEvent(TaskEvent{ Type: EventHeartbeat, TaskID: task.ID, Progress: task.Progress, Message: fmt.Sprintf("Heartbeat: %d%% complete, cost: $%.4f", task.Progress, task.TotalCost), Data: map[string]interface{}{ "runtime": time.Since(task.CreatedAt).String(), "cost": task.TotalCost, }, Timestamp: now, }) } } }() totalSubTasks := len(task.Plan.ExecutionOrder) for waveIdx := startWave; waveIdx < totalSubTasks; waveIdx++ { select { case <-ctx.Done(): c.saveCheckpoint(task, waveIdx, 0, "context_timeout") return task, ctx.Err() case <-checkpointTicker.C: c.saveCheckpoint(task, waveIdx, 0, "periodic") default: } if budget > 0 && task.TotalCost >= budget { c.saveCheckpoint(task, waveIdx, 0, "budget_exceeded") task.Status = StatusPaused task.Message = fmt.Sprintf("Paused: budget exceeded ($%.2f / $%.2f)", task.TotalCost, budget) c.updateTask(ctx, task) return task, nil } wave := task.Plan.ExecutionOrder[waveIdx] waveTasks := make([]SubTask, 0) for _, subTaskID := range wave { for i := range task.SubTasks { if task.SubTasks[i].ID == subTaskID { waveTasks = append(waveTasks, task.SubTasks[i]) break } } } results, err := c.executor.ExecuteGroup(ctx, waveTasks, budget-task.TotalCost) if err != nil { c.saveCheckpoint(task, waveIdx, 0, "execution_error") task.Status = StatusFailed task.Error = fmt.Sprintf("Execution failed at wave %d: %v", waveIdx, err) task.UpdatedAt = time.Now() c.updateTask(ctx, task) return task, err } for _, result := range results { for i := range task.SubTasks { if task.SubTasks[i].ID == result.SubTaskID { task.SubTasks[i].Output = result.Output task.SubTasks[i].Cost = result.Cost task.SubTasks[i].Status = StatusCompleted now := time.Now() task.SubTasks[i].CompletedAt = &now if result.Error != nil { task.SubTasks[i].Status = StatusFailed task.SubTasks[i].Error = result.Error.Error() } break } } task.TotalCost += result.Cost task.TotalRuntime = time.Since(task.CreatedAt) for _, artifact := range result.Artifacts { task.Artifacts = append(task.Artifacts, artifact) c.emitEvent(TaskEvent{ Type: EventArtifact, TaskID: task.ID, SubTaskID: result.SubTaskID, Data: map[string]interface{}{ "artifact": artifact, }, Timestamp: time.Now(), }) } } progress := 10 + int(float64(waveIdx+1)/float64(totalSubTasks)*80) task.Progress = progress task.Iterations = waveIdx + 1 task.UpdatedAt = time.Now() c.updateTask(ctx, task) c.emitEvent(TaskEvent{ Type: EventIteration, TaskID: task.ID, Progress: progress, Message: fmt.Sprintf("Completed wave %d/%d (runtime: %v)", waveIdx+1, totalSubTasks, time.Since(task.CreatedAt).Round(time.Second)), Data: map[string]interface{}{ "wave": waveIdx + 1, "total": totalSubTasks, "cost": task.TotalCost, "runtime": time.Since(task.CreatedAt).String(), "artifacts": len(task.Artifacts), }, Timestamp: time.Now(), }) } task.Status = StatusCompleted task.Progress = 100 now := time.Now() task.CompletedAt = &now task.UpdatedAt = now task.TotalRuntime = time.Since(task.CreatedAt) c.updateTask(ctx, task) c.emitEvent(TaskEvent{ Type: EventTaskCompleted, TaskID: task.ID, Status: StatusCompleted, Progress: 100, Message: fmt.Sprintf("Task completed (runtime: %v, cost: $%.4f)", task.TotalRuntime.Round(time.Second), task.TotalCost), Data: map[string]interface{}{ "artifacts": task.Artifacts, "totalCost": task.TotalCost, "totalRuntime": task.TotalRuntime.String(), "iterations": task.Iterations, }, Timestamp: time.Now(), }) c.storeTaskResults(ctx, task) return task, nil } func (c *Computer) saveCheckpoint(task *ComputerTask, waveIdx, subTaskIdx int, reason string) { checkpoint := Checkpoint{ ID: uuid.New().String(), TaskID: task.ID, WaveIndex: waveIdx, SubTaskIndex: subTaskIdx, State: make(map[string]interface{}), Progress: task.Progress, Memory: task.Memory, CreatedAt: time.Now(), RuntimeSoFar: time.Since(task.CreatedAt), CostSoFar: task.TotalCost, Reason: reason, } for _, artifact := range task.Artifacts { checkpoint.Artifacts = append(checkpoint.Artifacts, artifact.ID) } task.Checkpoint = &checkpoint task.Checkpoints = append(task.Checkpoints, checkpoint) task.UpdatedAt = time.Now() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() c.taskRepo.Update(ctx, task) c.emitEvent(TaskEvent{ Type: EventCheckpointSaved, TaskID: task.ID, Progress: task.Progress, Message: fmt.Sprintf("Checkpoint saved: %s (wave %d)", reason, waveIdx), Data: map[string]interface{}{ "checkpointId": checkpoint.ID, "waveIndex": waveIdx, "subTaskIndex": subTaskIdx, "reason": reason, "runtime": checkpoint.RuntimeSoFar.String(), "cost": checkpoint.CostSoFar, }, Timestamp: time.Now(), }) } func (c *Computer) Pause(ctx context.Context, taskID string) error { c.mu.Lock() task, ok := c.tasks[taskID] if !ok { c.mu.Unlock() var err error task, err = c.taskRepo.GetByID(ctx, taskID) if err != nil { return err } c.mu.Lock() } if task.Status != StatusExecuting && task.Status != StatusLongRunning { c.mu.Unlock() return errors.New("task is not running") } now := time.Now() task.Status = StatusPaused task.PausedAt = &now task.UpdatedAt = now c.mu.Unlock() c.saveCheckpoint(task, task.Iterations, 0, "user_paused") c.emitEvent(TaskEvent{ Type: EventPaused, TaskID: taskID, Status: StatusPaused, Progress: task.Progress, Message: "Task paused by user", Timestamp: now, }) return c.taskRepo.Update(ctx, task) } func (c *Computer) Resume(ctx context.Context, taskID string, userInput string) error { c.mu.RLock() task, ok := c.tasks[taskID] c.mu.RUnlock() if !ok { var err error task, err = c.taskRepo.GetByID(ctx, taskID) if err != nil { return fmt.Errorf("task not found: %w", err) } } if task.Status != StatusWaiting { return errors.New("task is not waiting for user input") } task.Memory["user_input"] = userInput task.Status = StatusExecuting task.UpdatedAt = time.Now() go c.executeTask(context.Background(), task, ExecuteOptions{Async: true}) return nil } func (c *Computer) Cancel(ctx context.Context, taskID string) error { c.mu.Lock() task, ok := c.tasks[taskID] if ok { task.Status = StatusCancelled task.UpdatedAt = time.Now() } c.mu.Unlock() if !ok { task, err := c.taskRepo.GetByID(ctx, taskID) if err != nil { return fmt.Errorf("task not found: %w", err) } task.Status = StatusCancelled task.UpdatedAt = time.Now() return c.taskRepo.Update(ctx, task) } c.emitEvent(TaskEvent{ Type: EventTaskFailed, TaskID: taskID, Status: StatusCancelled, Message: "Task cancelled by user", Timestamp: time.Now(), }) return c.taskRepo.Update(ctx, task) } func (c *Computer) GetStatus(ctx context.Context, taskID string) (*ComputerTask, error) { c.mu.RLock() task, ok := c.tasks[taskID] c.mu.RUnlock() if ok { return task, nil } return c.taskRepo.GetByID(ctx, taskID) } func (c *Computer) GetUserTasks(ctx context.Context, userID string, limit, offset int) ([]ComputerTask, error) { return c.taskRepo.GetByUserID(ctx, userID, limit, offset) } func (c *Computer) Stream(ctx context.Context, taskID string) (<-chan TaskEvent, error) { return c.eventBus.Subscribe(taskID), nil } func (c *Computer) updateTask(ctx context.Context, task *ComputerTask) { c.mu.Lock() c.tasks[task.ID] = task c.mu.Unlock() _ = c.taskRepo.Update(ctx, task) } func (c *Computer) emitEvent(event TaskEvent) { c.eventBus.Publish(event.TaskID, event) } func (c *Computer) storeTaskResults(ctx context.Context, task *ComputerTask) { for _, st := range task.SubTasks { if st.Output != nil { outputJSON, _ := json.Marshal(st.Output) entry := &MemoryEntry{ ID: uuid.New().String(), UserID: task.UserID, TaskID: task.ID, Key: fmt.Sprintf("subtask_%s_result", st.ID), Value: string(outputJSON), Type: MemoryTypeResult, CreatedAt: time.Now(), } _ = c.memory.Store(ctx, task.UserID, entry) } } } func (c *Computer) StartScheduler(ctx context.Context) { if c.scheduler != nil { c.scheduler.Start(ctx) } } func (c *Computer) StopScheduler() { if c.scheduler != nil { c.scheduler.Stop() } } type EventBus struct { subscribers map[string][]chan TaskEvent mu sync.RWMutex } func NewEventBus() *EventBus { return &EventBus{ subscribers: make(map[string][]chan TaskEvent), } } func (eb *EventBus) Subscribe(taskID string) <-chan TaskEvent { eb.mu.Lock() defer eb.mu.Unlock() ch := make(chan TaskEvent, 100) eb.subscribers[taskID] = append(eb.subscribers[taskID], ch) return ch } func (eb *EventBus) Unsubscribe(taskID string, ch <-chan TaskEvent) { eb.mu.Lock() defer eb.mu.Unlock() subs := eb.subscribers[taskID] for i, sub := range subs { if sub == ch { eb.subscribers[taskID] = append(subs[:i], subs[i+1:]...) close(sub) break } } } func (eb *EventBus) Publish(taskID string, event TaskEvent) { eb.mu.RLock() subs := eb.subscribers[taskID] eb.mu.RUnlock() for _, ch := range subs { select { case ch <- event: default: } } }