Major changes: - Add Go backend (backend/) with microservices architecture - Enhanced master-agents-svc: reranker, content-classifier, stealth-crawler, proxy-manager, media-search, fastClassifier, language detection - New web-svc widgets: KnowledgeCard, ProductCard, ProfileCard, VideoCard, UnifiedCard, CardGallery, InlineImageGallery, SourcesPanel, RelatedQuestions - Improved discover-svc with discover-db integration - Docker deployment improvements (Caddyfile, vendor.sh, BUILD.md) - Library-svc: project_id schema migration - Remove deprecated finance-svc and travel-svc - Localization improvements across services Made-with: Cursor
544 lines
14 KiB
Go
544 lines
14 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gooseek/backend/internal/llm"
|
|
"github.com/gooseek/backend/internal/search"
|
|
"github.com/gooseek/backend/internal/session"
|
|
"github.com/gooseek/backend/internal/types"
|
|
"github.com/google/uuid"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
type DeepResearchConfig struct {
|
|
LLM llm.Client
|
|
SearchClient *search.SearXNGClient
|
|
FocusMode FocusMode
|
|
Locale string
|
|
MaxSearchQueries int
|
|
MaxSources int
|
|
MaxIterations int
|
|
Timeout time.Duration
|
|
}
|
|
|
|
type DeepResearchResult struct {
|
|
FinalReport string
|
|
Sources []types.Chunk
|
|
SubQueries []SubQuery
|
|
Insights []string
|
|
FollowUpQueries []string
|
|
TotalSearches int
|
|
TotalSources int
|
|
Duration time.Duration
|
|
}
|
|
|
|
type SubQuery struct {
|
|
Query string
|
|
Purpose string
|
|
Status string
|
|
Results []types.Chunk
|
|
Insights []string
|
|
}
|
|
|
|
type DeepResearcher struct {
|
|
cfg DeepResearchConfig
|
|
sess *session.Session
|
|
mu sync.Mutex
|
|
allSources []types.Chunk
|
|
seenURLs map[string]bool
|
|
subQueries []SubQuery
|
|
insights []string
|
|
searchCount int
|
|
startTime time.Time
|
|
}
|
|
|
|
func NewDeepResearcher(cfg DeepResearchConfig, sess *session.Session) *DeepResearcher {
|
|
if cfg.MaxSearchQueries == 0 {
|
|
cfg.MaxSearchQueries = 30
|
|
}
|
|
if cfg.MaxSources == 0 {
|
|
cfg.MaxSources = 100
|
|
}
|
|
if cfg.MaxIterations == 0 {
|
|
cfg.MaxIterations = 5
|
|
}
|
|
if cfg.Timeout == 0 {
|
|
cfg.Timeout = 5 * time.Minute
|
|
}
|
|
|
|
return &DeepResearcher{
|
|
cfg: cfg,
|
|
sess: sess,
|
|
seenURLs: make(map[string]bool),
|
|
allSources: make([]types.Chunk, 0),
|
|
subQueries: make([]SubQuery, 0),
|
|
insights: make([]string, 0),
|
|
startTime: time.Now(),
|
|
}
|
|
}
|
|
|
|
func (dr *DeepResearcher) Research(ctx context.Context, query string) (*DeepResearchResult, error) {
|
|
ctx, cancel := context.WithTimeout(ctx, dr.cfg.Timeout)
|
|
defer cancel()
|
|
|
|
researchBlockID := uuid.New().String()
|
|
dr.sess.EmitBlock(&types.Block{
|
|
ID: researchBlockID,
|
|
Type: types.BlockTypeResearch,
|
|
Data: types.ResearchData{
|
|
SubSteps: []types.ResearchSubStep{},
|
|
},
|
|
})
|
|
|
|
subQueries, err := dr.planResearch(ctx, query)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("planning failed: %w", err)
|
|
}
|
|
|
|
dr.updateResearchStatus(researchBlockID, "researching", fmt.Sprintf("Executing %d sub-queries", len(subQueries)))
|
|
|
|
for i := 0; i < dr.cfg.MaxIterations && dr.searchCount < dr.cfg.MaxSearchQueries; i++ {
|
|
if err := dr.executeIteration(ctx, i, researchBlockID); err != nil {
|
|
if ctx.Err() != nil {
|
|
break
|
|
}
|
|
}
|
|
|
|
if dr.hasEnoughData() {
|
|
break
|
|
}
|
|
|
|
newQueries, err := dr.generateFollowUpQueries(ctx, query)
|
|
if err != nil || len(newQueries) == 0 {
|
|
break
|
|
}
|
|
|
|
for _, q := range newQueries {
|
|
dr.mu.Lock()
|
|
dr.subQueries = append(dr.subQueries, SubQuery{
|
|
Query: q.Query,
|
|
Purpose: q.Purpose,
|
|
Status: "pending",
|
|
})
|
|
dr.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
dr.updateResearchStatus(researchBlockID, "synthesizing", "Analyzing findings")
|
|
|
|
insights, err := dr.synthesizeInsights(ctx, query)
|
|
if err != nil {
|
|
insights = dr.insights
|
|
}
|
|
|
|
dr.updateResearchStatus(researchBlockID, "writing", "Generating report")
|
|
|
|
report, err := dr.generateFinalReport(ctx, query, insights)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("report generation failed: %w", err)
|
|
}
|
|
|
|
followUp, _ := dr.generateFollowUpSuggestions(ctx, query, report)
|
|
|
|
dr.updateResearchStatus(researchBlockID, "complete", "Research complete")
|
|
|
|
return &DeepResearchResult{
|
|
FinalReport: report,
|
|
Sources: dr.allSources,
|
|
SubQueries: dr.subQueries,
|
|
Insights: insights,
|
|
FollowUpQueries: followUp,
|
|
TotalSearches: dr.searchCount,
|
|
TotalSources: len(dr.allSources),
|
|
Duration: time.Since(dr.startTime),
|
|
}, nil
|
|
}
|
|
|
|
func (dr *DeepResearcher) planResearch(ctx context.Context, query string) ([]SubQuery, error) {
|
|
prompt := fmt.Sprintf(`Analyze this research query and break it into 3-5 sub-queries for comprehensive research.
|
|
|
|
Query: %s
|
|
|
|
For each sub-query, specify:
|
|
1. The search query (optimized for search engines)
|
|
2. The purpose (what aspect it addresses)
|
|
|
|
Respond in this exact format:
|
|
QUERY: [search query]
|
|
PURPOSE: [what this addresses]
|
|
|
|
QUERY: [search query]
|
|
PURPOSE: [what this addresses]
|
|
|
|
...
|
|
|
|
Be specific and actionable. Focus on different aspects: definitions, current state, history, expert opinions, data/statistics, controversies, future trends.`, query)
|
|
|
|
result, err := dr.cfg.LLM.GenerateText(ctx, llm.StreamRequest{
|
|
Messages: []llm.Message{{Role: "user", Content: prompt}},
|
|
})
|
|
if err != nil {
|
|
return dr.generateDefaultSubQueries(query), nil
|
|
}
|
|
|
|
subQueries := dr.parseSubQueries(result)
|
|
if len(subQueries) == 0 {
|
|
subQueries = dr.generateDefaultSubQueries(query)
|
|
}
|
|
|
|
dr.mu.Lock()
|
|
dr.subQueries = subQueries
|
|
dr.mu.Unlock()
|
|
|
|
return subQueries, nil
|
|
}
|
|
|
|
func (dr *DeepResearcher) parseSubQueries(text string) []SubQuery {
|
|
var queries []SubQuery
|
|
lines := strings.Split(text, "\n")
|
|
|
|
var currentQuery, currentPurpose string
|
|
for _, line := range lines {
|
|
line = strings.TrimSpace(line)
|
|
if strings.HasPrefix(line, "QUERY:") {
|
|
if currentQuery != "" && currentPurpose != "" {
|
|
queries = append(queries, SubQuery{
|
|
Query: currentQuery,
|
|
Purpose: currentPurpose,
|
|
Status: "pending",
|
|
})
|
|
}
|
|
currentQuery = strings.TrimSpace(strings.TrimPrefix(line, "QUERY:"))
|
|
currentPurpose = ""
|
|
} else if strings.HasPrefix(line, "PURPOSE:") {
|
|
currentPurpose = strings.TrimSpace(strings.TrimPrefix(line, "PURPOSE:"))
|
|
}
|
|
}
|
|
|
|
if currentQuery != "" && currentPurpose != "" {
|
|
queries = append(queries, SubQuery{
|
|
Query: currentQuery,
|
|
Purpose: currentPurpose,
|
|
Status: "pending",
|
|
})
|
|
}
|
|
|
|
return queries
|
|
}
|
|
|
|
func (dr *DeepResearcher) generateDefaultSubQueries(query string) []SubQuery {
|
|
return []SubQuery{
|
|
{Query: query, Purpose: "Main query", Status: "pending"},
|
|
{Query: query + " definition explained", Purpose: "Definitions and basics", Status: "pending"},
|
|
{Query: query + " latest news 2026", Purpose: "Current developments", Status: "pending"},
|
|
{Query: query + " expert analysis", Purpose: "Expert opinions", Status: "pending"},
|
|
{Query: query + " statistics data research", Purpose: "Data and evidence", Status: "pending"},
|
|
}
|
|
}
|
|
|
|
func (dr *DeepResearcher) executeIteration(ctx context.Context, iteration int, blockID string) error {
|
|
dr.mu.Lock()
|
|
pendingQueries := make([]int, 0)
|
|
for i, sq := range dr.subQueries {
|
|
if sq.Status == "pending" {
|
|
pendingQueries = append(pendingQueries, i)
|
|
}
|
|
}
|
|
dr.mu.Unlock()
|
|
|
|
if len(pendingQueries) == 0 {
|
|
return nil
|
|
}
|
|
|
|
batchSize := 3
|
|
if len(pendingQueries) < batchSize {
|
|
batchSize = len(pendingQueries)
|
|
}
|
|
|
|
g, gctx := errgroup.WithContext(ctx)
|
|
g.SetLimit(batchSize)
|
|
|
|
for _, idx := range pendingQueries[:batchSize] {
|
|
idx := idx
|
|
g.Go(func() error {
|
|
return dr.executeSubQuery(gctx, idx, blockID)
|
|
})
|
|
}
|
|
|
|
return g.Wait()
|
|
}
|
|
|
|
func (dr *DeepResearcher) executeSubQuery(ctx context.Context, idx int, blockID string) error {
|
|
dr.mu.Lock()
|
|
if idx >= len(dr.subQueries) {
|
|
dr.mu.Unlock()
|
|
return nil
|
|
}
|
|
sq := &dr.subQueries[idx]
|
|
sq.Status = "searching"
|
|
query := sq.Query
|
|
dr.searchCount++
|
|
dr.mu.Unlock()
|
|
|
|
dr.updateResearchStatus(blockID, "researching", fmt.Sprintf("Searching: %s", truncate(query, 50)))
|
|
|
|
enhancedQuery := EnhanceQueryForFocusMode(query, dr.cfg.FocusMode)
|
|
|
|
results, err := dr.cfg.SearchClient.Search(ctx, enhancedQuery, &search.SearchOptions{
|
|
Engines: dr.cfg.FocusMode.GetSearchEngines(),
|
|
Categories: FocusModeConfigs[dr.cfg.FocusMode].Categories,
|
|
PageNo: 1,
|
|
})
|
|
if err != nil {
|
|
dr.mu.Lock()
|
|
sq.Status = "failed"
|
|
dr.mu.Unlock()
|
|
return err
|
|
}
|
|
|
|
chunks := make([]types.Chunk, 0)
|
|
for _, r := range results.Results {
|
|
dr.mu.Lock()
|
|
if dr.seenURLs[r.URL] {
|
|
dr.mu.Unlock()
|
|
continue
|
|
}
|
|
dr.seenURLs[r.URL] = true
|
|
dr.mu.Unlock()
|
|
|
|
chunk := r.ToChunk()
|
|
chunks = append(chunks, chunk)
|
|
|
|
if len(chunks) >= 10 {
|
|
break
|
|
}
|
|
}
|
|
|
|
dr.mu.Lock()
|
|
sq.Results = chunks
|
|
sq.Status = "complete"
|
|
dr.allSources = append(dr.allSources, chunks...)
|
|
dr.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (dr *DeepResearcher) generateFollowUpQueries(ctx context.Context, originalQuery string) ([]SubQuery, error) {
|
|
if dr.searchCount >= dr.cfg.MaxSearchQueries-5 {
|
|
return nil, nil
|
|
}
|
|
|
|
var sourceSummary strings.Builder
|
|
dr.mu.Lock()
|
|
for i, s := range dr.allSources {
|
|
if i >= 20 {
|
|
break
|
|
}
|
|
sourceSummary.WriteString(fmt.Sprintf("- %s: %s\n", s.Metadata["title"], truncate(s.Content, 100)))
|
|
}
|
|
dr.mu.Unlock()
|
|
|
|
prompt := fmt.Sprintf(`Based on the original query and sources found so far, suggest 2-3 follow-up queries to deepen the research.
|
|
|
|
Original query: %s
|
|
|
|
Sources found so far:
|
|
%s
|
|
|
|
What aspects are missing? What would provide more comprehensive coverage?
|
|
Respond with queries in format:
|
|
QUERY: [query]
|
|
PURPOSE: [what gap it fills]`, originalQuery, sourceSummary.String())
|
|
|
|
result, err := dr.cfg.LLM.GenerateText(ctx, llm.StreamRequest{
|
|
Messages: []llm.Message{{Role: "user", Content: prompt}},
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return dr.parseSubQueries(result), nil
|
|
}
|
|
|
|
func (dr *DeepResearcher) synthesizeInsights(ctx context.Context, query string) ([]string, error) {
|
|
var sourcesText strings.Builder
|
|
dr.mu.Lock()
|
|
for i, s := range dr.allSources {
|
|
if i >= 30 {
|
|
break
|
|
}
|
|
sourcesText.WriteString(fmt.Sprintf("[%d] %s\n%s\n\n", i+1, s.Metadata["title"], truncate(s.Content, 300)))
|
|
}
|
|
dr.mu.Unlock()
|
|
|
|
prompt := fmt.Sprintf(`Analyze these sources and extract 5-7 key insights for the query: %s
|
|
|
|
Sources:
|
|
%s
|
|
|
|
Provide insights as bullet points, each starting with a key finding.
|
|
Focus on: main conclusions, patterns, contradictions, expert consensus, data points.`, query, sourcesText.String())
|
|
|
|
result, err := dr.cfg.LLM.GenerateText(ctx, llm.StreamRequest{
|
|
Messages: []llm.Message{{Role: "user", Content: prompt}},
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
insights := make([]string, 0)
|
|
for _, line := range strings.Split(result, "\n") {
|
|
line = strings.TrimSpace(line)
|
|
if strings.HasPrefix(line, "-") || strings.HasPrefix(line, "•") || strings.HasPrefix(line, "*") {
|
|
insights = append(insights, strings.TrimPrefix(strings.TrimPrefix(strings.TrimPrefix(line, "-"), "•"), "*"))
|
|
}
|
|
}
|
|
|
|
dr.mu.Lock()
|
|
dr.insights = insights
|
|
dr.mu.Unlock()
|
|
|
|
return insights, nil
|
|
}
|
|
|
|
func (dr *DeepResearcher) generateFinalReport(ctx context.Context, query string, insights []string) (string, error) {
|
|
var sourcesText strings.Builder
|
|
dr.mu.Lock()
|
|
sources := dr.allSources
|
|
dr.mu.Unlock()
|
|
|
|
for i, s := range sources {
|
|
if i >= 50 {
|
|
break
|
|
}
|
|
sourcesText.WriteString(fmt.Sprintf("[%d] %s (%s)\n%s\n\n", i+1, s.Metadata["title"], s.Metadata["url"], truncate(s.Content, 400)))
|
|
}
|
|
|
|
insightsText := strings.Join(insights, "\n- ")
|
|
|
|
focusCfg := FocusModeConfigs[dr.cfg.FocusMode]
|
|
locale := dr.cfg.Locale
|
|
if locale == "" {
|
|
locale = "en"
|
|
}
|
|
|
|
langInstruction := ""
|
|
if locale == "ru" {
|
|
langInstruction = "Write the report in Russian."
|
|
}
|
|
|
|
prompt := fmt.Sprintf(`%s
|
|
|
|
Write a comprehensive research report answering: %s
|
|
|
|
Key insights discovered:
|
|
- %s
|
|
|
|
Sources (cite using [1], [2], etc.):
|
|
%s
|
|
|
|
Structure your report with:
|
|
1. Executive Summary (2-3 sentences)
|
|
2. Key Findings (organized by theme)
|
|
3. Analysis and Discussion
|
|
4. Conclusions
|
|
|
|
%s
|
|
Use citations [1], [2], etc. throughout.
|
|
Be thorough but concise. Focus on actionable information.`, focusCfg.SystemPrompt, query, insightsText, sourcesText.String(), langInstruction)
|
|
|
|
stream, err := dr.cfg.LLM.StreamText(ctx, llm.StreamRequest{
|
|
Messages: []llm.Message{{Role: "user", Content: prompt}},
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
var report strings.Builder
|
|
textBlockID := uuid.New().String()
|
|
dr.sess.EmitBlock(&types.Block{
|
|
ID: textBlockID,
|
|
Type: types.BlockTypeText,
|
|
Data: "",
|
|
})
|
|
|
|
for chunk := range stream {
|
|
report.WriteString(chunk.ContentChunk)
|
|
dr.sess.EmitTextChunk(textBlockID, chunk.ContentChunk)
|
|
}
|
|
|
|
return report.String(), nil
|
|
}
|
|
|
|
func (dr *DeepResearcher) generateFollowUpSuggestions(ctx context.Context, query, report string) ([]string, error) {
|
|
prompt := fmt.Sprintf(`Based on this research query and report, suggest 3-4 follow-up questions the user might want to explore:
|
|
|
|
Query: %s
|
|
|
|
Report summary: %s
|
|
|
|
Provide follow-up questions that:
|
|
1. Go deeper into specific aspects
|
|
2. Explore related topics
|
|
3. Address practical applications
|
|
4. Consider alternative perspectives
|
|
|
|
Format as simple questions, one per line.`, query, truncate(report, 1000))
|
|
|
|
result, err := dr.cfg.LLM.GenerateText(ctx, llm.StreamRequest{
|
|
Messages: []llm.Message{{Role: "user", Content: prompt}},
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
suggestions := make([]string, 0)
|
|
for _, line := range strings.Split(result, "\n") {
|
|
line = strings.TrimSpace(line)
|
|
if line != "" && (strings.Contains(line, "?") || len(line) > 20) {
|
|
line = strings.TrimPrefix(line, "- ")
|
|
line = strings.TrimPrefix(line, "• ")
|
|
line = strings.TrimLeft(line, "0123456789. ")
|
|
if line != "" {
|
|
suggestions = append(suggestions, line)
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(suggestions) > 4 {
|
|
suggestions = suggestions[:4]
|
|
}
|
|
|
|
return suggestions, nil
|
|
}
|
|
|
|
func (dr *DeepResearcher) updateResearchStatus(blockID, status, message string) {
|
|
dr.sess.UpdateBlock(blockID, []session.Patch{
|
|
{Op: "replace", Path: "/data/status", Value: status},
|
|
{Op: "replace", Path: "/data/message", Value: message},
|
|
})
|
|
}
|
|
|
|
func (dr *DeepResearcher) hasEnoughData() bool {
|
|
dr.mu.Lock()
|
|
defer dr.mu.Unlock()
|
|
return len(dr.allSources) >= dr.cfg.MaxSources
|
|
}
|
|
|
|
func truncate(s string, maxLen int) string {
|
|
if len(s) <= maxLen {
|
|
return s
|
|
}
|
|
return s[:maxLen] + "..."
|
|
}
|
|
|
|
func RunDeepResearch(ctx context.Context, sess *session.Session, query string, cfg DeepResearchConfig) (*DeepResearchResult, error) {
|
|
researcher := NewDeepResearcher(cfg, sess)
|
|
return researcher.Research(ctx, query)
|
|
}
|