win ab1d263cf4
Some checks failed
CI / test (push) Failing after 6m33s
CI / golangci-lint (push) Failing after 5s
Security Scan / backend-security (push) Failing after 7s
Security Scan / frontend-security (push) Failing after 6s
feat: 遥测优化 — 丰富事件 + 双批次 + model 解析 + beta 常量
2026-03-22 13:16:40 +08:00

531 lines
20 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

'use strict';
const http = require('http');
const https = require('https');
const http2 = require('http2');
const net = require('net');
const crypto = require('crypto');
// ─── 配置 ───────────────────────────────────────────────
const UPSTREAM_HOST = process.env.UPSTREAM_HOST || 'api.anthropic.com';
const LISTEN_PORT = parseInt(process.env.PROXY_PORT || '3456', 10);
const LISTEN_HOST = process.env.PROXY_HOST || '127.0.0.1';
const UPSTREAM_PROXY = process.env.UPSTREAM_PROXY || '';
const CONNECT_TIMEOUT = parseInt(process.env.CONNECT_TIMEOUT || '30000', 10);
const IDLE_TIMEOUT = parseInt(process.env.IDLE_TIMEOUT || '600000', 10);
const TELEMETRY_ENABLED = process.env.TELEMETRY_ENABLED !== 'false'; // 默认开启
const DD_API_KEY = process.env.DD_API_KEY || 'pubbbf48e6d78dae54bceaa4acf463299bf';
const CLI_VERSION = process.env.CLI_VERSION || '2.1.81';
const BUILD_TIME = process.env.BUILD_TIME || '2026-03-20T21:26:18Z';
const log = (level, msg, extra = {}) => {
const entry = { time: new Date().toISOString(), level, msg, ...extra };
process.stderr.write(JSON.stringify(entry) + '\n');
};
const HEALTH_PATH = '/__health';
const h2Hosts = new Set();
const h2Sessions = new Map();
// ─── 遥测模拟 ────────────────────────────────────────────
// 每个 device_id 的会话状态
const sessionStates = new Map();
function getOrCreateSession(deviceId) {
if (sessionStates.has(deviceId)) return sessionStates.get(deviceId);
const state = {
sessionId: crypto.randomUUID(),
deviceId,
startTime: Date.now(),
requestCount: 0,
};
sessionStates.set(deviceId, state);
return state;
}
function generateDeviceId(accountSeed) {
return crypto.createHash('sha256').update(`device:${accountSeed}`).digest('hex');
}
function buildEnvBlock() {
return {
platform: 'linux',
node_version: process.version,
terminal: 'xterm-256color',
package_managers: 'npm',
runtimes: 'node',
is_running_with_bun: false,
is_ci: false,
is_claubbit: false,
is_github_action: false,
is_claude_code_action: false,
is_claude_ai_auth: false,
version: CLI_VERSION,
arch: 'x64',
is_claude_code_remote: false,
deployment_environment: 'unknown-linux',
is_conductor: false,
version_base: CLI_VERSION,
build_time: BUILD_TIME,
is_local_agent_mode: false,
vcs: 'git',
platform_raw: 'linux',
};
}
function buildProcessMetrics(uptime) {
const rss = 200_000_000 + Math.floor(Math.random() * 100_000_000);
return Buffer.from(JSON.stringify({
uptime,
rss,
heapTotal: 30_000_000 + Math.floor(Math.random() * 5_000_000),
heapUsed: 40_000_000 + Math.floor(Math.random() * 20_000_000),
external: 14_000_000 + Math.floor(Math.random() * 2_000_000),
arrayBuffers: Math.floor(Math.random() * 10_000),
constrainedMemory: 0,
cpuUsage: { user: 100_000 + Math.floor(Math.random() * 300_000), system: 20_000 + Math.floor(Math.random() * 80_000) },
cpuPercent: Math.random() * 200,
})).toString('base64');
}
function buildEvent(eventName, session, model, betas) {
const uptime = (Date.now() - session.startTime) / 1000;
return {
event_type: 'ClaudeCodeInternalEvent',
event_data: {
event_name: eventName,
client_timestamp: new Date().toISOString(),
model: model || 'claude-sonnet-4-6',
session_id: session.sessionId,
user_type: 'external',
betas: betas || 'claude-code-20250219,interleaved-thinking-2025-05-14',
env: buildEnvBlock(),
entrypoint: 'cli',
is_interactive: true,
client_type: 'cli',
process: buildProcessMetrics(uptime),
event_id: crypto.randomUUID(),
device_id: session.deviceId,
},
};
}
// 发送遥测到 api.anthropic.com/api/event_logging/batch
function sendTelemetryEvents(events) {
if (!TELEMETRY_ENABLED || events.length === 0) return;
const body = JSON.stringify({ events });
const opts = {
hostname: 'api.anthropic.com',
port: 443,
path: '/api/event_logging/batch',
method: 'POST',
headers: {
'Accept': 'application/json, text/plain, */*',
'Content-Type': 'application/json',
'User-Agent': `claude-code/${CLI_VERSION}`,
'x-service-name': 'claude-code',
'Content-Length': Buffer.byteLength(body),
},
timeout: 10000,
};
const req = https.request(opts, (res) => {
res.resume(); // drain
log('debug', 'telemetry_sent', { status: res.statusCode, events: events.length });
});
req.on('error', (err) => {
log('debug', 'telemetry_error', { error: err.message });
});
req.on('timeout', () => req.destroy());
req.end(body);
}
// 发送 DataDog 日志
function sendDatadogLog(eventName, session, model) {
if (!TELEMETRY_ENABLED) return;
const uptime = (Date.now() - session.startTime) / 1000;
const entry = {
ddsource: 'nodejs',
ddtags: `event:${eventName},arch:x64,client_type:cli,model:${model || 'claude-sonnet-4-6'},platform:linux,user_type:external,version:${CLI_VERSION},version_base:${CLI_VERSION}`,
message: eventName,
service: 'claude-code',
hostname: 'claude-code',
env: 'external',
model: model || 'claude-sonnet-4-6',
session_id: session.sessionId,
user_type: 'external',
entrypoint: 'cli',
is_interactive: 'true',
client_type: 'cli',
process_metrics: {
uptime,
rss: 200_000_000 + Math.floor(Math.random() * 100_000_000),
heapTotal: 30_000_000 + Math.floor(Math.random() * 5_000_000),
heapUsed: 40_000_000 + Math.floor(Math.random() * 20_000_000),
external: 14_000_000 + Math.floor(Math.random() * 2_000_000),
arrayBuffers: Math.floor(Math.random() * 10_000),
constrainedMemory: 0,
cpuUsage: { user: 100_000 + Math.floor(Math.random() * 300_000), system: 20_000 + Math.floor(Math.random() * 80_000) },
},
platform: 'linux',
platform_raw: 'linux',
arch: 'x64',
node_version: process.version,
version: CLI_VERSION,
version_base: CLI_VERSION,
build_time: BUILD_TIME,
deployment_environment: 'unknown-linux',
vcs: 'git',
};
const body = JSON.stringify([entry]);
const opts = {
hostname: 'http-intake.logs.us5.datadoghq.com',
port: 443,
path: '/api/v2/logs',
method: 'POST',
headers: {
'Accept': 'application/json, text/plain, */*',
'Content-Type': 'application/json',
'User-Agent': 'axios/1.13.6',
'dd-api-key': DD_API_KEY,
'Content-Length': Buffer.byteLength(body),
},
timeout: 10000,
};
const req = https.request(opts, (res) => { res.resume(); });
req.on('error', () => {});
req.on('timeout', () => req.destroy());
req.end(body);
}
// 请求前发遥测(模拟 CLI 启动 + 初始化事件)
function emitPreRequestTelemetry(reqHeaders, body) {
const accountSeed = reqHeaders['x-forwarded-host'] || 'default';
const deviceId = generateDeviceId(accountSeed + ':' + (reqHeaders['authorization'] || '').slice(-16));
const session = getOrCreateSession(deviceId);
session.requestCount++;
// 从请求体解析真实 model
let model = 'claude-sonnet-4-6';
try {
const parsed = JSON.parse(body.toString());
if (parsed.model) model = parsed.model;
} catch (_) {}
const betas = reqHeaders['anthropic-beta'] || 'claude-code-20250219,context-1m-2025-08-07,interleaved-thinking-2025-05-14,redact-thinking-2026-02-12,context-management-2025-06-27,prompt-caching-scope-2026-01-05,effort-2025-11-24';
// 首次请求:发完整启动事件序列(匹配真实 CLI 抓包4-6 个事件)
if (session.requestCount === 1) {
// 第一批MCP 连接事件(真实 CLI 有多个 MCP server
const batch1 = [
buildEvent('tengu_started', session, model, betas),
buildEvent('tengu_init', session, model, betas),
buildEvent('tengu_mcp_server_connection_failed', session, model, betas),
buildEvent('tengu_mcp_server_connection_failed', session, model, betas),
buildEvent('tengu_mcp_server_connection_succeeded', session, model, betas),
buildEvent('tengu_mcp_server_connection_succeeded', session, model, betas),
];
sendTelemetryEvents(batch1);
sendDatadogLog('tengu_started', session, model);
sendDatadogLog('tengu_init', session, model);
// 第二批延迟发送(真实 CLI 间隔约 30 秒)
setTimeout(() => {
const batch2 = [
buildEvent('tengu_session_init', session, model, betas),
buildEvent('tengu_context_loaded', session, model, betas),
];
sendTelemetryEvents(batch2);
}, 25000 + Math.floor(Math.random() * 10000));
}
// 每次请求:发 request_started
const events = [
buildEvent('tengu_api_request_started', session, model, betas),
];
sendTelemetryEvents(events);
}
// 请求后发遥测
function emitPostRequestTelemetry(reqHeaders, statusCode, body) {
const accountSeed = reqHeaders['x-forwarded-host'] || 'default';
const deviceId = generateDeviceId(accountSeed + ':' + (reqHeaders['authorization'] || '').slice(-16));
const session = getOrCreateSession(deviceId);
let model = 'claude-sonnet-4-6';
try {
const parsed = JSON.parse(body.toString());
if (parsed.model) model = parsed.model;
} catch (_) {}
const betas = reqHeaders['anthropic-beta'] || 'claude-code-20250219,context-1m-2025-08-07,interleaved-thinking-2025-05-14,redact-thinking-2026-02-12,context-management-2025-06-27,prompt-caching-scope-2026-01-05,effort-2025-11-24';
// 请求完成事件
const events = [
buildEvent('tengu_api_request_completed', session, model, betas),
buildEvent('tengu_conversation_turn_completed', session, model, betas),
];
sendTelemetryEvents(events);
sendDatadogLog('tengu_api_request_completed', session, model);
// 随机发额外事件(模拟用户行为:打开文件、查看搜索等)
if (Math.random() < 0.3) {
setTimeout(() => {
const extra = [
buildEvent('tengu_tool_use_completed', session, model, betas),
];
sendTelemetryEvents(extra);
}, 2000 + Math.floor(Math.random() * 5000));
}
}
// ─── H2 session 管理 ────────────────────────────────────
function getOrCreateH2Session(host) {
const existing = h2Sessions.get(host);
if (existing && !existing.closed && !existing.destroyed) return existing;
if (existing) { try { existing.close(); } catch (_) {} }
const session = http2.connect(`https://${host}`);
session.on('error', (err) => {
log('warn', 'h2_session_error', { host, error: err.message });
h2Sessions.delete(host);
try { session.close(); } catch (_) {}
});
session.on('close', () => h2Sessions.delete(host));
session.on('goaway', () => { h2Sessions.delete(host); try { session.close(); } catch (_) {} });
session.setTimeout(IDLE_TIMEOUT, () => { session.close(); h2Sessions.delete(host); });
h2Sessions.set(host, session);
return session;
}
function waitForConnect(session) {
if (session.connected) return Promise.resolve();
return new Promise((resolve, reject) => {
session.once('connect', resolve);
session.once('error', reject);
const t = setTimeout(() => reject(new Error('h2 connect timeout')), CONNECT_TIMEOUT);
session.once('connect', () => clearTimeout(t));
});
}
// ─── CONNECT 隧道 ────────────────────────────────────────
function connectViaProxy(proxyUrl, targetHost, targetPort) {
return new Promise((resolve, reject) => {
const proxy = new URL(proxyUrl);
const conn = net.connect(parseInt(proxy.port || '80', 10), proxy.hostname, () => {
const auth = proxy.username
? `Proxy-Authorization: Basic ${Buffer.from(`${decodeURIComponent(proxy.username)}:${decodeURIComponent(proxy.password || '')}`).toString('base64')}\r\n`
: '';
conn.write(`CONNECT ${targetHost}:${targetPort} HTTP/1.1\r\nHost: ${targetHost}:${targetPort}\r\n${auth}\r\n`);
});
conn.once('error', reject);
conn.setTimeout(CONNECT_TIMEOUT, () => conn.destroy(new Error('CONNECT timeout')));
let buf = '';
conn.on('data', function onData(chunk) {
buf += chunk.toString();
const idx = buf.indexOf('\r\n\r\n');
if (idx === -1) return;
conn.removeListener('data', onData);
const code = parseInt(buf.split(' ')[1], 10);
if (code === 200) { conn.setTimeout(0); resolve(conn); }
else { conn.destroy(); reject(new Error(`CONNECT ${code}`)); }
});
});
}
// ─── 收集请求体 ──────────────────────────────────────────
function collectBody(req) {
return new Promise((resolve) => {
const chunks = [];
req.on('data', (c) => chunks.push(c));
req.on('end', () => resolve(Buffer.concat(chunks)));
req.on('error', () => resolve(Buffer.concat(chunks)));
});
}
// ─── H1 代理 ─────────────────────────────────────────────
function sendViaH1(targetHost, method, path, reqHeaders, body, res, savedHeaders) {
return new Promise((resolve) => {
const headers = { ...reqHeaders, host: targetHost };
['x-forwarded-host', 'connection', 'keep-alive', 'proxy-connection', 'transfer-encoding'].forEach(h => delete headers[h]);
if (body.length > 0) headers['content-length'] = String(body.length);
const opts = { hostname: targetHost, port: 443, path, method, headers, servername: targetHost, timeout: CONNECT_TIMEOUT };
const startTime = Date.now();
const finish = (requestOpts) => {
const proxyReq = https.request(requestOpts);
proxyReq.on('response', (proxyRes) => {
log('info', 'proxy_response', { host: targetHost, status: proxyRes.statusCode, path, proto: 'h1' });
const rh = { ...proxyRes.headers };
delete rh['connection']; delete rh['keep-alive'];
res.writeHead(proxyRes.statusCode, rh);
proxyRes.pipe(res, { end: true });
// 请求完成后发遥测
if (path.includes('/v1/messages') && savedHeaders) {
emitPostRequestTelemetry(savedHeaders, proxyRes.statusCode, body);
}
resolve('ok');
});
proxyReq.on('error', (err) => {
if (err.message === 'socket hang up' && (Date.now() - startTime) < 2000) {
log('info', 'h1_rejected_switching_to_h2', { host: targetHost });
h2Hosts.add(targetHost);
sendViaH2(targetHost, method, path, reqHeaders, body, res, savedHeaders).then(() => resolve('h2'));
return;
}
log('error', 'h1_error', { error: err.message, host: targetHost, path });
if (!res.headersSent) { res.writeHead(502); res.end(JSON.stringify({ error: err.message })); }
resolve('error');
});
proxyReq.on('timeout', () => proxyReq.destroy(new Error('timeout')));
proxyReq.end(body);
};
if (UPSTREAM_PROXY) {
connectViaProxy(UPSTREAM_PROXY, targetHost, 443)
.then((socket) => { opts.socket = socket; opts.agent = false; finish(opts); })
.catch((err) => { log('error', 'tunnel_failed', { error: err.message }); if (!res.headersSent) { res.writeHead(502); res.end('tunnel error'); } resolve('error'); });
} else {
finish(opts);
}
});
}
// ─── H2 代理 ─────────────────────────────────────────────
async function sendViaH2(targetHost, method, path, reqHeaders, body, res, savedHeaders) {
try {
const session = getOrCreateH2Session(targetHost);
await waitForConnect(session);
const headers = {};
const skip = new Set(['host','connection','keep-alive','proxy-connection','transfer-encoding','upgrade','x-forwarded-host','http2-settings']);
for (const [k, v] of Object.entries(reqHeaders)) {
if (!skip.has(k.toLowerCase())) headers[k] = v;
}
headers[':method'] = method;
headers[':path'] = path;
headers[':authority'] = targetHost;
headers[':scheme'] = 'https';
if (body.length > 0) headers['content-length'] = String(body.length);
const stream = session.request(headers);
let responded = false;
stream.on('response', (h2h) => {
responded = true;
const status = h2h[':status'] || 502;
const rh = {};
for (const [k, v] of Object.entries(h2h)) { if (!k.startsWith(':')) rh[k] = v; }
log('info', 'proxy_response', { host: targetHost, status, path, proto: 'h2' });
res.writeHead(status, rh);
stream.on('data', (c) => res.write(c));
stream.on('end', () => res.end());
if (path.includes('/v1/messages') && savedHeaders) {
emitPostRequestTelemetry(savedHeaders, status);
}
});
stream.on('error', (err) => {
if (err.message && err.message.includes('NGHTTP2')) {
h2Sessions.delete(targetHost);
try { session.close(); } catch (_) {}
}
if (responded) { if (!res.writableEnded) res.end(); return; }
log('error', 'h2_error', { error: err.message, host: targetHost, path });
if (!res.headersSent) { res.writeHead(502); res.end(JSON.stringify({ error: err.message })); }
});
stream.on('close', () => {
if (!responded && !res.headersSent) {
log('warn', 'h2_no_response', { host: targetHost, path });
res.writeHead(502); res.end('{"error":"h2_no_response"}');
} else if (!res.writableEnded) { res.end(); }
});
stream.setTimeout(CONNECT_TIMEOUT, () => stream.close());
stream.end(body);
} catch (err) {
log('error', 'h2_exception', { error: err.message, host: targetHost });
h2Sessions.delete(targetHost);
if (!res.headersSent) { res.writeHead(502); res.end(JSON.stringify({ error: err.message })); }
}
}
// ─── 请求入口 ─────────────────────────────────────────────
async function proxyRequest(req, res) {
const targetHost = req.headers['x-forwarded-host'] || UPSTREAM_HOST;
log('info', 'proxy_request', { host: targetHost, method: req.method, path: req.url });
// 保存原始 headers 用于遥测
const savedHeaders = { ...req.headers };
const body = await collectBody(req);
// 请求前发遥测(仅 /v1/messages 请求)
if (req.url.includes('/v1/messages') && TELEMETRY_ENABLED) {
emitPreRequestTelemetry(savedHeaders, body);
// 随机延迟 50-200ms 模拟真实 CLI 行为
await new Promise(r => setTimeout(r, 50 + Math.floor(Math.random() * 150)));
}
if (h2Hosts.has(targetHost)) {
await sendViaH2(targetHost, req.method, req.url, req.headers, body, res, savedHeaders);
} else {
await sendViaH1(targetHost, req.method, req.url, req.headers, body, res, savedHeaders);
}
}
// ─── HTTP 服务器 ─────────────────────────────────────────
const server = http.createServer((req, res) => {
if (req.url === HEALTH_PATH) {
res.writeHead(200, { 'content-type': 'application/json' });
res.end(JSON.stringify({
status: 'ok', node: process.version, openssl: process.versions.openssl,
uptime: process.uptime(), h2Hosts: [...h2Hosts],
telemetry: TELEMETRY_ENABLED, sessions: sessionStates.size,
}));
return;
}
proxyRequest(req, res).catch((err) => {
log('error', 'unhandled', { error: err.message });
if (!res.headersSent) { res.writeHead(500); res.end('internal error'); }
});
});
server.timeout = 0;
server.keepAliveTimeout = IDLE_TIMEOUT;
server.headersTimeout = 60000;
server.listen(LISTEN_PORT, LISTEN_HOST, () => {
log('info', 'node-tls-proxy started', {
listen: `${LISTEN_HOST}:${LISTEN_PORT}`, node: process.version, openssl: process.versions.openssl,
telemetry: TELEMETRY_ENABLED,
});
});
// 定期清理过期 session1 小时无活动)
setInterval(() => {
const now = Date.now();
for (const [id, state] of sessionStates) {
if (now - state.startTime > 3600_000) sessionStates.delete(id);
}
}, 300_000);
let stopping = false;
function shutdown(sig) {
if (stopping) return; stopping = true;
for (const s of h2Sessions.values()) try { s.close(); } catch (_) {}
h2Sessions.clear();
server.close(() => process.exit(0));
setTimeout(() => process.exit(1), 5000);
}
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
process.on('uncaughtException', (e) => log('error', 'uncaught', { error: e.message }));
process.on('unhandledRejection', (r) => log('error', 'rejection', { error: String(r) }));