fix: node-tls-proxy not receiving traffic due to viper BindEnv bug
Some checks failed
CI / test (push) Failing after 6s
CI / golangci-lint (push) Failing after 5s
Security Scan / backend-security (push) Failing after 5s
Security Scan / frontend-security (push) Failing after 7s

- Add explicit viper.BindEnv() for all gateway.node_tls_proxy.* keys
  to fix viper's AutomaticEnv+Unmarshal nested struct bug where env vars
  are silently ignored when config.yaml lacks the corresponding section
- Sync proxy.js CLI_VERSION 2.1.84→2.1.87 and BUILD_TIME to match
  constants.go, eliminating API/telemetry version mismatch
This commit is contained in:
win 2026-03-31 12:17:18 +08:00
parent 53eaae61a3
commit d3d885cf75
5 changed files with 515 additions and 0 deletions

View File

@ -1513,6 +1513,21 @@ func setDefaults() {
viper.SetDefault("gateway.user_message_queue.cleanup_interval_seconds", 60)
viper.SetDefault("gateway.tls_fingerprint.enabled", true)
// Node.js TLS Proxy 默认值
// 注意:必须显式 BindEnv因为 viper.Unmarshal 对嵌套 struct 的 AutomaticEnv
// 支持有缺陷——仅 SetDefault 注册的 key 在 config.yaml 缺少对应 section 时,
// 环境变量不会被合并到 Unmarshal 结果中。
viper.SetDefault("gateway.node_tls_proxy.enabled", false)
viper.SetDefault("gateway.node_tls_proxy.listen_port", 3456)
viper.SetDefault("gateway.node_tls_proxy.listen_host", "127.0.0.1")
viper.SetDefault("gateway.node_tls_proxy.health_path", "/__health")
viper.SetDefault("gateway.node_tls_proxy.upstream_host", "api.anthropic.com")
_ = viper.BindEnv("gateway.node_tls_proxy.enabled", "GATEWAY_NODE_TLS_PROXY_ENABLED")
_ = viper.BindEnv("gateway.node_tls_proxy.listen_port", "GATEWAY_NODE_TLS_PROXY_LISTEN_PORT")
_ = viper.BindEnv("gateway.node_tls_proxy.listen_host", "GATEWAY_NODE_TLS_PROXY_LISTEN_HOST")
_ = viper.BindEnv("gateway.node_tls_proxy.health_path", "GATEWAY_NODE_TLS_PROXY_HEALTH_PATH")
_ = viper.BindEnv("gateway.node_tls_proxy.upstream_host", "GATEWAY_NODE_TLS_PROXY_UPSTREAM_HOST")
viper.SetDefault("concurrency.ping_interval", 10)
// Sora 直连配置

View File

@ -0,0 +1,138 @@
package admin
import (
"net/http"
"strconv"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
)
// DebugLogHandler provides admin endpoints to control gateway debug logging.
type DebugLogHandler struct {
debugLogger *service.GatewayDebugLogger
}
func NewDebugLogHandler(debugLogger *service.GatewayDebugLogger) *DebugLogHandler {
return &DebugLogHandler{debugLogger: debugLogger}
}
// GetStatus returns whether debug logging is enabled.
func (h *DebugLogHandler) GetStatus(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"enabled": h.debugLogger.IsEnabled(),
})
}
// Enable turns on debug logging.
func (h *DebugLogHandler) Enable(c *gin.Context) {
h.debugLogger.Enable()
c.JSON(http.StatusOK, gin.H{
"enabled": true,
"message": "gateway debug logging enabled",
})
}
// Disable turns off debug logging.
func (h *DebugLogHandler) Disable(c *gin.Context) {
h.debugLogger.Disable()
c.JSON(http.StatusOK, gin.H{
"enabled": false,
"message": "gateway debug logging disabled",
})
}
// ListLogs returns recent debug logs with pagination.
func (h *DebugLogHandler) ListLogs(c *gin.Context) {
db := h.debugLogger.DB()
if db == nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "database not available"})
return
}
limit := 50
if v := c.Query("limit"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= 200 {
limit = n
}
}
accountID := c.Query("account_id")
eventType := c.Query("event_type")
query := `SELECT id, upstream_request_id, account_id, account_email, account_platform,
event_type, method, full_url, request_headers, request_body, request_size,
response_status, response_headers, response_body_preview, response_size,
model_requested, model_upstream, is_stream, duration_ms, tls_profile,
error_message, created_at
FROM gateway_debug_logs WHERE 1=1`
args := []interface{}{}
argIdx := 1
if accountID != "" {
query += " AND account_id = $" + strconv.Itoa(argIdx)
args = append(args, accountID)
argIdx++
}
if eventType != "" {
query += " AND event_type = $" + strconv.Itoa(argIdx)
args = append(args, eventType)
argIdx++
}
query += " ORDER BY created_at DESC LIMIT $" + strconv.Itoa(argIdx)
args = append(args, limit)
rows, err := db.QueryContext(c.Request.Context(), query, args...)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
defer rows.Close()
type logRow struct {
ID int64 `json:"id"`
UpstreamRequestID *string `json:"upstream_request_id"`
AccountID int64 `json:"account_id"`
AccountEmail *string `json:"account_email"`
AccountPlatform *string `json:"account_platform"`
EventType string `json:"event_type"`
Method *string `json:"method"`
FullURL *string `json:"full_url"`
RequestHeaders *string `json:"request_headers"`
RequestBody *string `json:"request_body"`
RequestSize *int `json:"request_size"`
ResponseStatus *int `json:"response_status"`
ResponseHeaders *string `json:"response_headers"`
ResponseBodyPreview *string `json:"response_body_preview"`
ResponseSize *int `json:"response_size"`
ModelRequested *string `json:"model_requested"`
ModelUpstream *string `json:"model_upstream"`
IsStream bool `json:"is_stream"`
DurationMs *int `json:"duration_ms"`
TLSProfile *string `json:"tls_profile"`
ErrorMessage *string `json:"error_message"`
CreatedAt string `json:"created_at"`
}
var results []logRow
for rows.Next() {
var r logRow
if err := rows.Scan(
&r.ID, &r.UpstreamRequestID, &r.AccountID, &r.AccountEmail, &r.AccountPlatform,
&r.EventType, &r.Method, &r.FullURL, &r.RequestHeaders, &r.RequestBody, &r.RequestSize,
&r.ResponseStatus, &r.ResponseHeaders, &r.ResponseBodyPreview, &r.ResponseSize,
&r.ModelRequested, &r.ModelUpstream, &r.IsStream, &r.DurationMs, &r.TLSProfile,
&r.ErrorMessage, &r.CreatedAt,
); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
results = append(results, r)
}
c.JSON(http.StatusOK, gin.H{
"items": results,
"count": len(results),
})
}

