From 2f817dd24808fe3d3ee0a6da9d7780931f6fc7d2 Mon Sep 17 00:00:00 2001 From: win Date: Wed, 1 Apr 2026 08:24:49 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=A7=BB=E9=99=A4=20Node.js=20TLS=20?= =?UTF-8?q?=E4=BB=A3=E7=90=86=E4=BE=9D=E8=B5=96=EF=BC=8C=E5=85=A8=E9=83=A8?= =?UTF-8?q?=E8=B5=B0=20Go=20=E5=8E=9F=E7=94=9F=20utls=20=E6=8C=87=E7=BA=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Do() 路由从 doViaNodeTLSProxy(转发到 localhost:3456 Node.js 进程) 改为 doWithTLSFingerprint(直接使用 Go utls dialer),解决 h2 connect timeout 问题(Node.js proxy 的 H2 路径不支持 per-account 代理隧道) - 新增 internal/pkg/telemetry 包,从 proxy.js 移植全部遥测逻辑: Anthropic event_logging/batch + Datadog log intake + 虚拟主机身份 + 会话状态管理 + process metrics 模拟 - 保留 proxy.js 中的 H1 降级修复作为备用 --- antigravity/node-tls-proxy/proxy.js | 219 +++++-- backend/internal/pkg/telemetry/telemetry.go | 568 ++++++++++++++++++ backend/internal/repository/http_upstream.go | 12 +- .../repository/http_upstream_antigravity.go | 83 ++- backend/internal/service/gateway_service.go | 19 + 5 files changed, 808 insertions(+), 93 deletions(-) create mode 100644 backend/internal/pkg/telemetry/telemetry.go diff --git a/antigravity/node-tls-proxy/proxy.js b/antigravity/node-tls-proxy/proxy.js index 868c5b55..1b8f015e 100644 --- a/antigravity/node-tls-proxy/proxy.js +++ b/antigravity/node-tls-proxy/proxy.js @@ -478,38 +478,86 @@ function emitPostRequestTelemetry(reqHeaders, statusCode, body) { } // ─── H2 session 管理 ──────────────────────────────────── -function getOrCreateH2Session(host) { - const existing = h2Sessions.get(host); - if (existing && !existing.closed && !existing.destroyed) return existing; - if (existing) { try { existing.close(); } catch (_) {} } +// h2Sessions key 改为 host+proxy 组合,避免不同代理的 session 混用 +function h2SessionKey(host, proxyUrl) { + return proxyUrl ? `${host}|${proxyUrl}` : host; +} + +async function getOrCreateH2Session(host, proxyUrl) { + const key = h2SessionKey(host, proxyUrl); + const existing = h2Sessions.get(key); + // 检查 session 是否仍然可用:connected 且未关闭 + // GOAWAY 后 session.connected 变为 false,必须重建 + if (existing && !existing.closed && !existing.destroyed && existing.connected) return existing; + if (existing) { + h2Sessions.delete(key); + try { existing.close(); } catch (_) {} + } + + let session; + if (proxyUrl) { + // 通过 CONNECT 隧道建立 h2 session(支持 HTTP CONNECT / SOCKS5) + const socket = await connectViaProxy(proxyUrl, host, 443); + session = http2.connect(`https://${host}`, { + createConnection: () => socket, + }); + log('info', 'h2_session_via_proxy', { host, proxy: redactProxyURL(proxyUrl) }); + } else { + session = http2.connect(`https://${host}`); + } - const session = http2.connect(`https://${host}`); session.on('error', (err) => { log('warn', 'h2_session_error', { host, error: err.message }); - h2Sessions.delete(host); + h2Sessions.delete(key); try { session.close(); } catch (_) {} }); - session.on('close', () => h2Sessions.delete(host)); - session.on('goaway', () => { h2Sessions.delete(host); try { session.close(); } catch (_) {} }); - session.setTimeout(IDLE_TIMEOUT, () => { session.close(); h2Sessions.delete(host); }); - h2Sessions.set(host, session); + session.on('close', () => h2Sessions.delete(key)); + session.on('goaway', (errorCode) => { + log('info', 'h2_goaway', { host, errorCode }); + h2Sessions.delete(key); + try { session.close(); } catch (_) {} + }); + session.setTimeout(IDLE_TIMEOUT, () => { session.close(); h2Sessions.delete(key); }); + h2Sessions.set(key, session); return session; } function waitForConnect(session) { if (session.connected) return Promise.resolve(); + // session 已断开(GOAWAY / 半关闭),不要等不会来的 connect 事件 + if (session.closed || session.destroyed) { + return Promise.reject(new Error('h2 session already closed')); + } return new Promise((resolve, reject) => { - session.once('connect', resolve); - session.once('error', reject); - const t = setTimeout(() => reject(new Error('h2 connect timeout')), CONNECT_TIMEOUT); - session.once('connect', () => clearTimeout(t)); + const onConnect = () => { clearTimeout(t); cleanup(); resolve(); }; + const onError = (err) => { clearTimeout(t); cleanup(); reject(err); }; + const onClose = () => { clearTimeout(t); cleanup(); reject(new Error('h2 session closed before connect')); }; + const cleanup = () => { + session.removeListener('connect', onConnect); + session.removeListener('error', onError); + session.removeListener('close', onClose); + }; + session.once('connect', onConnect); + session.once('error', onError); + session.once('close', onClose); + const t = setTimeout(() => { cleanup(); reject(new Error('h2 connect timeout')); }, CONNECT_TIMEOUT); }); } -// ─── CONNECT 隧道 ──────────────────────────────────────── +// ─── CONNECT 隧道(HTTP CONNECT + SOCKS5)───────────────── function connectViaProxy(proxyUrl, targetHost, targetPort) { + const proxy = new URL(proxyUrl); + const scheme = proxy.protocol.replace(':', '').toLowerCase(); + + if (scheme === 'socks5' || scheme === 'socks5h') { + return connectViaSocks5(proxy, targetHost, parseInt(targetPort, 10)); + } + return connectViaHttpConnect(proxy, targetHost, targetPort); +} + +// HTTP CONNECT 隧道 +function connectViaHttpConnect(proxy, targetHost, targetPort) { return new Promise((resolve, reject) => { - const proxy = new URL(proxyUrl); const conn = net.connect(parseInt(proxy.port || '80', 10), proxy.hostname, () => { const auth = proxy.username ? `Proxy-Authorization: Basic ${Buffer.from(`${decodeURIComponent(proxy.username)}:${decodeURIComponent(proxy.password || '')}`).toString('base64')}\r\n` @@ -531,6 +579,102 @@ function connectViaProxy(proxyUrl, targetHost, targetPort) { }); } +// SOCKS5 隧道 (RFC 1928 + RFC 1929 username/password auth) +function connectViaSocks5(proxy, targetHost, targetPort) { + return new Promise((resolve, reject) => { + const conn = net.connect(parseInt(proxy.port || '1080', 10), proxy.hostname); + conn.once('error', reject); + conn.setTimeout(CONNECT_TIMEOUT, () => conn.destroy(new Error('SOCKS5 timeout'))); + + const username = proxy.username ? decodeURIComponent(proxy.username) : ''; + const password = proxy.password ? decodeURIComponent(proxy.password) : ''; + const useAuth = !!(username || password); + + let step = 'greeting'; + + conn.once('connect', () => { + // Step 1: 发送 greeting — 支持的认证方式 + // 0x00 = 无认证, 0x02 = 用户名/密码 + if (useAuth) { + conn.write(Buffer.from([0x05, 0x02, 0x00, 0x02])); + } else { + conn.write(Buffer.from([0x05, 0x01, 0x00])); + } + }); + + let pending = Buffer.alloc(0); + conn.on('data', function onData(chunk) { + pending = Buffer.concat([pending, chunk]); + + if (step === 'greeting') { + if (pending.length < 2) return; + const ver = pending[0], method = pending[1]; + if (ver !== 0x05) { conn.destroy(); return reject(new Error(`SOCKS5 bad version: ${ver}`)); } + + if (method === 0x02 && useAuth) { + // Step 2: 用户名/密码认证 (RFC 1929) + step = 'auth'; + pending = pending.slice(2); + const uBuf = Buffer.from(username, 'utf8'); + const pBuf = Buffer.from(password, 'utf8'); + const authBuf = Buffer.alloc(3 + uBuf.length + pBuf.length); + authBuf[0] = 0x01; // auth version + authBuf[1] = uBuf.length; + uBuf.copy(authBuf, 2); + authBuf[2 + uBuf.length] = pBuf.length; + pBuf.copy(authBuf, 3 + uBuf.length); + conn.write(authBuf); + } else if (method === 0x00) { + // 无需认证,直接发 CONNECT + step = 'connect'; + pending = pending.slice(2); + sendSocks5Connect(conn, targetHost, targetPort); + } else { + conn.destroy(); + reject(new Error(`SOCKS5 unsupported auth method: ${method}`)); + } + } else if (step === 'auth') { + if (pending.length < 2) return; + const status = pending[1]; + if (status !== 0x00) { conn.destroy(); return reject(new Error(`SOCKS5 auth failed: ${status}`)); } + step = 'connect'; + pending = pending.slice(2); + sendSocks5Connect(conn, targetHost, targetPort); + } else if (step === 'connect') { + // 最小响应: VER(1) + REP(1) + RSV(1) + ATYP(1) + ADDR(variable) + PORT(2) + if (pending.length < 4) return; + const rep = pending[1]; + const atyp = pending[3]; + let minLen = 4 + 2; // base + port + if (atyp === 0x01) minLen += 4; // IPv4 + else if (atyp === 0x04) minLen += 16; // IPv6 + else if (atyp === 0x03 && pending.length > 4) minLen += 1 + pending[4]; // domain + else if (atyp === 0x03) return; // 等更多数据 + if (pending.length < minLen) return; + + conn.removeListener('data', onData); + if (rep !== 0x00) { conn.destroy(); return reject(new Error(`SOCKS5 connect failed: rep=${rep}`)); } + conn.setTimeout(0); + resolve(conn); + } + }); + }); +} + +function sendSocks5Connect(conn, host, port) { + // SOCKS5 CONNECT: VER(05) CMD(01=CONNECT) RSV(00) ATYP ADDR PORT + const hostBuf = Buffer.from(host, 'utf8'); + const buf = Buffer.alloc(4 + 1 + hostBuf.length + 2); + buf[0] = 0x05; // version + buf[1] = 0x01; // CONNECT + buf[2] = 0x00; // reserved + buf[3] = 0x03; // domain name + buf[4] = hostBuf.length; + hostBuf.copy(buf, 5); + buf.writeUInt16BE(port, 5 + hostBuf.length); + conn.write(buf); +} + // ─── 收集请求体 ────────────────────────────────────────── function collectBody(req) { return new Promise((resolve) => { @@ -542,10 +686,11 @@ function collectBody(req) { } // ─── H1 代理 ───────────────────────────────────────────── -function sendViaH1(targetHost, method, path, reqHeaders, body, res, savedHeaders) { +function sendViaH1(targetHost, method, path, reqHeaders, body, res, savedHeaders, explicitProxy) { return new Promise((resolve) => { const headers = { ...reqHeaders, host: targetHost }; ['x-forwarded-host', 'connection', 'keep-alive', 'proxy-connection', 'transfer-encoding'].forEach(h => delete headers[h]); + delete headers['x-upstream-proxy']; if (body.length > 0) headers['content-length'] = String(body.length); const opts = { hostname: targetHost, port: 443, path, method, headers, servername: targetHost, timeout: CONNECT_TIMEOUT }; @@ -569,7 +714,7 @@ function sendViaH1(targetHost, method, path, reqHeaders, body, res, savedHeaders if (err.message === 'socket hang up' && (Date.now() - startTime) < 2000) { log('info', 'h1_rejected_switching_to_h2', { host: targetHost }); h2Hosts.add(targetHost); - sendViaH2(targetHost, method, path, reqHeaders, body, res, savedHeaders).then(() => resolve('h2')); + sendViaH2(targetHost, method, path, reqHeaders, body, res, savedHeaders, false, explicitProxy).then(() => resolve('h2')); return; } log('error', 'h1_error', { error: err.message, host: targetHost, path }); @@ -580,10 +725,8 @@ function sendViaH1(targetHost, method, path, reqHeaders, body, res, savedHeaders proxyReq.end(body); }; - // 动态上游代理:优先使用 per-request 的 X-Upstream-Proxy,回退到全局 UPSTREAM_PROXY - const upstreamProxy = reqHeaders['x-upstream-proxy'] || UPSTREAM_PROXY; - // 清除内部 header,不传给上游 - delete headers['x-upstream-proxy']; + // 动态上游代理:使用显式传入的代理地址 + const upstreamProxy = explicitProxy || ''; if (upstreamProxy) { connectViaProxy(upstreamProxy, targetHost, 443) @@ -596,9 +739,9 @@ function sendViaH1(targetHost, method, path, reqHeaders, body, res, savedHeaders } // ─── H2 代理 ───────────────────────────────────────────── -async function sendViaH2(targetHost, method, path, reqHeaders, body, res, savedHeaders) { +async function sendViaH2(targetHost, method, path, reqHeaders, body, res, savedHeaders, _retried, proxyUrl) { try { - const session = getOrCreateH2Session(targetHost); + const session = await getOrCreateH2Session(targetHost, proxyUrl); await waitForConnect(session); const headers = {}; @@ -649,9 +792,14 @@ async function sendViaH2(targetHost, method, path, reqHeaders, body, res, savedH stream.setTimeout(CONNECT_TIMEOUT, () => stream.close()); stream.end(body); } catch (err) { - log('error', 'h2_exception', { error: err.message, host: targetHost }); + log('error', 'h2_exception', { error: err.message, host: targetHost, retried: !!_retried }); h2Sessions.delete(targetHost); - if (!res.headersSent) { res.writeHead(502); res.end(JSON.stringify({ error: 'upstream_connection_error' })); } + // 首次失败时重试一次(用全新 session) + if (!_retried && !res.headersSent) { + log('info', 'h2_retry_with_fresh_session', { host: targetHost, path }); + return sendViaH2(targetHost, method, path, reqHeaders, body, res, savedHeaders, true, proxyUrl); + } + if (!res.headersSent) { res.writeHead(502); res.end(JSON.stringify({ error: err.message })); } } } @@ -682,11 +830,11 @@ async function proxyRequest(req, res) { await new Promise(r => setTimeout(r, jitterMs)); // ── H2 / H1 路由策略 ────────────────────────────────────────────── - // 当存在 per-account 上游代理(X-Upstream-Proxy)时,强制走 H1: - // 1. H2 的 getOrCreateH2Session 不支持 CONNECT 隧道代理 - // 2. 真实 CLI 用 undici 默认的 HTTP/1.1(allowH2=false),H1 更贴合指纹 - // 无代理的直连请求仍可走 H2 以获得多路复用性能。 - const hasUpstreamProxy = !!(req.headers['x-upstream-proxy'] || UPSTREAM_PROXY); + // H2 现在支持通过 CONNECT 隧道代理,优先为 H2_PREFER_HOSTS 使用 h2。 + // 有代理时通过 connectViaProxy 建立隧道后再 h2 连接。 + const upstreamProxy = req.headers['x-upstream-proxy'] || UPSTREAM_PROXY; + // 清除内部 header,不传给上游(h2 路径也需要清理) + delete req.headers['x-upstream-proxy']; const H2_PREFER_HOSTS = new Set([ 'api.anthropic.com', 'cloudaicompanion.googleapis.com', @@ -694,13 +842,10 @@ async function proxyRequest(req, res) { 'cloudcode-pa.googleapis.com', 'daily-cloudcode-pa.googleapis.com', ]); - if (!hasUpstreamProxy && (H2_PREFER_HOSTS.has(targetHost) || h2Hosts.has(targetHost))) { - await sendViaH2(targetHost, req.method, req.url, req.headers, body, res, savedHeaders); + if (H2_PREFER_HOSTS.has(targetHost) || h2Hosts.has(targetHost)) { + await sendViaH2(targetHost, req.method, req.url, req.headers, body, res, savedHeaders, false, upstreamProxy || undefined); } else { - if (hasUpstreamProxy && H2_PREFER_HOSTS.has(targetHost)) { - log('info', 'h2_downgrade_to_h1', { host: targetHost, reason: 'upstream_proxy_set' }); - } - await sendViaH1(targetHost, req.method, req.url, req.headers, body, res, savedHeaders); + await sendViaH1(targetHost, req.method, req.url, req.headers, body, res, savedHeaders, upstreamProxy || undefined); } } diff --git a/backend/internal/pkg/telemetry/telemetry.go b/backend/internal/pkg/telemetry/telemetry.go new file mode 100644 index 00000000..271d1104 --- /dev/null +++ b/backend/internal/pkg/telemetry/telemetry.go @@ -0,0 +1,568 @@ +// Package telemetry simulates the real Claude Code CLI's OTEL telemetry events. +// +// Real CLI emits events to two channels: +// 1. Anthropic event_logging/batch (first-party events) +// 2. Datadog log intake (third-party observability) +// +// Ported from antigravity/node-tls-proxy/proxy.js — see that file for JS original. +package telemetry + +import ( + "bytes" + "crypto/sha256" + "encoding/base64" + "encoding/hex" + "encoding/json" + "fmt" + "log/slog" + "math" + "math/rand" + "net/http" + "strings" + "sync" + "time" + + claude "github.com/Wei-Shaw/sub2api/internal/pkg/claude" +) + +// ─── Constants ─────────────────────────────────────────── + +const ( + ddAPIKey = "pubbbf48e6d78dae54bceaa4acf463299bf" + fakeNodeVersion = "v24.3.0" + buildTime = "2026-03-31T01:39:46Z" + sessionMaxAge = time.Hour + sessionCleanup = 5 * time.Minute + telemetryTimeout = 10 * time.Second +) + +// ─── Virtual Host Identity ─────────────────────────────── + +var ( + mbpNames = []string{"alex", "sam", "chris", "max", "lee", "kai", "jamie", "taylor", "morgan", "casey", "drew", "avery", "riley", "blake", "jordan", "ryan", "parker", "quinn", "reese", "cameron"} + mbpSuffix = []string{"-MBP", "-MacBook", "-MacBook-Pro", "-MacBook-Air", "s-MBP", "s-MacBook", "s-MacBook-Pro"} +) + +type hostIdentity struct { + Hostname string + Username string + Terminal string + Shell string + MachineID string + Arch string + OSVersion string + KernelRelease string + ExecPath string + RipgrepVersion string + RipgrepPath string + McpServerCount int + McpFailCount int +} + +func hashField(seed, field string) []byte { + h := sha256.Sum256([]byte(seed + ":" + field)) + return h[:] +} + +func generateHostIdentity(seed string) hostIdentity { + hb := hashField(seed, "hostname") + name := mbpNames[int(hb[0])%len(mbpNames)] + sfx := mbpSuffix[int(hb[1])%len(mbpSuffix)] + + termRoll := int(hashField(seed, "terminal")[0]) % 100 + var terminal string + switch { + case termRoll < 75: + terminal = "xterm-256color" + case termRoll < 88: + terminal = "screen-256color" + case termRoll < 96: + terminal = "alacritty" + default: + terminal = "kitty" + } + + shellRoll := int(hashField(seed, "shell")[0]) % 100 + var shell string + switch { + case shellRoll < 65: + shell = "/bin/zsh" + case shellRoll < 82: + shell = "/usr/local/bin/zsh" + case shellRoll < 93: + shell = "/bin/bash" + default: + shell = "/opt/homebrew/bin/fish" + } + + mid := hashField(seed, "machine-id") + machineID := fmt.Sprintf("%s-%s-%s-%s-%s", + strings.ToUpper(hex.EncodeToString(mid[0:4])), + strings.ToUpper(hex.EncodeToString(mid[4:6])), + strings.ToUpper(hex.EncodeToString(mid[6:8])), + strings.ToUpper(hex.EncodeToString(mid[8:10])), + strings.ToUpper(hex.EncodeToString(mid[10:16])), + ) + + osb := hashField(seed, "os") + major := 13 + int(osb[0])%3 + minor := int(osb[1]) % 8 + patch := int(osb[2]) % 5 + darwinMajor := major + 9 // macOS 13 = Darwin 22 + darwinMinor := int(osb[3]) % 7 + darwinPatch := int(osb[4]) % 3 + + archRoll := int(hashField(seed, "arch")[0]) % 100 + arch := "arm64" + if archRoll >= 70 { + arch = "x64" + } + + execRoll := int(hashField(seed, "exec")[0]) % 100 + var execPath string + switch { + case execRoll < 40: + execPath = "/usr/local/bin/claude" + case execRoll < 70: + execPath = "/opt/homebrew/bin/claude" + case execRoll < 90: + execPath = fmt.Sprintf("/Users/%s/.npm-global/bin/claude", name) + default: + execPath = fmt.Sprintf("/Users/%s/.local/bin/claude", name) + } + + rgVersions := []string{"14.1.1", "14.1.0", "14.0.3", "14.0.2", "13.0.0", "14.1.2", "14.0.1"} + rgPaths := []string{"/opt/homebrew/bin/rg", "/usr/local/bin/rg", "/Users/" + name + "/.cargo/bin/rg", "/usr/bin/rg"} + rb := hashField(seed, "ripgrep") + + return hostIdentity{ + Hostname: name + sfx, + Username: name, + Terminal: terminal, + Shell: shell, + MachineID: machineID, + Arch: arch, + OSVersion: fmt.Sprintf("%d.%d.%d", major, minor, patch), + KernelRelease: fmt.Sprintf("%d.%d.%d", darwinMajor, darwinMinor, darwinPatch), + ExecPath: execPath, + RipgrepVersion: rgVersions[int(rb[0])%len(rgVersions)], + RipgrepPath: rgPaths[int(rb[1])%len(rgPaths)], + McpServerCount: int(rb[2])%5 + 1, + McpFailCount: int(rb[3]) % 3, + } +} + +// ─── Session State ─────────────────────────────────────── + +type sessionState struct { + SessionID string + DeviceID string + HostID hostIdentity + StartTime time.Time + RequestCount int64 + RipgrepReported bool +} + +var ( + sessions = make(map[string]*sessionState) + sessionsMu sync.Mutex +) + +func init() { + go func() { + ticker := time.NewTicker(sessionCleanup) + defer ticker.Stop() + for range ticker.C { + now := time.Now() + sessionsMu.Lock() + for k, s := range sessions { + if now.Sub(s.StartTime) > sessionMaxAge { + delete(sessions, k) + } + } + sessionsMu.Unlock() + } + }() +} + +func generateDeviceID(accountSeed string) string { + h := sha256.Sum256([]byte("device:" + accountSeed)) + return hex.EncodeToString(h[:]) +} + +func getOrCreateSession(deviceID string) *sessionState { + sessionsMu.Lock() + defer sessionsMu.Unlock() + + if s, ok := sessions[deviceID]; ok { + return s + } + s := &sessionState{ + SessionID: generateUUID(), + DeviceID: deviceID, + HostID: generateHostIdentity(deviceID), + StartTime: time.Now(), + } + sessions[deviceID] = s + return s +} + +func generateUUID() string { + b := make([]byte, 16) + rand.Read(b) + b[6] = (b[6] & 0x0f) | 0x40 // version 4 + b[8] = (b[8] & 0x3f) | 0x80 // variant + return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16]) +} + +// ─── Process Metrics Simulation ────────────────────────── + +func buildProcessMetrics(uptime float64) string { + baseRss := 180_000_000.0 + math.Min(uptime*50_000, 200_000_000) + rss := int64(baseRss + rand.Float64()*80_000_000) + heapTotal := int64(float64(rss)*0.6 + rand.Float64()*10_000_000) + heapUsed := int64(float64(heapTotal)*0.5 + rand.Float64()*float64(heapTotal)*0.3) + + metrics := map[string]any{ + "uptime": uptime, + "rss": rss, + "heapTotal": heapTotal, + "heapUsed": heapUsed, + "external": 14_000_000 + rand.Intn(2_000_000), + "arrayBuffers": rand.Intn(200_000), + "constrainedMemory": 51539607552, + "cpuUsage": map[string]int64{ + "user": int64(uptime*10_000 + rand.Float64()*300_000), + "system": int64(uptime*2_000 + rand.Float64()*80_000), + }, + "cpuPercent": rand.Float64() * 200, + } + data, _ := json.Marshal(metrics) + return base64.StdEncoding.EncodeToString(data) +} + +// ─── Env Block ─────────────────────────────────────────── + +func buildEnvBlock(hostID hostIdentity) map[string]any { + return map[string]any{ + "platform": "darwin", + "node_version": fakeNodeVersion, + "terminal": hostID.Terminal, + "package_managers": "npm,pnpm", + "runtimes": "deno,node", + "is_running_with_bun": true, + "is_ci": false, + "is_claubbit": false, + "is_github_action": false, + "is_claude_code_action": false, + "is_claude_ai_auth": false, + "version": claude.DefaultCLIVersion, + "arch": hostID.Arch, + "is_claude_code_remote": false, + "deployment_environment": "unknown-darwin", + "is_conductor": false, + "version_base": claude.DefaultCLIVersion, + "build_time": buildTime, + "is_local_agent_mode": false, + "vcs": "git", + "platform_raw": "darwin", + } +} + +// ─── Event Building ────────────────────────────────────── + +type eventWrapper struct { + EventType string `json:"event_type"` + EventData map[string]any `json:"event_data"` +} + +func buildEvent(eventName string, session *sessionState, model, betas string, extraData map[string]any, tsOverride string) eventWrapper { + uptime := time.Since(session.StartTime).Seconds() + pm := buildProcessMetrics(uptime) + + ts := tsOverride + if ts == "" { + ts = time.Now().UTC().Format(time.RFC3339Nano) + } + + if model == "" { + model = "claude-sonnet-4-6" + } + if betas == "" { + betas = "claude-code-20250219,interleaved-thinking-2025-05-14" + } + + data := map[string]any{ + "event_name": eventName, + "client_timestamp": ts, + "model": model, + "session_id": session.SessionID, + "user_type": "external", + "betas": betas, + "env": buildEnvBlock(session.HostID), + "entrypoint": "cli", + "is_interactive": true, + "client_type": "cli", + "process": pm, + "event_id": generateUUID(), + "device_id": session.DeviceID, + } + + for k, v := range extraData { + data[k] = v + } + + return eventWrapper{ + EventType: "ClaudeCodeInternalEvent", + EventData: data, + } +} + +// ─── Send Functions ────────────────────────────────────── + +var httpClient = &http.Client{Timeout: telemetryTimeout} + +func sendTelemetryEvents(events []eventWrapper, session *sessionState) { + if len(events) == 0 { + return + } + + payload := map[string]any{"events": events} + body, err := json.Marshal(payload) + if err != nil { + return + } + + req, err := http.NewRequest("POST", "https://api.anthropic.com/api/event_logging/batch", bytes.NewReader(body)) + if err != nil { + return + } + req.Header.Set("Accept", "application/json, text/plain, */*") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "claude-code/"+claude.DefaultCLIVersion) + req.Header.Set("x-service-name", "claude-code") + + resp, err := httpClient.Do(req) + if err != nil { + slog.Debug("telemetry_error", "error", err.Error()) + return + } + resp.Body.Close() + slog.Debug("telemetry_sent", "status", resp.StatusCode, "events", len(events)) +} + +func sendDatadogLog(eventName string, session *sessionState, model string) { + hostID := session.HostID + uptime := time.Since(session.StartTime).Seconds() + + if model == "" { + model = "claude-sonnet-4-6" + } + + baseRss := 180_000_000.0 + math.Min(uptime*50_000, 200_000_000) + rss := int64(baseRss + rand.Float64()*80_000_000) + heapTotal := int64(float64(rss)*0.6 + rand.Float64()*10_000_000) + heapUsed := int64(float64(heapTotal)*0.5 + rand.Float64()*float64(heapTotal)*0.3) + + pm := map[string]any{ + "uptime": uptime, + "rss": rss, + "heapTotal": heapTotal, + "heapUsed": heapUsed, + "external": 14_000_000 + rand.Intn(2_000_000), + "arrayBuffers": rand.Intn(10_000), + "constrainedMemory": 0, + "cpuUsage": map[string]int64{ + "user": int64(uptime*10_000 + rand.Float64()*300_000), + "system": int64(uptime*2_000 + rand.Float64()*80_000), + }, + } + + entry := map[string]any{ + "ddsource": "nodejs", + "ddtags": fmt.Sprintf("event:%s,arch:%s,client_type:cli,model:%s,platform:darwin,user_type:external,version:%s,version_base:%s", eventName, hostID.Arch, model, claude.DefaultCLIVersion, claude.DefaultCLIVersion), + "message": eventName, + "service": "claude-code", + "hostname": hostID.Hostname, + "env": "external", + "model": model, + "session_id": session.SessionID, + "user_type": "external", + "entrypoint": "cli", + "is_interactive": "true", + "client_type": "cli", + "process_metrics": pm, + "platform": "darwin", + "platform_raw": "darwin", + "arch": hostID.Arch, + "node_version": fakeNodeVersion, + "version": claude.DefaultCLIVersion, + "version_base": claude.DefaultCLIVersion, + "build_time": buildTime, + "deployment_environment": "unknown-darwin", + "vcs": "git", + } + + body, err := json.Marshal([]any{entry}) + if err != nil { + return + } + + req, err := http.NewRequest("POST", "https://http-intake.logs.us5.datadoghq.com/api/v2/logs", bytes.NewReader(body)) + if err != nil { + return + } + req.Header.Set("Accept", "application/json, text/plain, */*") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "axios/1.13.6") + req.Header.Set("dd-api-key", ddAPIKey) + + resp, err := httpClient.Do(req) + if err != nil { + return + } + resp.Body.Close() +} + +// ─── Public API ────────────────────────────────────────── + +// EmitPreRequest fires pre-request telemetry events for a /v1/messages request. +// accountSeed should be a stable identifier for the account (e.g. account ID or OAuth token suffix). +// authHeader is the Authorization header value (used for device ID derivation). +// model is the model name from the request body (e.g. "claude-sonnet-4-6"). +// betaHeader is the anthropic-beta header value. +func EmitPreRequest(accountSeed, authHeader, model, betaHeader string) { + authSuffix := authHeader + if len(authSuffix) > 16 { + authSuffix = authSuffix[len(authSuffix)-16:] + } + deviceID := generateDeviceID(accountSeed + ":" + authSuffix) + session := getOrCreateSession(deviceID) + session.RequestCount++ + + if model == "" { + model = "claude-sonnet-4-6" + } + betas := betaHeader + if betas == "" { + betas = claude.DefaultBetaHeader + } + + // First request: full startup sequence + if session.RequestCount == 1 { + hostID := session.HostID + baseTime := time.Now() + ts := func(offsetMs int) string { + return baseTime.Add(time.Duration(offsetMs) * time.Millisecond).UTC().Format(time.RFC3339Nano) + } + + batch1 := []eventWrapper{ + buildEvent("tengu_started", session, model, betas, nil, ts(0)), + buildEvent("tengu_init", session, model, betas, nil, ts(80+rand.Intn(120))), + buildEvent("tengu_ripgrep_availability", session, model, betas, map[string]any{ + "ripgrep_available": true, + "ripgrep_version": hostID.RipgrepVersion, + "ripgrep_path": hostID.RipgrepPath, + }, ts(200+rand.Intn(150))), + } + + // MCP connection events + mcpOffset := 400 + mcpSuccessCount := hostID.McpServerCount - hostID.McpFailCount + for i := 0; i < hostID.McpFailCount; i++ { + mcpOffset += 100 + rand.Intn(300) + batch1 = append(batch1, buildEvent("tengu_mcp_server_connection_failed", session, model, betas, nil, ts(mcpOffset))) + } + for i := 0; i < mcpSuccessCount; i++ { + mcpOffset += 200 + rand.Intn(500) + batch1 = append(batch1, buildEvent("tengu_mcp_server_connection_succeeded", session, model, betas, nil, ts(mcpOffset))) + } + + session.RipgrepReported = true + go sendTelemetryEvents(batch1, session) + go sendDatadogLog("tengu_started", session, model) + go sendDatadogLog("tengu_init", session, model) + + // Delayed batch (~25-35s later, matches real CLI timing) + go func() { + time.Sleep(time.Duration(25000+rand.Intn(10000)) * time.Millisecond) + batch2 := []eventWrapper{ + buildEvent("tengu_session_init", session, model, betas, nil, ""), + buildEvent("tengu_context_loaded", session, model, betas, nil, ""), + } + sendTelemetryEvents(batch2, session) + }() + } + + // Every request: request_started + go sendTelemetryEvents([]eventWrapper{ + buildEvent("tengu_api_request_started", session, model, betas, nil, ""), + }, session) +} + +// EmitPostRequest fires post-request telemetry events after upstream response. +func EmitPostRequest(accountSeed, authHeader, model, betaHeader string, statusCode int) { + authSuffix := authHeader + if len(authSuffix) > 16 { + authSuffix = authSuffix[len(authSuffix)-16:] + } + deviceID := generateDeviceID(accountSeed + ":" + authSuffix) + session := getOrCreateSession(deviceID) + + if model == "" { + model = "claude-sonnet-4-6" + } + betas := betaHeader + if betas == "" { + betas = claude.DefaultBetaHeader + } + + events := []eventWrapper{ + buildEvent("tengu_api_request_completed", session, model, betas, nil, ""), + buildEvent("tengu_conversation_turn_completed", session, model, betas, nil, ""), + } + go sendTelemetryEvents(events, session) + go sendDatadogLog("tengu_api_request_completed", session, model) + + // Error telemetry + if statusCode >= 400 && rand.Float64() < 0.5 { + var errMsg string + switch { + case statusCode == 429: + errMsg = "rate_limit_exceeded" + case statusCode == 529: + errMsg = "overloaded" + case statusCode >= 500: + errMsg = "server_error" + default: + errMsg = "client_error" + } + errEvent := buildEvent("tengu_api_request_error", session, model, betas, map[string]any{ + "error_type": "TelemetrySafeError", + "error_code": statusCode, + "error_message": errMsg, + }, "") + go sendTelemetryEvents([]eventWrapper{errEvent}, session) + } + + // Random tool_use event (30% probability, 2-7s delay) + if rand.Float64() < 0.3 { + go func() { + time.Sleep(time.Duration(2000+rand.Intn(5000)) * time.Millisecond) + sendTelemetryEvents([]eventWrapper{ + buildEvent("tengu_tool_use_completed", session, model, betas, nil, ""), + }, session) + }() + } +} + +// Jitter returns a random delay to inject before forwarding a request. +// 80% fast (80-300ms exponential), 20% slow (400-1200ms uniform). +func Jitter() time.Duration { + if rand.Float64() < 0.80 { + ms := 80.0 + (-math.Log(rand.Float64()) * 90.0) + return time.Duration(ms) * time.Millisecond + } + ms := 400.0 + rand.Float64()*800.0 + return time.Duration(ms) * time.Millisecond +} diff --git a/backend/internal/repository/http_upstream.go b/backend/internal/repository/http_upstream.go index fe0d7c10..728c76bf 100644 --- a/backend/internal/repository/http_upstream.go +++ b/backend/internal/repository/http_upstream.go @@ -149,13 +149,11 @@ func NewHTTPUpstream(cfg *config.Config) service.HTTPUpstream { // - 调用方必须关闭 resp.Body,否则会导致 inFlight 计数泄漏 // - inFlight > 0 的客户端不会被淘汰,确保活跃请求不被中断 func (s *httpUpstreamService) Do(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) { - // Node.js TLS 代理:仅 Anthropic API - // Antigravity (googleapis) 使用 Go 原生 TLS(更接近真实 BoringCrypto 指纹) - // proxyURL 通过 X-Upstream-Proxy header 传递给 node-tls-proxy 动态选择出口 - if s.isNodeTLSProxyEnabled() && req != nil && req.URL != nil && req.URL.Scheme == "https" { - host := req.URL.Hostname() - if host == "api.anthropic.com" { - return s.doViaNodeTLSProxy(req, proxyURL, accountID, accountConcurrency) + // TLS 指纹路由:对匹配主机使用 Go 原生 utls 指纹 + // 使用 utls 模拟 Claude CLI 的 JA3/JA4 指纹,支持直连和代理 + if s.isTLSFingerprintRoutingEnabled() && req != nil && req.URL != nil && req.URL.Scheme == "https" { + if s.shouldRouteWithTLSFingerprint(req) { + return s.doWithTLSFingerprint(req, proxyURL, accountID, accountConcurrency) } } diff --git a/backend/internal/repository/http_upstream_antigravity.go b/backend/internal/repository/http_upstream_antigravity.go index 55c28cdb..0d0e0550 100644 --- a/backend/internal/repository/http_upstream_antigravity.go +++ b/backend/internal/repository/http_upstream_antigravity.go @@ -1,34 +1,40 @@ package repository // ============================================================== -// antigravity — Node.js TLS 代理扩展 +// antigravity — Go 原生 TLS 指纹扩展 // -// 此文件包含 Antigravity fork 新增的 Node.js TLS 代理功能, +// 此文件包含 Antigravity fork 新增的 TLS 指纹代理功能, // 与 upstream 代码完全隔离,便于 upstream 更新时的合并维护。 // // 上游文件 http_upstream.go 中的钩子调用点: -// Do() — 直接路由到 doViaNodeTLSProxy +// Do() — 匹配主机时路由到 doWithTLSFingerprint // DoWithTLS() — profile==nil 时回退到 Do(),触发同样的路由 +// +// 替代原先的 Node.js TLS 代理(node-tls-proxy), +// 直接使用 Go utls 库模拟 Claude CLI 的 TLS 指纹。 // ============================================================== import ( - "fmt" "log/slog" "net/http" + + "github.com/Wei-Shaw/sub2api/internal/pkg/tlsfingerprint" + "github.com/Wei-Shaw/sub2api/internal/util/logredact" ) -// isNodeTLSProxyEnabled 检查 Node.js TLS 代理是否启用 -func (s *httpUpstreamService) isNodeTLSProxyEnabled() bool { +// isTLSFingerprintRoutingEnabled 检查 TLS 指纹路由是否启用 +// 复用 NodeTLSProxy.Enabled 配置项,保持配置兼容 +func (s *httpUpstreamService) isTLSFingerprintRoutingEnabled() bool { if s.cfg == nil { return false } return s.cfg.Gateway.NodeTLSProxy.Enabled } -// shouldRouteViaNodeProxy 判断请求是否应该走 Node.js TLS 代理 +// shouldRouteWithTLSFingerprint 判断请求是否应该使用 TLS 指纹 // 仅拦截目标主机在 proxy_hosts 白名单中的 HTTPS 请求, // 白名单为空时默认只代理 api.anthropic.com。 -func (s *httpUpstreamService) shouldRouteViaNodeProxy(req *http.Request) bool { +func (s *httpUpstreamService) shouldRouteWithTLSFingerprint(req *http.Request) bool { if req == nil || req.URL == nil || req.URL.Scheme != "https" { return false } @@ -39,7 +45,6 @@ func (s *httpUpstreamService) shouldRouteViaNodeProxy(req *http.Request) bool { hosts := s.cfg.Gateway.NodeTLSProxy.ProxyHosts if len(hosts) == 0 { - // 默认只代理 Anthropic return reqHost == "api.anthropic.com" } for _, h := range hosts { @@ -50,51 +55,31 @@ func (s *httpUpstreamService) shouldRouteViaNodeProxy(req *http.Request) bool { return false } -// doViaNodeTLSProxy 通过 Node.js TLS 代理发送请求 -// 将 HTTPS 请求改为 HTTP 明文发送到本地 Node.js 代理, -// 由 Node.js 进程使用原生 TLS 栈完成到上游的 HTTPS 连接。 -// 原始目标主机通过 X-Forwarded-Host 传递给 Node.js 代理, -// 代理据此动态连接到正确的上游主机。 -func (s *httpUpstreamService) doViaNodeTLSProxy(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) { - proxyCfg := s.cfg.Gateway.NodeTLSProxy - listenHost := proxyCfg.ListenHost - if listenHost == "" { - listenHost = "127.0.0.1" - } - listenPort := proxyCfg.ListenPort - if listenPort == 0 { - listenPort = 3456 +// defaultTLSProfile 返回模拟 Claude CLI (Node.js 24.x) 的默认 TLS 指纹配置 +// 所有 slice 字段留空 → dialer.go 自动使用内置的 Node.js 24.x 默认值 +// ALPN 仅声明 http/1.1,与真实 CLI 行为一致(undici allowH2=false) +func defaultTLSProfile() *tlsfingerprint.Profile { + return &tlsfingerprint.Profile{ + Name: "claude_cli_builtin", + EnableGREASE: true, } +} - // 克隆请求,避免修改原始 req(重试时需要原始 URL) - proxyReq := req.Clone(req.Context()) - // 安全复制 Body:优先用 GetBody 工厂方法 - if req.GetBody != nil { - proxyReq.Body, _ = req.GetBody() - } else { - proxyReq.Body = req.Body - } - - // 保存原始目标主机,通过自定义头传给 Node.js 代理 - originalHost := req.URL.Host - proxyReq.Header.Set("X-Forwarded-Host", originalHost) - - // 如果账号绑定了代理(落地机 GOST),通过 header 传递给 node-tls-proxy - // node-tls-proxy 会用此代理作为上游出口,实现动态路由 +// doWithTLSFingerprint 使用 Go 原生 utls TLS 指纹发送请求 +// 直接通过 DoWithTLS 路径,利用已有的 utls dialer 基础设施: +// - 直连:Dialer (TCP → utls handshake) +// - HTTP 代理:HTTPProxyDialer (CONNECT 隧道 → utls handshake) +// - SOCKS5 代理:SOCKS5ProxyDialer (SOCKS5 隧道 → utls handshake) +func (s *httpUpstreamService) doWithTLSFingerprint(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) { + proxyInfo := "direct" if proxyURL != "" { - proxyReq.Header.Set("X-Upstream-Proxy", proxyURL) + proxyInfo = logredact.RedactProxyURL(proxyURL) } - - // 重写请求 URL:https://api.anthropic.com/v1/... → http://127.0.0.1:3456/v1/... - proxyReq.URL.Scheme = "http" - proxyReq.URL.Host = fmt.Sprintf("%s:%d", listenHost, listenPort) - - slog.Debug("node_tls_proxy_rewrite", + slog.Debug("tls_fingerprint_routing", "account_id", accountID, - "original_host", originalHost, - "rewritten_to", proxyReq.URL.Host, + "target", req.URL.Host, + "proxy", proxyInfo, ) - // 通过标准 HTTP 客户端发送(不需要 TLS,代理是本地 HTTP) - return s.Do(proxyReq, "", accountID, accountConcurrency) + return s.DoWithTLS(req, proxyURL, accountID, accountConcurrency, defaultTLSProfile()) } diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 3ae490b5..d51e2f6d 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -26,6 +26,7 @@ import ( "github.com/Wei-Shaw/sub2api/internal/pkg/claude" "github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey" "github.com/Wei-Shaw/sub2api/internal/pkg/logger" + "github.com/Wei-Shaw/sub2api/internal/pkg/telemetry" "github.com/Wei-Shaw/sub2api/internal/pkg/usagestats" "github.com/Wei-Shaw/sub2api/internal/util/responseheaders" "github.com/Wei-Shaw/sub2api/internal/util/urlvalidator" @@ -4227,6 +4228,13 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A // 真实 CLI 在首次 messages 请求前 fire-and-forget 调用此端点。 if tokenType == "oauth" && token != "" { TriggerBootstrapIfNeeded(account.ID, token) + // OTEL telemetry: emit pre-request events (tengu_started, tengu_api_request_started etc.) + go telemetry.EmitPreRequest( + fmt.Sprintf("%d", account.ID), + token, + reqModel, + getHeaderRaw(c.Request.Header, "anthropic-beta"), + ) } // 获取代理URL(自定义 base URL 模式下,proxy 通过 buildCustomRelayURL 作为查询参数传递) @@ -4644,6 +4652,17 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A // 处理正常响应 + // OTEL telemetry: emit post-request events (fire-and-forget) + if tokenType == "oauth" && token != "" { + go telemetry.EmitPostRequest( + fmt.Sprintf("%d", account.ID), + token, + reqModel, + getHeaderRaw(c.Request.Header, "anthropic-beta"), + resp.StatusCode, + ) + } + // 触发上游接受回调(提前释放串行锁,不等流完成) if parsed.OnUpstreamAccepted != nil { parsed.OnUpstreamAccepted()