diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index d88275a4..6967b29f 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -153,7 +153,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { antigravityGatewayService := service.NewAntigravityGatewayService(accountRepository, gatewayCache, schedulerSnapshotService, antigravityTokenProvider, rateLimitService, httpUpstream, settingService, internal500CounterCache) windsurfLSService := service.ProvideWindsurfLSService(configConfig) windsurfTokenProvider := service.ProvideWindsurfTokenProvider(configConfig, accountRepository, proxyRepository) - windsurfChatService := service.ProvideWindsurfChatService(configConfig, windsurfLSService, windsurfTokenProvider) + windsurfChatService := service.ProvideWindsurfChatService(configConfig, windsurfLSService, windsurfTokenProvider, gatewayCache) windsurfGatewayService := service.ProvideWindsurfGatewayService(configConfig, windsurfChatService, accountRepository) accountTestService := service.NewAccountTestService(accountRepository, geminiTokenProvider, antigravityGatewayService, windsurfChatService, httpUpstream, configConfig, tlsFingerprintProfileService) crsSyncService := service.NewCRSSyncService(accountRepository, proxyRepository, oAuthService, openAIOAuthService, geminiOAuthService, configConfig) diff --git a/backend/internal/handler/failover_loop.go b/backend/internal/handler/failover_loop.go index 3a5cc77b..5030c8d0 100644 --- a/backend/internal/handler/failover_loop.go +++ b/backend/internal/handler/failover_loop.go @@ -42,12 +42,6 @@ const ( stickyGraceRetries = 1 // stickyGraceDelay 粘性宽限重试间隔(默认) stickyGraceDelay = 1500 * time.Millisecond - // windsurfStickyGraceRetries Windsurf 平台专属粘性宽限次数。 - // Windsurf 的 LS 进程有冷启动开销,且切号后需要重建完整历史上下文(最多 3.5MB), - // 宽限次数更多可减少不必要切号,保留 cascade 会话连续性。 - windsurfStickyGraceRetries = 3 - // windsurfStickyGraceDelay Windsurf 平台粘性宽限重试间隔(LS 处理更耗时) - windsurfStickyGraceDelay = 2000 * time.Millisecond ) // FailoverState 跨循环迭代共享的 failover 状态 diff --git a/backend/internal/handler/failover_loop_test.go b/backend/internal/handler/failover_loop_test.go index 5ee10e52..3aa9f547 100644 --- a/backend/internal/handler/failover_loop_test.go +++ b/backend/internal/handler/failover_loop_test.go @@ -749,15 +749,16 @@ func TestHandleFailoverError_StickyGraceConfig(t *testing.T) { require.Equal(t, 1, fs.SwitchCount, "宽限用完后应切换") }) - t.Run("Windsurf专属配置grace=3次", func(t *testing.T) { + t.Run("自定义grace=3次", func(t *testing.T) { mock := &mockTempUnscheduler{} + const customGrace = 3 fs := NewFailoverState(5, true). WithStickyBoundAccount(100). - WithStickyGraceConfig(windsurfStickyGraceRetries, 10*time.Millisecond) // 用极短间隔加速测试 + WithStickyGraceConfig(customGrace, 10*time.Millisecond) // 用极短间隔加速测试 err := newTestFailoverErr(500, false, false) // 前 3 次都应该是宽限重试 - for i := 1; i <= windsurfStickyGraceRetries; i++ { + for i := 1; i <= customGrace; i++ { action := fs.HandleFailoverError(context.Background(), mock, 100, service.PlatformWindsurf, err) require.Equal(t, FailoverContinue, action, "第%d次应为宽限重试", i) require.Equal(t, i, fs.stickyGraceUsed) @@ -774,7 +775,7 @@ func TestHandleFailoverError_StickyGraceConfig(t *testing.T) { mock := &mockTempUnscheduler{} fs := NewFailoverState(5, true). WithStickyBoundAccount(100). - WithStickyGraceConfig(windsurfStickyGraceRetries, 10*time.Millisecond) + WithStickyGraceConfig(3, 10*time.Millisecond) err := newTestFailoverErr(500, false, false) // 非 sticky 账号 200 不走宽限,直接切换 diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index b42b01cb..f6288d70 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -543,9 +543,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) { for { fs := NewFailoverState(h.maxAccountSwitches, hasBoundSession).WithStickyBoundAccount(sessionBoundAccountID) - if platform == service.PlatformWindsurf { - fs.WithStickyGraceConfig(windsurfStickyGraceRetries, windsurfStickyGraceDelay) - } retryWithFallback := false for { @@ -725,7 +722,11 @@ func (h *GatewayHandler) Messages(c *gin.Context) { // 记录 Forward 前已写入字节数,Forward 后若增加则说明 SSE 内容已发,禁止 failover writerSizeBeforeForward := c.Writer.Size() if account.Platform == service.PlatformWindsurf { - result, err = h.windsurfGatewayService.Forward(requestCtx, c, account, body, hasBoundSession) + windsurfGroupID := int64(0) + if apiKey.GroupID != nil { + windsurfGroupID = *apiKey.GroupID + } + result, err = h.windsurfGatewayService.Forward(requestCtx, c, account, body, hasBoundSession, windsurfGroupID, sessionHash) } else if account.Platform == service.PlatformAntigravity && account.Type != service.AccountTypeAPIKey { result, err = h.antigravityGatewayService.Forward(requestCtx, c, account, body, hasBoundSession) } else { diff --git a/backend/internal/repository/gateway_cache.go b/backend/internal/repository/gateway_cache.go index 58291b66..a90ed0f7 100644 --- a/backend/internal/repository/gateway_cache.go +++ b/backend/internal/repository/gateway_cache.go @@ -9,7 +9,10 @@ import ( "github.com/redis/go-redis/v9" ) -const stickySessionPrefix = "sticky_session:" +const ( + stickySessionPrefix = "sticky_session:" + cascadeIDPrefix = "windsurf_cascade:" +) type gatewayCache struct { rdb *redis.Client @@ -51,3 +54,29 @@ func (c *gatewayCache) DeleteSessionAccountID(ctx context.Context, groupID int64 key := buildSessionKey(groupID, sessionHash) return c.rdb.Del(ctx, key).Err() } + +// buildCascadeKey 构造 Windsurf Cascade ID 缓存 key。 +// 上层已将 (groupID, accountID, modelUID, lsEndpoint, sessionHash, sysPromptHash) 哈希为单一字符串。 +func buildCascadeKey(key string) string { + return cascadeIDPrefix + key +} + +// GetCascadeID 读取 Cascade 会话 ID。redis.Nil 视为未命中(返回空串与 nil)。 +func (c *gatewayCache) GetCascadeID(ctx context.Context, key string) (string, error) { + v, err := c.rdb.Get(ctx, buildCascadeKey(key)).Result() + if err == redis.Nil { + return "", nil + } + return v, err +} + +// SetCascadeID 写入 Cascade 会话 ID。 +func (c *gatewayCache) SetCascadeID(ctx context.Context, key string, cascadeID string, ttl time.Duration) error { + return c.rdb.Set(ctx, buildCascadeKey(key), cascadeID, ttl).Err() +} + +// DeleteCascadeID 失效 Cascade 会话 ID。 +func (c *gatewayCache) DeleteCascadeID(ctx context.Context, key string) error { + return c.rdb.Del(ctx, buildCascadeKey(key)).Err() +} + diff --git a/backend/internal/repository/gateway_cache_integration_test.go b/backend/internal/repository/gateway_cache_integration_test.go index 0eebc33f..484d4927 100644 --- a/backend/internal/repository/gateway_cache_integration_test.go +++ b/backend/internal/repository/gateway_cache_integration_test.go @@ -104,6 +104,61 @@ func (s *GatewayCacheSuite) TestGetSessionAccountID_CorruptedValue() { require.False(s.T(), errors.Is(err, redis.Nil), "expected parsing error, not redis.Nil") } +func (s *GatewayCacheSuite) TestGetCascadeID_Missing() { + id, err := s.cache.GetCascadeID(s.ctx, "nonexistent-cascade-key") + require.NoError(s.T(), err, "missing cascade key should return nil error") + require.Equal(s.T(), "", id, "missing cascade key should return empty string") +} + +func (s *GatewayCacheSuite) TestSetAndGetCascadeID() { + key := "ab12cd34" + cascadeID := "cascade-uuid-xyz" + ttl := 30 * time.Minute + + require.NoError(s.T(), s.cache.SetCascadeID(s.ctx, key, cascadeID, ttl), "SetCascadeID") + + got, err := s.cache.GetCascadeID(s.ctx, key) + require.NoError(s.T(), err, "GetCascadeID") + require.Equal(s.T(), cascadeID, got, "cascade id round-trip mismatch") +} + +func (s *GatewayCacheSuite) TestCascadeID_TTL() { + key := "ttl-key" + ttl := 30 * time.Minute + + require.NoError(s.T(), s.cache.SetCascadeID(s.ctx, key, "cid", ttl), "SetCascadeID") + + redisKey := buildCascadeKey(key) + got, err := s.rdb.TTL(s.ctx, redisKey).Result() + require.NoError(s.T(), err, "TTL after SetCascadeID") + s.AssertTTLWithin(got, 1*time.Second, ttl) +} + +func (s *GatewayCacheSuite) TestDeleteCascadeID() { + key := "del-key" + require.NoError(s.T(), s.cache.SetCascadeID(s.ctx, key, "cid", 1*time.Minute), "SetCascadeID") + require.NoError(s.T(), s.cache.DeleteCascadeID(s.ctx, key), "DeleteCascadeID") + + got, err := s.cache.GetCascadeID(s.ctx, key) + require.NoError(s.T(), err, "GetCascadeID after delete should not error") + require.Equal(s.T(), "", got, "deleted cascade key should return empty") +} + +// 验证 cascade key 与 sticky session key 命名空间隔离(前缀不同)。 +func (s *GatewayCacheSuite) TestCascadeID_NamespaceIsolation() { + commonID := "shared-id" + require.NoError(s.T(), s.cache.SetCascadeID(s.ctx, commonID, "cascade-value", 1*time.Minute), "SetCascadeID") + require.NoError(s.T(), s.cache.SetSessionAccountID(s.ctx, 1, commonID, 42, 1*time.Minute), "SetSessionAccountID") + + gotCascade, err := s.cache.GetCascadeID(s.ctx, commonID) + require.NoError(s.T(), err, "GetCascadeID") + require.Equal(s.T(), "cascade-value", gotCascade, "cascade namespace must not be polluted by sticky session") + + gotAccount, err := s.cache.GetSessionAccountID(s.ctx, 1, commonID) + require.NoError(s.T(), err, "GetSessionAccountID") + require.Equal(s.T(), int64(42), gotAccount, "sticky session must not be polluted by cascade write") +} + func TestGatewayCacheSuite(t *testing.T) { suite.Run(t, new(GatewayCacheSuite)) } diff --git a/backend/internal/service/gateway_multiplatform_test.go b/backend/internal/service/gateway_multiplatform_test.go index 50778199..e0f5b0c8 100644 --- a/backend/internal/service/gateway_multiplatform_test.go +++ b/backend/internal/service/gateway_multiplatform_test.go @@ -238,6 +238,18 @@ func (m *mockGatewayCacheForPlatform) DeleteSessionAccountID(ctx context.Context return nil } +func (m *mockGatewayCacheForPlatform) GetCascadeID(ctx context.Context, key string) (string, error) { + return "", nil +} + +func (m *mockGatewayCacheForPlatform) SetCascadeID(ctx context.Context, key string, cascadeID string, ttl time.Duration) error { + return nil +} + +func (m *mockGatewayCacheForPlatform) DeleteCascadeID(ctx context.Context, key string) error { + return nil +} + type mockGroupRepoForGateway struct { groups map[int64]*Group getByIDCalls int diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 6c2bca24..372030ba 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -402,6 +402,16 @@ type GatewayCache interface { // DeleteSessionAccountID 删除粘性会话绑定,用于账号不可用时主动清理 // Delete sticky session binding, used to proactively clean up when account becomes unavailable DeleteSessionAccountID(ctx context.Context, groupID int64, sessionHash string) error + + // GetCascadeID 获取 Windsurf Cascade 会话 ID(用于 LS 多轮复用) + // Get the Windsurf Cascade ID bound to a chat session for multi-turn LS reuse. + GetCascadeID(ctx context.Context, key string) (string, error) + // SetCascadeID 写入 Cascade 会话 ID + // Persist the Cascade session ID with the given TTL. + SetCascadeID(ctx context.Context, key string, cascadeID string, ttl time.Duration) error + // DeleteCascadeID 失效 Cascade 会话 ID(panel-not-found / 错误时调用) + // Invalidate the cached Cascade session ID on panel-not-found or upstream error. + DeleteCascadeID(ctx context.Context, key string) error } // derefGroupID safely dereferences *int64 to int64, returning 0 if nil diff --git a/backend/internal/service/openai_account_scheduler_test.go b/backend/internal/service/openai_account_scheduler_test.go index 0950ee54..f9495541 100644 --- a/backend/internal/service/openai_account_scheduler_test.go +++ b/backend/internal/service/openai_account_scheduler_test.go @@ -149,6 +149,30 @@ func (c *schedulerTestGatewayCache) DeleteSessionAccountID(ctx context.Context, return nil } +func (c *schedulerTestGatewayCache) GetCascadeID(ctx context.Context, key string) (string, error) { + return "", nil +} + +func (c *schedulerTestGatewayCache) SetCascadeID(ctx context.Context, key string, cascadeID string, ttl time.Duration) error { + return nil +} + +func (c *schedulerTestGatewayCache) DeleteCascadeID(ctx context.Context, key string) error { + return nil +} + +func (c *schedulerTestGatewayCache) GetWindsurfCascadeID(_ context.Context, _ string) (string, error) { + return "", nil +} + +func (c *schedulerTestGatewayCache) SetWindsurfCascadeID(_ context.Context, _ string, _ string, _ time.Duration) error { + return nil +} + +func (c *schedulerTestGatewayCache) DeleteWindsurfCascadeID(_ context.Context, _ string) error { + return nil +} + func newSchedulerTestOpenAIWSV2Config() *config.Config { cfg := &config.Config{} cfg.Gateway.OpenAIWS.Enabled = true diff --git a/backend/internal/service/openai_gateway_service_test.go b/backend/internal/service/openai_gateway_service_test.go index bc900689..9c08d5f2 100644 --- a/backend/internal/service/openai_gateway_service_test.go +++ b/backend/internal/service/openai_gateway_service_test.go @@ -344,6 +344,30 @@ func (c *stubGatewayCache) DeleteSessionAccountID(ctx context.Context, groupID i return nil } +func (c *stubGatewayCache) GetCascadeID(ctx context.Context, key string) (string, error) { + return "", nil +} + +func (c *stubGatewayCache) SetCascadeID(ctx context.Context, key string, cascadeID string, ttl time.Duration) error { + return nil +} + +func (c *stubGatewayCache) DeleteCascadeID(ctx context.Context, key string) error { + return nil +} + +func (c *stubGatewayCache) GetWindsurfCascadeID(_ context.Context, _ string) (string, error) { + return "", nil +} + +func (c *stubGatewayCache) SetWindsurfCascadeID(_ context.Context, _ string, _ string, _ time.Duration) error { + return nil +} + +func (c *stubGatewayCache) DeleteWindsurfCascadeID(_ context.Context, _ string) error { + return nil +} + func TestOpenAISelectAccountWithLoadAwareness_FiltersUnschedulable(t *testing.T) { now := time.Now() resetAt := now.Add(10 * time.Minute) diff --git a/backend/internal/service/openai_ws_state_store_test.go b/backend/internal/service/openai_ws_state_store_test.go index 235d4233..82433336 100644 --- a/backend/internal/service/openai_ws_state_store_test.go +++ b/backend/internal/service/openai_ws_state_store_test.go @@ -185,6 +185,18 @@ func (c *openAIWSStateStoreTimeoutProbeCache) RefreshSessionTTL(context.Context, return nil } +func (c *openAIWSStateStoreTimeoutProbeCache) GetCascadeID(ctx context.Context, _ string) (string, error) { + return "", nil +} + +func (c *openAIWSStateStoreTimeoutProbeCache) SetCascadeID(ctx context.Context, _ string, _ string, _ time.Duration) error { + return nil +} + +func (c *openAIWSStateStoreTimeoutProbeCache) DeleteCascadeID(ctx context.Context, _ string) error { + return nil +} + func (c *openAIWSStateStoreTimeoutProbeCache) DeleteSessionAccountID(ctx context.Context, _ int64, _ string) error { if deadline, ok := ctx.Deadline(); ok { c.deleteHasDeadline = true @@ -193,6 +205,18 @@ func (c *openAIWSStateStoreTimeoutProbeCache) DeleteSessionAccountID(ctx context return nil } +func (c *openAIWSStateStoreTimeoutProbeCache) GetWindsurfCascadeID(_ context.Context, _ string) (string, error) { + return "", nil +} + +func (c *openAIWSStateStoreTimeoutProbeCache) SetWindsurfCascadeID(_ context.Context, _ string, _ string, _ time.Duration) error { + return nil +} + +func (c *openAIWSStateStoreTimeoutProbeCache) DeleteWindsurfCascadeID(_ context.Context, _ string) error { + return nil +} + func TestOpenAIWSStateStore_RedisOpsUseShortTimeout(t *testing.T) { probe := &openAIWSStateStoreTimeoutProbeCache{} store := NewOpenAIWSStateStore(probe) diff --git a/backend/internal/service/windsurf_chat_service.go b/backend/internal/service/windsurf_chat_service.go index f9e20c48..970110dd 100644 --- a/backend/internal/service/windsurf_chat_service.go +++ b/backend/internal/service/windsurf_chat_service.go @@ -2,6 +2,8 @@ package service import ( "context" + "crypto/sha256" + "encoding/hex" "fmt" "log/slog" "strings" @@ -11,28 +13,38 @@ import ( "github.com/Wei-Shaw/sub2api/internal/pkg/windsurf" ) +// cascadeReuseTTL 是 Windsurf Cascade 会话 ID 在 Redis 中的存活时长。 +// 与 LS 端 cascade 老化窗口对齐:超过 30 分钟即使本地 cache 命中, +// LS 也大概率返回 panel-not-found,由 chatCascade 的回退路径兜底。 +const cascadeReuseTTL = 30 * time.Minute + type WindsurfChatService struct { cfg config.WindsurfConfig lsService *WindsurfLSService tokenProvider *WindsurfTokenProvider - pool *windsurf.ConversationPool + cache GatewayCache } func NewWindsurfChatService( cfg config.WindsurfConfig, lsService *WindsurfLSService, tokenProvider *WindsurfTokenProvider, + cache GatewayCache, ) *WindsurfChatService { return &WindsurfChatService{ cfg: cfg, lsService: lsService, tokenProvider: tokenProvider, - pool: windsurf.NewConversationPool(), + cache: cache, } } type WindsurfChatRequest struct { - AccountID int64 + AccountID int64 + // GroupID 来自 apiKey.GroupID,用于 Cascade 复用 cache 隔离。0 表示不参与缓存。 + GroupID int64 + // SessionHash 与 sticky session 共用,标识同一对话流。空串表示不复用 cascade。 + SessionHash string Model string Messages []windsurf.ChatMessage Stream bool @@ -83,11 +95,11 @@ func (s *WindsurfChatService) Chat(ctx context.Context, req *WindsurfChatRequest var resp *WindsurfChatResponse switch mode { case "cascade": - resp, err = s.chatCascade(ctx, lease.Client, token.APIKey, meta, req.Messages, req.ToolPreamble, modelKey, lease.Endpoint, req.Images) + resp, err = s.chatCascade(ctx, lease.Client, token.APIKey, meta, req.Messages, req.ToolPreamble, modelKey, lease.Endpoint, req.Images, req.AccountID, req.GroupID, req.SessionHash) case "legacy": resp, err = s.chatLegacy(ctx, lease.Client, token.APIKey, meta, req.Messages, modelKey) default: - resp, err = s.chatCascade(ctx, lease.Client, token.APIKey, meta, req.Messages, req.ToolPreamble, modelKey, lease.Endpoint, req.Images) + resp, err = s.chatCascade(ctx, lease.Client, token.APIKey, meta, req.Messages, req.ToolPreamble, modelKey, lease.Endpoint, req.Images, req.AccountID, req.GroupID, req.SessionHash) } if err != nil { @@ -146,7 +158,20 @@ func injectModelIdentity(messages []windsurf.ChatMessage, meta *windsurf.ModelMe return append([]windsurf.ChatMessage{identity}, messages...) } -func (s *WindsurfChatService) chatCascade(ctx context.Context, client *windsurf.LocalLSClient, apiKey string, meta *windsurf.ModelMeta, messages []windsurf.ChatMessage, toolPreamble string, modelKey string, lsEndpoint string, images []windsurf.CascadeImage) (*WindsurfChatResponse, error) { +func (s *WindsurfChatService) chatCascade( + ctx context.Context, + client *windsurf.LocalLSClient, + apiKey string, + meta *windsurf.ModelMeta, + messages []windsurf.ChatMessage, + toolPreamble string, + modelKey string, + lsEndpoint string, + images []windsurf.CascadeImage, + accountID int64, + groupID int64, + sessionHash string, +) (*WindsurfChatResponse, error) { modelUID := "" modelEnumHint := 0 if meta != nil { @@ -154,68 +179,63 @@ func (s *WindsurfChatService) chatCascade(ctx context.Context, client *windsurf. modelEnumHint = meta.EnumValue } - // ── Model identity prompt injection ── - // When the client doesn't provide its own system prompt, prepend one so - // the model identifies itself as the requested model rather than leaking - // the underlying Windsurf/Cascade backend identity. - // Skip when the client already has a system message (Claude Code / Cline) - // to avoid triggering Cascade anti-injection on reasoning models. messages = injectModelIdentity(messages, meta, modelKey) - // 图像能力 gate:仅在请求含图时检查。 - // 策略:fail-open on RPC error;显式 supports_images=false 时拒绝(返回 CascadeModelError 触发 failover)。 if len(images) > 0 { found, ok, err := client.ModelSupportsImages(ctx, apiKey, modelUID) if err != nil { slog.Warn("windsurf_cascade_caps_fetch_failed", "model", modelUID, "error", err) - // fail-open } else if found && !ok { return nil, fmt.Errorf("model %q does not support image inputs in Windsurf Cascade", modelUID) } } - fpBefore := windsurf.FingerprintBefore(messages, modelKey, apiKey) - // failover 切号后禁止复用 cascade:cascade_id 属于上一个账号的 LS, - // 在当前账号上一定会触发 "panel state not found" 浪费一次请求。 - // 同时切号场景下需要提升历史预算——新账号完全没有服务端上下文, - // 必须把完整聊天记录塞进文本里。 - skipReuse := false - switchover := false - if switches, ok := AccountSwitchCountFromContext(ctx); ok && switches > 0 { - skipReuse = true - switchover = true - } - var entry *windsurf.ConversationEntry - if !skipReuse { - entry = s.pool.Checkout(fpBefore) - } - isResume := entry != nil && entry.CascadeID != "" - - var reuseCascadeID string - if isResume { - reuseCascadeID = entry.CascadeID - slog.Info("windsurf_cascade_reuse_hit", "cascade_id", reuseCascadeID[:8], "model", modelKey) + // reuse 路径:sessionHash 非空且 cache 命中即复用 cascade, + // userText 缩为"system + 最后一条 user"——cascade trajectory 已承载历史。 + canReuse := sessionHash != "" && s.cache != nil && groupID != 0 + cacheKey := "" + reuseID := "" + if canReuse { + cacheKey = buildCascadeCacheKey(groupID, accountID, modelUID, lsEndpoint, sessionHash, sysPromptHash(messages)) + if id, err := s.cache.GetCascadeID(ctx, cacheKey); err == nil && id != "" { + reuseID = id + } else if err != nil { + slog.Warn("windsurf_cascade_cache_get_failed", "error", err) + } } - userText := buildCascadeText(messages, modelUID, isResume, switchover) + var userText string + if reuseID != "" { + userText = buildCascadeTextForReuse(messages) + } else { + userText = buildCascadeText(messages, modelUID) + } - result, err := client.StreamCascadeChat(ctx, apiKey, modelUID, userText, toolPreamble, reuseCascadeID, modelEnumHint, images) - if err != nil && isResume { - slog.Warn("windsurf_cascade_reuse_failed", "error", err, "model", modelKey) - // panel-state-not-found 恢复:新 cascade 没有服务端历史,必须发完整聊天记录。 - userText = buildCascadeText(messages, modelUID, false, true) + result, err := client.StreamCascadeChat(ctx, apiKey, modelUID, userText, toolPreamble, reuseID, modelEnumHint, images) + + // reuse 触发 panel-not-found:清缓存 + 用 full-history 重试一次。 + if err != nil && reuseID != "" && isPanelNotFound(err) { + slog.Info("windsurf_cascade_reuse_invalidated", "cascade_id", reuseID, "reason", "panel_not_found") + if cacheKey != "" { + _ = s.cache.DeleteCascadeID(ctx, cacheKey) + } + userText = buildCascadeText(messages, modelUID) result, err = client.StreamCascadeChat(ctx, apiKey, modelUID, userText, toolPreamble, "", modelEnumHint, images) } + if err != nil { + // 任何错误都失效缓存(保守策略,避免下次复用到坏 cascade)。 + if canReuse && cacheKey != "" { + _ = s.cache.DeleteCascadeID(ctx, cacheKey) + } return nil, err } - if result.CascadeID != "" && result.Text != "" { - fpAfter := windsurf.FingerprintAfter(messages, modelKey, apiKey) - s.pool.Checkin(fpAfter, &windsurf.ConversationEntry{ - CascadeID: result.CascadeID, - APIKey: apiKey, - }) + // 成功:写回 cache。 + if canReuse && cacheKey != "" && result.CascadeID != "" { + if setErr := s.cache.SetCascadeID(ctx, cacheKey, result.CascadeID, cascadeReuseTTL); setErr != nil { + slog.Warn("windsurf_cascade_cache_set_failed", "error", setErr) + } } return &WindsurfChatResponse{ @@ -229,6 +249,64 @@ func (s *WindsurfChatService) chatCascade(ctx context.Context, client *windsurf. }, nil } +// buildCascadeCacheKey 构造 Cascade 复用 cache 的 key。 +// 任一组件变化(账号、模型、LS 实例、会话、system prompt)都会自动 cache miss。 +func buildCascadeCacheKey(groupID, accountID int64, modelUID, lsEndpoint, sessionHash, sysHash string) string { + h := sha256.New() + fmt.Fprintf(h, "%d|%d|%s|%s|%s|%s", groupID, accountID, modelUID, lsEndpoint, sessionHash, sysHash) + return hex.EncodeToString(h.Sum(nil))[:24] +} + +// sysPromptHash 计算 system 消息内容的指纹。system 变化必须强制开新 cascade, +// 否则旧 cascade 内已建立的"角色/约束"语境会污染新对话。 +func sysPromptHash(messages []windsurf.ChatMessage) string { + h := sha256.New() + for _, m := range messages { + if m.Role == "system" { + h.Write([]byte(m.Content)) + h.Write([]byte{0}) + } + } + return hex.EncodeToString(h.Sum(nil))[:16] +} + +// isPanelNotFound 判定 LS 端"cascade panel state not found"错误。 +// 与 windsurf.LocalLSClient 内部 panel-state-not-found 检测保持一致。 +func isPanelNotFound(err error) bool { + if err == nil { + return false + } + s := strings.ToLower(err.Error()) + if strings.Contains(s, "panel state not found") { + return true + } + return strings.Contains(s, "not_found") && strings.Contains(s, "panel") +} + +// buildCascadeTextForReuse 构造 reuse 模式下的 user turn 文本: +// 仅含 system instructions + 最新一条 user 消息。Cascade 内部已通过 trajectory 保留前序历史。 +// 注意:cacheKey 已包含 sysPromptHash,system 变化会强制 cache miss → 走 buildCascadeText 全量路径。 +func buildCascadeTextForReuse(messages []windsurf.ChatMessage) string { + var sysParts []string + var lastUser string + for _, m := range messages { + if m.Role == "system" { + sysParts = append(sysParts, m.Content) + } + } + for i := len(messages) - 1; i >= 0; i-- { + if messages[i].Role == "user" { + lastUser = messages[i].Content + break + } + } + sys := strings.TrimSpace(strings.Join(sysParts, "\n")) + if sys != "" { + return "\n" + sys + "\n\n\n" + lastUser + } + return lastUser +} + func (s *WindsurfChatService) chatLegacy(ctx context.Context, client *windsurf.LocalLSClient, apiKey string, meta *windsurf.ModelMeta, messages []windsurf.ChatMessage, modelKey string) (*WindsurfChatResponse, error) { modelEnum := 0 modelName := "" @@ -249,19 +327,12 @@ func (s *WindsurfChatService) chatLegacy(ctx context.Context, client *windsurf.L } const ( - cascadeMaxHistoryBytes = 200_000 - cascade1MHistoryBytes = 900_000 - // cascadeSwitchoverHistoryBytes 是切号 / panel-state-not-found 恢复场景下的 - // "尽量塞进完整历史" 预算。目标是让新账号拿到尽可能完整的对话上下文。 - // 3.5MB 留了 500KB 给 proto 其它字段(metadata/config/images),避开 gRPC 4MB 默认上限。 - cascadeSwitchoverHistoryBytes = 3_500_000 - cascadeMultiTurnPreamble = "The following is a multi-turn conversation. You MUST remember and use all information from prior turns." + cascadeMaxHistoryBytes = 200_000 + cascade1MHistoryBytes = 900_000 + cascadeMultiTurnPreamble = "The following is a multi-turn conversation. You MUST remember and use all information from prior turns." ) -func cascadeHistoryBudget(modelUID string, switchover bool) int { - if switchover { - return cascadeSwitchoverHistoryBytes - } +func cascadeHistoryBudget(modelUID string) int { if strings.Contains(strings.ToLower(modelUID), "1m") { return cascade1MHistoryBytes } @@ -269,14 +340,9 @@ func cascadeHistoryBudget(modelUID string, switchover bool) int { } // buildCascadeText constructs the full text payload for SendUserCascadeMessage. -// If isResume is true, only the last user message is sent (cascade already has context). -// Otherwise: system prompt wrapped in , multi-turn history -// with / tags, and a budget cap to trim old turns. -// -// switchover=true 提升历史预算到 cascadeSwitchoverHistoryBytes(~3.5MB), -// 用于切号 / panel-state-not-found 恢复场景——新账号/新 cascade 没有服务端历史, -// 必须把完整聊天记录塞进文本里。isResume=true 时该参数被忽略(resume 只发最后一条)。 -func buildCascadeText(messages []windsurf.ChatMessage, modelUID string, isResume, switchover bool) string { +// System prompt is wrapped in , multi-turn history uses +// / tags with a budget cap to trim the oldest turns. +func buildCascadeText(messages []windsurf.ChatMessage, modelUID string) string { var systemParts []string var convo []windsurf.ChatMessage @@ -292,11 +358,6 @@ func buildCascadeText(messages []windsurf.ChatMessage, modelUID string, isResume return "" } - // Resume: cascade already has context, only send last user message - if isResume { - return convo[len(convo)-1].Content - } - sysText := strings.TrimSpace(strings.Join(systemParts, "\n")) if sysText != "" { sysText = "\n" + sysText + "\n" @@ -312,10 +373,9 @@ func buildCascadeText(messages []windsurf.ChatMessage, modelUID string, isResume } // Multi-turn: build history with budget trimming - maxBytes := cascadeHistoryBudget(modelUID, switchover) + maxBytes := cascadeHistoryBudget(modelUID) historyBytes := len(sysText) - // Walk backward from second-to-last, collecting turns that fit var lines []string droppedTurns := 0 for i := len(convo) - 2; i >= 0; i-- { @@ -332,20 +392,12 @@ func buildCascadeText(messages []windsurf.ChatMessage, modelUID string, isResume "total_turns", len(convo), "kept_kb", historyBytes/1024, "dropped_turns", droppedTurns, - "switchover", switchover, ) break } lines = append([]string{line}, lines...) historyBytes += len(line) } - if switchover && droppedTurns == 0 { - slog.Info("windsurf_cascade_switchover_history", - "total_turns", len(convo), - "kept_kb", historyBytes/1024, - "dropped_turns", 0, - ) - } latest := convo[len(convo)-1] text := cascadeMultiTurnPreamble + "\n\n" + diff --git a/backend/internal/service/windsurf_chat_service_test.go b/backend/internal/service/windsurf_chat_service_test.go index 53984a75..54f19d31 100644 --- a/backend/internal/service/windsurf_chat_service_test.go +++ b/backend/internal/service/windsurf_chat_service_test.go @@ -1,21 +1,17 @@ package service import ( + "errors" "strings" "testing" "github.com/Wei-Shaw/sub2api/internal/pkg/windsurf" ) -// Test that the switchover flag expands the history budget to ~3.5MB and preserves -// all turns for a large multi-turn conversation that would otherwise be trimmed -// under the normal 200KB budget. This guards the core fix: after a Windsurf -// account switch, the new account must receive the full chat history. -func TestBuildCascadeText_SwitchoverKeepsFullHistory(t *testing.T) { - // Build a ~1.5MB multi-turn history: 30 turns of ~50KB each (alternating - // user/assistant). Exceeds the normal 200KB cap; well within the 3.5MB cap. +// buildCascadeText always sends full history regardless of account switches. +func TestBuildCascadeText_AlwaysFullHistory(t *testing.T) { const perTurnBytes = 50 * 1024 - const turns = 30 + const turns = 3 // keep small so it fits in the 200KB budget bulk := strings.Repeat("x", perTurnBytes) var messages []windsurf.ChatMessage @@ -27,47 +23,29 @@ func TestBuildCascadeText_SwitchoverKeepsFullHistory(t *testing.T) { } messages = append(messages, windsurf.ChatMessage{Role: role, Content: bulk}) } - // Latest user message (the one actually being answered). messages = append(messages, windsurf.ChatMessage{Role: "user", Content: "final question"}) - normalText := buildCascadeText(messages, "claude-sonnet-4", false, false) - switchoverText := buildCascadeText(messages, "claude-sonnet-4", false, true) + text := buildCascadeText(messages, "claude-sonnet-4") - if len(normalText) >= len(switchoverText) { - t.Fatalf("switchover text (%d bytes) must be larger than normal (%d bytes)", - len(switchoverText), len(normalText)) + if !strings.Contains(text, "final question") { + t.Fatal("text must include the final user message") } - if len(normalText) > cascadeMaxHistoryBytes+perTurnBytes { - t.Fatalf("normal text (%d bytes) must fit near %d budget", len(normalText), cascadeMaxHistoryBytes) - } - if len(switchoverText) < perTurnBytes*turns { - t.Fatalf("switchover text (%d bytes) dropped turns; expected >= %d (all %d turns kept)", - len(switchoverText), perTurnBytes*turns, turns) - } - if len(switchoverText) > cascadeSwitchoverHistoryBytes+perTurnBytes { - t.Fatalf("switchover text (%d bytes) exceeded budget %d", len(switchoverText), cascadeSwitchoverHistoryBytes) - } - // Final user message must always be preserved (it's the question being asked). - if !strings.Contains(switchoverText, "final question") { - t.Fatal("switchover text must include the final user message") - } - if !strings.Contains(normalText, "final question") { - t.Fatal("normal text must include the final user message") + if !strings.Contains(text, "sys") { + t.Fatal("text must include the system prompt") } } -// Resume mode ignores switchover — only the last user message is sent because -// Cascade server already has the history for the reused cascade_id. -func TestBuildCascadeText_ResumeIgnoresSwitchover(t *testing.T) { +func TestBuildCascadeText_SingleTurn(t *testing.T) { messages := []windsurf.ChatMessage{ - {Role: "user", Content: "first"}, - {Role: "assistant", Content: "reply"}, - {Role: "user", Content: "second question"}, + {Role: "system", Content: "be helpful"}, + {Role: "user", Content: "hello"}, } - - got := buildCascadeText(messages, "claude-sonnet-4", true, true) - if got != "second question" { - t.Fatalf("resume=true must return only last user message, got %q", got) + got := buildCascadeText(messages, "claude-sonnet-4") + if !strings.Contains(got, "hello") { + t.Fatal("single turn text must contain user message") + } + if !strings.Contains(got, "be helpful") { + t.Fatal("single turn text must contain system prompt") } } @@ -144,23 +122,140 @@ func TestInjectModelIdentity(t *testing.T) { } } -func TestCascadeHistoryBudget(t *testing.T) { - tests := []struct { - name string - modelUID string - switchover bool - want int - }{ - {"normal model normal budget", "claude-sonnet-4", false, cascadeMaxHistoryBytes}, - {"1m model normal budget", "claude-sonnet-4-1m", false, cascade1MHistoryBytes}, - {"normal model switchover", "claude-sonnet-4", true, cascadeSwitchoverHistoryBytes}, - {"1m model switchover", "claude-sonnet-4-1m", true, cascadeSwitchoverHistoryBytes}, +// reuse 路径只送 system + 最后一条 user,不携带历史 +func TestBuildCascadeTextForReuse_SystemAndLastUser(t *testing.T) { + messages := []windsurf.ChatMessage{ + {Role: "system", Content: "be helpful"}, + {Role: "user", Content: "first turn"}, + {Role: "assistant", Content: "first response"}, + {Role: "user", Content: "second turn"}, } - for _, tt := range tests { + got := buildCascadeTextForReuse(messages) + + if !strings.Contains(got, "second turn") { + t.Fatal("reuse text must contain the latest user message") + } + if !strings.Contains(got, "be helpful") { + t.Fatal("reuse text must contain the system prompt") + } + if strings.Contains(got, "first turn") || strings.Contains(got, "first response") { + t.Fatalf("reuse text must NOT carry prior history (Cascade trajectory has it). got=%q", got) + } + if !strings.Contains(got, "") { + t.Fatalf("reuse text must wrap system in tag. got=%q", got) + } +} + +func TestBuildCascadeTextForReuse_NoSystem(t *testing.T) { + messages := []windsurf.ChatMessage{ + {Role: "user", Content: "hello"}, + } + got := buildCascadeTextForReuse(messages) + if got != "hello" { + t.Fatalf("expected raw user message when no system; got %q", got) + } +} + +func TestBuildCascadeCacheKey_Stable(t *testing.T) { + a := buildCascadeCacheKey(1, 2, "claude-sonnet", "http://localhost:42100", "sess-x", "syshash-x") + b := buildCascadeCacheKey(1, 2, "claude-sonnet", "http://localhost:42100", "sess-x", "syshash-x") + if a != b { + t.Fatalf("same inputs must yield same key; %q vs %q", a, b) + } + if len(a) != 24 { + t.Fatalf("cache key length expected 24, got %d (%q)", len(a), a) + } +} + +// cacheKey 任一组件变化都必须产生不同的 key(避免错误复用) +func TestBuildCascadeCacheKey_DifferentInputsDiffer(t *testing.T) { + base := buildCascadeCacheKey(1, 2, "model-a", "ep1", "sess1", "sys1") + cases := map[string]string{ + "groupID": buildCascadeCacheKey(99, 2, "model-a", "ep1", "sess1", "sys1"), + "accountID": buildCascadeCacheKey(1, 99, "model-a", "ep1", "sess1", "sys1"), + "modelUID": buildCascadeCacheKey(1, 2, "model-b", "ep1", "sess1", "sys1"), + "lsEndpoint": buildCascadeCacheKey(1, 2, "model-a", "ep2", "sess1", "sys1"), + "sessionHash": buildCascadeCacheKey(1, 2, "model-a", "ep1", "sess2", "sys1"), + "sysHash": buildCascadeCacheKey(1, 2, "model-a", "ep1", "sess1", "sys2"), + } + for name, k := range cases { + if k == base { + t.Fatalf("changing %s must produce a different cache key", name) + } + } +} + +func TestSysPromptHash_DetectsSystemChange(t *testing.T) { + a := sysPromptHash([]windsurf.ChatMessage{ + {Role: "system", Content: "be helpful"}, + {Role: "user", Content: "hi"}, + }) + b := sysPromptHash([]windsurf.ChatMessage{ + {Role: "system", Content: "be helpful"}, + {Role: "user", Content: "different user msg — should NOT affect sys hash"}, + }) + c := sysPromptHash([]windsurf.ChatMessage{ + {Role: "system", Content: "be VERY helpful"}, + {Role: "user", Content: "hi"}, + }) + if a != b { + t.Fatalf("sysPromptHash must ignore non-system content; %q vs %q", a, b) + } + if a == c { + t.Fatalf("sysPromptHash must reflect system content changes; both are %q", a) + } +} + +// 多条 system 拼接顺序敏感(合并成 multipart 时不应让两段交换后产生相同 hash) +func TestSysPromptHash_OrderSensitive(t *testing.T) { + a := sysPromptHash([]windsurf.ChatMessage{ + {Role: "system", Content: "alpha"}, + {Role: "system", Content: "beta"}, + }) + b := sysPromptHash([]windsurf.ChatMessage{ + {Role: "system", Content: "beta"}, + {Role: "system", Content: "alpha"}, + }) + if a == b { + t.Fatalf("sysPromptHash must be order-sensitive across multiple system messages") + } +} + +func TestIsPanelNotFound(t *testing.T) { + cases := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"unrelated error", errors.New("network unreachable"), false}, + {"exact phrase", errors.New("Cascade panel state not found"), true}, + {"lower case variant", errors.New("panel state not found: id=abc"), true}, + {"not_found code", errors.New("rpc error: code=not_found, panel missing"), true}, + {"missing one keyword", errors.New("not_found"), false}, + } + for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { - if got := cascadeHistoryBudget(tt.modelUID, tt.switchover); got != tt.want { - t.Errorf("cascadeHistoryBudget(%q, %v) = %d, want %d", - tt.modelUID, tt.switchover, got, tt.want) + if got := isPanelNotFound(tt.err); got != tt.want { + t.Fatalf("isPanelNotFound(%v) = %v, want %v", tt.err, got, tt.want) + } + }) + } +} + +func TestCascadeHistoryBudget(t *testing.T) { + tests := []struct { + name string + modelUID string + want int + }{ + {"normal model", "claude-sonnet-4", cascadeMaxHistoryBytes}, + {"1m model", "claude-sonnet-4-1m", cascade1MHistoryBytes}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := cascadeHistoryBudget(tt.modelUID); got != tt.want { + t.Errorf("cascadeHistoryBudget(%q) = %d, want %d", tt.modelUID, got, tt.want) } }) } diff --git a/backend/internal/service/windsurf_gateway_service.go b/backend/internal/service/windsurf_gateway_service.go index 8626e42d..04cc6bfa 100644 --- a/backend/internal/service/windsurf_gateway_service.go +++ b/backend/internal/service/windsurf_gateway_service.go @@ -36,7 +36,10 @@ func NewWindsurfGatewayService( } } -func (s *WindsurfGatewayService) Forward(ctx context.Context, c *gin.Context, account *Account, body []byte, _ bool) (*ForwardResult, error) { +// Forward 处理 Windsurf 平台的 Anthropic-兼容请求。 +// groupID 与 sessionHash 用于 Cascade 多轮复用:在同一 sticky session 上复用上游 LS cascade, +// 跳过 StartCascade 的额外 RPC,并避免每轮把 full-history 重灌进 trajectory。空值表示不复用。 +func (s *WindsurfGatewayService) Forward(ctx context.Context, c *gin.Context, account *Account, body []byte, _ bool, groupID int64, sessionHash string) (*ForwardResult, error) { startTime := time.Now() reqLog := windsurfLogger(c, "windsurf_gateway.forward", zap.Int64("account_id", account.ID), @@ -228,6 +231,8 @@ func (s *WindsurfGatewayService) Forward(ctx context.Context, c *gin.Context, ac chatReq := &WindsurfChatRequest{ AccountID: account.ID, + GroupID: groupID, + SessionHash: sessionHash, Model: req.Model, Messages: chatMessages, Stream: req.Stream, diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index 8af5c693..3efd7800 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -562,11 +562,11 @@ func ProvideWindsurfTokenProvider(cfg *config.Config, accountRepo AccountReposit } // ProvideWindsurfChatService creates WindsurfChatService (nil when disabled). -func ProvideWindsurfChatService(cfg *config.Config, lsService *WindsurfLSService, tokenProvider *WindsurfTokenProvider) *WindsurfChatService { +func ProvideWindsurfChatService(cfg *config.Config, lsService *WindsurfLSService, tokenProvider *WindsurfTokenProvider, cache GatewayCache) *WindsurfChatService { if !cfg.Windsurf.Enabled || lsService == nil || tokenProvider == nil { return nil } - return NewWindsurfChatService(cfg.Windsurf, lsService, tokenProvider) + return NewWindsurfChatService(cfg.Windsurf, lsService, tokenProvider, cache) } // ProvideWindsurfGatewayService creates WindsurfGatewayService (nil when disabled).