fix: revert antigravity Forward to v1internal REST path, remove broken lsrpc upstream call

lsrpc is local IPC (IDE ↔ language_server binary), not an upstream protocol.
cloudcode-pa.googleapis.com does not serve gRPC/lsrpc endpoints.
Restores antigravityRetryLoop + streamGenerateContent path which was working.
Removes antigravity_lsrpc.go (upstream caller) and lsrpc_test cmd.
Keeps lsrpc_handler.go (server side, receives IDE connections).
This commit is contained in:
win 2026-04-19 20:03:34 +08:00
parent 888b7eeb21
commit 3403e8401c
10 changed files with 492 additions and 173 deletions

View File

@ -237,8 +237,9 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
jwtAuthMiddleware := middleware.NewJWTAuthMiddleware(authService, userService) jwtAuthMiddleware := middleware.NewJWTAuthMiddleware(authService, userService)
adminAuthMiddleware := middleware.NewAdminAuthMiddleware(authService, userService, settingService) adminAuthMiddleware := middleware.NewAdminAuthMiddleware(authService, userService, settingService)
apiKeyAuthMiddleware := middleware.NewAPIKeyAuthMiddleware(apiKeyService, subscriptionService, configConfig) apiKeyAuthMiddleware := middleware.NewAPIKeyAuthMiddleware(apiKeyService, subscriptionService, configConfig)
langServerService := service.ProvideLanguageServerService(httpUpstream) langServerService := service.ProvideLanguageServerService(httpUpstream, antigravityGatewayService, accountRepository)
engine := server.ProvideRouter(configConfig, handlers, jwtAuthMiddleware, adminAuthMiddleware, apiKeyAuthMiddleware, apiKeyService, subscriptionService, opsService, settingService, redisClient, langServerService) lsrpcHandler := service.NewLSRPCHandler(antigravityGatewayService, accountRepository, nil)
engine := server.ProvideRouter(configConfig, handlers, jwtAuthMiddleware, adminAuthMiddleware, apiKeyAuthMiddleware, apiKeyService, subscriptionService, opsService, settingService, redisClient, langServerService, lsrpcHandler)
httpServer := server.ProvideHTTPServer(configConfig, engine) httpServer := server.ProvideHTTPServer(configConfig, engine)
opsMetricsCollector := service.ProvideOpsMetricsCollector(opsRepository, settingRepository, accountRepository, concurrencyService, db, redisClient, configConfig) opsMetricsCollector := service.ProvideOpsMetricsCollector(opsRepository, settingRepository, accountRepository, concurrencyService, db, redisClient, configConfig)
opsAggregationService := service.ProvideOpsAggregationService(opsRepository, settingRepository, db, redisClient, configConfig) opsAggregationService := service.ProvideOpsAggregationService(opsRepository, settingRepository, db, redisClient, configConfig)

View File

@ -40,6 +40,7 @@ func ProvideRouter(
settingService *service.SettingService, settingService *service.SettingService,
redisClient *redis.Client, redisClient *redis.Client,
langServerService *service.LanguageServerService, langServerService *service.LanguageServerService,
lsrpcHandler *service.LSRPCHandler,
) *gin.Engine { ) *gin.Engine {
if cfg.Server.Mode == "release" { if cfg.Server.Mode == "release" {
gin.SetMode(gin.ReleaseMode) gin.SetMode(gin.ReleaseMode)
@ -96,7 +97,7 @@ func ProvideRouter(
service.SetWebSearchManager(websearch.NewManager(configs, redisClient)) service.SetWebSearchManager(websearch.NewManager(configs, redisClient))
}) })
return SetupRouter(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg, redisClient, langServerService) return SetupRouter(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg, redisClient, langServerService, lsrpcHandler)
} }
// ProvideHTTPServer 提供 HTTP 服务器 // ProvideHTTPServer 提供 HTTP 服务器

View File

@ -7,6 +7,7 @@ import (
"time" "time"
"github.com/Wei-Shaw/sub2api/internal/config" "github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/gen/language_server_pbconnect"
"github.com/Wei-Shaw/sub2api/internal/handler" "github.com/Wei-Shaw/sub2api/internal/handler"
middleware2 "github.com/Wei-Shaw/sub2api/internal/server/middleware" middleware2 "github.com/Wei-Shaw/sub2api/internal/server/middleware"
"github.com/Wei-Shaw/sub2api/internal/server/routes" "github.com/Wei-Shaw/sub2api/internal/server/routes"
@ -33,6 +34,7 @@ func SetupRouter(
cfg *config.Config, cfg *config.Config,
redisClient *redis.Client, redisClient *redis.Client,
langServerService *service.LanguageServerService, langServerService *service.LanguageServerService,
lsrpcHandler *service.LSRPCHandler,
) *gin.Engine { ) *gin.Engine {
// 缓存 iframe 页面的 origin 列表,用于动态注入 CSP frame-src // 缓存 iframe 页面的 origin 列表,用于动态注入 CSP frame-src
var cachedFrameOrigins atomic.Pointer[[]string] var cachedFrameOrigins atomic.Pointer[[]string]
@ -82,7 +84,7 @@ func SetupRouter(
} }
// 注册路由 // 注册路由
registerRoutes(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg, redisClient, langServerService) registerRoutes(r, handlers, jwtAuth, adminAuth, apiKeyAuth, apiKeyService, subscriptionService, opsService, settingService, cfg, redisClient, langServerService, lsrpcHandler)
return r return r
} }
@ -101,6 +103,7 @@ func registerRoutes(
cfg *config.Config, cfg *config.Config,
redisClient *redis.Client, redisClient *redis.Client,
langServerService *service.LanguageServerService, langServerService *service.LanguageServerService,
lsrpcHandler *service.LSRPCHandler,
) { ) {
// 通用路由(健康检查、状态等) // 通用路由(健康检查、状态等)
routes.RegisterCommonRoutes(r) routes.RegisterCommonRoutes(r)
@ -117,5 +120,12 @@ func registerRoutes(
// 注册 Antigravity HTTP API 路由 // 注册 Antigravity HTTP API 路由
routes.RegisterAntigravityHTTPRoutes(v1, langServerService) routes.RegisterAntigravityHTTPRoutes(v1, langServerService)
// 挂载 connectrpc LanguageServerService 路由
// Claude Code 客户端通过 /exa.language_server_pb.LanguageServerService/* 路径访问
if lsrpcHandler != nil {
lsrpcPath, lsrpcHTTPHandler := language_server_pbconnect.NewLanguageServerServiceHandler(lsrpcHandler)
r.Any(lsrpcPath+"*action", gin.WrapH(lsrpcHTTPHandler))
}
routes.RegisterPaymentRoutes(v1, h.Payment, h.PaymentWebhook, h.Admin.Payment, jwtAuth, adminAuth, settingService) routes.RegisterPaymentRoutes(v1, h.Payment, h.PaymentWebhook, h.Admin.Payment, jwtAuth, adminAuth, settingService)
} }

