Files
broswer-automation/app/remote-server/src/index.ts

488 lines
17 KiB
TypeScript

import Fastify from 'fastify';
import cors from '@fastify/cors';
import websocket from '@fastify/websocket';
import { pino } from 'pino';
import chalk from 'chalk';
import { randomUUID } from 'node:crypto';
import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js';
import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import { MCPRemoteServer } from './server/mcp-remote-server.js';
const logger = pino({
level: 'info',
});
async function startServer() {
const fastify = Fastify({
logger: true,
});
// Register CORS
await fastify.register(cors, {
origin: true,
credentials: true,
});
// Register WebSocket support
await fastify.register(websocket);
// Create MCP Remote Server instance
const mcpServer = new MCPRemoteServer(logger);
// Transport mapping for streaming connections
const transportsMap: Map<string, StreamableHTTPServerTransport | SSEServerTransport> = new Map();
// Health check endpoint
fastify.get('/health', async (request, reply) => {
return { status: 'ok', timestamp: new Date().toISOString() };
});
// SSE endpoint for streaming MCP communication
fastify.get('/sse', async (request, reply) => {
try {
// Set SSE headers
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control',
});
// Create SSE transport
const transport = new SSEServerTransport('/messages', reply.raw);
transportsMap.set(transport.sessionId, transport);
reply.raw.on('close', () => {
transportsMap.delete(transport.sessionId);
logger.info(`SSE connection closed for session: ${transport.sessionId}`);
});
// Start the transport first
await transport.start();
// Connect the MCP server to this transport
await mcpServer.connectTransport(transport);
// Hijack the reply to prevent Fastify from sending additional headers
reply.hijack();
} catch (error) {
logger.error('Error setting up SSE connection:', error);
if (!reply.sent && !reply.raw.headersSent) {
reply.code(500).send({ error: 'Internal server error' });
}
}
});
// POST /messages: Handle SSE POST messages
fastify.post('/messages', async (request, reply) => {
const sessionId = request.headers['x-session-id'] as string | undefined;
const transport = sessionId ? (transportsMap.get(sessionId) as SSEServerTransport) : undefined;
if (!transport) {
reply.code(400).send({ error: 'Invalid session ID for SSE' });
return;
}
try {
await transport.handlePostMessage(request.raw, reply.raw, request.body);
} catch (error) {
logger.error('Error handling SSE POST message:', error);
if (!reply.sent) {
reply.code(500).send({ error: 'Internal server error' });
}
}
});
// POST /mcp: Handle client-to-server messages for streamable HTTP
fastify.post('/mcp', async (request, reply) => {
// Extract session ID and user ID from headers for routing
const sessionId = request.headers['mcp-session-id'] as string | undefined;
const userId = request.headers['chrome-user-id'] as string | undefined;
let transport: StreamableHTTPServerTransport | undefined = transportsMap.get(
sessionId || '',
) as StreamableHTTPServerTransport;
if (transport) {
// Transport found, use existing one
} else if (!sessionId && isInitializeRequest(request.body)) {
// Create new session for initialization request
const newSessionId = randomUUID();
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => newSessionId,
onsessioninitialized: (initializedSessionId) => {
if (transport && initializedSessionId === newSessionId) {
transportsMap.set(initializedSessionId, transport);
logger.info(`New streamable HTTP session initialized: ${initializedSessionId}`);
}
},
});
// Connect the MCP server to this transport
await mcpServer.connectTransport(transport);
} else {
reply.code(400).send({ error: 'Invalid session or missing initialization' });
return;
}
try {
// Set user context for routing if user ID is provided
if (userId) {
mcpServer.setUserContext(userId, sessionId);
logger.info(
`🎯 [MCP] User context set for request - User: ${userId}, Session: ${sessionId}`,
);
}
await transport.handleRequest(request.raw, reply.raw, request.body);
if (!reply.sent) {
reply.hijack(); // Prevent Fastify from automatically sending response
}
} catch (error) {
logger.error('Error handling streamable HTTP POST request:', error);
if (!reply.sent) {
reply.code(500).send({ error: 'Internal server error' });
}
}
});
// GET /mcp: Handle SSE stream for streamable HTTP
fastify.get('/mcp', async (request, reply) => {
const sessionId = request.headers['mcp-session-id'] as string | undefined;
const transport = sessionId
? (transportsMap.get(sessionId) as StreamableHTTPServerTransport)
: undefined;
if (!transport) {
reply.code(400).send({ error: 'Invalid session ID' });
return;
}
reply.raw.setHeader('Content-Type', 'text/event-stream');
reply.raw.setHeader('Cache-Control', 'no-cache');
reply.raw.setHeader('Connection', 'keep-alive');
reply.raw.setHeader('Access-Control-Allow-Origin', '*');
reply.raw.flushHeaders();
try {
await transport.handleRequest(request.raw, reply.raw);
if (!reply.sent) {
reply.hijack(); // Prevent Fastify from automatically sending response
}
} catch (error) {
logger.error('Error handling streamable HTTP GET request:', error);
if (!reply.raw.writableEnded) {
reply.raw.end();
}
}
request.socket.on('close', () => {
logger.info(`Streamable HTTP client disconnected for session: ${sessionId}`);
});
});
// DELETE /mcp: Handle session termination for streamable HTTP
fastify.delete('/mcp', async (request, reply) => {
const sessionId = request.headers['mcp-session-id'] as string | undefined;
const transport = sessionId
? (transportsMap.get(sessionId) as StreamableHTTPServerTransport)
: undefined;
if (!transport) {
reply.code(400).send({ error: 'Invalid session ID' });
return;
}
try {
await transport.handleRequest(request.raw, reply.raw);
if (sessionId) {
transportsMap.delete(sessionId);
logger.info(`Streamable HTTP session terminated: ${sessionId}`);
}
if (!reply.sent) {
reply.code(204).send();
}
} catch (error) {
logger.error('Error handling streamable HTTP DELETE request:', error);
if (!reply.sent) {
reply.code(500).send({ error: 'Internal server error' });
}
}
});
// WebSocket endpoint for MCP communication
fastify.register(async function (fastify) {
fastify.get('/ws/mcp', { websocket: true }, (connection: any, req) => {
logger.info('New MCP WebSocket connection established');
// Set up ping/pong to keep connection alive
const pingInterval = setInterval(() => {
if (connection.readyState === connection.OPEN) {
connection.ping();
}
}, 30000); // Ping every 30 seconds
connection.on('pong', () => {
logger.debug('Received pong from MCP client');
});
connection.on('message', async (message: any) => {
try {
const data = JSON.parse(message.toString());
console.log('🔵 [MCP WebSocket] Received message:', JSON.stringify(data, null, 2));
logger.info('🔵 [MCP WebSocket] Received message:', {
method: data.method,
id: data.id,
params: data.params,
fullMessage: data,
});
// Handle MCP protocol messages with proper ID preservation
let response;
if (data.method === 'tools/list') {
try {
const toolsResponse = await mcpServer.handleMessage(data);
response = {
jsonrpc: '2.0',
id: data.id,
result: toolsResponse,
};
} catch (error) {
response = {
jsonrpc: '2.0',
id: data.id,
error: {
code: -32603,
message: error instanceof Error ? error.message : 'Internal error',
},
};
}
} else if (data.method === 'tools/call') {
try {
const toolResponse = await mcpServer.handleMessage(data);
response = {
jsonrpc: '2.0',
id: data.id,
result: toolResponse,
};
} catch (error) {
response = {
jsonrpc: '2.0',
id: data.id,
error: {
code: -32603,
message: error instanceof Error ? error.message : 'Tool execution failed',
},
};
}
} else {
// Unknown method
response = {
jsonrpc: '2.0',
id: data.id,
error: {
code: -32601,
message: 'Method not found',
},
};
}
if (response) {
logger.info('🟢 [MCP WebSocket] Sending response:', {
id: response.id,
hasResult: !!response.result,
hasError: !!response.error,
fullResponse: response,
});
connection.send(JSON.stringify(response));
}
} catch (error) {
console.error('🚨 [MCP WebSocket] Error handling MCP message:');
console.error('Error message:', error instanceof Error ? error.message : String(error));
console.error('Error stack:', error instanceof Error ? error.stack : undefined);
console.error('Full error:', error);
logger.error('🚨 [MCP WebSocket] Error handling MCP message:', {
error: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined,
errorType: typeof error,
fullError: error,
});
// Send error response with proper MCP format
const errorResponse = {
jsonrpc: '2.0',
id: null, // Use null if we can't parse the original ID
error: {
code: -32700,
message: 'Parse error',
},
};
connection.send(JSON.stringify(errorResponse));
}
});
connection.on('close', () => {
logger.info('MCP WebSocket connection closed');
clearInterval(pingInterval);
});
connection.on('error', (error: any) => {
logger.error('WebSocket error:', error);
clearInterval(pingInterval);
});
});
});
// WebSocket endpoint for Chrome extension connections
fastify.register(async function (fastify) {
fastify.get('/chrome', { websocket: true }, (connection: any, req) => {
logger.info('New Chrome extension WebSocket connection established');
// Set up ping/pong to keep Chrome extension connection alive
const chromeExtensionPingInterval = setInterval(() => {
if (connection.readyState === connection.OPEN) {
connection.ping();
}
}, 30000); // Ping every 30 seconds
// Create a connection wrapper for the Chrome tools
const connectionWrapper = {
socket: connection,
send: (data: string) => connection.send(data),
on: (event: string, handler: Function) => connection.on(event, handler),
off: (event: string, handler: Function) => connection.off(event, handler),
get readyState() {
// WebSocket states: 0=CONNECTING, 1=OPEN, 2=CLOSING, 3=CLOSED
return connection.readyState || 1; // Default to OPEN if not available
},
};
// Extract user information from connection headers or query params
const userAgent = req.headers['user-agent'] || 'Unknown';
const ipAddress = req.headers['x-forwarded-for'] || req.socket?.remoteAddress || 'Unknown';
// Initialize with temporary user ID (will be updated when Chrome extension sends connection_info)
let currentUserId = `temp_user_${Date.now()}_${Math.random().toString(36).substring(2, 8)}`;
// Register this connection with the Chrome tools with session management
const sessionInfo = mcpServer.registerChromeExtension(connectionWrapper, currentUserId, {
userAgent,
ipAddress,
connectedAt: new Date().toISOString(),
connectionType: 'anonymous',
});
logger.info('🟢 [Chrome Extension] Connection registered:', sessionInfo);
connection.on('message', async (message: any) => {
try {
const data = JSON.parse(message.toString());
// Handle connection info message
if (data.type === 'connection_info') {
logger.info('🔗 [Chrome Extension] Received connection info:', data);
// Update user ID if provided by Chrome extension
if (data.userId && data.userId !== sessionInfo.userId) {
logger.info(
`🔄 [Chrome Extension] Updating user ID from ${sessionInfo.userId} to ${data.userId}`,
);
// Update the session with the Chrome extension's user ID
const updatedSessionInfo = mcpServer.updateChromeExtensionUserId(
connectionWrapper,
data.userId,
);
if (updatedSessionInfo) {
// Update our local reference
Object.assign(sessionInfo, updatedSessionInfo);
logger.info(
`✅ [Chrome Extension] User ID updated successfully: ${sessionInfo.userId}`,
);
}
}
// Send session info back to extension
const sessionResponse = {
type: 'session_info',
sessionInfo: {
userId: sessionInfo.userId,
sessionId: sessionInfo.sessionId,
connectionId: sessionInfo.connectionId,
},
timestamp: Date.now(),
};
connection.send(JSON.stringify(sessionResponse));
return;
}
logger.info('🟡 [Chrome Extension] Received message:', {
action: data.action,
id: data.id,
type: data.type,
sessionId: sessionInfo.sessionId,
userId: sessionInfo.userId,
fullMessage: data,
});
// Handle responses from Chrome extension
mcpServer.handleChromeResponse(data);
} catch (error) {
logger.error('Error handling Chrome extension message:', error);
}
});
connection.on('close', () => {
logger.info('Chrome extension WebSocket connection closed');
clearInterval(chromeExtensionPingInterval);
mcpServer.unregisterChromeExtension(connectionWrapper);
});
connection.on('error', (error: any) => {
logger.error('Chrome extension WebSocket error:', error);
clearInterval(chromeExtensionPingInterval);
mcpServer.unregisterChromeExtension(connectionWrapper);
});
});
});
// Start the server
const port = process.env.PORT ? parseInt(process.env.PORT) : 3001;
const host = process.env.HOST || '0.0.0.0';
try {
await fastify.listen({ port, host });
console.log(chalk.green(`🚀 MCP Remote Server started successfully!`));
console.log(chalk.blue(`📡 Server running at: http://${host}:${port}`));
console.log(chalk.blue(`🌊 Streaming HTTP endpoint: http://${host}:${port}/mcp`));
console.log(chalk.blue(`📡 SSE endpoint: http://${host}:${port}/sse`));
console.log(chalk.blue(`🔌 WebSocket endpoint: ws://${host}:${port}/ws/mcp`));
console.log(chalk.blue(`🔌 Chrome extension endpoint: ws://${host}:${port}/chrome`));
console.log(chalk.yellow(`💡 Use 'npm run start:server' to start the server`));
} catch (err) {
console.error('Error starting server:', err);
logger.error('Error starting server:', err);
process.exit(1);
}
}
// Handle graceful shutdown
process.on('SIGINT', () => {
console.log(chalk.yellow('\n🛑 Shutting down server...'));
process.exit(0);
});
process.on('SIGTERM', () => {
console.log(chalk.yellow('\n🛑 Shutting down server...'));
process.exit(0);
});
startServer().catch((error) => {
console.error('Failed to start server:', error);
logger.error('Failed to start server:', error);
process.exit(1);
});