win 75c3c01f46 feat: 遥测模拟 — 模拟 Claude CLI 的 event_logging + DataDog 日志
基于真实 Claude CLI 2.1.81 抓包数据实现:
- POST api.anthropic.com/api/event_logging/batch(请求前后自动发送)
- POST http-intake.logs.us5.datadoghq.com/api/v2/logs
- 事件类型:tengu_started, tengu_init, tengu_api_request_started/completed
- 每个账号独立 session_id + device_id
- process_metrics base64 编码(匹配真实格式)
- 可通过 TELEMETRY_ENABLED=false 关闭
2026-03-25 11:37:27 +08:00

495 lines
19 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

'use strict';
const http = require('http');
const https = require('https');
const http2 = require('http2');
const net = require('net');
const crypto = require('crypto');
// ─── 配置 ───────────────────────────────────────────────
const UPSTREAM_HOST = process.env.UPSTREAM_HOST || 'api.anthropic.com';
const LISTEN_PORT = parseInt(process.env.PROXY_PORT || '3456', 10);
const LISTEN_HOST = process.env.PROXY_HOST || '127.0.0.1';
const UPSTREAM_PROXY = process.env.UPSTREAM_PROXY || '';
const CONNECT_TIMEOUT = parseInt(process.env.CONNECT_TIMEOUT || '30000', 10);
const IDLE_TIMEOUT = parseInt(process.env.IDLE_TIMEOUT || '600000', 10);
const TELEMETRY_ENABLED = process.env.TELEMETRY_ENABLED !== 'false'; // 默认开启
const DD_API_KEY = process.env.DD_API_KEY || 'pubbbf48e6d78dae54bceaa4acf463299bf';
const CLI_VERSION = process.env.CLI_VERSION || '2.1.81';
const BUILD_TIME = process.env.BUILD_TIME || '2026-03-20T21:26:18Z';
const log = (level, msg, extra = {}) => {
const entry = { time: new Date().toISOString(), level, msg, ...extra };
process.stderr.write(JSON.stringify(entry) + '\n');
};
const HEALTH_PATH = '/__health';
const h2Hosts = new Set();
const h2Sessions = new Map();
// ─── 遥测模拟 ────────────────────────────────────────────
// 每个 device_id 的会话状态
const sessionStates = new Map();
function getOrCreateSession(deviceId) {
if (sessionStates.has(deviceId)) return sessionStates.get(deviceId);
const state = {
sessionId: crypto.randomUUID(),
deviceId,
startTime: Date.now(),
requestCount: 0,
};
sessionStates.set(deviceId, state);
return state;
}
function generateDeviceId(accountSeed) {
return crypto.createHash('sha256').update(`device:${accountSeed}`).digest('hex');
}
function buildEnvBlock() {
return {
platform: 'linux',
node_version: process.version,
terminal: 'xterm-256color',
package_managers: 'npm',
runtimes: 'node',
is_running_with_bun: false,
is_ci: false,
is_claubbit: false,
is_github_action: false,
is_claude_code_action: false,
is_claude_ai_auth: false,
version: CLI_VERSION,
arch: 'x64',
is_claude_code_remote: false,
deployment_environment: 'unknown-linux',
is_conductor: false,
version_base: CLI_VERSION,
build_time: BUILD_TIME,
is_local_agent_mode: false,
vcs: 'git',
platform_raw: 'linux',
};
}
function buildProcessMetrics(uptime) {
const rss = 200_000_000 + Math.floor(Math.random() * 100_000_000);
return Buffer.from(JSON.stringify({
uptime,
rss,
heapTotal: 30_000_000 + Math.floor(Math.random() * 5_000_000),
heapUsed: 40_000_000 + Math.floor(Math.random() * 20_000_000),
external: 14_000_000 + Math.floor(Math.random() * 2_000_000),
arrayBuffers: Math.floor(Math.random() * 10_000),
constrainedMemory: 0,
cpuUsage: { user: 100_000 + Math.floor(Math.random() * 300_000), system: 20_000 + Math.floor(Math.random() * 80_000) },
cpuPercent: Math.random() * 200,
})).toString('base64');
}
function buildEvent(eventName, session, model, betas) {
const uptime = (Date.now() - session.startTime) / 1000;
return {
event_type: 'ClaudeCodeInternalEvent',
event_data: {
event_name: eventName,
client_timestamp: new Date().toISOString(),
model: model || 'claude-sonnet-4-6',
session_id: session.sessionId,
user_type: 'external',
betas: betas || 'claude-code-20250219,interleaved-thinking-2025-05-14',
env: buildEnvBlock(),
entrypoint: 'cli',
is_interactive: true,
client_type: 'cli',
process: buildProcessMetrics(uptime),
event_id: crypto.randomUUID(),
device_id: session.deviceId,
},
};
}
// 发送遥测到 api.anthropic.com/api/event_logging/batch
function sendTelemetryEvents(events) {
if (!TELEMETRY_ENABLED || events.length === 0) return;
const body = JSON.stringify({ events });
const opts = {
hostname: 'api.anthropic.com',
port: 443,
path: '/api/event_logging/batch',
method: 'POST',
headers: {
'Accept': 'application/json, text/plain, */*',
'Content-Type': 'application/json',
'User-Agent': `claude-code/${CLI_VERSION}`,
'x-service-name': 'claude-code',
'Content-Length': Buffer.byteLength(body),
},
timeout: 10000,
};
const req = https.request(opts, (res) => {
res.resume(); // drain
log('debug', 'telemetry_sent', { status: res.statusCode, events: events.length });
});
req.on('error', (err) => {
log('debug', 'telemetry_error', { error: err.message });
});
req.on('timeout', () => req.destroy());
req.end(body);
}
// 发送 DataDog 日志
function sendDatadogLog(eventName, session, model) {
if (!TELEMETRY_ENABLED) return;
const uptime = (Date.now() - session.startTime) / 1000;
const entry = {
ddsource: 'nodejs',
ddtags: `event:${eventName},arch:x64,client_type:cli,model:${model || 'claude-sonnet-4-6'},platform:linux,user_type:external,version:${CLI_VERSION},version_base:${CLI_VERSION}`,
message: eventName,
service: 'claude-code',
hostname: 'claude-code',
env: 'external',
model: model || 'claude-sonnet-4-6',
session_id: session.sessionId,
user_type: 'external',
entrypoint: 'cli',
is_interactive: 'true',
client_type: 'cli',
process_metrics: {
uptime,
rss: 200_000_000 + Math.floor(Math.random() * 100_000_000),
heapTotal: 30_000_000 + Math.floor(Math.random() * 5_000_000),
heapUsed: 40_000_000 + Math.floor(Math.random() * 20_000_000),
external: 14_000_000 + Math.floor(Math.random() * 2_000_000),
arrayBuffers: Math.floor(Math.random() * 10_000),
constrainedMemory: 0,
cpuUsage: { user: 100_000 + Math.floor(Math.random() * 300_000), system: 20_000 + Math.floor(Math.random() * 80_000) },
},
platform: 'linux',
platform_raw: 'linux',
arch: 'x64',
node_version: process.version,
version: CLI_VERSION,
version_base: CLI_VERSION,
build_time: BUILD_TIME,
deployment_environment: 'unknown-linux',
vcs: 'git',
};
const body = JSON.stringify([entry]);
const opts = {
hostname: 'http-intake.logs.us5.datadoghq.com',
port: 443,
path: '/api/v2/logs',
method: 'POST',
headers: {
'Accept': 'application/json, text/plain, */*',
'Content-Type': 'application/json',
'User-Agent': 'axios/1.13.6',
'dd-api-key': DD_API_KEY,
'Content-Length': Buffer.byteLength(body),
},
timeout: 10000,
};
const req = https.request(opts, (res) => { res.resume(); });
req.on('error', () => {});
req.on('timeout', () => req.destroy());
req.end(body);
}
// 请求前发遥测(模拟 CLI 启动 + 初始化事件)
function emitPreRequestTelemetry(reqHeaders) {
const accountSeed = reqHeaders['x-forwarded-host'] || 'default';
const deviceId = generateDeviceId(accountSeed + ':' + (reqHeaders['authorization'] || '').slice(-16));
const session = getOrCreateSession(deviceId);
session.requestCount++;
const model = 'claude-sonnet-4-6';
const betas = reqHeaders['anthropic-beta'] || 'claude-code-20250219,interleaved-thinking-2025-05-14';
// 首次请求:发 tengu_started + tengu_init
if (session.requestCount === 1) {
const events = [
buildEvent('tengu_started', session, model, betas),
buildEvent('tengu_init', session, model, betas),
buildEvent('tengu_mcp_server_connection_succeeded', session, model, betas),
];
sendTelemetryEvents(events);
sendDatadogLog('tengu_started', session, model);
sendDatadogLog('tengu_init', session, model);
}
// 每次请求:发 request 相关事件
const events = [
buildEvent('tengu_api_request_started', session, model, betas),
];
sendTelemetryEvents(events);
}
// 请求后发遥测
function emitPostRequestTelemetry(reqHeaders, statusCode) {
const accountSeed = reqHeaders['x-forwarded-host'] || 'default';
const deviceId = generateDeviceId(accountSeed + ':' + (reqHeaders['authorization'] || '').slice(-16));
const session = getOrCreateSession(deviceId);
const model = 'claude-sonnet-4-6';
const betas = reqHeaders['anthropic-beta'] || 'claude-code-20250219,interleaved-thinking-2025-05-14';
const events = [
buildEvent('tengu_api_request_completed', session, model, betas),
];
sendTelemetryEvents(events);
sendDatadogLog('tengu_api_request_completed', session, model);
}
// ─── H2 session 管理 ────────────────────────────────────
function getOrCreateH2Session(host) {
const existing = h2Sessions.get(host);
if (existing && !existing.closed && !existing.destroyed) return existing;
if (existing) { try { existing.close(); } catch (_) {} }
const session = http2.connect(`https://${host}`);
session.on('error', (err) => {
log('warn', 'h2_session_error', { host, error: err.message });
h2Sessions.delete(host);
try { session.close(); } catch (_) {}
});
session.on('close', () => h2Sessions.delete(host));
session.on('goaway', () => { h2Sessions.delete(host); try { session.close(); } catch (_) {} });
session.setTimeout(IDLE_TIMEOUT, () => { session.close(); h2Sessions.delete(host); });
h2Sessions.set(host, session);
return session;
}
function waitForConnect(session) {
if (session.connected) return Promise.resolve();
return new Promise((resolve, reject) => {
session.once('connect', resolve);
session.once('error', reject);
const t = setTimeout(() => reject(new Error('h2 connect timeout')), CONNECT_TIMEOUT);
session.once('connect', () => clearTimeout(t));
});
}
// ─── CONNECT 隧道 ────────────────────────────────────────
function connectViaProxy(proxyUrl, targetHost, targetPort) {
return new Promise((resolve, reject) => {
const proxy = new URL(proxyUrl);
const conn = net.connect(parseInt(proxy.port || '80', 10), proxy.hostname, () => {
const auth = proxy.username
? `Proxy-Authorization: Basic ${Buffer.from(`${decodeURIComponent(proxy.username)}:${decodeURIComponent(proxy.password || '')}`).toString('base64')}\r\n`
: '';
conn.write(`CONNECT ${targetHost}:${targetPort} HTTP/1.1\r\nHost: ${targetHost}:${targetPort}\r\n${auth}\r\n`);
});
conn.once('error', reject);
conn.setTimeout(CONNECT_TIMEOUT, () => conn.destroy(new Error('CONNECT timeout')));
let buf = '';
conn.on('data', function onData(chunk) {
buf += chunk.toString();
const idx = buf.indexOf('\r\n\r\n');
if (idx === -1) return;
conn.removeListener('data', onData);
const code = parseInt(buf.split(' ')[1], 10);
if (code === 200) { conn.setTimeout(0); resolve(conn); }
else { conn.destroy(); reject(new Error(`CONNECT ${code}`)); }
});
});
}
// ─── 收集请求体 ──────────────────────────────────────────
function collectBody(req) {
return new Promise((resolve) => {
const chunks = [];
req.on('data', (c) => chunks.push(c));
req.on('end', () => resolve(Buffer.concat(chunks)));
req.on('error', () => resolve(Buffer.concat(chunks)));
});
}
// ─── H1 代理 ─────────────────────────────────────────────
function sendViaH1(targetHost, method, path, reqHeaders, body, res, savedHeaders) {
return new Promise((resolve) => {
const headers = { ...reqHeaders, host: targetHost };
['x-forwarded-host', 'connection', 'keep-alive', 'proxy-connection', 'transfer-encoding'].forEach(h => delete headers[h]);
if (body.length > 0) headers['content-length'] = String(body.length);
const opts = { hostname: targetHost, port: 443, path, method, headers, servername: targetHost, timeout: CONNECT_TIMEOUT };
const startTime = Date.now();
const finish = (requestOpts) => {
const proxyReq = https.request(requestOpts);
proxyReq.on('response', (proxyRes) => {
log('info', 'proxy_response', { host: targetHost, status: proxyRes.statusCode, path, proto: 'h1' });
const rh = { ...proxyRes.headers };
delete rh['connection']; delete rh['keep-alive'];
res.writeHead(proxyRes.statusCode, rh);
proxyRes.pipe(res, { end: true });
// 请求完成后发遥测
if (path.includes('/v1/messages') && savedHeaders) {
emitPostRequestTelemetry(savedHeaders, proxyRes.statusCode);
}
resolve('ok');
});
proxyReq.on('error', (err) => {
if (err.message === 'socket hang up' && (Date.now() - startTime) < 2000) {
log('info', 'h1_rejected_switching_to_h2', { host: targetHost });
h2Hosts.add(targetHost);
sendViaH2(targetHost, method, path, reqHeaders, body, res, savedHeaders).then(() => resolve('h2'));
return;
}
log('error', 'h1_error', { error: err.message, host: targetHost, path });
if (!res.headersSent) { res.writeHead(502); res.end(JSON.stringify({ error: err.message })); }
resolve('error');
});
proxyReq.on('timeout', () => proxyReq.destroy(new Error('timeout')));
proxyReq.end(body);
};
if (UPSTREAM_PROXY) {
connectViaProxy(UPSTREAM_PROXY, targetHost, 443)
.then((socket) => { opts.socket = socket; opts.agent = false; finish(opts); })
.catch((err) => { log('error', 'tunnel_failed', { error: err.message }); if (!res.headersSent) { res.writeHead(502); res.end('tunnel error'); } resolve('error'); });
} else {
finish(opts);
}
});
}
// ─── H2 代理 ─────────────────────────────────────────────
async function sendViaH2(targetHost, method, path, reqHeaders, body, res, savedHeaders) {
try {
const session = getOrCreateH2Session(targetHost);
await waitForConnect(session);
const headers = {};
const skip = new Set(['host','connection','keep-alive','proxy-connection','transfer-encoding','upgrade','x-forwarded-host','http2-settings']);
for (const [k, v] of Object.entries(reqHeaders)) {
if (!skip.has(k.toLowerCase())) headers[k] = v;
}
headers[':method'] = method;
headers[':path'] = path;
headers[':authority'] = targetHost;
headers[':scheme'] = 'https';
if (body.length > 0) headers['content-length'] = String(body.length);
const stream = session.request(headers);
let responded = false;
stream.on('response', (h2h) => {
responded = true;
const status = h2h[':status'] || 502;
const rh = {};
for (const [k, v] of Object.entries(h2h)) { if (!k.startsWith(':')) rh[k] = v; }
log('info', 'proxy_response', { host: targetHost, status, path, proto: 'h2' });
res.writeHead(status, rh);
stream.on('data', (c) => res.write(c));
stream.on('end', () => res.end());
if (path.includes('/v1/messages') && savedHeaders) {
emitPostRequestTelemetry(savedHeaders, status);
}
});
stream.on('error', (err) => {
if (err.message && err.message.includes('NGHTTP2')) {
h2Sessions.delete(targetHost);
try { session.close(); } catch (_) {}
}
if (responded) { if (!res.writableEnded) res.end(); return; }
log('error', 'h2_error', { error: err.message, host: targetHost, path });
if (!res.headersSent) { res.writeHead(502); res.end(JSON.stringify({ error: err.message })); }
});
stream.on('close', () => {
if (!responded && !res.headersSent) {
log('warn', 'h2_no_response', { host: targetHost, path });
res.writeHead(502); res.end('{"error":"h2_no_response"}');
} else if (!res.writableEnded) { res.end(); }
});
stream.setTimeout(CONNECT_TIMEOUT, () => stream.close());
stream.end(body);
} catch (err) {
log('error', 'h2_exception', { error: err.message, host: targetHost });
h2Sessions.delete(targetHost);
if (!res.headersSent) { res.writeHead(502); res.end(JSON.stringify({ error: err.message })); }
}
}
// ─── 请求入口 ─────────────────────────────────────────────
async function proxyRequest(req, res) {
const targetHost = req.headers['x-forwarded-host'] || UPSTREAM_HOST;
log('info', 'proxy_request', { host: targetHost, method: req.method, path: req.url });
// 保存原始 headers 用于遥测
const savedHeaders = { ...req.headers };
const body = await collectBody(req);
// 请求前发遥测(仅 /v1/messages 请求)
if (req.url.includes('/v1/messages') && TELEMETRY_ENABLED) {
emitPreRequestTelemetry(savedHeaders);
// 随机延迟 50-200ms 模拟真实 CLI 行为
await new Promise(r => setTimeout(r, 50 + Math.floor(Math.random() * 150)));
}
if (h2Hosts.has(targetHost)) {
await sendViaH2(targetHost, req.method, req.url, req.headers, body, res, savedHeaders);
} else {
await sendViaH1(targetHost, req.method, req.url, req.headers, body, res, savedHeaders);
}
}
// ─── HTTP 服务器 ─────────────────────────────────────────
const server = http.createServer((req, res) => {
if (req.url === HEALTH_PATH) {
res.writeHead(200, { 'content-type': 'application/json' });
res.end(JSON.stringify({
status: 'ok', node: process.version, openssl: process.versions.openssl,
uptime: process.uptime(), h2Hosts: [...h2Hosts],
telemetry: TELEMETRY_ENABLED, sessions: sessionStates.size,
}));
return;
}
proxyRequest(req, res).catch((err) => {
log('error', 'unhandled', { error: err.message });
if (!res.headersSent) { res.writeHead(500); res.end('internal error'); }
});
});
server.timeout = 0;
server.keepAliveTimeout = IDLE_TIMEOUT;
server.headersTimeout = 60000;
server.listen(LISTEN_PORT, LISTEN_HOST, () => {
log('info', 'node-tls-proxy started', {
listen: `${LISTEN_HOST}:${LISTEN_PORT}`, node: process.version, openssl: process.versions.openssl,
telemetry: TELEMETRY_ENABLED,
});
});
// 定期清理过期 session1 小时无活动)
setInterval(() => {
const now = Date.now();
for (const [id, state] of sessionStates) {
if (now - state.startTime > 3600_000) sessionStates.delete(id);
}
}, 300_000);
let stopping = false;
function shutdown(sig) {
if (stopping) return; stopping = true;
for (const s of h2Sessions.values()) try { s.close(); } catch (_) {}
h2Sessions.clear();
server.close(() => process.exit(0));
setTimeout(() => process.exit(1), 5000);
}
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
process.on('uncaughtException', (e) => log('error', 'uncaught', { error: e.message }));
process.on('unhandledRejection', (r) => log('error', 'rejection', { error: String(r) }));