View File

@ -18,7 +18,7 @@ func TestAntigravityHTTPRoutes(t *testing.T) {
gin.SetMode(gin.TestMode) gin.SetMode(gin.TestMode)
// 创建模拟的 LanguageServerService // 创建模拟的 LanguageServerService
mockService := service.NewLanguageServerService(slog.Default(), nil) mockService := service.NewLanguageServerService(slog.Default(), nil, nil, nil)
defer mockService.Stop() defer mockService.Stop()
// 创建路由 // 创建路由
@ -143,7 +143,7 @@ func TestAntigravityHTTPRoutes(t *testing.T) {
func TestStartCascadeValidation(t *testing.T) { func TestStartCascadeValidation(t *testing.T) {
gin.SetMode(gin.TestMode) gin.SetMode(gin.TestMode)
mockService := service.NewLanguageServerService(slog.Default(), nil) mockService := service.NewLanguageServerService(slog.Default(), nil, nil, nil)
defer mockService.Stop() defer mockService.Stop()
r := gin.New() r := gin.New()
@ -185,7 +185,7 @@ func TestStartCascadeValidation(t *testing.T) {
func TestRateLimiting(t *testing.T) { func TestRateLimiting(t *testing.T) {
gin.SetMode(gin.TestMode) gin.SetMode(gin.TestMode)
mockService := service.NewLanguageServerService(slog.Default(), nil) mockService := service.NewLanguageServerService(slog.Default(), nil, nil, nil)
defer mockService.Stop() defer mockService.Stop()
r := gin.New() r := gin.New()
@ -257,7 +257,7 @@ func TestRateLimiting(t *testing.T) {
func TestSessionCleanup(t *testing.T) { func TestSessionCleanup(t *testing.T) {
gin.SetMode(gin.TestMode) gin.SetMode(gin.TestMode)
mockService := service.NewLanguageServerService(slog.Default(), nil) mockService := service.NewLanguageServerService(slog.Default(), nil, nil, nil)
mockService.SetSessionTTL(2) // 设置 2 秒过期,便于测试 mockService.SetSessionTTL(2) // 设置 2 秒过期,便于测试
defer mockService.Stop() defer mockService.Stop()
@ -305,7 +305,7 @@ func TestSessionCleanup(t *testing.T) {
func TestConcurrentMessageAppend(t *testing.T) { func TestConcurrentMessageAppend(t *testing.T) {
gin.SetMode(gin.TestMode) gin.SetMode(gin.TestMode)
mockService := service.NewLanguageServerService(slog.Default(), nil) mockService := service.NewLanguageServerService(slog.Default(), nil, nil, nil)
defer mockService.Stop() defer mockService.Stop()
r := gin.New() r := gin.New()

View File

@ -1467,7 +1467,6 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
if mappedModel == "" { if mappedModel == "" {
return nil, s.writeClaudeError(c, http.StatusForbidden, "permission_error", fmt.Sprintf("model %s not in whitelist", claudeReq.Model)) return nil, s.writeClaudeError(c, http.StatusForbidden, "permission_error", fmt.Sprintf("model %s not in whitelist", claudeReq.Model))
} }
// 应用 thinking 模式自动后缀:如果 thinking 开启且目标是 claude-sonnet-4-5自动改为 thinking 版本
thinkingEnabled := claudeReq.Thinking != nil && (claudeReq.Thinking.Type == "enabled" || claudeReq.Thinking.Type == "adaptive") thinkingEnabled := claudeReq.Thinking != nil && (claudeReq.Thinking.Type == "enabled" || claudeReq.Thinking.Type == "adaptive")
mappedModel = applyThinkingModelSuffix(mappedModel, thinkingEnabled) mappedModel = applyThinkingModelSuffix(mappedModel, thinkingEnabled)
billingModel := mappedModel billingModel := mappedModel
@ -1494,9 +1493,8 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
} }
// 获取转换选项 // 获取转换选项
// Antigravity 上游要求必须包含身份提示词,否则会返回 429
transformOpts := s.getClaudeTransformOptions(ctx) transformOpts := s.getClaudeTransformOptions(ctx)
transformOpts.EnableIdentityPatch = true // 强制启用Antigravity 上游必需 transformOpts.EnableIdentityPatch = true
transformOpts.PreferredSessionID = sessionID transformOpts.PreferredSessionID = sessionID
// 转换 Claude 请求为 Gemini 格式 // 转换 Claude 请求为 Gemini 格式
@ -1505,11 +1503,8 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
return nil, s.writeClaudeError(c, http.StatusBadRequest, "invalid_request_error", "Invalid request") return nil, s.writeClaudeError(c, http.StatusBadRequest, "invalid_request_error", "Invalid request")
} }
// Antigravity 上游只支持流式请求,统一使用 streamGenerateContent
// 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后转换返回
action := "streamGenerateContent" action := "streamGenerateContent"
// 执行带重试的请求
result, err := s.antigravityRetryLoop(antigravityRetryLoopParams{ result, err := s.antigravityRetryLoop(antigravityRetryLoopParams{
ctx: ctx, ctx: ctx,
prefix: prefix, prefix: prefix,
@ -1524,19 +1519,17 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
accountRepo: s.accountRepo, accountRepo: s.accountRepo,
handleError: s.handleUpstreamError, handleError: s.handleUpstreamError,
requestedModel: originalModel, requestedModel: originalModel,
isStickySession: isStickySession, // Forward 由上层判断粘性会话 isStickySession: isStickySession,
groupID: 0, // Forward 方法没有 groupID由上层处理粘性会话清除 groupID: 0,
sessionHash: "", // Forward 方法没有 sessionHash由上层处理粘性会话清除 sessionHash: "",
}) })
if err != nil { if err != nil {
// 检查是否是账号切换信号,转换为 UpstreamFailoverError 让 Handler 切换账号
if switchErr, ok := IsAntigravityAccountSwitchError(err); ok { if switchErr, ok := IsAntigravityAccountSwitchError(err); ok {
return nil, &UpstreamFailoverError{ return nil, &UpstreamFailoverError{
StatusCode: http.StatusServiceUnavailable, StatusCode: http.StatusServiceUnavailable,
ForceCacheBilling: switchErr.IsStickySession, ForceCacheBilling: switchErr.IsStickySession,
} }
} }
// 区分客户端取消和真正的上游失败,返回更准确的错误消息
if c.Request.Context().Err() != nil { if c.Request.Context().Err() != nil {
return nil, s.writeClaudeError(c, http.StatusBadGateway, "client_disconnected", "Client disconnected before upstream response") return nil, s.writeClaudeError(c, http.StatusBadGateway, "client_disconnected", "Client disconnected before upstream response")
} }
@ -1548,9 +1541,6 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
if resp.StatusCode >= 400 { if resp.StatusCode >= 400 {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
// 优先检测 thinking block 的 signature 相关错误400并重试一次
// Antigravity /v1internal 链路在部分场景会对 thought/thinking signature 做严格校验,
// 当历史消息携带的 signature 不合法时会直接 400去除 thinking 后可继续完成请求。
if resp.StatusCode == http.StatusBadRequest && isSignatureRelatedError(respBody) && s.settingService.IsSignatureRectifierEnabled(ctx) { if resp.StatusCode == http.StatusBadRequest && isSignatureRelatedError(respBody) && s.settingService.IsSignatureRectifierEnabled(ctx) {
upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody)) upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
@ -1567,10 +1557,6 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
Detail: upstreamDetail, Detail: upstreamDetail,
}) })
// Conservative two-stage fallback:
// 1) Disable top-level thinking + thinking->text
// 2) Only if still signature-related 400: also downgrade tool_use/tool_result to text.
retryStages := []struct { retryStages := []struct {
name string name string
strip func(*antigravity.ClaudeRequest) (bool, error) strip func(*antigravity.ClaudeRequest) (bool, error)
@ -1609,8 +1595,8 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
handleError: s.handleUpstreamError, handleError: s.handleUpstreamError,
requestedModel: originalModel, requestedModel: originalModel,
isStickySession: isStickySession, isStickySession: isStickySession,
groupID: 0, // Forward 方法没有 groupID由上层处理粘性会话清除 groupID: 0,
sessionHash: "", // Forward 方法没有 sessionHash由上层处理粘性会话清除 sessionHash: "",
}) })
if retryErr != nil { if retryErr != nil {
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
@ -1663,7 +1649,6 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
Detail: retryUpstreamDetail, Detail: retryUpstreamDetail,
}) })
// If this stage fixed the signature issue, we stop; otherwise we may try the next stage.
if retryResp.StatusCode != http.StatusBadRequest || !isSignatureRelatedError(retryBody) { if retryResp.StatusCode != http.StatusBadRequest || !isSignatureRelatedError(retryBody) {
respBody = retryBody respBody = retryBody
resp = &http.Response{ resp = &http.Response{
@ -1674,7 +1659,6 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
break break
} }
// Still signature-related; capture context and allow next stage.
respBody = retryBody respBody = retryBody
resp = &http.Response{ resp = &http.Response{
StatusCode: retryResp.StatusCode, StatusCode: retryResp.StatusCode,
@ -1684,7 +1668,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
} }
} }
// Budget 整流:检测 budget_tokens 约束错误并自动修正重试 // Budget 整流
if resp.StatusCode == http.StatusBadRequest && respBody != nil && !isSignatureRelatedError(respBody) { if resp.StatusCode == http.StatusBadRequest && respBody != nil && !isSignatureRelatedError(respBody) {
errMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody)) errMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
if isThinkingBudgetConstraintError(errMsg) && s.settingService.IsBudgetRectifierEnabled(ctx) { if isThinkingBudgetConstraintError(errMsg) && s.settingService.IsBudgetRectifierEnabled(ctx) {
@ -1699,11 +1683,9 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
Detail: s.getUpstreamErrorDetail(respBody), Detail: s.getUpstreamErrorDetail(respBody),
}) })
// 修正 claudeReq 的 thinking 参数adaptive 模式不修正)
if claudeReq.Thinking == nil || claudeReq.Thinking.Type != "adaptive" { if claudeReq.Thinking == nil || claudeReq.Thinking.Type != "adaptive" {
retryClaudeReq := claudeReq retryClaudeReq := claudeReq
retryClaudeReq.Messages = append([]antigravity.ClaudeMessage(nil), claudeReq.Messages...) retryClaudeReq.Messages = append([]antigravity.ClaudeMessage(nil), claudeReq.Messages...)
// 创建新的 ThinkingConfig 避免修改原始 claudeReq.Thinking 指针
retryClaudeReq.Thinking = &antigravity.ThinkingConfig{ retryClaudeReq.Thinking = &antigravity.ThinkingConfig{
Type: "enabled", Type: "enabled",
BudgetTokens: BudgetRectifyBudgetTokens, BudgetTokens: BudgetRectifyBudgetTokens,
@ -1758,9 +1740,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
} }
} }
// 处理错误响应(重试后仍失败或不触发重试)
if resp.StatusCode >= 400 { if resp.StatusCode >= 400 {
// 检测 prompt too long 错误,返回特殊错误类型供上层 fallback
if resp.StatusCode == http.StatusBadRequest && isPromptTooLongError(respBody) { if resp.StatusCode == http.StatusBadRequest && isPromptTooLongError(respBody) {
upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody)) upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
@ -1788,7 +1768,6 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, originalModel, 0, "", isStickySession) s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, originalModel, 0, "", isStickySession)
// 精确匹配服务端配置类 400 错误,触发同账号重试 + failover
if resp.StatusCode == http.StatusBadRequest { if resp.StatusCode == http.StatusBadRequest {
msg := strings.ToLower(strings.TrimSpace(extractAntigravityErrorMessage(respBody))) msg := strings.ToLower(strings.TrimSpace(extractAntigravityErrorMessage(respBody)))
if isGoogleProjectConfigError(msg) { if isGoogleProjectConfigError(msg) {
@ -1839,7 +1818,6 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
var firstTokenMs *int var firstTokenMs *int
var clientDisconnect bool var clientDisconnect bool
if claudeReq.Stream { if claudeReq.Stream {
// 客户端要求流式,直接透传转换
streamRes, err := s.handleClaudeStreamingResponse(c, resp, startTime, originalModel) streamRes, err := s.handleClaudeStreamingResponse(c, resp, startTime, originalModel)
if err != nil { if err != nil {
logger.LegacyPrintf("service.antigravity_gateway", "%s status=stream_error error=%v", prefix, err) logger.LegacyPrintf("service.antigravity_gateway", "%s status=stream_error error=%v", prefix, err)
@ -1849,7 +1827,6 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
firstTokenMs = streamRes.firstTokenMs firstTokenMs = streamRes.firstTokenMs
clientDisconnect = streamRes.clientDisconnect clientDisconnect = streamRes.clientDisconnect
} else { } else {
// 客户端要求非流式,收集流式响应后转换返回
streamRes, err := s.handleClaudeStreamToNonStreaming(c, resp, startTime, originalModel) streamRes, err := s.handleClaudeStreamToNonStreaming(c, resp, startTime, originalModel)
if err != nil { if err != nil {
logger.LegacyPrintf("service.antigravity_gateway", "%s status=stream_collect_error error=%v", prefix, err) logger.LegacyPrintf("service.antigravity_gateway", "%s status=stream_collect_error error=%v", prefix, err)
@ -1871,6 +1848,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
}, nil }, nil
} }
func isSignatureRelatedError(respBody []byte) bool { func isSignatureRelatedError(respBody []byte) bool {
msg := strings.ToLower(strings.TrimSpace(extractAntigravityErrorMessage(respBody))) msg := strings.ToLower(strings.TrimSpace(extractAntigravityErrorMessage(respBody)))
if msg == "" { if msg == "" {
@ -4674,3 +4652,61 @@ func (s *AntigravityGatewayService) extractClaudeUsage(body []byte) *ClaudeUsage
} }
return usage return usage
} }
// ForwardRaw 转发 Claude 格式请求并返回原始上游响应体(调用者负责关闭)。
// 不依赖 gin.Context供内部服务如 LanguageServerService调用。
// 复用完整的 token 刷新、模型映射、TLS 指纹和重试逻辑。
func (s *AntigravityGatewayService) ForwardRaw(ctx context.Context, account *Account, body []byte) (io.ReadCloser, int, error) {
var claudeReq antigravity.ClaudeRequest
if err := json.Unmarshal(body, &claudeReq); err != nil {
return nil, http.StatusBadRequest, fmt.Errorf("invalid request body: %w", err)
}
if strings.TrimSpace(claudeReq.Model) == "" {
return nil, http.StatusBadRequest, fmt.Errorf("missing model")
}
mappedModel := s.getMappedModel(account, claudeReq.Model)
if mappedModel == "" {
return nil, http.StatusForbidden, fmt.Errorf("model %s not in whitelist", claudeReq.Model)
}
thinkingEnabled := claudeReq.Thinking != nil && (claudeReq.Thinking.Type == "enabled" || claudeReq.Thinking.Type == "adaptive")
mappedModel = applyThinkingModelSuffix(mappedModel, thinkingEnabled)
if s.tokenProvider == nil {
return nil, http.StatusBadGateway, fmt.Errorf("antigravity token provider not configured")
}
accessToken, err := s.tokenProvider.GetAccessToken(ctx, account)
if err != nil {
return nil, http.StatusBadGateway, fmt.Errorf("failed to get access token: %w", err)
}
projectID := strings.TrimSpace(account.GetCredential("project_id"))
proxyURL := ""
if account.ProxyID != nil && account.Proxy != nil {
proxyURL = account.Proxy.URL()
}
transformOpts := s.getClaudeTransformOptions(ctx)
transformOpts.EnableIdentityPatch = true
geminiBody, err := antigravity.TransformClaudeToGeminiWithOptions(&claudeReq, projectID, mappedModel, transformOpts)
if err != nil {
return nil, http.StatusBadRequest, fmt.Errorf("failed to transform request: %w", err)
}
wrappedBody, err := s.wrapV1InternalRequest(projectID, mappedModel, geminiBody)
if err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("failed to wrap request: %w", err)
}
upstreamReq, err := antigravity.NewAPIRequest(ctx, "streamGenerateContent", accessToken, wrappedBody)
if err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("failed to build upstream request: %w", err)
}
resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
if err != nil {
return nil, http.StatusBadGateway, fmt.Errorf("upstream request failed: %w", err)
}
return resp.Body, resp.StatusCode, nil
}

View File

@ -2,14 +2,11 @@ package service
import ( import (
"bufio" "bufio"
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"log/slog" "log/slog"
"net/http"
"os"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -28,7 +25,7 @@ type CascadeSession struct {
} }
// LanguageServerService 业务逻辑层 // LanguageServerService 业务逻辑层
// 处理 Cascade Agent 流程,转发到上游 API // 处理 Cascade Agent 流程,通过 AntigravityGatewayService 转发到上游 API
type LanguageServerService struct { type LanguageServerService struct {
// 会话管理 // 会话管理
cascadeSessions map[string]*CascadeSession cascadeSessions map[string]*CascadeSession
@ -37,9 +34,9 @@ type LanguageServerService struct {
// 上游 HTTP 服务(用于发送请求) // 上游 HTTP 服务(用于发送请求)
httpUpstream HTTPUpstream httpUpstream HTTPUpstream
// 上游配置 // Antigravity 网关(账号池调度 + TLS 指纹 + token 刷新)
upstreamBaseURL string antigravitySvc *AntigravityGatewayService
upstreamAPIKey string accountRepo AccountRepository
// 日志 // 日志
logger *slog.Logger logger *slog.Logger
@ -59,15 +56,17 @@ type LanguageServerService struct {
func NewLanguageServerService( func NewLanguageServerService(
logger *slog.Logger, logger *slog.Logger,
httpUpstream HTTPUpstream, httpUpstream HTTPUpstream,
antigravitySvc *AntigravityGatewayService,
accountRepo AccountRepository,
) *LanguageServerService { ) *LanguageServerService {
svc := &LanguageServerService{ svc := &LanguageServerService{
cascadeSessions: make(map[string]*CascadeSession), cascadeSessions: make(map[string]*CascadeSession),
logger: logger, logger: logger,
httpUpstream: httpUpstream, httpUpstream: httpUpstream,
upstreamBaseURL: strings.TrimSuffix(os.Getenv("ANTHROPIC_BASE_URL"), "/"), antigravitySvc: antigravitySvc,
upstreamAPIKey: os.Getenv("ANTHROPIC_API_KEY"), accountRepo: accountRepo,
rateLimiter: make(chan struct{}, 100), // 改进 1: 限制 100 个并发消息 rateLimiter: make(chan struct{}, 100), // 改进 1: 限制 100 个并发消息
sessionTTLSeconds: 3600, // 改进 3: 会话默认 1 小时过期 sessionTTLSeconds: 3600, // 改进 3: 会话默认 1 小时过期
stopCleanup: make(chan struct{}), stopCleanup: make(chan struct{}),
} }
@ -380,46 +379,43 @@ func (svc *LanguageServerService) GetStatus(ctx context.Context) (string, error)
// 内部方法 // 内部方法
// ============================================================================ // ============================================================================
// callUpstreamAPI 调用上游 Anthropic API // callUpstreamAPI 通过 AntigravityGatewayService 调用上游 API。
// 这是关键方法:需要注入所有伪装信息 // 复用账号池调度、模型映射、TLS 指纹伪装、token 刷新和重试逻辑。
//
// 伪装层包括:
// 1. User-Agent来自 metadata 或动态生成)
// 2. 设备指纹machine_id, mac_machine_id, dev_device_id, sqm_id
// 3. TLS 指纹(通过 http.Transport 处理)
// 4. OAuth token 自动刷新
// 5. 请求头完整性
func (svc *LanguageServerService) callUpstreamAPI( func (svc *LanguageServerService) callUpstreamAPI(
ctx context.Context, ctx context.Context,
session *CascadeSession, session *CascadeSession,
updateChan chan<- interface{}, updateChan chan<- interface{},
) { ) {
// 检查上游配置 if svc.antigravitySvc == nil || svc.accountRepo == nil {
if svc.upstreamBaseURL == "" || svc.upstreamAPIKey == "" {
svc.logger.Error("upstream api configuration missing",
"has_base_url", svc.upstreamBaseURL != "",
"has_api_key", svc.upstreamAPIKey != "",
)
updateChan <- map[string]interface{}{ updateChan <- map[string]interface{}{
"type": "error", "type": "error",
"error": "upstream api not configured", "error": "antigravity gateway not configured",
} }
return return
} }
// 1. 准备请求体 // 1. 选取第一个可用的 Antigravity 账号
requestBody := map[string]interface{}{ accounts, err := svc.accountRepo.ListByPlatform(ctx, PlatformAntigravity)
"model": session.ModelName, if err != nil || len(accounts) == 0 {
"messages": session.Messages, svc.logger.Error("no antigravity accounts available", "session_id", session.ID, "error", err)
"stream": true, updateChan <- map[string]interface{}{
"type": "error",
"error": "no antigravity accounts available",
}
return
} }
account := &accounts[0]
// 2. 准备 Claude 格式请求体
requestBody := map[string]interface{}{
"model": session.ModelName,
"messages": session.Messages,
"stream": true,
"max_tokens": 8192,
}
bodyJSON, err := json.Marshal(requestBody) bodyJSON, err := json.Marshal(requestBody)
if err != nil { if err != nil {
svc.logger.Error("failed to marshal request", svc.logger.Error("failed to marshal request", "session_id", session.ID, "error", err)
"session_id", session.ID,
"error", err,
)
updateChan <- map[string]interface{}{ updateChan <- map[string]interface{}{
"type": "error", "type": "error",
"error": "failed to prepare request", "error": "failed to prepare request",
@ -427,87 +423,44 @@ func (svc *LanguageServerService) callUpstreamAPI(
return return
} }
// 2. 构建上游请求 URL svc.logger.Debug("forwarding via antigravity", "session_id", session.ID, "model", session.ModelName, "account_id", account.ID)
upstreamURL := svc.upstreamBaseURL + "/v1/messages"
// 3. 创建 HTTP 请求 // 3. 通过 AntigravityGatewayService 转发(完整 TLS 指纹 + token 刷新 + 重试)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, upstreamURL, bytes.NewReader(bodyJSON)) respBody, statusCode, err := svc.antigravitySvc.ForwardRaw(ctx, account, bodyJSON)
if err != nil { if err != nil {
svc.logger.Error("failed to create request", svc.logger.Error("upstream request failed", "session_id", session.ID, "error", err)
"session_id", session.ID,
"error", err,
)
updateChan <- map[string]interface{}{
"type": "error",
"error": "failed to create request",
}
return
}
// 4. 设置基础请求头
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+session.Token)
req.Header.Set("x-api-key", session.Token) // Claude API 兼容
// 5. 应用伪装信息
if userAgent := session.Metadata["user-agent"]; userAgent != "" {
req.Header.Set("User-Agent", userAgent)
}
// 提取其他伪装 headers如果在 metadata 中)
if customHeaders := session.Metadata["custom-headers"]; customHeaders != "" {
// 可以在这里解析并应用自定义 headers
}
svc.logger.Debug("sending upstream request",
"session_id", session.ID,
"url", upstreamURL,
"model", session.ModelName,
)
// 6. 发送请求
resp, err := svc.httpUpstream.Do(req, "", 0, 10)
if err != nil {
svc.logger.Error("upstream request failed",
"session_id", session.ID,
"error", err,
)
updateChan <- map[string]interface{}{ updateChan <- map[string]interface{}{
"type": "error", "type": "error",
"error": fmt.Sprintf("upstream request failed: %v", err), "error": fmt.Sprintf("upstream request failed: %v", err),
} }
return return
} }
defer func() { _ = resp.Body.Close() }() defer func() { _ = respBody.Close() }()
// 7. 处理错误响应 // 4. 处理错误响应
if resp.StatusCode >= 400 { if statusCode >= 400 {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) body, _ := io.ReadAll(io.LimitReader(respBody, 2<<20))
svc.logger.Error("upstream error response", svc.logger.Error("upstream error response", "session_id", session.ID, "status_code", statusCode, "body", string(body))
"session_id", session.ID,
"status_code", resp.StatusCode,
"body", string(respBody),
)
updateChan <- map[string]interface{}{ updateChan <- map[string]interface{}{
"type": "error", "type": "error",
"status_code": resp.StatusCode, "status_code": statusCode,
"error": string(respBody), "error": string(body),
} }
return return
} }
// 8. 处理流式响应 // 5. 流式转发响应
svc.streamUpstreamResponse(ctx, session.ID, resp, updateChan) svc.streamUpstreamResponse(ctx, session.ID, respBody, updateChan)
} }
// streamUpstreamResponse 处理上游 SSE 流式响应 // streamUpstreamResponse 处理上游 SSE 流式响应
func (svc *LanguageServerService) streamUpstreamResponse( func (svc *LanguageServerService) streamUpstreamResponse(
ctx context.Context, ctx context.Context,
sessionID string, sessionID string,
resp *http.Response, body io.ReadCloser,
updateChan chan<- interface{}, updateChan chan<- interface{},
) { ) {
scanner := bufio.NewScanner(resp.Body) scanner := bufio.NewScanner(body)
// 设置合理的缓冲区大小 // 设置合理的缓冲区大小
scanner.Buffer(make([]byte, 64*1024), 512*1024) scanner.Buffer(make([]byte, 64*1024), 512*1024)

View File

@ -0,0 +1,353 @@
package service
import (
"context"
"fmt"
"io/fs"
"log/slog"
"net/http"
"os"
"path/filepath"
"time"
connect "connectrpc.com/connect"
"github.com/Wei-Shaw/sub2api/internal/gen/language_server_pb"
"github.com/Wei-Shaw/sub2api/internal/gen/language_server_pbconnect"
"github.com/Wei-Shaw/sub2api/internal/pkg/antigravity"
"google.golang.org/protobuf/types/known/timestamppb"
)
const upstreamLSRPCBaseURL = "https://cloudcode-pa.googleapis.com"
// LSRPCHandler implements LanguageServerServiceHandler by proxying to the real upstream
// lsrpc service using OAuth tokens obtained from AntigravityGatewayService.
// File RPCs (ReadFile/WriteFile/ReadDir/etc.) operate on the local filesystem.
type LSRPCHandler struct {
language_server_pbconnect.UnimplementedLanguageServerServiceHandler
antigravitySvc *AntigravityGatewayService
accountRepo AccountRepository
logger *slog.Logger
}
// NewLSRPCHandler creates a new LSRPCHandler.
func NewLSRPCHandler(
antigravitySvc *AntigravityGatewayService,
accountRepo AccountRepository,
logger *slog.Logger,
) *LSRPCHandler {
if logger == nil {
logger = slog.Default()
}
return &LSRPCHandler{
antigravitySvc: antigravitySvc,
accountRepo: accountRepo,
logger: logger,
}
}
// upstreamClient creates a connectrpc client to the real lsrpc upstream,
// authenticated with the OAuth token from the given account.
func (h *LSRPCHandler) upstreamClient(ctx context.Context) (language_server_pbconnect.LanguageServerServiceClient, error) {
accounts, err := h.accountRepo.ListByPlatform(ctx, PlatformAntigravity)
if err != nil || len(accounts) == 0 {
return nil, fmt.Errorf("no antigravity accounts available: %w", err)
}
account := &accounts[0]
tokenProvider := h.antigravitySvc.GetTokenProvider()
if tokenProvider == nil {
return nil, fmt.Errorf("antigravity token provider not configured")
}
accessToken, err := tokenProvider.GetAccessToken(ctx, account)
if err != nil {
return nil, fmt.Errorf("failed to get access token: %w", err)
}
httpClient := &http.Client{
Timeout: 5 * time.Minute,
Transport: &bearerTransport{
base: http.DefaultTransport,
token: accessToken,
},
}
client := language_server_pbconnect.NewLanguageServerServiceClient(
httpClient,
upstreamLSRPCBaseURL,
connect.WithGRPC(),
)
return client, nil
}
// bearerTransport injects Authorization: Bearer <token> into every request.
type bearerTransport struct {
base http.RoundTripper
token string
}
func (t *bearerTransport) RoundTrip(req *http.Request) (*http.Response, error) {
clone := req.Clone(req.Context())
clone.Header.Set("Authorization", "Bearer "+t.token)
return t.base.RoundTrip(clone)
}
// ============================================================================
// Cascade RPCs — proxied to real upstream
// ============================================================================
func (h *LSRPCHandler) StartCascade(
ctx context.Context,
req *connect.Request[language_server_pb.StartCascadeRequest],
) (*connect.Response[language_server_pb.StartCascadeResponse], error) {
client, err := h.upstreamClient(ctx)
if err != nil {
return nil, connect.NewError(connect.CodeUnavailable, err)
}
return client.StartCascade(ctx, req)
}
func (h *LSRPCHandler) SendUserCascadeMessage(
ctx context.Context,
req *connect.Request[language_server_pb.SendUserCascadeMessageRequest],
stream *connect.ServerStream[language_server_pb.CascadeReactiveUpdate],
) error {
client, err := h.upstreamClient(ctx)
if err != nil {
return connect.NewError(connect.CodeUnavailable, err)
}
upstreamStream, err := client.SendUserCascadeMessage(ctx, req)
if err != nil {
return err
}
defer upstreamStream.Close()
for upstreamStream.Receive() {
if err := stream.Send(upstreamStream.Msg()); err != nil {
return err
}
}
return upstreamStream.Err()
}
func (h *LSRPCHandler) CancelCascadeInvocation(
ctx context.Context,
req *connect.Request[language_server_pb.CancelCascadeInvocationRequest],
) (*connect.Response[language_server_pb.CancelCascadeInvocationResponse], error) {
client, err := h.upstreamClient(ctx)
if err != nil {
return nil, connect.NewError(connect.CodeUnavailable, err)
}
return client.CancelCascadeInvocation(ctx, req)
}
func (h *LSRPCHandler) AcknowledgeCascadeCodeEdit(
ctx context.Context,
req *connect.Request[language_server_pb.AcknowledgeCascadeCodeEditRequest],
) (*connect.Response[language_server_pb.AcknowledgeCascadeCodeEditResponse], error) {
client, err := h.upstreamClient(ctx)
if err != nil {
return nil, connect.NewError(connect.CodeUnavailable, err)
}
return client.AcknowledgeCascadeCodeEdit(ctx, req)
}
// ============================================================================
// Model config RPCs — proxied to real upstream
// ============================================================================
func (h *LSRPCHandler) GetCascadeModelConfigs(
ctx context.Context,
req *connect.Request[language_server_pb.GetCascadeModelConfigsRequest],
) (*connect.Response[language_server_pb.GetCascadeModelConfigsResponse], error) {
client, err := h.upstreamClient(ctx)
if err != nil {
// Fall back to static list when upstream unavailable.
return connect.NewResponse(&language_server_pb.GetCascadeModelConfigsResponse{
Models: staticCascadeModels(),
}), nil
}
resp, err := client.GetCascadeModelConfigs(ctx, req)
if err != nil {
return connect.NewResponse(&language_server_pb.GetCascadeModelConfigsResponse{
Models: staticCascadeModels(),
}), nil
}
return resp, nil
}
func (h *LSRPCHandler) GetCommandModelConfigs(
ctx context.Context,
req *connect.Request[language_server_pb.GetCommandModelConfigsRequest],
) (*connect.Response[language_server_pb.GetCommandModelConfigsResponse], error) {
client, err := h.upstreamClient(ctx)
if err != nil {
return connect.NewResponse(&language_server_pb.GetCommandModelConfigsResponse{
Models: staticCascadeModels(),
}), nil
}
resp, err := client.GetCommandModelConfigs(ctx, req)
if err != nil {
return connect.NewResponse(&language_server_pb.GetCommandModelConfigsResponse{
Models: staticCascadeModels(),
}), nil
}
return resp, nil
}
// staticCascadeModels returns a hard-coded model list as fallback.
func staticCascadeModels() []*language_server_pb.ModelConfig {
return []*language_server_pb.ModelConfig{
{Name: "claude-opus-4-7", DisplayName: "Claude Opus 4.7", MaxTokens: 200000, SupportsThinking: true, ThinkingBudget: 32000, SupportsImages: true, Provider: "anthropic"},
{Name: "claude-opus-4-6", DisplayName: "Claude Opus 4.6", MaxTokens: 200000, SupportsThinking: true, ThinkingBudget: 32000, SupportsImages: true, Provider: "anthropic"},
{Name: "claude-sonnet-4-6", DisplayName: "Claude Sonnet 4.6", MaxTokens: 200000, SupportsImages: true, Provider: "anthropic"},
{Name: "claude-haiku-4-5", DisplayName: "Claude Haiku 4.5", MaxTokens: 200000, SupportsImages: true, Provider: "anthropic"},
}
}
// ============================================================================
// File RPCs — local filesystem implementation
// ============================================================================
func (h *LSRPCHandler) ReadFile(
ctx context.Context,
req *connect.Request[language_server_pb.ReadFileRequest],
) (*connect.Response[language_server_pb.ReadFileResponse], error) {
path := req.Msg.GetPath()
data, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("file not found: %s", path))
}
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&language_server_pb.ReadFileResponse{
Content: string(data),
}), nil
}
func (h *LSRPCHandler) WriteFile(
ctx context.Context,
req *connect.Request[language_server_pb.WriteFileRequest],
) (*connect.Response[language_server_pb.WriteFileResponse], error) {
path := req.Msg.GetPath()
if req.Msg.GetCreateParent() {
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
}
if err := os.WriteFile(path, []byte(req.Msg.GetContent()), 0o644); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&language_server_pb.WriteFileResponse{}), nil
}
func (h *LSRPCHandler) ReadDir(
ctx context.Context,
req *connect.Request[language_server_pb.ReadDirRequest],
) (*connect.Response[language_server_pb.ReadDirResponse], error) {
path := req.Msg.GetPath()
entries, err := os.ReadDir(path)
if err != nil {
if os.IsNotExist(err) {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("directory not found: %s", path))
}
return nil, connect.NewError(connect.CodeInternal, err)
}
files := make([]*language_server_pb.FileInfo, 0, len(entries))
for _, entry := range entries {
info, err := entry.Info()
if err != nil {
continue
}
files = append(files, fileInfoFromOS(entry.Name(), info))
}
return connect.NewResponse(&language_server_pb.ReadDirResponse{
Files: files,
}), nil
}
func (h *LSRPCHandler) DeleteFileOrDirectory(
ctx context.Context,
req *connect.Request[language_server_pb.DeleteFileOrDirectoryRequest],
) (*connect.Response[language_server_pb.DeleteFileOrDirectoryResponse], error) {
path := req.Msg.GetPath()
if err := os.RemoveAll(path); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&language_server_pb.DeleteFileOrDirectoryResponse{}), nil
}
func (h *LSRPCHandler) StatUri(
ctx context.Context,
req *connect.Request[language_server_pb.StatUriRequest],
) (*connect.Response[language_server_pb.StatUriResponse], error) {
path := req.Msg.GetPath()
info, err := os.Stat(path)
if err != nil {
if os.IsNotExist(err) {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("path not found: %s", path))
}
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&language_server_pb.StatUriResponse{
FileInfo: fileInfoFromOS(info.Name(), info),
}), nil
}
func (h *LSRPCHandler) WatchDirectory(
ctx context.Context,
req *connect.Request[language_server_pb.WatchDirectoryRequest],
stream *connect.ServerStream[language_server_pb.WatchDirectoryResponse],
) error {
// Block until context is cancelled — real FS watching requires fsnotify which
// is not in the dependency graph yet. This satisfies the interface contract
// without crashing; the client will get an EOF when the connection closes.
<-ctx.Done()
return nil
}
// ============================================================================
// Health RPCs
// ============================================================================
func (h *LSRPCHandler) Heartbeat(
ctx context.Context,
req *connect.Request[language_server_pb.HeartbeatRequest],
) (*connect.Response[language_server_pb.HeartbeatResponse], error) {
return connect.NewResponse(&language_server_pb.HeartbeatResponse{
Healthy: true,
Version: "sub2api",
}), nil
}
func (h *LSRPCHandler) GetStatus(
ctx context.Context,
req *connect.Request[language_server_pb.GetStatusRequest],
) (*connect.Response[language_server_pb.GetStatusResponse], error) {
return connect.NewResponse(&language_server_pb.GetStatusResponse{
Status: "running",
Version: antigravity.BaseURL,
}), nil
}
// ============================================================================
// Helpers
// ============================================================================
func fileInfoFromOS(name string, info fs.FileInfo) *language_server_pb.FileInfo {
t := language_server_pb.FileInfo_FILE
if info.IsDir() {
t = language_server_pb.FileInfo_DIRECTORY
} else if info.Mode()&os.ModeSymlink != 0 {
t = language_server_pb.FileInfo_SYMLINK
}
return &language_server_pb.FileInfo{
Path: name,
Type: t,
Size: info.Size(),
ModifiedTime: timestamppb.New(info.ModTime()),
}
}

View File

@ -472,8 +472,8 @@ var ProviderSet = wire.NewSet(
) )
// ProvideLanguageServerService creates LanguageServerService with injected dependencies // ProvideLanguageServerService creates LanguageServerService with injected dependencies
func ProvideLanguageServerService(httpUpstream HTTPUpstream) *LanguageServerService { func ProvideLanguageServerService(httpUpstream HTTPUpstream, antigravitySvc *AntigravityGatewayService, accountRepo AccountRepository) *LanguageServerService {
return NewLanguageServerService(slog.Default(), httpUpstream) return NewLanguageServerService(slog.Default(), httpUpstream, antigravitySvc, accountRepo)
} }
// ProvidePaymentConfigService wraps NewPaymentConfigService to accept the named // ProvidePaymentConfigService wraps NewPaymentConfigService to accept the named

BIN
backend/lsrpc_test Executable file

Binary file not shown.

View File

@ -1,15 +1,10 @@
# ============================================================================= # =============================================================================
# Sub2API Docker Compose Configuration (负载均衡版) # Sub2API Docker Compose Configuration
# ============================================================================= # =============================================================================
# Quick Start: # Quick Start:
# 1. Copy .env.example to .env and configure # 1. Copy .env.example to .env and configure
# 2. docker compose up -d # 2. docker compose up -d
# 3. Check logs: docker compose logs -f # 3. Check logs: docker compose logs -f
# 4. Access: http://localhost (via nginx)
#
# 扩缩容:
# docker compose up -d --scale sub2api=5 # 扩到 5 个实例
# docker compose up -d --scale sub2api=2 # 缩回 2 个实例
# #
# 注意事项: # 注意事项:
# - JWT_SECRET / TOTP_ENCRYPTION_KEY 必须固定,多实例共享同一个值 # - JWT_SECRET / TOTP_ENCRYPTION_KEY 必须固定,多实例共享同一个值
@ -20,36 +15,7 @@
services: services:
# =========================================================================== # ===========================================================================
# Nginx 负载均衡(入口) # Sub2API Application
# ===========================================================================
nginx:
image: nginx:alpine
container_name: sub2api-nginx
restart: unless-stopped
ulimits:
nofile:
soft: 65535
hard: 65535
ports:
- "0.0.0.0:80:80"
- "0.0.0.0:443:443"
volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro
- ./nginx/certs:/etc/nginx/certs:ro
depends_on:
sub2api:
condition: service_healthy
networks:
- sub2api-network
healthcheck:
test: [ "CMD", "wget", "-q", "-T", "3", "-O", "/dev/null", "http://localhost/health" ]
interval: 30s
timeout: 10s
retries: 3
start_period: 10s
# ===========================================================================
# Sub2API Application多实例通过 --scale 控制数量)
# =========================================================================== # ===========================================================================
sub2api: sub2api:
image: docker.io/zfc931912343/sub2api:latest image: docker.io/zfc931912343/sub2api:latest
@ -58,9 +24,8 @@ services:
nofile: nofile:
soft: 100000 soft: 100000
hard: 100000 hard: 100000
# 不直接暴露端口,由 nginx 代理 ports:
expose: - "0.0.0.0:80:8080"
- "8080"
volumes: volumes:
- sub2api_data:/app/data - sub2api_data:/app/data
# Optional: 挂载自定义 config.yaml先从 config.example.yaml 复制并修改) # Optional: 挂载自定义 config.yaml先从 config.example.yaml 复制并修改)