feat: 移除 Node.js TLS 代理依赖,全部走 Go 原生 utls 指纹

- Do() 路由从 doViaNodeTLSProxy(转发到 localhost:3456 Node.js 进程)
  改为 doWithTLSFingerprint(直接使用 Go utls dialer),解决 h2 connect
  timeout 问题(Node.js proxy 的 H2 路径不支持 per-account 代理隧道)
- 新增 internal/pkg/telemetry 包,从 proxy.js 移植全部遥测逻辑:
  Anthropic event_logging/batch + Datadog log intake + 虚拟主机身份 +
  会话状态管理 + process metrics 模拟
- 保留 proxy.js 中的 H1 降级修复作为备用
This commit is contained in:
win 2026-04-01 08:24:49 +08:00
parent 0df29af0ab
commit 2f817dd248
5 changed files with 808 additions and 93 deletions

View File

@ -478,38 +478,86 @@ function emitPostRequestTelemetry(reqHeaders, statusCode, body) {
}
// ─── H2 session 管理 ────────────────────────────────────
function getOrCreateH2Session(host) {
const existing = h2Sessions.get(host);
if (existing && !existing.closed && !existing.destroyed) return existing;
if (existing) { try { existing.close(); } catch (_) {} }
// h2Sessions key 改为 host+proxy 组合,避免不同代理的 session 混用
function h2SessionKey(host, proxyUrl) {
return proxyUrl ? `${host}|${proxyUrl}` : host;
}
async function getOrCreateH2Session(host, proxyUrl) {
const key = h2SessionKey(host, proxyUrl);
const existing = h2Sessions.get(key);
// 检查 session 是否仍然可用connected 且未关闭
// GOAWAY 后 session.connected 变为 false必须重建
if (existing && !existing.closed && !existing.destroyed && existing.connected) return existing;
if (existing) {
h2Sessions.delete(key);
try { existing.close(); } catch (_) {}
}
let session;
if (proxyUrl) {
// 通过 CONNECT 隧道建立 h2 session支持 HTTP CONNECT / SOCKS5
const socket = await connectViaProxy(proxyUrl, host, 443);
session = http2.connect(`https://${host}`, {
createConnection: () => socket,
});
log('info', 'h2_session_via_proxy', { host, proxy: redactProxyURL(proxyUrl) });
} else {
session = http2.connect(`https://${host}`);
}
const session = http2.connect(`https://${host}`);
session.on('error', (err) => {
log('warn', 'h2_session_error', { host, error: err.message });
h2Sessions.delete(host);
h2Sessions.delete(key);
try { session.close(); } catch (_) {}
});
session.on('close', () => h2Sessions.delete(host));
session.on('goaway', () => { h2Sessions.delete(host); try { session.close(); } catch (_) {} });
session.setTimeout(IDLE_TIMEOUT, () => { session.close(); h2Sessions.delete(host); });
h2Sessions.set(host, session);
session.on('close', () => h2Sessions.delete(key));
session.on('goaway', (errorCode) => {
log('info', 'h2_goaway', { host, errorCode });
h2Sessions.delete(key);
try { session.close(); } catch (_) {}
});
session.setTimeout(IDLE_TIMEOUT, () => { session.close(); h2Sessions.delete(key); });
h2Sessions.set(key, session);
return session;
}
function waitForConnect(session) {
if (session.connected) return Promise.resolve();
// session 已断开GOAWAY / 半关闭),不要等不会来的 connect 事件
if (session.closed || session.destroyed) {
return Promise.reject(new Error('h2 session already closed'));
}
return new Promise((resolve, reject) => {
session.once('connect', resolve);
session.once('error', reject);
const t = setTimeout(() => reject(new Error('h2 connect timeout')), CONNECT_TIMEOUT);
session.once('connect', () => clearTimeout(t));
const onConnect = () => { clearTimeout(t); cleanup(); resolve(); };
const onError = (err) => { clearTimeout(t); cleanup(); reject(err); };
const onClose = () => { clearTimeout(t); cleanup(); reject(new Error('h2 session closed before connect')); };
const cleanup = () => {
session.removeListener('connect', onConnect);
session.removeListener('error', onError);
session.removeListener('close', onClose);
};
session.once('connect', onConnect);
session.once('error', onError);
session.once('close', onClose);
const t = setTimeout(() => { cleanup(); reject(new Error('h2 connect timeout')); }, CONNECT_TIMEOUT);
});
}
// ─── CONNECT 隧道 ────────────────────────────────────────
// ─── CONNECT 隧道HTTP CONNECT + SOCKS5─────────────────
function connectViaProxy(proxyUrl, targetHost, targetPort) {
const proxy = new URL(proxyUrl);
const scheme = proxy.protocol.replace(':', '').toLowerCase();
if (scheme === 'socks5' || scheme === 'socks5h') {
return connectViaSocks5(proxy, targetHost, parseInt(targetPort, 10));
}
return connectViaHttpConnect(proxy, targetHost, targetPort);
}
// HTTP CONNECT 隧道
function connectViaHttpConnect(proxy, targetHost, targetPort) {
return new Promise((resolve, reject) => {
const proxy = new URL(proxyUrl);
const conn = net.connect(parseInt(proxy.port || '80', 10), proxy.hostname, () => {
const auth = proxy.username
? `Proxy-Authorization: Basic ${Buffer.from(`${decodeURIComponent(proxy.username)}:${decodeURIComponent(proxy.password || '')}`).toString('base64')}\r\n`
@ -531,6 +579,102 @@ function connectViaProxy(proxyUrl, targetHost, targetPort) {
});
}
// SOCKS5 隧道 (RFC 1928 + RFC 1929 username/password auth)
function connectViaSocks5(proxy, targetHost, targetPort) {
return new Promise((resolve, reject) => {
const conn = net.connect(parseInt(proxy.port || '1080', 10), proxy.hostname);
conn.once('error', reject);
conn.setTimeout(CONNECT_TIMEOUT, () => conn.destroy(new Error('SOCKS5 timeout')));
const username = proxy.username ? decodeURIComponent(proxy.username) : '';
const password = proxy.password ? decodeURIComponent(proxy.password) : '';
const useAuth = !!(username || password);
let step = 'greeting';
conn.once('connect', () => {
// Step 1: 发送 greeting — 支持的认证方式
// 0x00 = 无认证, 0x02 = 用户名/密码
if (useAuth) {
conn.write(Buffer.from([0x05, 0x02, 0x00, 0x02]));
} else {
conn.write(Buffer.from([0x05, 0x01, 0x00]));
}
});
let pending = Buffer.alloc(0);
conn.on('data', function onData(chunk) {
pending = Buffer.concat([pending, chunk]);
if (step === 'greeting') {
if (pending.length < 2) return;
const ver = pending[0], method = pending[1];
if (ver !== 0x05) { conn.destroy(); return reject(new Error(`SOCKS5 bad version: ${ver}`)); }
if (method === 0x02 && useAuth) {
// Step 2: 用户名/密码认证 (RFC 1929)
step = 'auth';
pending = pending.slice(2);
const uBuf = Buffer.from(username, 'utf8');
const pBuf = Buffer.from(password, 'utf8');
const authBuf = Buffer.alloc(3 + uBuf.length + pBuf.length);
authBuf[0] = 0x01; // auth version
authBuf[1] = uBuf.length;
uBuf.copy(authBuf, 2);
authBuf[2 + uBuf.length] = pBuf.length;
pBuf.copy(authBuf, 3 + uBuf.length);
conn.write(authBuf);
} else if (method === 0x00) {
// 无需认证,直接发 CONNECT
step = 'connect';
pending = pending.slice(2);
sendSocks5Connect(conn, targetHost, targetPort);
} else {
conn.destroy();
reject(new Error(`SOCKS5 unsupported auth method: ${method}`));
}
} else if (step === 'auth') {
if (pending.length < 2) return;
const status = pending[1];
if (status !== 0x00) { conn.destroy(); return reject(new Error(`SOCKS5 auth failed: ${status}`)); }
step = 'connect';
pending = pending.slice(2);
sendSocks5Connect(conn, targetHost, targetPort);
} else if (step === 'connect') {
// 最小响应: VER(1) + REP(1) + RSV(1) + ATYP(1) + ADDR(variable) + PORT(2)
if (pending.length < 4) return;
const rep = pending[1];
const atyp = pending[3];
let minLen = 4 + 2; // base + port
if (atyp === 0x01) minLen += 4; // IPv4
else if (atyp === 0x04) minLen += 16; // IPv6
else if (atyp === 0x03 && pending.length > 4) minLen += 1 + pending[4]; // domain
else if (atyp === 0x03) return; // 等更多数据
if (pending.length < minLen) return;
conn.removeListener('data', onData);
if (rep !== 0x00) { conn.destroy(); return reject(new Error(`SOCKS5 connect failed: rep=${rep}`)); }
conn.setTimeout(0);
resolve(conn);
}
});
});
}
function sendSocks5Connect(conn, host, port) {
// SOCKS5 CONNECT: VER(05) CMD(01=CONNECT) RSV(00) ATYP ADDR PORT
const hostBuf = Buffer.from(host, 'utf8');
const buf = Buffer.alloc(4 + 1 + hostBuf.length + 2);
buf[0] = 0x05; // version
buf[1] = 0x01; // CONNECT
buf[2] = 0x00; // reserved
buf[3] = 0x03; // domain name
buf[4] = hostBuf.length;
hostBuf.copy(buf, 5);
buf.writeUInt16BE(port, 5 + hostBuf.length);
conn.write(buf);
}
// ─── 收集请求体 ──────────────────────────────────────────
function collectBody(req) {
return new Promise((resolve) => {
@ -542,10 +686,11 @@ function collectBody(req) {
}
// ─── H1 代理 ─────────────────────────────────────────────
function sendViaH1(targetHost, method, path, reqHeaders, body, res, savedHeaders) {
function sendViaH1(targetHost, method, path, reqHeaders, body, res, savedHeaders, explicitProxy) {
return new Promise((resolve) => {
const headers = { ...reqHeaders, host: targetHost };
['x-forwarded-host', 'connection', 'keep-alive', 'proxy-connection', 'transfer-encoding'].forEach(h => delete headers[h]);
delete headers['x-upstream-proxy'];
if (body.length > 0) headers['content-length'] = String(body.length);
const opts = { hostname: targetHost, port: 443, path, method, headers, servername: targetHost, timeout: CONNECT_TIMEOUT };
@ -569,7 +714,7 @@ function sendViaH1(targetHost, method, path, reqHeaders, body, res, savedHeaders
if (err.message === 'socket hang up' && (Date.now() - startTime) < 2000) {
log('info', 'h1_rejected_switching_to_h2', { host: targetHost });
h2Hosts.add(targetHost);
sendViaH2(targetHost, method, path, reqHeaders, body, res, savedHeaders).then(() => resolve('h2'));
sendViaH2(targetHost, method, path, reqHeaders, body, res, savedHeaders, false, explicitProxy).then(() => resolve('h2'));
return;
}
log('error', 'h1_error', { error: err.message, host: targetHost, path });
@ -580,10 +725,8 @@ function sendViaH1(targetHost, method, path, reqHeaders, body, res, savedHeaders
proxyReq.end(body);
};
// 动态上游代理:优先使用 per-request 的 X-Upstream-Proxy回退到全局 UPSTREAM_PROXY
const upstreamProxy = reqHeaders['x-upstream-proxy'] || UPSTREAM_PROXY;
// 清除内部 header不传给上游
delete headers['x-upstream-proxy'];
// 动态上游代理:使用显式传入的代理地址
const upstreamProxy = explicitProxy || '';
if (upstreamProxy) {
connectViaProxy(upstreamProxy, targetHost, 443)
@ -596,9 +739,9 @@ function sendViaH1(targetHost, method, path, reqHeaders, body, res, savedHeaders
}
// ─── H2 代理 ─────────────────────────────────────────────
async function sendViaH2(targetHost, method, path, reqHeaders, body, res, savedHeaders) {
async function sendViaH2(targetHost, method, path, reqHeaders, body, res, savedHeaders, _retried, proxyUrl) {
try {
const session = getOrCreateH2Session(targetHost);
const session = await getOrCreateH2Session(targetHost, proxyUrl);
await waitForConnect(session);
const headers = {};
@ -649,9 +792,14 @@ async function sendViaH2(targetHost, method, path, reqHeaders, body, res, savedH
stream.setTimeout(CONNECT_TIMEOUT, () => stream.close());
stream.end(body);
} catch (err) {
log('error', 'h2_exception', { error: err.message, host: targetHost });
log('error', 'h2_exception', { error: err.message, host: targetHost, retried: !!_retried });
h2Sessions.delete(targetHost);
if (!res.headersSent) { res.writeHead(502); res.end(JSON.stringify({ error: 'upstream_connection_error' })); }
// 首次失败时重试一次(用全新 session
if (!_retried && !res.headersSent) {
log('info', 'h2_retry_with_fresh_session', { host: targetHost, path });
return sendViaH2(targetHost, method, path, reqHeaders, body, res, savedHeaders, true, proxyUrl);
}
if (!res.headersSent) { res.writeHead(502); res.end(JSON.stringify({ error: err.message })); }
}
}
@ -682,11 +830,11 @@ async function proxyRequest(req, res) {
await new Promise(r => setTimeout(r, jitterMs));
// ── H2 / H1 路由策略 ──────────────────────────────────────────────
// 当存在 per-account 上游代理X-Upstream-Proxy强制走 H1
// 1. H2 的 getOrCreateH2Session 不支持 CONNECT 隧道代理
// 2. 真实 CLI 用 undici 默认的 HTTP/1.1allowH2=falseH1 更贴合指纹
// 无代理的直连请求仍可走 H2 以获得多路复用性能。
const hasUpstreamProxy = !!(req.headers['x-upstream-proxy'] || UPSTREAM_PROXY);
// H2 现在支持通过 CONNECT 隧道代理,优先为 H2_PREFER_HOSTS 使用 h2。
// 有代理时通过 connectViaProxy 建立隧道后再 h2 连接。
const upstreamProxy = req.headers['x-upstream-proxy'] || UPSTREAM_PROXY;
// 清除内部 header不传给上游h2 路径也需要清理)
delete req.headers['x-upstream-proxy'];
const H2_PREFER_HOSTS = new Set([
'api.anthropic.com',
'cloudaicompanion.googleapis.com',
@ -694,13 +842,10 @@ async function proxyRequest(req, res) {
'cloudcode-pa.googleapis.com',
'daily-cloudcode-pa.googleapis.com',
]);
if (!hasUpstreamProxy && (H2_PREFER_HOSTS.has(targetHost) || h2Hosts.has(targetHost))) {
await sendViaH2(targetHost, req.method, req.url, req.headers, body, res, savedHeaders);
if (H2_PREFER_HOSTS.has(targetHost) || h2Hosts.has(targetHost)) {
await sendViaH2(targetHost, req.method, req.url, req.headers, body, res, savedHeaders, false, upstreamProxy || undefined);
} else {
if (hasUpstreamProxy && H2_PREFER_HOSTS.has(targetHost)) {
log('info', 'h2_downgrade_to_h1', { host: targetHost, reason: 'upstream_proxy_set' });
}
await sendViaH1(targetHost, req.method, req.url, req.headers, body, res, savedHeaders);
await sendViaH1(targetHost, req.method, req.url, req.headers, body, res, savedHeaders, upstreamProxy || undefined);
}
}

View File

@ -0,0 +1,568 @@
// Package telemetry simulates the real Claude Code CLI's OTEL telemetry events.
//
// Real CLI emits events to two channels:
// 1. Anthropic event_logging/batch (first-party events)
// 2. Datadog log intake (third-party observability)
//
// Ported from antigravity/node-tls-proxy/proxy.js — see that file for JS original.
package telemetry
import (
"bytes"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"log/slog"
"math"
"math/rand"
"net/http"
"strings"
"sync"
"time"
claude "github.com/Wei-Shaw/sub2api/internal/pkg/claude"
)
// ─── Constants ───────────────────────────────────────────
const (
ddAPIKey = "pubbbf48e6d78dae54bceaa4acf463299bf"
fakeNodeVersion = "v24.3.0"
buildTime = "2026-03-31T01:39:46Z"
sessionMaxAge = time.Hour
sessionCleanup = 5 * time.Minute
telemetryTimeout = 10 * time.Second
)
// ─── Virtual Host Identity ───────────────────────────────
var (
mbpNames = []string{"alex", "sam", "chris", "max", "lee", "kai", "jamie", "taylor", "morgan", "casey", "drew", "avery", "riley", "blake", "jordan", "ryan", "parker", "quinn", "reese", "cameron"}
mbpSuffix = []string{"-MBP", "-MacBook", "-MacBook-Pro", "-MacBook-Air", "s-MBP", "s-MacBook", "s-MacBook-Pro"}
)
type hostIdentity struct {
Hostname string
Username string
Terminal string
Shell string
MachineID string
Arch string
OSVersion string
KernelRelease string
ExecPath string
RipgrepVersion string
RipgrepPath string
McpServerCount int
McpFailCount int
}
func hashField(seed, field string) []byte {
h := sha256.Sum256([]byte(seed + ":" + field))
return h[:]
}
func generateHostIdentity(seed string) hostIdentity {
hb := hashField(seed, "hostname")
name := mbpNames[int(hb[0])%len(mbpNames)]
sfx := mbpSuffix[int(hb[1])%len(mbpSuffix)]
termRoll := int(hashField(seed, "terminal")[0]) % 100
var terminal string
switch {
case termRoll < 75:
terminal = "xterm-256color"
case termRoll < 88:
terminal = "screen-256color"
case termRoll < 96:
terminal = "alacritty"
default:
terminal = "kitty"
}
shellRoll := int(hashField(seed, "shell")[0]) % 100
var shell string
switch {
case shellRoll < 65:
shell = "/bin/zsh"
case shellRoll < 82:
shell = "/usr/local/bin/zsh"
case shellRoll < 93:
shell = "/bin/bash"
default:
shell = "/opt/homebrew/bin/fish"
}
mid := hashField(seed, "machine-id")
machineID := fmt.Sprintf("%s-%s-%s-%s-%s",
strings.ToUpper(hex.EncodeToString(mid[0:4])),
strings.ToUpper(hex.EncodeToString(mid[4:6])),
strings.ToUpper(hex.EncodeToString(mid[6:8])),
strings.ToUpper(hex.EncodeToString(mid[8:10])),
strings.ToUpper(hex.EncodeToString(mid[10:16])),
)
osb := hashField(seed, "os")
major := 13 + int(osb[0])%3
minor := int(osb[1]) % 8
patch := int(osb[2]) % 5
darwinMajor := major + 9 // macOS 13 = Darwin 22
darwinMinor := int(osb[3]) % 7
darwinPatch := int(osb[4]) % 3
archRoll := int(hashField(seed, "arch")[0]) % 100
arch := "arm64"
if archRoll >= 70 {
arch = "x64"
}
execRoll := int(hashField(seed, "exec")[0]) % 100
var execPath string
switch {
case execRoll < 40:
execPath = "/usr/local/bin/claude"
case execRoll < 70:
execPath = "/opt/homebrew/bin/claude"
case execRoll < 90:
execPath = fmt.Sprintf("/Users/%s/.npm-global/bin/claude", name)
default:
execPath = fmt.Sprintf("/Users/%s/.local/bin/claude", name)
}
rgVersions := []string{"14.1.1", "14.1.0", "14.0.3", "14.0.2", "13.0.0", "14.1.2", "14.0.1"}
rgPaths := []string{"/opt/homebrew/bin/rg", "/usr/local/bin/rg", "/Users/" + name + "/.cargo/bin/rg", "/usr/bin/rg"}
rb := hashField(seed, "ripgrep")
return hostIdentity{
Hostname: name + sfx,
Username: name,
Terminal: terminal,
Shell: shell,
MachineID: machineID,
Arch: arch,
OSVersion: fmt.Sprintf("%d.%d.%d", major, minor, patch),
KernelRelease: fmt.Sprintf("%d.%d.%d", darwinMajor, darwinMinor, darwinPatch),
ExecPath: execPath,
RipgrepVersion: rgVersions[int(rb[0])%len(rgVersions)],
RipgrepPath: rgPaths[int(rb[1])%len(rgPaths)],
McpServerCount: int(rb[2])%5 + 1,
McpFailCount: int(rb[3]) % 3,
}
}
// ─── Session State ───────────────────────────────────────
type sessionState struct {
SessionID string
DeviceID string
HostID hostIdentity
StartTime time.Time
RequestCount int64
RipgrepReported bool
}
var (
sessions = make(map[string]*sessionState)
sessionsMu sync.Mutex
)
func init() {
go func() {
ticker := time.NewTicker(sessionCleanup)
defer ticker.Stop()
for range ticker.C {
now := time.Now()
sessionsMu.Lock()
for k, s := range sessions {
if now.Sub(s.StartTime) > sessionMaxAge {
delete(sessions, k)
}
}
sessionsMu.Unlock()
}
}()
}
func generateDeviceID(accountSeed string) string {
h := sha256.Sum256([]byte("device:" + accountSeed))
return hex.EncodeToString(h[:])
}
func getOrCreateSession(deviceID string) *sessionState {
sessionsMu.Lock()
defer sessionsMu.Unlock()
if s, ok := sessions[deviceID]; ok {
return s
}
s := &sessionState{
SessionID: generateUUID(),
DeviceID: deviceID,
HostID: generateHostIdentity(deviceID),
StartTime: time.Now(),
}
sessions[deviceID] = s
return s
}
func generateUUID() string {
b := make([]byte, 16)
rand.Read(b)
b[6] = (b[6] & 0x0f) | 0x40 // version 4
b[8] = (b[8] & 0x3f) | 0x80 // variant
return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16])
}
// ─── Process Metrics Simulation ──────────────────────────
func buildProcessMetrics(uptime float64) string {
baseRss := 180_000_000.0 + math.Min(uptime*50_000, 200_000_000)
rss := int64(baseRss + rand.Float64()*80_000_000)
heapTotal := int64(float64(rss)*0.6 + rand.Float64()*10_000_000)
heapUsed := int64(float64(heapTotal)*0.5 + rand.Float64()*float64(heapTotal)*0.3)
metrics := map[string]any{
"uptime": uptime,
"rss": rss,
"heapTotal": heapTotal,
"heapUsed": heapUsed,
"external": 14_000_000 + rand.Intn(2_000_000),
"arrayBuffers": rand.Intn(200_000),
"constrainedMemory": 51539607552,
"cpuUsage": map[string]int64{
"user": int64(uptime*10_000 + rand.Float64()*300_000),
"system": int64(uptime*2_000 + rand.Float64()*80_000),
},
"cpuPercent": rand.Float64() * 200,
}
data, _ := json.Marshal(metrics)
return base64.StdEncoding.EncodeToString(data)
}
// ─── Env Block ───────────────────────────────────────────
func buildEnvBlock(hostID hostIdentity) map[string]any {
return map[string]any{
"platform": "darwin",
"node_version": fakeNodeVersion,
"terminal": hostID.Terminal,
"package_managers": "npm,pnpm",
"runtimes": "deno,node",
"is_running_with_bun": true,
"is_ci": false,
"is_claubbit": false,
"is_github_action": false,
"is_claude_code_action": false,
"is_claude_ai_auth": false,
"version": claude.DefaultCLIVersion,
"arch": hostID.Arch,
"is_claude_code_remote": false,
"deployment_environment": "unknown-darwin",
"is_conductor": false,
"version_base": claude.DefaultCLIVersion,
"build_time": buildTime,
"is_local_agent_mode": false,
"vcs": "git",
"platform_raw": "darwin",
}
}
// ─── Event Building ──────────────────────────────────────
type eventWrapper struct {
EventType string `json:"event_type"`
EventData map[string]any `json:"event_data"`
}
func buildEvent(eventName string, session *sessionState, model, betas string, extraData map[string]any, tsOverride string) eventWrapper {
uptime := time.Since(session.StartTime).Seconds()
pm := buildProcessMetrics(uptime)
ts := tsOverride
if ts == "" {
ts = time.Now().UTC().Format(time.RFC3339Nano)
}
if model == "" {
model = "claude-sonnet-4-6"
}
if betas == "" {
betas = "claude-code-20250219,interleaved-thinking-2025-05-14"
}
data := map[string]any{
"event_name": eventName,
"client_timestamp": ts,
"model": model,
"session_id": session.SessionID,
"user_type": "external",
"betas": betas,
"env": buildEnvBlock(session.HostID),
"entrypoint": "cli",
"is_interactive": true,
"client_type": "cli",
"process": pm,
"event_id": generateUUID(),
"device_id": session.DeviceID,
}
for k, v := range extraData {
data[k] = v
}
return eventWrapper{
EventType: "ClaudeCodeInternalEvent",
EventData: data,
}
}
// ─── Send Functions ──────────────────────────────────────
var httpClient = &http.Client{Timeout: telemetryTimeout}
func sendTelemetryEvents(events []eventWrapper, session *sessionState) {
if len(events) == 0 {
return
}
payload := map[string]any{"events": events}
body, err := json.Marshal(payload)
if err != nil {
return
}
req, err := http.NewRequest("POST", "https://api.anthropic.com/api/event_logging/batch", bytes.NewReader(body))
if err != nil {
return
}
req.Header.Set("Accept", "application/json, text/plain, */*")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "claude-code/"+claude.DefaultCLIVersion)
req.Header.Set("x-service-name", "claude-code")
resp, err := httpClient.Do(req)
if err != nil {
slog.Debug("telemetry_error", "error", err.Error())
return
}
resp.Body.Close()
slog.Debug("telemetry_sent", "status", resp.StatusCode, "events", len(events))
}
func sendDatadogLog(eventName string, session *sessionState, model string) {
hostID := session.HostID
uptime := time.Since(session.StartTime).Seconds()
if model == "" {
model = "claude-sonnet-4-6"
}
baseRss := 180_000_000.0 + math.Min(uptime*50_000, 200_000_000)
rss := int64(baseRss + rand.Float64()*80_000_000)
heapTotal := int64(float64(rss)*0.6 + rand.Float64()*10_000_000)
heapUsed := int64(float64(heapTotal)*0.5 + rand.Float64()*float64(heapTotal)*0.3)
pm := map[string]any{
"uptime": uptime,
"rss": rss,
"heapTotal": heapTotal,
"heapUsed": heapUsed,
"external": 14_000_000 + rand.Intn(2_000_000),
"arrayBuffers": rand.Intn(10_000),
"constrainedMemory": 0,
"cpuUsage": map[string]int64{
"user": int64(uptime*10_000 + rand.Float64()*300_000),
"system": int64(uptime*2_000 + rand.Float64()*80_000),
},
}
entry := map[string]any{
"ddsource": "nodejs",
"ddtags": fmt.Sprintf("event:%s,arch:%s,client_type:cli,model:%s,platform:darwin,user_type:external,version:%s,version_base:%s", eventName, hostID.Arch, model, claude.DefaultCLIVersion, claude.DefaultCLIVersion),
"message": eventName,
"service": "claude-code",
"hostname": hostID.Hostname,
"env": "external",
"model": model,
"session_id": session.SessionID,
"user_type": "external",
"entrypoint": "cli",
"is_interactive": "true",
"client_type": "cli",
"process_metrics": pm,
"platform": "darwin",
"platform_raw": "darwin",
"arch": hostID.Arch,
"node_version": fakeNodeVersion,
"version": claude.DefaultCLIVersion,
"version_base": claude.DefaultCLIVersion,
"build_time": buildTime,
"deployment_environment": "unknown-darwin",
"vcs": "git",
}
body, err := json.Marshal([]any{entry})
if err != nil {
return
}
req, err := http.NewRequest("POST", "https://http-intake.logs.us5.datadoghq.com/api/v2/logs", bytes.NewReader(body))
if err != nil {
return
}
req.Header.Set("Accept", "application/json, text/plain, */*")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "axios/1.13.6")
req.Header.Set("dd-api-key", ddAPIKey)
resp, err := httpClient.Do(req)
if err != nil {
return
}
resp.Body.Close()
}
// ─── Public API ──────────────────────────────────────────
// EmitPreRequest fires pre-request telemetry events for a /v1/messages request.
// accountSeed should be a stable identifier for the account (e.g. account ID or OAuth token suffix).
// authHeader is the Authorization header value (used for device ID derivation).
// model is the model name from the request body (e.g. "claude-sonnet-4-6").
// betaHeader is the anthropic-beta header value.
func EmitPreRequest(accountSeed, authHeader, model, betaHeader string) {
authSuffix := authHeader
if len(authSuffix) > 16 {
authSuffix = authSuffix[len(authSuffix)-16:]
}
deviceID := generateDeviceID(accountSeed + ":" + authSuffix)
session := getOrCreateSession(deviceID)
session.RequestCount++
if model == "" {
model = "claude-sonnet-4-6"
}
betas := betaHeader
if betas == "" {
betas = claude.DefaultBetaHeader
}
// First request: full startup sequence
if session.RequestCount == 1 {
hostID := session.HostID
baseTime := time.Now()
ts := func(offsetMs int) string {
return baseTime.Add(time.Duration(offsetMs) * time.Millisecond).UTC().Format(time.RFC3339Nano)
}
batch1 := []eventWrapper{
buildEvent("tengu_started", session, model, betas, nil, ts(0)),
buildEvent("tengu_init", session, model, betas, nil, ts(80+rand.Intn(120))),
buildEvent("tengu_ripgrep_availability", session, model, betas, map[string]any{
"ripgrep_available": true,
"ripgrep_version": hostID.RipgrepVersion,
"ripgrep_path": hostID.RipgrepPath,
}, ts(200+rand.Intn(150))),
}
// MCP connection events
mcpOffset := 400
mcpSuccessCount := hostID.McpServerCount - hostID.McpFailCount
for i := 0; i < hostID.McpFailCount; i++ {
mcpOffset += 100 + rand.Intn(300)
batch1 = append(batch1, buildEvent("tengu_mcp_server_connection_failed", session, model, betas, nil, ts(mcpOffset)))
}
for i := 0; i < mcpSuccessCount; i++ {
mcpOffset += 200 + rand.Intn(500)
batch1 = append(batch1, buildEvent("tengu_mcp_server_connection_succeeded", session, model, betas, nil, ts(mcpOffset)))
}
session.RipgrepReported = true
go sendTelemetryEvents(batch1, session)
go sendDatadogLog("tengu_started", session, model)
go sendDatadogLog("tengu_init", session, model)
// Delayed batch (~25-35s later, matches real CLI timing)
go func() {
time.Sleep(time.Duration(25000+rand.Intn(10000)) * time.Millisecond)
batch2 := []eventWrapper{
buildEvent("tengu_session_init", session, model, betas, nil, ""),
buildEvent("tengu_context_loaded", session, model, betas, nil, ""),
}
sendTelemetryEvents(batch2, session)
}()
}
// Every request: request_started
go sendTelemetryEvents([]eventWrapper{
buildEvent("tengu_api_request_started", session, model, betas, nil, ""),
}, session)
}
// EmitPostRequest fires post-request telemetry events after upstream response.
func EmitPostRequest(accountSeed, authHeader, model, betaHeader string, statusCode int) {
authSuffix := authHeader
if len(authSuffix) > 16 {
authSuffix = authSuffix[len(authSuffix)-16:]
}
deviceID := generateDeviceID(accountSeed + ":" + authSuffix)
session := getOrCreateSession(deviceID)
if model == "" {
model = "claude-sonnet-4-6"
}
betas := betaHeader
if betas == "" {
betas = claude.DefaultBetaHeader
}
events := []eventWrapper{
buildEvent("tengu_api_request_completed", session, model, betas, nil, ""),
buildEvent("tengu_conversation_turn_completed", session, model, betas, nil, ""),
}
go sendTelemetryEvents(events, session)
go sendDatadogLog("tengu_api_request_completed", session, model)
// Error telemetry
if statusCode >= 400 && rand.Float64() < 0.5 {
var errMsg string
switch {
case statusCode == 429:
errMsg = "rate_limit_exceeded"
case statusCode == 529:
errMsg = "overloaded"
case statusCode >= 500:
errMsg = "server_error"
default:
errMsg = "client_error"
}
errEvent := buildEvent("tengu_api_request_error", session, model, betas, map[string]any{
"error_type": "TelemetrySafeError",
"error_code": statusCode,
"error_message": errMsg,
}, "")
go sendTelemetryEvents([]eventWrapper{errEvent}, session)
}
// Random tool_use event (30% probability, 2-7s delay)
if rand.Float64() < 0.3 {
go func() {
time.Sleep(time.Duration(2000+rand.Intn(5000)) * time.Millisecond)
sendTelemetryEvents([]eventWrapper{
buildEvent("tengu_tool_use_completed", session, model, betas, nil, ""),
}, session)
}()
}
}
// Jitter returns a random delay to inject before forwarding a request.
// 80% fast (80-300ms exponential), 20% slow (400-1200ms uniform).
func Jitter() time.Duration {
if rand.Float64() < 0.80 {
ms := 80.0 + (-math.Log(rand.Float64()) * 90.0)
return time.Duration(ms) * time.Millisecond
}
ms := 400.0 + rand.Float64()*800.0
return time.Duration(ms) * time.Millisecond
}

View File

@ -149,13 +149,11 @@ func NewHTTPUpstream(cfg *config.Config) service.HTTPUpstream {
// - 调用方必须关闭 resp.Body否则会导致 inFlight 计数泄漏
// - inFlight > 0 的客户端不会被淘汰,确保活跃请求不被中断
func (s *httpUpstreamService) Do(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) {
// Node.js TLS 代理:仅 Anthropic API
// Antigravity (googleapis) 使用 Go 原生 TLS更接近真实 BoringCrypto 指纹)
// proxyURL 通过 X-Upstream-Proxy header 传递给 node-tls-proxy 动态选择出口
if s.isNodeTLSProxyEnabled() && req != nil && req.URL != nil && req.URL.Scheme == "https" {
host := req.URL.Hostname()
if host == "api.anthropic.com" {
return s.doViaNodeTLSProxy(req, proxyURL, accountID, accountConcurrency)
// TLS 指纹路由:对匹配主机使用 Go 原生 utls 指纹
// 使用 utls 模拟 Claude CLI 的 JA3/JA4 指纹,支持直连和代理
if s.isTLSFingerprintRoutingEnabled() && req != nil && req.URL != nil && req.URL.Scheme == "https" {
if s.shouldRouteWithTLSFingerprint(req) {
return s.doWithTLSFingerprint(req, proxyURL, accountID, accountConcurrency)
}
}

View File

@ -1,34 +1,40 @@
package repository
// ==============================================================
// antigravity — Node.js TLS 代理扩展
// antigravity — Go 原生 TLS 指纹扩展
//
// 此文件包含 Antigravity fork 新增的 Node.js TLS 代理功能,
// 此文件包含 Antigravity fork 新增的 TLS 指纹代理功能,
// 与 upstream 代码完全隔离,便于 upstream 更新时的合并维护。
//
// 上游文件 http_upstream.go 中的钩子调用点:
// Do() — 直接路由到 doViaNodeTLSProxy
// Do() — 匹配主机时路由到 doWithTLSFingerprint
// DoWithTLS() — profile==nil 时回退到 Do(),触发同样的路由
//
// 替代原先的 Node.js TLS 代理node-tls-proxy
// 直接使用 Go utls 库模拟 Claude CLI 的 TLS 指纹。
// ==============================================================
import (
"fmt"
"log/slog"
"net/http"
"github.com/Wei-Shaw/sub2api/internal/pkg/tlsfingerprint"
"github.com/Wei-Shaw/sub2api/internal/util/logredact"
)
// isNodeTLSProxyEnabled 检查 Node.js TLS 代理是否启用
func (s *httpUpstreamService) isNodeTLSProxyEnabled() bool {
// isTLSFingerprintRoutingEnabled 检查 TLS 指纹路由是否启用
// 复用 NodeTLSProxy.Enabled 配置项,保持配置兼容
func (s *httpUpstreamService) isTLSFingerprintRoutingEnabled() bool {
if s.cfg == nil {
return false
}
return s.cfg.Gateway.NodeTLSProxy.Enabled
}
// shouldRouteViaNodeProxy 判断请求是否应该走 Node.js TLS 代理
// shouldRouteWithTLSFingerprint 判断请求是否应该使用 TLS 指纹
// 仅拦截目标主机在 proxy_hosts 白名单中的 HTTPS 请求,
// 白名单为空时默认只代理 api.anthropic.com。
func (s *httpUpstreamService) shouldRouteViaNodeProxy(req *http.Request) bool {
func (s *httpUpstreamService) shouldRouteWithTLSFingerprint(req *http.Request) bool {
if req == nil || req.URL == nil || req.URL.Scheme != "https" {
return false
}
@ -39,7 +45,6 @@ func (s *httpUpstreamService) shouldRouteViaNodeProxy(req *http.Request) bool {
hosts := s.cfg.Gateway.NodeTLSProxy.ProxyHosts
if len(hosts) == 0 {
// 默认只代理 Anthropic
return reqHost == "api.anthropic.com"
}
for _, h := range hosts {
@ -50,51 +55,31 @@ func (s *httpUpstreamService) shouldRouteViaNodeProxy(req *http.Request) bool {
return false
}
// doViaNodeTLSProxy 通过 Node.js TLS 代理发送请求
// 将 HTTPS 请求改为 HTTP 明文发送到本地 Node.js 代理,
// 由 Node.js 进程使用原生 TLS 栈完成到上游的 HTTPS 连接。
// 原始目标主机通过 X-Forwarded-Host 传递给 Node.js 代理,
// 代理据此动态连接到正确的上游主机。
func (s *httpUpstreamService) doViaNodeTLSProxy(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) {
proxyCfg := s.cfg.Gateway.NodeTLSProxy
listenHost := proxyCfg.ListenHost
if listenHost == "" {
listenHost = "127.0.0.1"
}
listenPort := proxyCfg.ListenPort
if listenPort == 0 {
listenPort = 3456
// defaultTLSProfile 返回模拟 Claude CLI (Node.js 24.x) 的默认 TLS 指纹配置
// 所有 slice 字段留空 → dialer.go 自动使用内置的 Node.js 24.x 默认值
// ALPN 仅声明 http/1.1,与真实 CLI 行为一致undici allowH2=false
func defaultTLSProfile() *tlsfingerprint.Profile {
return &tlsfingerprint.Profile{
Name: "claude_cli_builtin",
EnableGREASE: true,
}
}
// 克隆请求,避免修改原始 req重试时需要原始 URL
proxyReq := req.Clone(req.Context())
// 安全复制 Body优先用 GetBody 工厂方法
if req.GetBody != nil {
proxyReq.Body, _ = req.GetBody()
} else {
proxyReq.Body = req.Body
}
// 保存原始目标主机,通过自定义头传给 Node.js 代理
originalHost := req.URL.Host
proxyReq.Header.Set("X-Forwarded-Host", originalHost)
// 如果账号绑定了代理(落地机 GOST通过 header 传递给 node-tls-proxy
// node-tls-proxy 会用此代理作为上游出口,实现动态路由
// doWithTLSFingerprint 使用 Go 原生 utls TLS 指纹发送请求
// 直接通过 DoWithTLS 路径,利用已有的 utls dialer 基础设施:
// - 直连Dialer (TCP → utls handshake)
// - HTTP 代理HTTPProxyDialer (CONNECT 隧道 → utls handshake)
// - SOCKS5 代理SOCKS5ProxyDialer (SOCKS5 隧道 → utls handshake)
func (s *httpUpstreamService) doWithTLSFingerprint(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) {
proxyInfo := "direct"
if proxyURL != "" {
proxyReq.Header.Set("X-Upstream-Proxy", proxyURL)
proxyInfo = logredact.RedactProxyURL(proxyURL)
}
// 重写请求 URLhttps://api.anthropic.com/v1/... → http://127.0.0.1:3456/v1/...
proxyReq.URL.Scheme = "http"
proxyReq.URL.Host = fmt.Sprintf("%s:%d", listenHost, listenPort)
slog.Debug("node_tls_proxy_rewrite",
slog.Debug("tls_fingerprint_routing",
"account_id", accountID,
"original_host", originalHost,
"rewritten_to", proxyReq.URL.Host,
"target", req.URL.Host,
"proxy", proxyInfo,
)
// 通过标准 HTTP 客户端发送(不需要 TLS代理是本地 HTTP
return s.Do(proxyReq, "", accountID, accountConcurrency)
return s.DoWithTLS(req, proxyURL, accountID, accountConcurrency, defaultTLSProfile())
}

View File

@ -26,6 +26,7 @@ import (
"github.com/Wei-Shaw/sub2api/internal/pkg/claude"
"github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/Wei-Shaw/sub2api/internal/pkg/telemetry"
"github.com/Wei-Shaw/sub2api/internal/pkg/usagestats"
"github.com/Wei-Shaw/sub2api/internal/util/responseheaders"
"github.com/Wei-Shaw/sub2api/internal/util/urlvalidator"
@ -4227,6 +4228,13 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
// 真实 CLI 在首次 messages 请求前 fire-and-forget 调用此端点。
if tokenType == "oauth" && token != "" {
TriggerBootstrapIfNeeded(account.ID, token)
// OTEL telemetry: emit pre-request events (tengu_started, tengu_api_request_started etc.)
go telemetry.EmitPreRequest(
fmt.Sprintf("%d", account.ID),
token,
reqModel,
getHeaderRaw(c.Request.Header, "anthropic-beta"),
)
}
// 获取代理URL自定义 base URL 模式下proxy 通过 buildCustomRelayURL 作为查询参数传递)
@ -4644,6 +4652,17 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
// 处理正常响应
// OTEL telemetry: emit post-request events (fire-and-forget)
if tokenType == "oauth" && token != "" {
go telemetry.EmitPostRequest(
fmt.Sprintf("%d", account.ID),
token,
reqModel,
getHeaderRaw(c.Request.Header, "anthropic-beta"),
resp.StatusCode,
)
}
// 触发上游接受回调(提前释放串行锁,不等流完成)
if parsed.OnUpstreamAccepted != nil {
parsed.OnUpstreamAccepted()