From 0e9f7808154da89d75889c9738acb933c9ff0ca3 Mon Sep 17 00:00:00 2001 From: win Date: Tue, 31 Mar 2026 01:26:48 +0800 Subject: [PATCH] fix: surface ls quota exhaustion in antigravity streams --- .../internal/pkg/lspool/upstream_adapter.go | 27 ++++++- .../service/antigravity_gateway_service.go | 80 ++++++++++++++++--- 2 files changed, 96 insertions(+), 11 deletions(-) diff --git a/backend/internal/pkg/lspool/upstream_adapter.go b/backend/internal/pkg/lspool/upstream_adapter.go index f212b724..dac0df89 100644 --- a/backend/internal/pkg/lspool/upstream_adapter.go +++ b/backend/internal/pkg/lspool/upstream_adapter.go @@ -73,6 +73,31 @@ var ( errLSModelMapPending = errors.New("model mapping not ready") ) +// IsLSQuotaExhaustedError reports whether err originated from an LS cascade +// quota/capacity exhaustion signal. +func IsLSQuotaExhaustedError(err error) bool { + return errors.Is(err, errLSQuotaExhausted) +} + +// LSQuotaExhaustedMessage extracts the original LS error message, if present. +func LSQuotaExhaustedMessage(err error) string { + if err == nil { + return "" + } + msg := strings.TrimSpace(err.Error()) + if msg == "" { + return "" + } + prefix := errLSQuotaExhausted.Error() + if msg == prefix { + return "" + } + if strings.HasPrefix(msg, prefix+":") { + return strings.TrimSpace(strings.TrimPrefix(msg, prefix+":")) + } + return msg +} + type cascadeSessionState struct { CascadeID string SystemText string @@ -601,7 +626,7 @@ func (u *LSPoolUpstream) streamCascadeResponse(ctx context.Context, inst *Instan if state.ErrorMessage != "" { u.logTraceSummary(slog.LevelWarn, "[LS-POOL] Cascade terminated with error", trace, "error", state.ErrorMessage) if isQuotaExhaustedError(state.ErrorMessage) { - _ = w.CloseWithError(errLSQuotaExhausted) + _ = w.CloseWithError(fmt.Errorf("%w: %s", errLSQuotaExhausted, state.ErrorMessage)) } else { _ = w.CloseWithError(errors.New(state.ErrorMessage)) } diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index 10a84dca..06cd6c6b 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -22,6 +22,7 @@ import ( "github.com/Wei-Shaw/sub2api/internal/pkg/antigravity" "github.com/Wei-Shaw/sub2api/internal/pkg/logger" + "github.com/Wei-Shaw/sub2api/internal/pkg/lspool" "github.com/gin-gonic/gin" "github.com/google/uuid" "github.com/tidwall/gjson" @@ -3183,6 +3184,53 @@ func handleStreamReadError(err error, clientDisconnected bool, prefix string) (d return false, false } +func googleStatusTextForHTTP(status int) string { + switch status { + case http.StatusBadRequest: + return "INVALID_ARGUMENT" + case http.StatusNotFound: + return "NOT_FOUND" + case http.StatusTooManyRequests: + return "RESOURCE_EXHAUSTED" + case http.StatusServiceUnavailable: + return "UNAVAILABLE" + default: + return "UNKNOWN" + } +} + +func buildAnthropicStreamErrorEvent(errType, message string) string { + payload := map[string]any{ + "type": "error", + "error": map[string]any{ + "type": errType, + "message": message, + }, + } + data, _ := json.Marshal(payload) + return "event: error\ndata: " + string(data) + "\n\n" +} + +func buildGeminiStreamErrorEvent(status int, message string) string { + payload := map[string]any{ + "error": map[string]any{ + "code": status, + "message": message, + "status": googleStatusTextForHTTP(status), + }, + } + data, _ := json.Marshal(payload) + return "event: error\ndata: " + string(data) + "\n\n" +} + +func lsQuotaExhaustedMessage(err error) string { + msg := strings.TrimSpace(lspool.LSQuotaExhaustedMessage(err)) + if msg != "" { + return sanitizeUpstreamErrorMessage(msg) + } + return "You have exhausted your capacity on this model." +} + func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context, resp *http.Response, startTime time.Time) (*antigravityStreamResult, error) { c.Status(resp.StatusCode) c.Header("Cache-Control", "no-cache") @@ -3278,12 +3326,12 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context // 仅发送一次错误事件,避免多次写入导致协议混乱 errorEventSent := false - sendErrorEvent := func(reason string) { + sendErrorEvent := func(status int, message string) { if errorEventSent || cw.Disconnected() { return } errorEventSent = true - _, _ = fmt.Fprintf(c.Writer, "event: error\ndata: {\"error\":\"%s\"}\n\n", reason) + _, _ = fmt.Fprint(c.Writer, buildGeminiStreamErrorEvent(status, message)) flusher.Flush() } @@ -3297,12 +3345,18 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context if disconnect, handled := handleStreamReadError(ev.err, cw.Disconnected(), "antigravity gemini"); handled { return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs, clientDisconnect: disconnect}, nil } + if lspool.IsLSQuotaExhaustedError(ev.err) { + msg := lsQuotaExhaustedMessage(ev.err) + logger.LegacyPrintf("service.antigravity_gateway", "LS quota exhausted during streaming (antigravity gemini): %s", msg) + sendErrorEvent(http.StatusTooManyRequests, msg) + return nil, ev.err + } if errors.Is(ev.err, bufio.ErrTooLong) { logger.LegacyPrintf("service.antigravity_gateway", "SSE line too long (antigravity): max_size=%d error=%v", maxLineSize, ev.err) - sendErrorEvent("response_too_large") + sendErrorEvent(http.StatusBadGateway, "Response too large") return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, ev.err } - sendErrorEvent("stream_read_error") + sendErrorEvent(http.StatusServiceUnavailable, "Upstream stream read failed") return nil, ev.err } @@ -3365,7 +3419,7 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs, clientDisconnect: true}, nil } logger.LegacyPrintf("service.antigravity_gateway", "Stream data interval timeout (antigravity)") - sendErrorEvent("stream_timeout") + sendErrorEvent(http.StatusServiceUnavailable, "Upstream stream timeout") return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout") case <-keepaliveCh: @@ -4125,12 +4179,12 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context // 仅发送一次错误事件,避免多次写入导致协议混乱 errorEventSent := false - sendErrorEvent := func(reason string) { + sendErrorEvent := func(errType, message string) { if errorEventSent || cw.Disconnected() { return } errorEventSent = true - _, _ = fmt.Fprintf(c.Writer, "event: error\ndata: {\"error\":\"%s\"}\n\n", reason) + _, _ = fmt.Fprint(c.Writer, buildAnthropicStreamErrorEvent(errType, message)) flusher.Flush() } @@ -4164,12 +4218,18 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context if disconnect, handled := handleStreamReadError(ev.err, cw.Disconnected(), "antigravity claude"); handled { return &antigravityStreamResult{usage: finishUsage(), firstTokenMs: firstTokenMs, clientDisconnect: disconnect}, nil } + if lspool.IsLSQuotaExhaustedError(ev.err) { + msg := lsQuotaExhaustedMessage(ev.err) + logger.LegacyPrintf("service.antigravity_gateway", "LS quota exhausted during streaming (antigravity claude): %s", msg) + sendErrorEvent("rate_limit_error", msg) + return nil, fmt.Errorf("stream read error: %w", ev.err) + } if errors.Is(ev.err, bufio.ErrTooLong) { logger.LegacyPrintf("service.antigravity_gateway", "SSE line too long (antigravity): max_size=%d error=%v", maxLineSize, ev.err) - sendErrorEvent("response_too_large") + sendErrorEvent("api_error", "Response too large") return &antigravityStreamResult{usage: convertUsage(nil), firstTokenMs: firstTokenMs}, ev.err } - sendErrorEvent("stream_read_error") + sendErrorEvent("api_error", "Upstream stream read failed") return nil, fmt.Errorf("stream read error: %w", ev.err) } @@ -4195,7 +4255,7 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context return &antigravityStreamResult{usage: finishUsage(), firstTokenMs: firstTokenMs, clientDisconnect: true}, nil } logger.LegacyPrintf("service.antigravity_gateway", "Stream data interval timeout (antigravity)") - sendErrorEvent("stream_timeout") + sendErrorEvent("api_error", "Upstream stream timeout") return &antigravityStreamResult{usage: convertUsage(nil), firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout") case <-keepaliveCh: