package session import ( "encoding/json" "sync" "github.com/gooseek/backend/internal/types" "github.com/google/uuid" ) type EventType string const ( EventData EventType = "data" EventEnd EventType = "end" EventError EventType = "error" ) type Event struct { Type EventType `json:"type"` Data interface{} `json:"data"` } type Subscriber func(event EventType, data interface{}) type Session struct { id string blocks map[string]*types.Block subscribers []Subscriber mu sync.RWMutex closed bool } func NewSession() *Session { return &Session{ id: uuid.New().String(), blocks: make(map[string]*types.Block), subscribers: make([]Subscriber, 0), } } func (s *Session) ID() string { return s.id } func (s *Session) Subscribe(fn Subscriber) func() { s.mu.Lock() s.subscribers = append(s.subscribers, fn) idx := len(s.subscribers) - 1 s.mu.Unlock() return func() { s.mu.Lock() defer s.mu.Unlock() if idx < len(s.subscribers) { s.subscribers = append(s.subscribers[:idx], s.subscribers[idx+1:]...) } } } func (s *Session) Emit(eventType EventType, data interface{}) { s.mu.RLock() if s.closed { s.mu.RUnlock() return } subs := make([]Subscriber, len(s.subscribers)) copy(subs, s.subscribers) s.mu.RUnlock() for _, sub := range subs { sub(eventType, data) } } func (s *Session) EmitBlock(block *types.Block) { s.mu.Lock() s.blocks[block.ID] = block s.mu.Unlock() s.Emit(EventData, map[string]interface{}{ "type": "block", "block": block, }) } func (s *Session) UpdateBlock(blockID string, patches []Patch) { s.mu.Lock() block, ok := s.blocks[blockID] if !ok { s.mu.Unlock() return } for _, patch := range patches { applyPatch(block, patch) } s.mu.Unlock() s.Emit(EventData, map[string]interface{}{ "type": "updateBlock", "blockId": blockID, "patch": patches, }) } func (s *Session) EmitTextChunk(blockID, chunk string) { s.Emit(EventData, map[string]interface{}{ "type": "textChunk", "blockId": blockID, "chunk": chunk, }) } func (s *Session) EmitResearchComplete() { s.Emit(EventData, map[string]interface{}{ "type": "researchComplete", }) } func (s *Session) EmitEnd() { s.Emit(EventData, map[string]interface{}{ "type": "messageEnd", }) s.Emit(EventEnd, nil) } func (s *Session) EmitError(err error) { s.Emit(EventData, map[string]interface{}{ "type": "error", "data": err.Error(), }) s.Emit(EventError, map[string]interface{}{ "data": err.Error(), }) } func (s *Session) GetBlock(id string) *types.Block { s.mu.RLock() defer s.mu.RUnlock() return s.blocks[id] } func (s *Session) Close() { s.mu.Lock() s.closed = true s.subscribers = nil s.mu.Unlock() } func (s *Session) RemoveAllListeners() { s.mu.Lock() s.subscribers = nil s.mu.Unlock() } type Patch struct { Op string `json:"op"` Path string `json:"path"` Value interface{} `json:"value"` } func applyPatch(block *types.Block, patch Patch) { if patch.Op != "replace" { return } switch patch.Path { case "/data": block.Data = patch.Value case "/data/subSteps": if rd, ok := block.Data.(types.ResearchData); ok { if steps, ok := patch.Value.([]types.ResearchSubStep); ok { rd.SubSteps = steps block.Data = rd } } } } func MarshalEvent(data interface{}) ([]byte, error) { return json.Marshal(data) }