win b285fb7b2f fix: 对齐 Claude Code 2.1.88 源码指纹
- 1P event_logging/batch 添加 OAuth Bearer auth header
- DD hostname 改为固定 "claude-code"(与真实 CLI 一致)
- 事件名对齐真实 CLI: tengu_api_query/tengu_api_success/tengu_api_error/tengu_tool_use_success
- DD header 大小写改为 DD-API-KEY
- ResponseHeaderTimeout 300s → 600s(与真实 CLI 10min 超时对齐)
2026-04-01 17:47:40 +08:00

572 lines
18 KiB
Go

// Package telemetry simulates the real Claude Code CLI's OTEL telemetry events.
//
// Real CLI emits events to two channels:
// 1. Anthropic event_logging/batch (first-party events)
// 2. Datadog log intake (third-party observability)
//
// Ported from antigravity/node-tls-proxy/proxy.js — see that file for JS original.
package telemetry
import (
"bytes"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"log/slog"
"math"
"math/rand"
"net/http"
"strings"
"sync"
"time"
claude "github.com/Wei-Shaw/sub2api/internal/pkg/claude"
)
// ─── Constants ───────────────────────────────────────────
const (
ddAPIKey = "pubbbf48e6d78dae54bceaa4acf463299bf"
fakeNodeVersion = "v24.3.0"
buildTime = "2026-03-31T01:39:46Z"
sessionMaxAge = time.Hour
sessionCleanup = 5 * time.Minute
telemetryTimeout = 10 * time.Second
)
// ─── Virtual Host Identity ───────────────────────────────
var (
mbpNames = []string{"alex", "sam", "chris", "max", "lee", "kai", "jamie", "taylor", "morgan", "casey", "drew", "avery", "riley", "blake", "jordan", "ryan", "parker", "quinn", "reese", "cameron"}
mbpSuffix = []string{"-MBP", "-MacBook", "-MacBook-Pro", "-MacBook-Air", "s-MBP", "s-MacBook", "s-MacBook-Pro"}
)
type hostIdentity struct {
Hostname string
Username string
Terminal string
Shell string
MachineID string
Arch string
OSVersion string
KernelRelease string
ExecPath string
RipgrepVersion string
RipgrepPath string
McpServerCount int
McpFailCount int
}
func hashField(seed, field string) []byte {
h := sha256.Sum256([]byte(seed + ":" + field))
return h[:]
}
func generateHostIdentity(seed string) hostIdentity {
hb := hashField(seed, "hostname")
name := mbpNames[int(hb[0])%len(mbpNames)]
sfx := mbpSuffix[int(hb[1])%len(mbpSuffix)]
termRoll := int(hashField(seed, "terminal")[0]) % 100
var terminal string
switch {
case termRoll < 75:
terminal = "xterm-256color"
case termRoll < 88:
terminal = "screen-256color"
case termRoll < 96:
terminal = "alacritty"
default:
terminal = "kitty"
}
shellRoll := int(hashField(seed, "shell")[0]) % 100
var shell string
switch {
case shellRoll < 65:
shell = "/bin/zsh"
case shellRoll < 82:
shell = "/usr/local/bin/zsh"
case shellRoll < 93:
shell = "/bin/bash"
default:
shell = "/opt/homebrew/bin/fish"
}
mid := hashField(seed, "machine-id")
machineID := fmt.Sprintf("%s-%s-%s-%s-%s",
strings.ToUpper(hex.EncodeToString(mid[0:4])),
strings.ToUpper(hex.EncodeToString(mid[4:6])),
strings.ToUpper(hex.EncodeToString(mid[6:8])),
strings.ToUpper(hex.EncodeToString(mid[8:10])),
strings.ToUpper(hex.EncodeToString(mid[10:16])),
)
osb := hashField(seed, "os")
major := 13 + int(osb[0])%3
minor := int(osb[1]) % 8
patch := int(osb[2]) % 5
darwinMajor := major + 9 // macOS 13 = Darwin 22
darwinMinor := int(osb[3]) % 7
darwinPatch := int(osb[4]) % 3
archRoll := int(hashField(seed, "arch")[0]) % 100
arch := "arm64"
if archRoll >= 70 {
arch = "x64"
}
execRoll := int(hashField(seed, "exec")[0]) % 100
var execPath string
switch {
case execRoll < 40:
execPath = "/usr/local/bin/claude"
case execRoll < 70:
execPath = "/opt/homebrew/bin/claude"
case execRoll < 90:
execPath = fmt.Sprintf("/Users/%s/.npm-global/bin/claude", name)
default:
execPath = fmt.Sprintf("/Users/%s/.local/bin/claude", name)
}
rgVersions := []string{"14.1.1", "14.1.0", "14.0.3", "14.0.2", "13.0.0", "14.1.2", "14.0.1"}
rgPaths := []string{"/opt/homebrew/bin/rg", "/usr/local/bin/rg", "/Users/" + name + "/.cargo/bin/rg", "/usr/bin/rg"}
rb := hashField(seed, "ripgrep")
return hostIdentity{
Hostname: name + sfx,
Username: name,
Terminal: terminal,
Shell: shell,
MachineID: machineID,
Arch: arch,
OSVersion: fmt.Sprintf("%d.%d.%d", major, minor, patch),
KernelRelease: fmt.Sprintf("%d.%d.%d", darwinMajor, darwinMinor, darwinPatch),
ExecPath: execPath,
RipgrepVersion: rgVersions[int(rb[0])%len(rgVersions)],
RipgrepPath: rgPaths[int(rb[1])%len(rgPaths)],
McpServerCount: int(rb[2])%5 + 1,
McpFailCount: int(rb[3]) % 3,
}
}
// ─── Session State ───────────────────────────────────────
type sessionState struct {
SessionID string
DeviceID string
HostID hostIdentity
StartTime time.Time
RequestCount int64
RipgrepReported bool
}
var (
sessions = make(map[string]*sessionState)
sessionsMu sync.Mutex
)
func init() {
go func() {
ticker := time.NewTicker(sessionCleanup)
defer ticker.Stop()
for range ticker.C {
now := time.Now()
sessionsMu.Lock()
for k, s := range sessions {
if now.Sub(s.StartTime) > sessionMaxAge {
delete(sessions, k)
}
}
sessionsMu.Unlock()
}
}()
}
func generateDeviceID(accountSeed string) string {
h := sha256.Sum256([]byte("device:" + accountSeed))
return hex.EncodeToString(h[:])
}
func getOrCreateSession(deviceID string) *sessionState {
sessionsMu.Lock()
defer sessionsMu.Unlock()
if s, ok := sessions[deviceID]; ok {
return s
}
s := &sessionState{
SessionID: generateUUID(),
DeviceID: deviceID,
HostID: generateHostIdentity(deviceID),
StartTime: time.Now(),
}
sessions[deviceID] = s
return s
}
func generateUUID() string {
b := make([]byte, 16)
rand.Read(b)
b[6] = (b[6] & 0x0f) | 0x40 // version 4
b[8] = (b[8] & 0x3f) | 0x80 // variant
return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16])
}
// ─── Process Metrics Simulation ──────────────────────────
func buildProcessMetrics(uptime float64) string {
baseRss := 180_000_000.0 + math.Min(uptime*50_000, 200_000_000)
rss := int64(baseRss + rand.Float64()*80_000_000)
heapTotal := int64(float64(rss)*0.6 + rand.Float64()*10_000_000)
heapUsed := int64(float64(heapTotal)*0.5 + rand.Float64()*float64(heapTotal)*0.3)
metrics := map[string]any{
"uptime": uptime,
"rss": rss,
"heapTotal": heapTotal,
"heapUsed": heapUsed,
"external": 14_000_000 + rand.Intn(2_000_000),
"arrayBuffers": rand.Intn(200_000),
"constrainedMemory": 51539607552,
"cpuUsage": map[string]int64{
"user": int64(uptime*10_000 + rand.Float64()*300_000),
"system": int64(uptime*2_000 + rand.Float64()*80_000),
},
"cpuPercent": rand.Float64() * 200,
}
data, _ := json.Marshal(metrics)
return base64.StdEncoding.EncodeToString(data)
}
// ─── Env Block ───────────────────────────────────────────
func buildEnvBlock(hostID hostIdentity) map[string]any {
return map[string]any{
"platform": "darwin",
"node_version": fakeNodeVersion,
"terminal": hostID.Terminal,
"package_managers": "npm,pnpm",
"runtimes": "deno,node",
"is_running_with_bun": true,
"is_ci": false,
"is_claubbit": false,
"is_github_action": false,
"is_claude_code_action": false,
"is_claude_ai_auth": false,
"version": claude.DefaultCLIVersion,
"arch": hostID.Arch,
"is_claude_code_remote": false,
"deployment_environment": "unknown-darwin",
"is_conductor": false,
"version_base": claude.DefaultCLIVersion,
"build_time": buildTime,
"is_local_agent_mode": false,
"vcs": "git",
"platform_raw": "darwin",
}
}
// ─── Event Building ──────────────────────────────────────
type eventWrapper struct {
EventType string `json:"event_type"`
EventData map[string]any `json:"event_data"`
}
func buildEvent(eventName string, session *sessionState, model, betas string, extraData map[string]any, tsOverride string) eventWrapper {
uptime := time.Since(session.StartTime).Seconds()
pm := buildProcessMetrics(uptime)
ts := tsOverride
if ts == "" {
ts = time.Now().UTC().Format(time.RFC3339Nano)
}
if model == "" {
model = "claude-sonnet-4-6"
}
if betas == "" {
betas = "claude-code-20250219,interleaved-thinking-2025-05-14"
}
data := map[string]any{
"event_name": eventName,
"client_timestamp": ts,
"model": model,
"session_id": session.SessionID,
"user_type": "external",
"betas": betas,
"env": buildEnvBlock(session.HostID),
"entrypoint": "cli",
"is_interactive": true,
"client_type": "cli",
"process": pm,
"event_id": generateUUID(),
"device_id": session.DeviceID,
}
for k, v := range extraData {
data[k] = v
}
return eventWrapper{
EventType: "ClaudeCodeInternalEvent",
EventData: data,
}
}
// ─── Send Functions ──────────────────────────────────────
var httpClient = &http.Client{Timeout: telemetryTimeout}
func sendTelemetryEvents(events []eventWrapper, session *sessionState, authToken string) {
if len(events) == 0 {
return
}
payload := map[string]any{"events": events}
body, err := json.Marshal(payload)
if err != nil {
return
}
req, err := http.NewRequest("POST", "https://api.anthropic.com/api/event_logging/batch", bytes.NewReader(body))
if err != nil {
return
}
req.Header.Set("Accept", "application/json, text/plain, */*")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "claude-code/"+claude.DefaultCLIVersion)
req.Header.Set("x-service-name", "claude-code")
if authToken != "" {
req.Header.Set("Authorization", "Bearer "+authToken)
}
resp, err := httpClient.Do(req)
if err != nil {
slog.Debug("telemetry_error", "error", err.Error())
return
}
resp.Body.Close()
slog.Debug("telemetry_sent", "status", resp.StatusCode, "events", len(events))
}
func sendDatadogLog(eventName string, session *sessionState, model string) {
hostID := session.HostID
uptime := time.Since(session.StartTime).Seconds()
if model == "" {
model = "claude-sonnet-4-6"
}
baseRss := 180_000_000.0 + math.Min(uptime*50_000, 200_000_000)
rss := int64(baseRss + rand.Float64()*80_000_000)
heapTotal := int64(float64(rss)*0.6 + rand.Float64()*10_000_000)
heapUsed := int64(float64(heapTotal)*0.5 + rand.Float64()*float64(heapTotal)*0.3)
pm := map[string]any{
"uptime": uptime,
"rss": rss,
"heapTotal": heapTotal,
"heapUsed": heapUsed,
"external": 14_000_000 + rand.Intn(2_000_000),
"arrayBuffers": rand.Intn(10_000),
"constrainedMemory": 0,
"cpuUsage": map[string]int64{
"user": int64(uptime*10_000 + rand.Float64()*300_000),
"system": int64(uptime*2_000 + rand.Float64()*80_000),
},
}
entry := map[string]any{
"ddsource": "nodejs",
"ddtags": fmt.Sprintf("event:%s,arch:%s,client_type:cli,model:%s,platform:darwin,user_type:external,version:%s,version_base:%s", eventName, hostID.Arch, model, claude.DefaultCLIVersion, claude.DefaultCLIVersion),
"message": eventName,
"service": "claude-code",
"hostname": "claude-code",
"env": "external",
"model": model,
"session_id": session.SessionID,
"user_type": "external",
"entrypoint": "cli",
"is_interactive": "true",
"client_type": "cli",
"process_metrics": pm,
"platform": "darwin",
"platform_raw": "darwin",
"arch": hostID.Arch,
"node_version": fakeNodeVersion,
"version": claude.DefaultCLIVersion,
"version_base": claude.DefaultCLIVersion,
"build_time": buildTime,
"deployment_environment": "unknown-darwin",
"vcs": "git",
}
body, err := json.Marshal([]any{entry})
if err != nil {
return
}
req, err := http.NewRequest("POST", "https://http-intake.logs.us5.datadoghq.com/api/v2/logs", bytes.NewReader(body))
if err != nil {
return
}
req.Header.Set("Accept", "application/json, text/plain, */*")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "axios/1.13.6")
req.Header.Set("DD-API-KEY", ddAPIKey)
resp, err := httpClient.Do(req)
if err != nil {
return
}
resp.Body.Close()
}
// ─── Public API ──────────────────────────────────────────
// EmitPreRequest fires pre-request telemetry events for a /v1/messages request.
// accountSeed should be a stable identifier for the account (e.g. account ID or OAuth token suffix).
// authHeader is the Authorization header value (used for device ID derivation).
// authToken is the raw OAuth token (without "Bearer " prefix) for 1P auth.
// model is the model name from the request body (e.g. "claude-sonnet-4-6").
// betaHeader is the anthropic-beta header value.
func EmitPreRequest(accountSeed, authHeader, authToken, model, betaHeader string) {
authSuffix := authHeader
if len(authSuffix) > 16 {
authSuffix = authSuffix[len(authSuffix)-16:]
}
deviceID := generateDeviceID(accountSeed + ":" + authSuffix)
session := getOrCreateSession(deviceID)
session.RequestCount++
if model == "" {
model = "claude-sonnet-4-6"
}
betas := betaHeader
if betas == "" {
betas = claude.DefaultBetaHeader
}
// First request: full startup sequence
if session.RequestCount == 1 {
hostID := session.HostID
baseTime := time.Now()
ts := func(offsetMs int) string {
return baseTime.Add(time.Duration(offsetMs) * time.Millisecond).UTC().Format(time.RFC3339Nano)
}
batch1 := []eventWrapper{
buildEvent("tengu_started", session, model, betas, nil, ts(0)),
buildEvent("tengu_init", session, model, betas, nil, ts(80+rand.Intn(120))),
buildEvent("tengu_ripgrep_availability", session, model, betas, map[string]any{
"ripgrep_available": true,
"ripgrep_version": hostID.RipgrepVersion,
"ripgrep_path": hostID.RipgrepPath,
}, ts(200+rand.Intn(150))),
}
// MCP connection events
mcpOffset := 400
mcpSuccessCount := hostID.McpServerCount - hostID.McpFailCount
for i := 0; i < hostID.McpFailCount; i++ {
mcpOffset += 100 + rand.Intn(300)
batch1 = append(batch1, buildEvent("tengu_mcp_server_connection_failed", session, model, betas, nil, ts(mcpOffset)))
}
for i := 0; i < mcpSuccessCount; i++ {
mcpOffset += 200 + rand.Intn(500)
batch1 = append(batch1, buildEvent("tengu_mcp_server_connection_succeeded", session, model, betas, nil, ts(mcpOffset)))
}
session.RipgrepReported = true
go sendTelemetryEvents(batch1, session, authToken)
go sendDatadogLog("tengu_started", session, model)
go sendDatadogLog("tengu_init", session, model)
// Delayed batch (~25-35s later, matches real CLI timing)
go func() {
time.Sleep(time.Duration(25000+rand.Intn(10000)) * time.Millisecond)
sendTelemetryEvents([]eventWrapper{
buildEvent("tengu_session_init", session, model, betas, nil, ""),
buildEvent("tengu_context_loaded", session, model, betas, nil, ""),
}, session, authToken)
}()
}
// Every request: tengu_api_query (real CLI event name)
go sendTelemetryEvents([]eventWrapper{
buildEvent("tengu_api_query", session, model, betas, nil, ""),
}, session, authToken)
}
// EmitPostRequest fires post-request telemetry events after upstream response.
func EmitPostRequest(accountSeed, authHeader, authToken, model, betaHeader string, statusCode int) {
authSuffix := authHeader
if len(authSuffix) > 16 {
authSuffix = authSuffix[len(authSuffix)-16:]
}
deviceID := generateDeviceID(accountSeed + ":" + authSuffix)
session := getOrCreateSession(deviceID)
if model == "" {
model = "claude-sonnet-4-6"
}
betas := betaHeader
if betas == "" {
betas = claude.DefaultBetaHeader
}
// Real CLI uses tengu_api_success on success, tengu_api_error on failure
if statusCode < 400 {
events := []eventWrapper{
buildEvent("tengu_api_success", session, model, betas, nil, ""),
}
go sendTelemetryEvents(events, session, authToken)
go sendDatadogLog("tengu_api_success", session, model)
} else {
var errMsg string
switch {
case statusCode == 429:
errMsg = "rate_limit_exceeded"
case statusCode == 529:
errMsg = "overloaded"
case statusCode >= 500:
errMsg = "server_error"
default:
errMsg = "client_error"
}
errEvent := buildEvent("tengu_api_error", session, model, betas, map[string]any{
"error_type": "TelemetrySafeError",
"error_code": statusCode,
"error_message": errMsg,
}, "")
go sendTelemetryEvents([]eventWrapper{errEvent}, session, authToken)
go sendDatadogLog("tengu_api_error", session, model)
}
// Random tool_use event (30% probability, 2-7s delay)
if rand.Float64() < 0.3 {
go func() {
time.Sleep(time.Duration(2000+rand.Intn(5000)) * time.Millisecond)
sendTelemetryEvents([]eventWrapper{
buildEvent("tengu_tool_use_success", session, model, betas, nil, ""),
}, session, authToken)
}()
}
}
// Jitter returns a random delay to inject before forwarding a request.
// 80% fast (80-300ms exponential), 20% slow (400-1200ms uniform).
func Jitter() time.Duration {
if rand.Float64() < 0.80 {
ms := 80.0 + (-math.Log(rand.Float64()) * 90.0)
return time.Duration(ms) * time.Millisecond
}
ms := 400.0 + rand.Float64()*800.0
return time.Duration(ms) * time.Millisecond
}