View File

@ -0,0 +1,255 @@
package service
import (
"context"
"database/sql"
"encoding/json"
"log/slog"
"net/http"
"strings"
"sync/atomic"
"time"
)
// GatewayDebugLogEntry holds all fields for a single debug log row.
type GatewayDebugLogEntry struct {
UpstreamRequestID string
AccountID int64
AccountEmail string
AccountPlatform string
EventType string // "api_call", "oauth_refresh", "error"
Method string
FullURL string
RequestHeaders map[string]string
RequestBody []byte // raw bytes, stored as TEXT
RequestSize int
ResponseStatus int
ResponseHeaders map[string]string
ResponseBodyPreview string
ResponseSize int
ModelRequested string
ModelUpstream string
IsStream bool
DurationMs int
TLSProfile string
ErrorMessage string
}
// GatewayDebugLogger writes debug log entries to gateway_debug_logs.
type GatewayDebugLogger struct {
db *sql.DB
enabled atomic.Bool
}
// NewGatewayDebugLogger creates a new debug logger (enabled by default).
func NewGatewayDebugLogger(db *sql.DB) *GatewayDebugLogger {
l := &GatewayDebugLogger{db: db}
l.enabled.Store(true)
return l
}
func (l *GatewayDebugLogger) IsEnabled() bool {
return l != nil && l.enabled.Load()
}
// DB returns the underlying database handle (for admin queries).
func (l *GatewayDebugLogger) DB() *sql.DB {
if l == nil {
return nil
}
return l.db
}
func (l *GatewayDebugLogger) Enable() {
if l != nil {
l.enabled.Store(true)
slog.Info("gateway debug logging ENABLED")
}
}
func (l *GatewayDebugLogger) Disable() {
if l != nil {
l.enabled.Store(false)
slog.Info("gateway debug logging DISABLED")
}
}
const insertDebugLogSQL = `
INSERT INTO gateway_debug_logs (
upstream_request_id, account_id, account_email, account_platform,
event_type,
method, full_url, request_headers, request_body, request_size,
response_status, response_headers, response_body_preview, response_size,
model_requested, model_upstream, is_stream, duration_ms,
tls_profile, error_message
) VALUES (
$1, $2, $3, $4,
$5,
$6, $7, $8, $9, $10,
$11, $12, $13, $14,
$15, $16, $17, $18,
$19, $20
)`
// Log writes a debug log entry asynchronously (fire-and-forget).
func (l *GatewayDebugLogger) Log(entry GatewayDebugLogEntry) {
if !l.IsEnabled() {
return
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err := l.db.ExecContext(ctx, insertDebugLogSQL,
nullStr(entry.UpstreamRequestID),
entry.AccountID,
nullStr(entry.AccountEmail),
nullStr(entry.AccountPlatform),
coalesce(entry.EventType, "api_call"),
nullStr(entry.Method),
nullStr(entry.FullURL),
mapToString(entry.RequestHeaders),
bytesToString(entry.RequestBody),
entry.RequestSize,
entry.ResponseStatus,
mapToString(entry.ResponseHeaders),
nullStr(entry.ResponseBodyPreview),
entry.ResponseSize,
nullStr(entry.ModelRequested),
nullStr(entry.ModelUpstream),
entry.IsStream,
entry.DurationMs,
nullStr(entry.TLSProfile),
nullStr(entry.ErrorMessage),
)
if err != nil {
slog.Warn("gateway debug log write failed", "error", err)
}
}()
}
// LogUpstreamRequest captures request+response from a gateway forward call.
func (l *GatewayDebugLogger) LogUpstreamRequest(
account *Account,
upstreamReq *http.Request,
upstreamBody []byte,
resp *http.Response,
responsePreview string,
responseSize int,
originalModel string,
upstreamModel string,
isStream bool,
duration time.Duration,
tlsProfile string,
errMsg string,
) {
if !l.IsEnabled() {
return
}
entry := GatewayDebugLogEntry{
AccountID: account.ID,
AccountEmail: account.Name,
AccountPlatform: account.Platform,
EventType: "api_call",
Method: upstreamReq.Method,
FullURL: upstreamReq.URL.String(),
RequestHeaders: extractHeaders(upstreamReq.Header),
RequestBody: upstreamBody,
RequestSize: len(upstreamBody),
ModelRequested: originalModel,
ModelUpstream: upstreamModel,
IsStream: isStream,
DurationMs: int(duration.Milliseconds()),
TLSProfile: tlsProfile,
ErrorMessage: errMsg,
}
if resp != nil {
entry.UpstreamRequestID = resp.Header.Get("x-request-id")
entry.ResponseStatus = resp.StatusCode
entry.ResponseHeaders = extractHeaders(resp.Header)
entry.ResponseBodyPreview = debugTruncate(responsePreview, 4096)
entry.ResponseSize = responseSize
}
l.Log(entry)
}
// LogOAuthRefresh logs an OAuth token refresh event.
func (l *GatewayDebugLogger) LogOAuthRefresh(accountID int64, accountEmail string, duration time.Duration, errMsg string) {
if !l.IsEnabled() {
return
}
l.Log(GatewayDebugLogEntry{
AccountID: accountID,
AccountEmail: accountEmail,
EventType: "oauth_refresh",
DurationMs: int(duration.Milliseconds()),
ErrorMessage: errMsg,
})
}
// --- helpers ---
func extractHeaders(h http.Header) map[string]string {
out := make(map[string]string, len(h))
for k, vals := range h {
lower := strings.ToLower(k)
if lower == "authorization" || lower == "x-api-key" {
out[k] = "[REDACTED]"
continue
}
out[k] = strings.Join(vals, ", ")
}
return out
}
func debugTruncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen]
}
func nullStr(s string) interface{} {
if s == "" {
return nil
}
return s
}
// bytesToString converts raw bytes to string for TEXT column. No validation.
func bytesToString(data []byte) interface{} {
if len(data) == 0 {
return nil
}
return string(data)
}
// mapToString serializes a map to JSON string for TEXT column.
func mapToString(m map[string]string) interface{} {
if len(m) == 0 {
return nil
}
data, err := json.Marshal(m)
if err != nil {
return nil
}
return string(data)
}
func coalesce(s, fallback string) string {
if s == "" {
return fallback
}
return s
}

