fix: surface ls quota exhaustion in antigravity streams
Some checks failed
CI / test (push) Failing after 3s
CI / golangci-lint (push) Failing after 3s
Security Scan / backend-security (push) Failing after 4s
Security Scan / frontend-security (push) Failing after 3s

This commit is contained in:
win 2026-03-31 01:26:48 +08:00
parent 860fc736bf
commit 0e9f780815
2 changed files with 96 additions and 11 deletions

View File

@ -73,6 +73,31 @@ var (
errLSModelMapPending = errors.New("model mapping not ready")
)
// IsLSQuotaExhaustedError reports whether err originated from an LS cascade
// quota/capacity exhaustion signal.
func IsLSQuotaExhaustedError(err error) bool {
return errors.Is(err, errLSQuotaExhausted)
}
// LSQuotaExhaustedMessage extracts the original LS error message, if present.
func LSQuotaExhaustedMessage(err error) string {
if err == nil {
return ""
}
msg := strings.TrimSpace(err.Error())
if msg == "" {
return ""
}
prefix := errLSQuotaExhausted.Error()
if msg == prefix {
return ""
}
if strings.HasPrefix(msg, prefix+":") {
return strings.TrimSpace(strings.TrimPrefix(msg, prefix+":"))
}
return msg
}
type cascadeSessionState struct {
CascadeID string
SystemText string
@ -601,7 +626,7 @@ func (u *LSPoolUpstream) streamCascadeResponse(ctx context.Context, inst *Instan
if state.ErrorMessage != "" {
u.logTraceSummary(slog.LevelWarn, "[LS-POOL] Cascade terminated with error", trace, "error", state.ErrorMessage)
if isQuotaExhaustedError(state.ErrorMessage) {
_ = w.CloseWithError(errLSQuotaExhausted)
_ = w.CloseWithError(fmt.Errorf("%w: %s", errLSQuotaExhausted, state.ErrorMessage))
} else {
_ = w.CloseWithError(errors.New(state.ErrorMessage))
}

View File

@ -22,6 +22,7 @@ import (
"github.com/Wei-Shaw/sub2api/internal/pkg/antigravity"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/Wei-Shaw/sub2api/internal/pkg/lspool"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/tidwall/gjson"
@ -3183,6 +3184,53 @@ func handleStreamReadError(err error, clientDisconnected bool, prefix string) (d
return false, false
}
func googleStatusTextForHTTP(status int) string {
switch status {
case http.StatusBadRequest:
return "INVALID_ARGUMENT"
case http.StatusNotFound:
return "NOT_FOUND"
case http.StatusTooManyRequests:
return "RESOURCE_EXHAUSTED"
case http.StatusServiceUnavailable:
return "UNAVAILABLE"
default:
return "UNKNOWN"
}
}
func buildAnthropicStreamErrorEvent(errType, message string) string {
payload := map[string]any{
"type": "error",
"error": map[string]any{
"type": errType,
"message": message,
},
}
data, _ := json.Marshal(payload)
return "event: error\ndata: " + string(data) + "\n\n"
}
func buildGeminiStreamErrorEvent(status int, message string) string {
payload := map[string]any{
"error": map[string]any{
"code": status,
"message": message,
"status": googleStatusTextForHTTP(status),
},
}
data, _ := json.Marshal(payload)
return "event: error\ndata: " + string(data) + "\n\n"
}
func lsQuotaExhaustedMessage(err error) string {
msg := strings.TrimSpace(lspool.LSQuotaExhaustedMessage(err))
if msg != "" {
return sanitizeUpstreamErrorMessage(msg)
}
return "You have exhausted your capacity on this model."
}
func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context, resp *http.Response, startTime time.Time) (*antigravityStreamResult, error) {
c.Status(resp.StatusCode)
c.Header("Cache-Control", "no-cache")
@ -3278,12 +3326,12 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context
// 仅发送一次错误事件,避免多次写入导致协议混乱
errorEventSent := false
sendErrorEvent := func(reason string) {
sendErrorEvent := func(status int, message string) {
if errorEventSent || cw.Disconnected() {
return
}
errorEventSent = true
_, _ = fmt.Fprintf(c.Writer, "event: error\ndata: {\"error\":\"%s\"}\n\n", reason)
_, _ = fmt.Fprint(c.Writer, buildGeminiStreamErrorEvent(status, message))
flusher.Flush()
}
@ -3297,12 +3345,18 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context
if disconnect, handled := handleStreamReadError(ev.err, cw.Disconnected(), "antigravity gemini"); handled {
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs, clientDisconnect: disconnect}, nil
}
if lspool.IsLSQuotaExhaustedError(ev.err) {
msg := lsQuotaExhaustedMessage(ev.err)
logger.LegacyPrintf("service.antigravity_gateway", "LS quota exhausted during streaming (antigravity gemini): %s", msg)
sendErrorEvent(http.StatusTooManyRequests, msg)
return nil, ev.err
}
if errors.Is(ev.err, bufio.ErrTooLong) {
logger.LegacyPrintf("service.antigravity_gateway", "SSE line too long (antigravity): max_size=%d error=%v", maxLineSize, ev.err)
sendErrorEvent("response_too_large")
sendErrorEvent(http.StatusBadGateway, "Response too large")
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, ev.err
}
sendErrorEvent("stream_read_error")
sendErrorEvent(http.StatusServiceUnavailable, "Upstream stream read failed")
return nil, ev.err
}
@ -3365,7 +3419,7 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs, clientDisconnect: true}, nil
}
logger.LegacyPrintf("service.antigravity_gateway", "Stream data interval timeout (antigravity)")
sendErrorEvent("stream_timeout")
sendErrorEvent(http.StatusServiceUnavailable, "Upstream stream timeout")
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout")
case <-keepaliveCh:
@ -4125,12 +4179,12 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context
// 仅发送一次错误事件,避免多次写入导致协议混乱
errorEventSent := false
sendErrorEvent := func(reason string) {
sendErrorEvent := func(errType, message string) {
if errorEventSent || cw.Disconnected() {
return
}
errorEventSent = true
_, _ = fmt.Fprintf(c.Writer, "event: error\ndata: {\"error\":\"%s\"}\n\n", reason)
_, _ = fmt.Fprint(c.Writer, buildAnthropicStreamErrorEvent(errType, message))
flusher.Flush()
}
@ -4164,12 +4218,18 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context
if disconnect, handled := handleStreamReadError(ev.err, cw.Disconnected(), "antigravity claude"); handled {
return &antigravityStreamResult{usage: finishUsage(), firstTokenMs: firstTokenMs, clientDisconnect: disconnect}, nil
}
if lspool.IsLSQuotaExhaustedError(ev.err) {
msg := lsQuotaExhaustedMessage(ev.err)
logger.LegacyPrintf("service.antigravity_gateway", "LS quota exhausted during streaming (antigravity claude): %s", msg)
sendErrorEvent("rate_limit_error", msg)
return nil, fmt.Errorf("stream read error: %w", ev.err)
}
if errors.Is(ev.err, bufio.ErrTooLong) {
logger.LegacyPrintf("service.antigravity_gateway", "SSE line too long (antigravity): max_size=%d error=%v", maxLineSize, ev.err)
sendErrorEvent("response_too_large")
sendErrorEvent("api_error", "Response too large")
return &antigravityStreamResult{usage: convertUsage(nil), firstTokenMs: firstTokenMs}, ev.err
}
sendErrorEvent("stream_read_error")
sendErrorEvent("api_error", "Upstream stream read failed")
return nil, fmt.Errorf("stream read error: %w", ev.err)
}
@ -4195,7 +4255,7 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context
return &antigravityStreamResult{usage: finishUsage(), firstTokenMs: firstTokenMs, clientDisconnect: true}, nil
}
logger.LegacyPrintf("service.antigravity_gateway", "Stream data interval timeout (antigravity)")
sendErrorEvent("stream_timeout")
sendErrorEvent("api_error", "Upstream stream timeout")
return &antigravityStreamResult{usage: convertUsage(nil), firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout")
case <-keepaliveCh: