Some checks failed
CI / test (push) Failing after 6s
CI / frontend (push) Failing after 4s
CI / golangci-lint (push) Failing after 4s
CI / windsurf-platform (macos-latest) (push) Has been cancelled
CI / windsurf-platform (windows-latest) (push) Has been cancelled
Security Scan / backend-security (push) Failing after 1m31s
Security Scan / frontend-security (push) Failing after 7s
237 lines
8.5 KiB
Go
237 lines
8.5 KiB
Go
package handler
|
||
|
||
import (
|
||
"context"
|
||
"net/http"
|
||
"time"
|
||
|
||
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
|
||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||
"go.uber.org/zap"
|
||
)
|
||
|
||
// TempUnscheduler 用于 HandleFailoverError 中同账号重试耗尽后的临时封禁。
|
||
// GatewayService 隐式实现此接口。
|
||
type TempUnscheduler interface {
|
||
TempUnscheduleRetryableError(ctx context.Context, accountID int64, failoverErr *service.UpstreamFailoverError)
|
||
}
|
||
|
||
// FailoverAction 表示 failover 错误处理后的下一步动作
|
||
type FailoverAction int
|
||
|
||
const (
|
||
// FailoverContinue 继续循环(同账号重试或切换账号,调用方统一 continue)
|
||
FailoverContinue FailoverAction = iota
|
||
// FailoverExhausted 切换次数耗尽(调用方应返回错误响应)
|
||
FailoverExhausted
|
||
// FailoverCanceled context 已取消(调用方应直接 return)
|
||
FailoverCanceled
|
||
)
|
||
|
||
const (
|
||
// maxSameAccountRetries 同账号重试次数上限(针对 RetryableOnSameAccount 错误)
|
||
maxSameAccountRetries = 3
|
||
// sameAccountRetryDelay 同账号重试间隔
|
||
sameAccountRetryDelay = 500 * time.Millisecond
|
||
// singleAccountBackoffDelay 单账号分组 503 退避重试固定延时。
|
||
// Service 层在 SingleAccountRetry 模式下已做充分原地重试(最多 3 次、总等待 30s),
|
||
// Handler 层只需短暂间隔后重新进入 Service 层即可。
|
||
singleAccountBackoffDelay = 2 * time.Second
|
||
// stickyGraceRetries 粘性会话绑定账号的宽限重试次数(默认):
|
||
// 命中 sticky 的账号在首次失败时原地重试,避免会话瞬移到其他账号导致上下文断裂。
|
||
stickyGraceRetries = 1
|
||
// stickyGraceDelay 粘性宽限重试间隔(默认)
|
||
stickyGraceDelay = 1500 * time.Millisecond
|
||
)
|
||
|
||
// FailoverState 跨循环迭代共享的 failover 状态
|
||
type FailoverState struct {
|
||
SwitchCount int
|
||
MaxSwitches int
|
||
FailedAccountIDs map[int64]struct{}
|
||
SameAccountRetryCount map[int64]int
|
||
LastFailoverErr *service.UpstreamFailoverError
|
||
ForceCacheBilling bool
|
||
hasBoundSession bool
|
||
// stickyBoundAccountID 命中粘性会话预绑定的账号 ID(>0 时生效)
|
||
stickyBoundAccountID int64
|
||
// stickyGraceUsed 已消耗的粘性宽限次数
|
||
stickyGraceUsed int
|
||
// stickyGraceMax 最大粘性宽限次数(平台相关,0 表示使用默认值)
|
||
stickyGraceMax int
|
||
// stickyGraceInterval 粘性宽限重试间隔(平台相关,0 表示使用默认值)
|
||
stickyGraceInterval time.Duration
|
||
}
|
||
|
||
// NewFailoverState 创建 failover 状态
|
||
func NewFailoverState(maxSwitches int, hasBoundSession bool) *FailoverState {
|
||
return &FailoverState{
|
||
MaxSwitches: maxSwitches,
|
||
FailedAccountIDs: make(map[int64]struct{}),
|
||
SameAccountRetryCount: make(map[int64]int),
|
||
hasBoundSession: hasBoundSession,
|
||
}
|
||
}
|
||
|
||
// WithStickyBoundAccount 绑定粘性会话预取到的账号 ID,用于 HandleFailoverError 里的宽限判定
|
||
func (s *FailoverState) WithStickyBoundAccount(accountID int64) *FailoverState {
|
||
s.stickyBoundAccountID = accountID
|
||
return s
|
||
}
|
||
|
||
// WithStickyGraceConfig 配置平台相关的粘性宽限参数。
|
||
// 仅在 stickyBoundAccountID > 0 时生效。
|
||
func (s *FailoverState) WithStickyGraceConfig(maxRetries int, interval time.Duration) *FailoverState {
|
||
s.stickyGraceMax = maxRetries
|
||
s.stickyGraceInterval = interval
|
||
return s
|
||
}
|
||
|
||
// effectiveStickyGraceMax 返回实际生效的宽限次数(未配置时用平台默认值)
|
||
func (s *FailoverState) effectiveStickyGraceMax() int {
|
||
if s.stickyGraceMax > 0 {
|
||
return s.stickyGraceMax
|
||
}
|
||
return stickyGraceRetries
|
||
}
|
||
|
||
// effectiveStickyGraceInterval 返回实际生效的宽限间隔(未配置时用平台默认值)
|
||
func (s *FailoverState) effectiveStickyGraceInterval() time.Duration {
|
||
if s.stickyGraceInterval > 0 {
|
||
return s.stickyGraceInterval
|
||
}
|
||
return stickyGraceDelay
|
||
}
|
||
|
||
// HandleFailoverError 处理 UpstreamFailoverError,返回下一步动作。
|
||
// 包含:缓存计费判断、同账号重试、临时封禁、切换计数、Antigravity 延时。
|
||
func (s *FailoverState) HandleFailoverError(
|
||
ctx context.Context,
|
||
gatewayService TempUnscheduler,
|
||
accountID int64,
|
||
platform string,
|
||
failoverErr *service.UpstreamFailoverError,
|
||
) FailoverAction {
|
||
s.LastFailoverErr = failoverErr
|
||
|
||
// 缓存计费判断
|
||
if needForceCacheBilling(s.hasBoundSession, failoverErr) {
|
||
s.ForceCacheBilling = true
|
||
}
|
||
|
||
// 同账号重试:对 RetryableOnSameAccount 的临时性错误,先在同一账号上重试
|
||
if failoverErr.RetryableOnSameAccount && s.SameAccountRetryCount[accountID] < maxSameAccountRetries {
|
||
s.SameAccountRetryCount[accountID]++
|
||
logger.FromContext(ctx).Warn("gateway.failover_same_account_retry",
|
||
zap.Int64("account_id", accountID),
|
||
zap.Int("upstream_status", failoverErr.StatusCode),
|
||
zap.Int("same_account_retry_count", s.SameAccountRetryCount[accountID]),
|
||
zap.Int("same_account_retry_max", maxSameAccountRetries),
|
||
)
|
||
if !sleepWithContext(ctx, sameAccountRetryDelay) {
|
||
return FailoverCanceled
|
||
}
|
||
return FailoverContinue
|
||
}
|
||
|
||
// 粘性会话宽限:如果当前账号是粘性预绑定账号,首次失败给一次原地等待重试机会,
|
||
// 避免因为瞬时 rate_limit 就把会话从 A 瞬移到 B 导致上下文/cascade 断裂。
|
||
// 仅对非 RetryableOnSameAccount 的硬失败生效(RetryableOnSameAccount 上面已处理)。
|
||
if s.stickyBoundAccountID > 0 &&
|
||
accountID == s.stickyBoundAccountID &&
|
||
s.stickyGraceUsed < s.effectiveStickyGraceMax() {
|
||
s.stickyGraceUsed++
|
||
logger.FromContext(ctx).Warn("gateway.failover_sticky_grace_retry",
|
||
zap.Int64("account_id", accountID),
|
||
zap.Int("upstream_status", failoverErr.StatusCode),
|
||
zap.Int("sticky_grace_used", s.stickyGraceUsed),
|
||
zap.Int("sticky_grace_max", s.effectiveStickyGraceMax()),
|
||
)
|
||
if !sleepWithContext(ctx, s.effectiveStickyGraceInterval()) {
|
||
return FailoverCanceled
|
||
}
|
||
return FailoverContinue
|
||
}
|
||
|
||
// 同账号重试用尽,执行临时封禁
|
||
if failoverErr.RetryableOnSameAccount {
|
||
gatewayService.TempUnscheduleRetryableError(ctx, accountID, failoverErr)
|
||
}
|
||
|
||
// 加入失败列表
|
||
s.FailedAccountIDs[accountID] = struct{}{}
|
||
|
||
// 检查是否耗尽
|
||
if s.SwitchCount >= s.MaxSwitches {
|
||
return FailoverExhausted
|
||
}
|
||
|
||
// 递增切换计数
|
||
s.SwitchCount++
|
||
logger.FromContext(ctx).Warn("gateway.failover_switch_account",
|
||
zap.Int64("account_id", accountID),
|
||
zap.Int("upstream_status", failoverErr.StatusCode),
|
||
zap.Int("switch_count", s.SwitchCount),
|
||
zap.Int("max_switches", s.MaxSwitches),
|
||
)
|
||
|
||
// Antigravity 平台换号线性递增延时
|
||
if platform == service.PlatformAntigravity {
|
||
delay := time.Duration(s.SwitchCount-1) * time.Second
|
||
if !sleepWithContext(ctx, delay) {
|
||
return FailoverCanceled
|
||
}
|
||
}
|
||
|
||
return FailoverContinue
|
||
}
|
||
|
||
// HandleSelectionExhausted 处理选号失败(所有候选账号都在排除列表中)时的退避重试决策。
|
||
// 针对 Antigravity 单账号分组的 503 (MODEL_CAPACITY_EXHAUSTED) 场景:
|
||
// 清除排除列表、等待退避后重新选号。
|
||
//
|
||
// 返回 FailoverContinue 时,调用方应设置 SingleAccountRetry context 并 continue。
|
||
// 返回 FailoverExhausted 时,调用方应返回错误响应。
|
||
// 返回 FailoverCanceled 时,调用方应直接 return。
|
||
func (s *FailoverState) HandleSelectionExhausted(ctx context.Context) FailoverAction {
|
||
if s.LastFailoverErr != nil &&
|
||
s.LastFailoverErr.StatusCode == http.StatusServiceUnavailable &&
|
||
s.SwitchCount <= s.MaxSwitches {
|
||
|
||
logger.FromContext(ctx).Warn("gateway.failover_single_account_backoff",
|
||
zap.Duration("backoff_delay", singleAccountBackoffDelay),
|
||
zap.Int("switch_count", s.SwitchCount),
|
||
zap.Int("max_switches", s.MaxSwitches),
|
||
)
|
||
if !sleepWithContext(ctx, singleAccountBackoffDelay) {
|
||
return FailoverCanceled
|
||
}
|
||
logger.FromContext(ctx).Warn("gateway.failover_single_account_retry",
|
||
zap.Int("switch_count", s.SwitchCount),
|
||
zap.Int("max_switches", s.MaxSwitches),
|
||
)
|
||
s.FailedAccountIDs = make(map[int64]struct{})
|
||
return FailoverContinue
|
||
}
|
||
return FailoverExhausted
|
||
}
|
||
|
||
// needForceCacheBilling 判断 failover 时是否需要强制缓存计费。
|
||
// 粘性会话切换账号、或上游明确标记时,将 input_tokens 转为 cache_read 计费。
|
||
func needForceCacheBilling(hasBoundSession bool, failoverErr *service.UpstreamFailoverError) bool {
|
||
return hasBoundSession || (failoverErr != nil && failoverErr.ForceCacheBilling)
|
||
}
|
||
|
||
// sleepWithContext 等待指定时长,返回 false 表示 context 已取消。
|
||
func sleepWithContext(ctx context.Context, d time.Duration) bool {
|
||
if d <= 0 {
|
||
return true
|
||
}
|
||
select {
|
||
case <-ctx.Done():
|
||
return false
|
||
case <-time.After(d):
|
||
return true
|
||
}
|
||
}
|