diff --git a/tools/node-tls-proxy/proxy.js b/tools/node-tls-proxy/proxy.js index 2ea4e266..55079672 100644 --- a/tools/node-tls-proxy/proxy.js +++ b/tools/node-tls-proxy/proxy.js @@ -19,222 +19,172 @@ const log = (level, msg, extra = {}) => { }; const HEALTH_PATH = '/__health'; +const h2Hosts = new Set(); // 已知需要 H2 的主机 +const h2Sessions = new Map(); // H2 session 缓存 -// ─── 协议缓存:记录哪些主机需要 H2 ────────────────────── -// 首次请求用 H1,如果秒挂(socket hang up < 2s)自动切 H2 并缓存 -const h2Hosts = new Set(); - -// ─── H2 会话池 ────────────────────────────────────────── -const h2Sessions = new Map(); - -function getH2Session(host) { +// ─── H2 session 管理 ──────────────────────────────────── +function getOrCreateH2Session(host) { const existing = h2Sessions.get(host); if (existing && !existing.closed && !existing.destroyed) return existing; + if (existing) { try { existing.close(); } catch (_) {} } const session = http2.connect(`https://${host}`); session.on('error', (err) => { log('warn', 'h2_session_error', { host, error: err.message }); h2Sessions.delete(host); + try { session.close(); } catch (_) {} }); session.on('close', () => h2Sessions.delete(host)); - session.setTimeout(IDLE_TIMEOUT, () => { - session.close(); - h2Sessions.delete(host); - }); + session.on('goaway', () => { h2Sessions.delete(host); try { session.close(); } catch (_) {} }); + session.setTimeout(IDLE_TIMEOUT, () => { session.close(); h2Sessions.delete(host); }); h2Sessions.set(host, session); return session; } +function waitForConnect(session) { + if (session.connected) return Promise.resolve(); + return new Promise((resolve, reject) => { + session.once('connect', resolve); + session.once('error', reject); + const t = setTimeout(() => reject(new Error('h2 connect timeout')), CONNECT_TIMEOUT); + session.once('connect', () => clearTimeout(t)); + }); +} + // ─── CONNECT 隧道 ──────────────────────────────────────── function connectViaProxy(proxyUrl, targetHost, targetPort) { return new Promise((resolve, reject) => { const proxy = new URL(proxyUrl); const conn = net.connect(parseInt(proxy.port || '80', 10), proxy.hostname, () => { const auth = proxy.username - ? `Proxy-Authorization: Basic ${Buffer.from( - `${decodeURIComponent(proxy.username)}:${decodeURIComponent(proxy.password || '')}` - ).toString('base64')}\r\n` + ? `Proxy-Authorization: Basic ${Buffer.from(`${decodeURIComponent(proxy.username)}:${decodeURIComponent(proxy.password || '')}`).toString('base64')}\r\n` : ''; - conn.write( - `CONNECT ${targetHost}:${targetPort} HTTP/1.1\r\n` + - `Host: ${targetHost}:${targetPort}\r\n` + auth + '\r\n' - ); + conn.write(`CONNECT ${targetHost}:${targetPort} HTTP/1.1\r\nHost: ${targetHost}:${targetPort}\r\n${auth}\r\n`); }); conn.once('error', reject); - conn.setTimeout(CONNECT_TIMEOUT, () => conn.destroy(new Error('proxy CONNECT timeout'))); - + conn.setTimeout(CONNECT_TIMEOUT, () => conn.destroy(new Error('CONNECT timeout'))); let buf = ''; - const onData = (chunk) => { + conn.on('data', function onData(chunk) { buf += chunk.toString(); const idx = buf.indexOf('\r\n\r\n'); if (idx === -1) return; conn.removeListener('data', onData); const code = parseInt(buf.split(' ')[1], 10); - if (code === 200) { - conn.setTimeout(0); - const rest = buf.slice(idx + 4); - if (rest.length > 0) conn.unshift(Buffer.from(rest)); - resolve(conn); - } else { - conn.destroy(); - reject(new Error(`CONNECT failed: ${code}`)); - } - }; - conn.on('data', onData); + if (code === 200) { conn.setTimeout(0); resolve(conn); } + else { conn.destroy(); reject(new Error(`CONNECT ${code}`)); } + }); + }); +} + +// ─── 收集请求体 ────────────────────────────────────────── +function collectBody(req) { + return new Promise((resolve) => { + const chunks = []; + req.on('data', (c) => chunks.push(c)); + req.on('end', () => resolve(Buffer.concat(chunks))); + req.on('error', () => resolve(Buffer.concat(chunks))); }); } // ─── H1 代理 ───────────────────────────────────────────── -function proxyViaH1(targetHost, req, res) { +function sendViaH1(targetHost, method, path, reqHeaders, body, res) { return new Promise((resolve) => { - const headers = { ...req.headers }; - headers.host = targetHost; - delete headers['x-forwarded-host']; - delete headers['connection']; - delete headers['keep-alive']; - delete headers['proxy-connection']; - delete headers['transfer-encoding']; - - const opts = { - hostname: targetHost, port: 443, path: req.url, - method: req.method, headers, servername: targetHost, - timeout: CONNECT_TIMEOUT, - }; + const headers = { ...reqHeaders, host: targetHost }; + ['x-forwarded-host', 'connection', 'keep-alive', 'proxy-connection', 'transfer-encoding'].forEach(h => delete headers[h]); + if (body.length > 0) headers['content-length'] = String(body.length); + const opts = { hostname: targetHost, port: 443, path, method, headers, servername: targetHost, timeout: CONNECT_TIMEOUT }; const startTime = Date.now(); - let proxyReq; - - const doRequest = (requestOpts) => { - proxyReq = https.request(requestOpts); + const finish = (requestOpts) => { + const proxyReq = https.request(requestOpts); proxyReq.on('response', (proxyRes) => { - log('info', 'proxy_response', { host: targetHost, status: proxyRes.statusCode, path: req.url, proto: 'h1' }); + log('info', 'proxy_response', { host: targetHost, status: proxyRes.statusCode, path, proto: 'h1' }); const rh = { ...proxyRes.headers }; - delete rh['connection']; - delete rh['keep-alive']; + delete rh['connection']; delete rh['keep-alive']; res.writeHead(proxyRes.statusCode, rh); proxyRes.pipe(res, { end: true }); - proxyRes.on('error', (e) => { log('error', 'h1_response_error', { error: e.message }); res.end(); }); resolve('ok'); }); - proxyReq.on('error', (err) => { - const elapsed = Date.now() - startTime; - // socket hang up < 2 秒 = 服务器拒绝 H1,切换到 H2 - if (err.message === 'socket hang up' && elapsed < 2000) { - log('info', 'h1_rejected_switching_to_h2', { host: targetHost, elapsed }); + if (err.message === 'socket hang up' && (Date.now() - startTime) < 2000) { + log('info', 'h1_rejected_switching_to_h2', { host: targetHost }); h2Hosts.add(targetHost); - proxyViaH2(targetHost, req, res); - resolve('h2_fallback'); + sendViaH2(targetHost, method, path, reqHeaders, body, res).then(() => resolve('h2')); return; } - log('error', 'h1_upstream_error', { error: err.message, host: targetHost, path: req.url }); - if (!res.headersSent) { - res.writeHead(502, { 'content-type': 'application/json' }); - res.end(JSON.stringify({ error: 'upstream_error', message: err.message })); - } + log('error', 'h1_error', { error: err.message, host: targetHost, path }); + if (!res.headersSent) { res.writeHead(502); res.end(JSON.stringify({ error: err.message })); } resolve('error'); }); - - proxyReq.on('timeout', () => proxyReq.destroy(new Error('upstream timeout'))); - req.on('close', () => { if (!proxyReq.destroyed) proxyReq.destroy(); }); - req.pipe(proxyReq, { end: true }); + proxyReq.on('timeout', () => proxyReq.destroy(new Error('timeout'))); + proxyReq.end(body); }; if (UPSTREAM_PROXY) { connectViaProxy(UPSTREAM_PROXY, targetHost, 443) - .then((socket) => { opts.socket = socket; opts.agent = false; doRequest(opts); }) - .catch((err) => { - log('error', 'proxy_tunnel_failed', { error: err.message, host: targetHost }); - if (!res.headersSent) { - res.writeHead(502, { 'content-type': 'application/json' }); - res.end(JSON.stringify({ error: 'proxy_tunnel_error' })); - } - resolve('error'); - }); + .then((socket) => { opts.socket = socket; opts.agent = false; finish(opts); }) + .catch((err) => { log('error', 'tunnel_failed', { error: err.message }); if (!res.headersSent) { res.writeHead(502); res.end('tunnel error'); } resolve('error'); }); } else { - doRequest(opts); + finish(opts); } }); } // ─── H2 代理 ───────────────────────────────────────────── -function proxyViaH2(targetHost, req, res) { +async function sendViaH2(targetHost, method, path, reqHeaders, body, res) { try { - const session = getH2Session(targetHost); + const session = getOrCreateH2Session(targetHost); + await waitForConnect(session); const headers = {}; - // 只拷贝合法的 H2 头(跳过 H1 专用头和连接头) - const skipHeaders = new Set([ - 'host', 'connection', 'keep-alive', 'proxy-connection', - 'transfer-encoding', 'upgrade', 'x-forwarded-host', - 'http2-settings', - ]); - for (const [k, v] of Object.entries(req.headers)) { - if (!skipHeaders.has(k.toLowerCase())) { - headers[k] = v; - } + const skip = new Set(['host','connection','keep-alive','proxy-connection','transfer-encoding','upgrade','x-forwarded-host','http2-settings']); + for (const [k, v] of Object.entries(reqHeaders)) { + if (!skip.has(k.toLowerCase())) headers[k] = v; } - - // H2 伪头 - headers[':method'] = req.method; - headers[':path'] = req.url; + headers[':method'] = method; + headers[':path'] = path; headers[':authority'] = targetHost; headers[':scheme'] = 'https'; + if (body.length > 0) headers['content-length'] = String(body.length); - const h2Stream = session.request(headers); - + const stream = session.request(headers); let responded = false; - h2Stream.on('response', (h2Headers) => { + stream.on('response', (h2h) => { responded = true; - const status = h2Headers[':status'] || 502; - const respHeaders = {}; - for (const [k, v] of Object.entries(h2Headers)) { - if (!k.startsWith(':')) respHeaders[k] = v; - } - log('info', 'proxy_response', { host: targetHost, status, path: req.url, proto: 'h2' }); - res.writeHead(status, respHeaders); - h2Stream.pipe(res, { end: true }); + const status = h2h[':status'] || 502; + const rh = {}; + for (const [k, v] of Object.entries(h2h)) { if (!k.startsWith(':')) rh[k] = v; } + log('info', 'proxy_response', { host: targetHost, status, path, proto: 'h2' }); + res.writeHead(status, rh); + stream.on('data', (c) => res.write(c)); + stream.on('end', () => res.end()); }); - h2Stream.on('error', (err) => { - log('error', 'h2_stream_error', { error: err.message, host: targetHost, path: req.url }); - h2Sessions.delete(targetHost); // 清理坏 session + stream.on('error', (err) => { + if (err.message && err.message.includes('NGHTTP2')) { + h2Sessions.delete(targetHost); + try { session.close(); } catch (_) {} + } + if (responded) { if (!res.writableEnded) res.end(); return; } + log('error', 'h2_error', { error: err.message, host: targetHost, path }); + if (!res.headersSent) { res.writeHead(502); res.end(JSON.stringify({ error: err.message })); } + }); + + stream.on('close', () => { if (!responded && !res.headersSent) { - res.writeHead(502, { 'content-type': 'application/json' }); - res.end(JSON.stringify({ error: 'h2_error', message: err.message })); - } + log('warn', 'h2_no_response', { host: targetHost, path }); + res.writeHead(502); res.end('{"error":"h2_no_response"}'); + } else if (!res.writableEnded) { res.end(); } }); - h2Stream.on('close', () => { - if (!responded && !res.headersSent) { - log('warn', 'h2_stream_closed_no_response', { host: targetHost, path: req.url }); - res.writeHead(502, { 'content-type': 'application/json' }); - res.end(JSON.stringify({ error: 'h2_no_response' })); - } - }); - - // 超时 - h2Stream.setTimeout(CONNECT_TIMEOUT, () => { - log('warn', 'h2_timeout', { host: targetHost, path: req.url }); - h2Stream.close(); - }); - - req.on('close', () => { - if (!h2Stream.destroyed) h2Stream.close(); - }); - - // pipe 请求体 - req.pipe(h2Stream, { end: true }); - + stream.setTimeout(CONNECT_TIMEOUT, () => stream.close()); + stream.end(body); } catch (err) { - log('error', 'h2_proxy_exception', { error: err.message, host: targetHost }); + log('error', 'h2_exception', { error: err.message, host: targetHost }); h2Sessions.delete(targetHost); - if (!res.headersSent) { - res.writeHead(502, { 'content-type': 'application/json' }); - res.end(JSON.stringify({ error: 'h2_exception', message: err.message })); - } + if (!res.headersSent) { res.writeHead(502); res.end(JSON.stringify({ error: err.message })); } } } @@ -243,64 +193,44 @@ async function proxyRequest(req, res) { const targetHost = req.headers['x-forwarded-host'] || UPSTREAM_HOST; log('info', 'proxy_request', { host: targetHost, method: req.method, path: req.url }); - // 已知需要 H2 的主机直接走 H2 - if (h2Hosts.has(targetHost)) { - proxyViaH2(targetHost, req, res); - return; - } + const body = await collectBody(req); - // 首次请求走 H1,如果秒挂自动切 H2 - await proxyViaH1(targetHost, req, res); + if (h2Hosts.has(targetHost)) { + await sendViaH2(targetHost, req.method, req.url, req.headers, body, res); + } else { + await sendViaH1(targetHost, req.method, req.url, req.headers, body, res); + } } // ─── HTTP 服务器 ───────────────────────────────────────── const server = http.createServer((req, res) => { if (req.url === HEALTH_PATH) { res.writeHead(200, { 'content-type': 'application/json' }); - res.end(JSON.stringify({ - status: 'ok', - upstream: UPSTREAM_HOST, - node: process.version, - openssl: process.versions.openssl, - uptime: process.uptime(), - h2Hosts: [...h2Hosts], - })); + res.end(JSON.stringify({ status: 'ok', node: process.version, openssl: process.versions.openssl, uptime: process.uptime(), h2Hosts: [...h2Hosts] })); return; } proxyRequest(req, res).catch((err) => { - log('error', 'unhandled_error', { error: err.message }); - if (!res.headersSent) { - res.writeHead(500, { 'content-type': 'application/json' }); - res.end(JSON.stringify({ error: 'internal_error' })); - } + log('error', 'unhandled', { error: err.message }); + if (!res.headersSent) { res.writeHead(500); res.end('internal error'); } }); }); server.timeout = 0; server.keepAliveTimeout = IDLE_TIMEOUT; server.headersTimeout = 60000; - server.listen(LISTEN_PORT, LISTEN_HOST, () => { - log('info', 'node-tls-proxy started', { - listen: `${LISTEN_HOST}:${LISTEN_PORT}`, - upstream: `${UPSTREAM_HOST}:443`, - proxy: UPSTREAM_PROXY || '(direct)', - node: process.version, - openssl: process.versions.openssl, - }); + log('info', 'node-tls-proxy started', { listen: `${LISTEN_HOST}:${LISTEN_PORT}`, node: process.version, openssl: process.versions.openssl }); }); -let shuttingDown = false; -function shutdown(signal) { - if (shuttingDown) return; - shuttingDown = true; - log('info', `received ${signal}, shutting down`); - for (const s of h2Sessions.values()) s.close(); +let stopping = false; +function shutdown(sig) { + if (stopping) return; stopping = true; + for (const s of h2Sessions.values()) try { s.close(); } catch (_) {} h2Sessions.clear(); server.close(() => process.exit(0)); setTimeout(() => process.exit(1), 5000); } process.on('SIGTERM', () => shutdown('SIGTERM')); process.on('SIGINT', () => shutdown('SIGINT')); -process.on('uncaughtException', (err) => log('error', 'uncaught', { error: err.message, stack: err.stack })); -process.on('unhandledRejection', (r) => log('error', 'unhandled_rejection', { error: String(r) })); +process.on('uncaughtException', (e) => log('error', 'uncaught', { error: e.message })); +process.on('unhandledRejection', (r) => log('error', 'rejection', { error: String(r) }));