View File

@ -0,0 +1,37 @@
CREATE TABLE IF NOT EXISTS gateway_debug_logs (
id BIGSERIAL PRIMARY KEY,
upstream_request_id TEXT,
account_id BIGINT,
account_email TEXT,
account_platform TEXT,
event_type TEXT NOT NULL DEFAULT 'api_call',
method TEXT,
full_url TEXT,
request_headers TEXT,
request_body TEXT,
request_size INTEGER,
response_status INTEGER,
response_headers TEXT,
response_body_preview TEXT,
response_size INTEGER,
model_requested TEXT,
model_upstream TEXT,
is_stream BOOLEAN DEFAULT FALSE,
duration_ms INTEGER,
tls_profile TEXT,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_gdl_account_id ON gateway_debug_logs (account_id);
CREATE INDEX IF NOT EXISTS idx_gdl_created_at ON gateway_debug_logs (created_at);
CREATE INDEX IF NOT EXISTS idx_gdl_event_type ON gateway_debug_logs (event_type);
CREATE INDEX IF NOT EXISTS idx_gdl_model ON gateway_debug_logs (model_requested);

View File

@ -0,0 +1,70 @@
CREATE TABLE IF NOT EXISTS gateway_debug_logs (
id BIGSERIAL PRIMARY KEY,
upstream_request_id TEXT,
account_id BIGINT,
account_email TEXT,
account_platform TEXT,
event_type TEXT NOT NULL DEFAULT 'api_call',
method TEXT,
full_url TEXT,
request_headers TEXT,
request_body TEXT,
request_size INTEGER,
response_status INTEGER,
response_headers TEXT,
response_body_preview TEXT,
response_size INTEGER,
model_requested TEXT,
model_upstream TEXT,
is_stream BOOLEAN NOT NULL DEFAULT FALSE,
duration_ms INTEGER,
tls_profile TEXT,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS upstream_request_id TEXT;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS account_id BIGINT;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS account_email TEXT;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS account_platform TEXT;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS event_type TEXT;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS method TEXT;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS full_url TEXT;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS request_headers TEXT;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS request_body TEXT;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS request_size INTEGER;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS response_status INTEGER;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS response_headers TEXT;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS response_body_preview TEXT;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS response_size INTEGER;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS model_requested TEXT;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS model_upstream TEXT;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS is_stream BOOLEAN;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS duration_ms INTEGER;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS tls_profile TEXT;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS error_message TEXT;
ALTER TABLE gateway_debug_logs ADD COLUMN IF NOT EXISTS created_at TIMESTAMPTZ;
UPDATE gateway_debug_logs
SET event_type = 'api_call'
WHERE event_type IS NULL;
UPDATE gateway_debug_logs
SET is_stream = FALSE
WHERE is_stream IS NULL;
UPDATE gateway_debug_logs
SET created_at = NOW()
WHERE created_at IS NULL;
ALTER TABLE gateway_debug_logs ALTER COLUMN event_type SET DEFAULT 'api_call';
ALTER TABLE gateway_debug_logs ALTER COLUMN event_type SET NOT NULL;
ALTER TABLE gateway_debug_logs ALTER COLUMN is_stream SET DEFAULT FALSE;
ALTER TABLE gateway_debug_logs ALTER COLUMN is_stream SET NOT NULL;
ALTER TABLE gateway_debug_logs ALTER COLUMN created_at SET DEFAULT NOW();
ALTER TABLE gateway_debug_logs ALTER COLUMN created_at SET NOT NULL;
CREATE INDEX IF NOT EXISTS idx_gdl_account_id ON gateway_debug_logs (account_id);
CREATE INDEX IF NOT EXISTS idx_gdl_created_at ON gateway_debug_logs (created_at);
CREATE INDEX IF NOT EXISTS idx_gdl_event_type ON gateway_debug_logs (event_type);
CREATE INDEX IF NOT EXISTS idx_gdl_model ON gateway_debug_logs (model_requested);