Some checks failed
Build and Deploy GooSeek / build-and-deploy (push) Has been cancelled
- Ingress: route /api/* on gooseek.ru to api-gateway (was going to webui) - api-gateway: move /health and /ready before JWT/rate-limit middleware to prevent liveness probe 429 failures causing CrashLoopBackOff - Readiness probes: fix agent-svc, search-svc, scraper-svc to use /health (they don't implement /ready endpoint, causing permanent 0/1 status) - ConfigMap: add missing CHAT_SVC_URL and API_GATEWAY_URL - deploy.sh: also clean up misplaced NetworkPolicy from gooseek namespace - webui: add Next.js rewrites to proxy /api/* to api-gateway Made-with: Cursor
318 lines
8.3 KiB
Go
318 lines
8.3 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/gofiber/fiber/v2"
|
|
"github.com/gofiber/fiber/v2/middleware/cors"
|
|
"github.com/gofiber/fiber/v2/middleware/logger"
|
|
"github.com/gooseek/backend/pkg/config"
|
|
"github.com/gooseek/backend/pkg/metrics"
|
|
"github.com/gooseek/backend/pkg/middleware"
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
var svcURLs map[string]string
|
|
|
|
func main() {
|
|
cfg, err := config.Load()
|
|
if err != nil {
|
|
log.Fatal("Failed to load config:", err)
|
|
}
|
|
|
|
opt, err := redis.ParseURL(cfg.RedisURL)
|
|
if err != nil {
|
|
log.Printf("Warning: failed to parse Redis URL, rate limiting will be disabled: %v", err)
|
|
}
|
|
var redisClient *redis.Client
|
|
if opt != nil {
|
|
redisClient = redis.NewClient(opt)
|
|
if _, err := redisClient.Ping(context.Background()).Result(); err != nil {
|
|
log.Printf("Warning: Redis not available, rate limiting will be disabled: %v", err)
|
|
redisClient = nil
|
|
}
|
|
}
|
|
|
|
svcURLs = map[string]string{
|
|
"auth": cfg.AuthSvcURL,
|
|
"chat": cfg.ChatSvcURL,
|
|
"agents": cfg.AgentSvcURL,
|
|
"search": cfg.SearchSvcURL,
|
|
"llm": cfg.LLMSvcURL,
|
|
"scraper": cfg.ScraperSvcURL,
|
|
"memory": cfg.MemorySvcURL,
|
|
"library": cfg.LibrarySvcURL,
|
|
"thread": cfg.ThreadSvcURL,
|
|
"discover": cfg.DiscoverSvcURL,
|
|
"finance": cfg.FinanceHeatmapURL,
|
|
"learning": cfg.LearningSvcURL,
|
|
"sandbox": cfg.SandboxSvcURL,
|
|
"travel": cfg.TravelSvcURL,
|
|
"medicine": cfg.MedicineSvcURL,
|
|
"admin": cfg.AdminSvcURL,
|
|
}
|
|
|
|
app := fiber.New(fiber.Config{
|
|
StreamRequestBody: true,
|
|
BodyLimit: 50 * 1024 * 1024,
|
|
ReadTimeout: time.Duration(cfg.HTTPTimeout),
|
|
WriteTimeout: 5 * time.Minute,
|
|
IdleTimeout: 2 * time.Minute,
|
|
})
|
|
|
|
app.Use(logger.New())
|
|
app.Use(cors.New(cors.Config{
|
|
AllowOrigins: strings.Join(cfg.AllowedOrigins, ","),
|
|
AllowHeaders: "Origin, Content-Type, Accept, Authorization",
|
|
AllowMethods: "GET, POST, PUT, PATCH, DELETE, OPTIONS",
|
|
}))
|
|
app.Use(metrics.PrometheusMiddleware(metrics.MetricsConfig{
|
|
ServiceName: "api-gateway",
|
|
}))
|
|
|
|
app.Get("/metrics", metrics.MetricsHandler())
|
|
|
|
app.Get("/health", func(c *fiber.Ctx) error {
|
|
return c.JSON(fiber.Map{"status": "ok"})
|
|
})
|
|
|
|
app.Get("/ready", func(c *fiber.Ctx) error {
|
|
return c.JSON(fiber.Map{"status": "ready"})
|
|
})
|
|
|
|
app.Use(middleware.JWT(middleware.JWTConfig{
|
|
Secret: cfg.JWTSecret,
|
|
AuthSvcURL: cfg.AuthSvcURL,
|
|
AllowGuest: true,
|
|
}))
|
|
|
|
if redisClient != nil {
|
|
app.Use(middleware.TieredRateLimit(middleware.TieredRateLimitConfig{
|
|
RedisClient: redisClient,
|
|
Tiers: map[string]middleware.TierConfig{
|
|
"free": {Max: 60, Window: time.Minute},
|
|
"pro": {Max: 300, Window: time.Minute},
|
|
"business": {Max: 1000, Window: time.Minute},
|
|
},
|
|
DefaultTier: "free",
|
|
}))
|
|
}
|
|
|
|
app.Post("/api/chat", handleChat)
|
|
app.All("/api/*", handleProxy)
|
|
|
|
port := cfg.APIGatewayPort
|
|
log.Printf("api-gateway listening on :%d", port)
|
|
log.Fatal(app.Listen(fmt.Sprintf(":%d", port)))
|
|
}
|
|
|
|
func getTarget(path string) (base, rewrite string) {
|
|
switch {
|
|
case strings.HasPrefix(path, "/api/v1/auth"):
|
|
return svcURLs["auth"], path
|
|
case path == "/api/chat" || strings.HasPrefix(path, "/api/chat?"):
|
|
return svcURLs["chat"], "/api/v1/chat"
|
|
case strings.HasPrefix(path, "/api/v1/agents"):
|
|
return svcURLs["agents"], path
|
|
case strings.HasPrefix(path, "/api/v1/search"):
|
|
return svcURLs["search"], path
|
|
case strings.HasPrefix(path, "/api/v1/llm"), strings.HasPrefix(path, "/api/v1/providers"):
|
|
return svcURLs["llm"], path
|
|
case strings.HasPrefix(path, "/api/v1/memory"):
|
|
return svcURLs["memory"], path
|
|
case strings.HasPrefix(path, "/api/v1/library"):
|
|
return svcURLs["library"], path
|
|
case strings.HasPrefix(path, "/api/v1/threads"):
|
|
return svcURLs["thread"], path
|
|
case strings.HasPrefix(path, "/api/v1/spaces"):
|
|
return svcURLs["thread"], path
|
|
case strings.HasPrefix(path, "/api/v1/pages"):
|
|
return svcURLs["thread"], path
|
|
case strings.HasPrefix(path, "/api/v1/share"):
|
|
return svcURLs["thread"], path
|
|
case strings.HasPrefix(path, "/api/v1/discover"):
|
|
return svcURLs["discover"], path
|
|
case strings.HasPrefix(path, "/api/v1/heatmap"):
|
|
return svcURLs["finance"], path
|
|
case strings.HasPrefix(path, "/api/v1/movers"):
|
|
return svcURLs["finance"], path
|
|
case strings.HasPrefix(path, "/api/v1/markets"):
|
|
return svcURLs["finance"], path
|
|
case strings.HasPrefix(path, "/api/v1/learning"):
|
|
return svcURLs["learning"], path
|
|
case strings.HasPrefix(path, "/api/v1/sandbox"):
|
|
return svcURLs["sandbox"], path
|
|
case strings.HasPrefix(path, "/api/v1/travel"):
|
|
return svcURLs["travel"], path
|
|
case strings.HasPrefix(path, "/api/v1/medicine"):
|
|
return svcURLs["medicine"], path
|
|
case strings.HasPrefix(path, "/api/v1/admin"):
|
|
return svcURLs["admin"], path
|
|
default:
|
|
return "", ""
|
|
}
|
|
}
|
|
|
|
func handleChat(c *fiber.Ctx) error {
|
|
base := svcURLs["chat"]
|
|
if base == "" {
|
|
return c.Status(503).JSON(fiber.Map{"error": "Chat service not configured"})
|
|
}
|
|
|
|
targetURL := strings.TrimSuffix(base, "/") + "/api/v1/chat"
|
|
|
|
req, err := http.NewRequest("POST", targetURL, strings.NewReader(string(c.Body())))
|
|
if err != nil {
|
|
return c.Status(500).JSON(fiber.Map{"error": err.Error()})
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
if auth := c.Get("Authorization"); auth != "" {
|
|
req.Header.Set("Authorization", auth)
|
|
}
|
|
|
|
client := &http.Client{Timeout: 5 * time.Minute}
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return c.Status(503).JSON(fiber.Map{"error": "Service unavailable"})
|
|
}
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
defer resp.Body.Close()
|
|
body, _ := io.ReadAll(resp.Body)
|
|
return c.Status(resp.StatusCode).Send(body)
|
|
}
|
|
|
|
c.Set("Content-Type", "application/x-ndjson")
|
|
c.Set("Cache-Control", "no-cache")
|
|
c.Set("Transfer-Encoding", "chunked")
|
|
|
|
c.Context().SetBodyStreamWriter(func(w *bufio.Writer) {
|
|
defer resp.Body.Close()
|
|
buf := make([]byte, 4096)
|
|
for {
|
|
n, err := resp.Body.Read(buf)
|
|
if n > 0 {
|
|
w.Write(buf[:n])
|
|
w.Flush()
|
|
}
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
break
|
|
}
|
|
}
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
func handleProxy(c *fiber.Ctx) error {
|
|
path := c.Path()
|
|
base, rewrite := getTarget(path)
|
|
|
|
if base == "" {
|
|
return c.Status(404).JSON(fiber.Map{"error": "Not found"})
|
|
}
|
|
|
|
targetURL := strings.TrimSuffix(base, "/") + rewrite
|
|
if c.Context().QueryArgs().Len() > 0 {
|
|
targetURL += "?" + string(c.Context().QueryArgs().QueryString())
|
|
}
|
|
|
|
method := c.Method()
|
|
var body io.Reader
|
|
if method != "GET" && method != "HEAD" {
|
|
body = strings.NewReader(string(c.Body()))
|
|
}
|
|
|
|
req, err := http.NewRequest(method, targetURL, body)
|
|
if err != nil {
|
|
return c.Status(500).JSON(fiber.Map{"error": err.Error()})
|
|
}
|
|
|
|
passHeaders := []string{"Authorization", "Content-Type", "Accept", "User-Agent", "Accept-Language"}
|
|
for _, h := range passHeaders {
|
|
if v := c.Get(h); v != "" {
|
|
req.Header.Set(h, v)
|
|
}
|
|
}
|
|
|
|
isSSE := strings.Contains(path, "/stream") ||
|
|
c.Get("Accept") == "text/event-stream"
|
|
|
|
timeout := time.Minute
|
|
if isSSE {
|
|
timeout = 30 * time.Minute
|
|
}
|
|
|
|
client := &http.Client{Timeout: timeout}
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return handleFallback(c, path)
|
|
}
|
|
|
|
if isSSE && resp.Header.Get("Content-Type") == "text/event-stream" {
|
|
c.Set("Content-Type", "text/event-stream")
|
|
c.Set("Cache-Control", "no-cache")
|
|
c.Set("Connection", "keep-alive")
|
|
c.Set("Transfer-Encoding", "chunked")
|
|
c.Set("X-Accel-Buffering", "no")
|
|
|
|
c.Context().SetBodyStreamWriter(func(w *bufio.Writer) {
|
|
defer resp.Body.Close()
|
|
buf := make([]byte, 4096)
|
|
for {
|
|
n, readErr := resp.Body.Read(buf)
|
|
if n > 0 {
|
|
w.Write(buf[:n])
|
|
w.Flush()
|
|
}
|
|
if readErr != nil {
|
|
return
|
|
}
|
|
}
|
|
})
|
|
return nil
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
for _, h := range []string{"Content-Type", "Cache-Control", "Set-Cookie"} {
|
|
if v := resp.Header.Get(h); v != "" {
|
|
c.Set(h, v)
|
|
}
|
|
}
|
|
|
|
data, _ := io.ReadAll(resp.Body)
|
|
return c.Status(resp.StatusCode).Send(data)
|
|
}
|
|
|
|
func handleFallback(c *fiber.Ctx, path string) error {
|
|
switch {
|
|
case strings.HasPrefix(path, "/api/v1/discover"):
|
|
return c.JSON(fiber.Map{"items": []interface{}{}})
|
|
case strings.HasPrefix(path, "/api/geo-context"):
|
|
return c.JSON(fiber.Map{"country": nil, "city": nil})
|
|
case strings.HasPrefix(path, "/api/translations"):
|
|
return c.JSON(fiber.Map{})
|
|
default:
|
|
return c.Status(503).JSON(fiber.Map{"error": "Service unavailable"})
|
|
}
|
|
}
|
|
|
|
func init() {
|
|
if os.Getenv("PORT") == "" {
|
|
os.Setenv("PORT", "3015")
|
|
}
|
|
}
|