sub2api/backend/internal/service/lsrpc_handler.go
win 9da079a5ee
Some checks failed
CI / test (push) Failing after 3s
CI / frontend (push) Failing after 3s
CI / golangci-lint (push) Failing after 3s
Security Scan / backend-security (push) Failing after 3s
Security Scan / frontend-security (push) Failing after 5s
CI / windsurf-platform (macos-latest) (push) Has been cancelled
CI / windsurf-platform (windows-latest) (push) Has been cancelled
x
2026-04-27 19:01:41 +08:00

354 lines
12 KiB
Go

package service
import (
"context"
"fmt"
"io/fs"
"log/slog"
"net/http"
"os"
"path/filepath"
"time"
connect "connectrpc.com/connect"
"github.com/Wei-Shaw/sub2api/internal/gen/language_server_pb"
"github.com/Wei-Shaw/sub2api/internal/gen/language_server_pbconnect"
"github.com/Wei-Shaw/sub2api/internal/pkg/antigravity"
"google.golang.org/protobuf/types/known/timestamppb"
)
const upstreamLSRPCBaseURL = "https://cloudcode-pa.googleapis.com"
// LSRPCHandler implements LanguageServerServiceHandler by proxying to the real upstream
// lsrpc service using OAuth tokens obtained from AntigravityGatewayService.
// File RPCs (ReadFile/WriteFile/ReadDir/etc.) operate on the local filesystem.
type LSRPCHandler struct {
language_server_pbconnect.UnimplementedLanguageServerServiceHandler
antigravitySvc *AntigravityGatewayService
accountRepo AccountRepository
logger *slog.Logger
}
// NewLSRPCHandler creates a new LSRPCHandler.
func NewLSRPCHandler(
antigravitySvc *AntigravityGatewayService,
accountRepo AccountRepository,
logger *slog.Logger,
) *LSRPCHandler {
if logger == nil {
logger = slog.Default()
}
return &LSRPCHandler{
antigravitySvc: antigravitySvc,
accountRepo: accountRepo,
logger: logger,
}
}
// upstreamClient creates a connectrpc client to the real lsrpc upstream,
// authenticated with the OAuth token from the given account.
func (h *LSRPCHandler) upstreamClient(ctx context.Context) (language_server_pbconnect.LanguageServerServiceClient, error) {
accounts, err := h.accountRepo.ListByPlatform(ctx, PlatformAntigravity)
if err != nil || len(accounts) == 0 {
return nil, fmt.Errorf("no antigravity accounts available: %w", err)
}
account := &accounts[0]
tokenProvider := h.antigravitySvc.GetTokenProvider()
if tokenProvider == nil {
return nil, fmt.Errorf("antigravity token provider not configured")
}
accessToken, err := tokenProvider.GetAccessToken(ctx, account)
if err != nil {
return nil, fmt.Errorf("failed to get access token: %w", err)
}
httpClient := &http.Client{
Timeout: 5 * time.Minute,
Transport: &bearerTransport{
base: http.DefaultTransport,
token: accessToken,
},
}
client := language_server_pbconnect.NewLanguageServerServiceClient(
httpClient,
upstreamLSRPCBaseURL,
connect.WithGRPC(),
)
return client, nil
}
// bearerTransport injects Authorization: Bearer <token> into every request.
type bearerTransport struct {
base http.RoundTripper
token string
}
func (t *bearerTransport) RoundTrip(req *http.Request) (*http.Response, error) {
clone := req.Clone(req.Context())
clone.Header.Set("Authorization", "Bearer "+t.token)
return t.base.RoundTrip(clone)
}
// ============================================================================
// Cascade RPCs — proxied to real upstream
// ============================================================================
func (h *LSRPCHandler) StartCascade(
ctx context.Context,
req *connect.Request[language_server_pb.StartCascadeRequest],
) (*connect.Response[language_server_pb.StartCascadeResponse], error) {
client, err := h.upstreamClient(ctx)
if err != nil {
return nil, connect.NewError(connect.CodeUnavailable, err)
}
return client.StartCascade(ctx, req)
}
func (h *LSRPCHandler) SendUserCascadeMessage(
ctx context.Context,
req *connect.Request[language_server_pb.SendUserCascadeMessageRequest],
stream *connect.ServerStream[language_server_pb.CascadeReactiveUpdate],
) error {
client, err := h.upstreamClient(ctx)
if err != nil {
return connect.NewError(connect.CodeUnavailable, err)
}
upstreamStream, err := client.SendUserCascadeMessage(ctx, req)
if err != nil {
return err
}
defer upstreamStream.Close()
for upstreamStream.Receive() {
if err := stream.Send(upstreamStream.Msg()); err != nil {
return err
}
}
return upstreamStream.Err()
}
func (h *LSRPCHandler) CancelCascadeInvocation(
ctx context.Context,
req *connect.Request[language_server_pb.CancelCascadeInvocationRequest],
) (*connect.Response[language_server_pb.CancelCascadeInvocationResponse], error) {
client, err := h.upstreamClient(ctx)
if err != nil {
return nil, connect.NewError(connect.CodeUnavailable, err)
}
return client.CancelCascadeInvocation(ctx, req)
}
func (h *LSRPCHandler) AcknowledgeCascadeCodeEdit(
ctx context.Context,
req *connect.Request[language_server_pb.AcknowledgeCascadeCodeEditRequest],
) (*connect.Response[language_server_pb.AcknowledgeCascadeCodeEditResponse], error) {
client, err := h.upstreamClient(ctx)
if err != nil {
return nil, connect.NewError(connect.CodeUnavailable, err)
}
return client.AcknowledgeCascadeCodeEdit(ctx, req)
}
// ============================================================================
// Model config RPCs — proxied to real upstream
// ============================================================================
func (h *LSRPCHandler) GetCascadeModelConfigs(
ctx context.Context,
req *connect.Request[language_server_pb.GetCascadeModelConfigsRequest],
) (*connect.Response[language_server_pb.GetCascadeModelConfigsResponse], error) {
client, err := h.upstreamClient(ctx)
if err != nil {
// Fall back to static list when upstream unavailable.
return connect.NewResponse(&language_server_pb.GetCascadeModelConfigsResponse{
Models: staticCascadeModels(),
}), nil
}
resp, err := client.GetCascadeModelConfigs(ctx, req)
if err != nil {
return connect.NewResponse(&language_server_pb.GetCascadeModelConfigsResponse{
Models: staticCascadeModels(),
}), nil
}
return resp, nil
}
func (h *LSRPCHandler) GetCommandModelConfigs(
ctx context.Context,
req *connect.Request[language_server_pb.GetCommandModelConfigsRequest],
) (*connect.Response[language_server_pb.GetCommandModelConfigsResponse], error) {
client, err := h.upstreamClient(ctx)
if err != nil {
return connect.NewResponse(&language_server_pb.GetCommandModelConfigsResponse{
Models: staticCascadeModels(),
}), nil
}
resp, err := client.GetCommandModelConfigs(ctx, req)
if err != nil {
return connect.NewResponse(&language_server_pb.GetCommandModelConfigsResponse{
Models: staticCascadeModels(),
}), nil
}
return resp, nil
}
// staticCascadeModels returns a hard-coded model list as fallback.
func staticCascadeModels() []*language_server_pb.ModelConfig {
return []*language_server_pb.ModelConfig{
{Name: "claude-opus-4-7", DisplayName: "Claude Opus 4.7", MaxTokens: 200000, SupportsThinking: true, ThinkingBudget: 32000, SupportsImages: true, Provider: "anthropic"},
{Name: "claude-opus-4-6", DisplayName: "Claude Opus 4.6", MaxTokens: 200000, SupportsThinking: true, ThinkingBudget: 32000, SupportsImages: true, Provider: "anthropic"},
{Name: "claude-sonnet-4-6", DisplayName: "Claude Sonnet 4.6", MaxTokens: 200000, SupportsImages: true, Provider: "anthropic"},
{Name: "claude-haiku-4-5", DisplayName: "Claude Haiku 4.5", MaxTokens: 200000, SupportsImages: true, Provider: "anthropic"},
}
}
// ============================================================================
// File RPCs — local filesystem implementation
// ============================================================================
func (h *LSRPCHandler) ReadFile(
ctx context.Context,
req *connect.Request[language_server_pb.ReadFileRequest],
) (*connect.Response[language_server_pb.ReadFileResponse], error) {
path := req.Msg.GetPath()
data, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("file not found: %s", path))
}
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&language_server_pb.ReadFileResponse{
Content: string(data),
}), nil
}
func (h *LSRPCHandler) WriteFile(
ctx context.Context,
req *connect.Request[language_server_pb.WriteFileRequest],
) (*connect.Response[language_server_pb.WriteFileResponse], error) {
path := req.Msg.GetPath()
if req.Msg.GetCreateParent() {
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
}
if err := os.WriteFile(path, []byte(req.Msg.GetContent()), 0o644); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&language_server_pb.WriteFileResponse{}), nil
}
func (h *LSRPCHandler) ReadDir(
ctx context.Context,
req *connect.Request[language_server_pb.ReadDirRequest],
) (*connect.Response[language_server_pb.ReadDirResponse], error) {
path := req.Msg.GetPath()
entries, err := os.ReadDir(path)
if err != nil {
if os.IsNotExist(err) {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("directory not found: %s", path))
}
return nil, connect.NewError(connect.CodeInternal, err)
}
files := make([]*language_server_pb.FileInfo, 0, len(entries))
for _, entry := range entries {
info, err := entry.Info()
if err != nil {
continue
}
files = append(files, fileInfoFromOS(entry.Name(), info))
}
return connect.NewResponse(&language_server_pb.ReadDirResponse{
Files: files,
}), nil
}
func (h *LSRPCHandler) DeleteFileOrDirectory(
ctx context.Context,
req *connect.Request[language_server_pb.DeleteFileOrDirectoryRequest],
) (*connect.Response[language_server_pb.DeleteFileOrDirectoryResponse], error) {
path := req.Msg.GetPath()
if err := os.RemoveAll(path); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&language_server_pb.DeleteFileOrDirectoryResponse{}), nil
}
func (h *LSRPCHandler) StatUri(
ctx context.Context,
req *connect.Request[language_server_pb.StatUriRequest],
) (*connect.Response[language_server_pb.StatUriResponse], error) {
path := req.Msg.GetPath()
info, err := os.Stat(path)
if err != nil {
if os.IsNotExist(err) {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("path not found: %s", path))
}
return nil, connect.NewError(connect.CodeInternal, err)
}
return connect.NewResponse(&language_server_pb.StatUriResponse{
FileInfo: fileInfoFromOS(info.Name(), info),
}), nil
}
func (h *LSRPCHandler) WatchDirectory(
ctx context.Context,
req *connect.Request[language_server_pb.WatchDirectoryRequest],
stream *connect.ServerStream[language_server_pb.WatchDirectoryResponse],
) error {
// Block until context is cancelled — real FS watching requires fsnotify which
// is not in the dependency graph yet. This satisfies the interface contract
// without crashing; the client will get an EOF when the connection closes.
<-ctx.Done()
return nil
}
// ============================================================================
// Health RPCs
// ============================================================================
func (h *LSRPCHandler) Heartbeat(
ctx context.Context,
req *connect.Request[language_server_pb.HeartbeatRequest],
) (*connect.Response[language_server_pb.HeartbeatResponse], error) {
return connect.NewResponse(&language_server_pb.HeartbeatResponse{
Healthy: true,
Version: "sub2api",
}), nil
}
func (h *LSRPCHandler) GetStatus(
ctx context.Context,
req *connect.Request[language_server_pb.GetStatusRequest],
) (*connect.Response[language_server_pb.GetStatusResponse], error) {
return connect.NewResponse(&language_server_pb.GetStatusResponse{
Status: "running",
Version: antigravity.BaseURL,
}), nil
}
// ============================================================================
// Helpers
// ============================================================================
func fileInfoFromOS(name string, info fs.FileInfo) *language_server_pb.FileInfo {
t := language_server_pb.FileInfo_FILE
if info.IsDir() {
t = language_server_pb.FileInfo_DIRECTORY
} else if info.Mode()&os.ModeSymlink != 0 {
t = language_server_pb.FileInfo_SYMLINK
}
return &language_server_pb.FileInfo{
Path: name,
Type: t,
Size: info.Size(),
ModifiedTime: timestamppb.New(info.ModTime()),
}
}