Files
broswer-automation/agent-livekit/mcp_chrome_client.py
nasir@endelospay.com d97cad1736 first commit
2025-08-12 02:54:17 +05:00

4167 lines
188 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
MCP Chrome Client for LiveKit Integration
This module provides a client interface to the MCP Chrome server
with voice command processing capabilities.
"""
import asyncio
import aiohttp
import json
import logging
import subprocess
from typing import Dict, Any, Optional, List
import re
class MCPResponseHandler:
"""
Handler for processing MCP tool responses and extracting target element information.
"""
@staticmethod
def parse_mcp_response(mcp_result: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse MCP tool response and extract meaningful data including target element.
Args:
mcp_result: Raw MCP tool response
Returns:
Parsed response data with success status, target element, and details
"""
try:
# Check primary error indicator
is_error = mcp_result.get("isError", False)
if is_error:
# Handle error response
error_message = "Unknown error"
if "content" in mcp_result and mcp_result["content"]:
error_message = mcp_result["content"][0].get("text", error_message)
return {
"success": False,
"error": error_message,
"is_mcp_error": True,
"target_element": None,
"optimal_selector": None
}
# Parse successful response content
if "content" not in mcp_result or not mcp_result["content"]:
return {
"success": False,
"error": "No content in MCP response",
"is_mcp_error": False,
"target_element": None,
"optimal_selector": None
}
content_text = mcp_result["content"][0].get("text", "")
if not content_text:
return {
"success": False,
"error": "Empty content in MCP response",
"is_mcp_error": False,
"target_element": None,
"optimal_selector": None
}
# Parse JSON content
try:
parsed_content = json.loads(content_text)
except json.JSONDecodeError as e:
return {
"success": False,
"error": f"Invalid JSON in MCP response: {e}",
"is_mcp_error": False,
"raw_content": content_text,
"target_element": None,
"optimal_selector": None
}
# Extract operation success status
operation_success = parsed_content.get("success", False)
# Extract target element information
target_element = parsed_content.get("targetElement", {})
# Generate optimal selector from target element
optimal_selector = MCPResponseHandler.generate_optimal_selector(target_element)
return {
"success": operation_success,
"message": parsed_content.get("message", ""),
"target_element": target_element,
"optimal_selector": optimal_selector,
"results": parsed_content.get("results", []),
"element_info": parsed_content.get("elementInfo", {}),
"navigation_occurred": parsed_content.get("navigationOccurred", False),
"raw_content": parsed_content,
"is_mcp_error": False
}
except Exception as e:
logging.getLogger(__name__).error(f"Error parsing MCP response: {e}")
return {
"success": False,
"error": f"Exception parsing MCP response: {str(e)}",
"is_mcp_error": False,
"target_element": None,
"optimal_selector": None
}
@staticmethod
def generate_optimal_selector(target_element: Dict[str, Any]) -> Optional[str]:
"""
Generate the most specific and reliable CSS selector from target element info.
Args:
target_element: Target element information from MCP response
Returns:
Optimal CSS selector string or None if no element info
"""
if not target_element:
return None
# Priority order for selector generation:
# 1. ID (most specific and reliable)
# 2. Name attribute with tag
# 3. Class with tag (if unique enough)
# 4. Type with additional attributes
element_id = target_element.get("id")
tag_name = target_element.get("tagName", "").lower()
class_name = target_element.get("className", "")
element_type = target_element.get("type", "")
name_attr = target_element.get("name", "")
# 1. Use ID if available (most reliable)
if element_id:
return f"#{element_id}"
# 2. Use name attribute with tag
if name_attr and tag_name:
return f"{tag_name}[name='{name_attr}']"
# 3. Use type attribute with tag for inputs
if element_type and tag_name == "input":
return f"input[type='{element_type}']"
# 4. Use class with tag (be careful with complex class names)
if class_name and tag_name:
# Use first class if multiple classes
first_class = class_name.split()[0] if class_name else ""
if first_class:
return f"{tag_name}.{first_class}"
# 5. Fallback to just tag name (least specific)
if tag_name:
return tag_name
return None
class MCPChromeClient:
"""Client for interacting with MCP Chrome server"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.server_type = config.get('mcp_server_type', 'http')
self.server_url = config.get('mcp_server_url', 'http://127.0.0.1:12306/mcp')
self.session: Optional[aiohttp.ClientSession] = None
self.process: Optional[subprocess.Popen] = None
self.session_id: Optional[str] = None
self.logger = logging.getLogger(__name__)
# Input field cache for automatic detection (includes all input types)
self.cached_input_fields: Dict[str, Any] = {}
self.current_page_url: Optional[str] = None
self.auto_detect_inputs: bool = True
# Target element tracking for intelligent selector reuse
self.last_target_element: Optional[Dict[str, Any]] = None
self.last_optimal_selector: Optional[str] = None
self.response_handler = MCPResponseHandler()
# Enhanced voice command patterns for natural language processing
# Order matters! Specific patterns should come before general ones
self.command_patterns = {
'fill_field_by_name': [
# Explicit fill commands with "with"
r'fill (?:the )?(.+?) (?:field )?with (.+)',
r'populate (?:the )?(.+?) (?:field )?with (.+)',
r'set (?:the )?(.+?) (?:field )?to (.+)',
# Enter/input commands
r'enter (.+) in (?:the )?(.+?) (?:field|input|box|area)',
r'input (.+) in (?:the )?(.+?) (?:field|input|box|area)',
r'type (.+) in (?:the )?(.+?) (?:field|input|box|area)',
r'write (.+) in (?:the )?(.+?) (?:field|input|box|area)',
r'put (.+) in (?:the )?(.+?) (?:field|input|box|area)',
r'add (.+) to (?:the )?(.+?) (?:field|input|box|area)',
# Direct field-value patterns
r'(.+?) field (.+)', # "email field john@example.com"
r'(.+?) input (.+)', # "search input python"
r'(.+?) box (.+)', # "text box hello world"
r'(.+?) area (.+)', # "text area hello world"
# Email patterns (high priority)
r'(?:email|e-mail) (.+@.+)', # "email john@example.com"
r'(.+@.+) (?:in|for) (?:the )?email', # "john@example.com in email"
# Phone patterns
r'(?:phone|telephone|mobile) ([\d\-\+\(\)\s]+)', # "phone 123-456-7890"
r'([\d\-\+\(\)\s]{10,}) (?:in|for) (?:the )?phone', # "123-456-7890 in phone"
# Password patterns
r'(?:password|pass) (.+)', # "password secret123"
r'(.+) (?:in|for) (?:the )?password', # "secret123 in password"
# Username patterns
r'(?:username|user) (.+)', # "username john_doe"
r'(.+) (?:in|for) (?:the )?username', # "john_doe in username"
# Search patterns
r'search (?:for )?(.+)', # "search for python"
r'(.+) (?:in|for) (?:the )?search', # "python in search"
# Generic field value pair (lowest priority)
r'(.+?) (.+)', # Generic field value pair
],
'type_in_focused': [
r'^type (.+)$',
r'^enter (.+)$',
r'^input (.+)$',
r'^write (.+)$',
r'^text (.+)$',
],
'keyboard': [
r'press (?:the )?(enter)(?:\s+key)?$',
r'hit (?:the )?(enter)(?:\s+key)?$',
r'press (?:the )?(.+) key',
r'hit (?:the )?(.+) key',
r'keyboard (.+)'
],
'go_to_google': [
r'^(?:go to )?google(?:\.com)?$',
r'^open google(?:\.com)?$',
r'^navigate to google(?:\.com)?$',
r'^take me to google$',
r'^show me google$'
],
'go_to_facebook': [
r'^(?:go to )?facebook(?:\.com)?$',
r'^open facebook(?:\.com)?$',
r'^navigate to facebook(?:\.com)?$',
r'^take me to facebook$',
r'^show me facebook$',
r'^facbook$', # Common speech recognition error
r'^face book$' # Another common variation
],
'go_to_twitter': [
r'^(?:go to )?(?:twitter|tweets)(?:\.com)?$',
r'^open (?:twitter|tweets)(?:\.com)?$',
r'^navigate to (?:twitter|tweets)(?:\.com)?$',
r'^take me to (?:twitter|tweets)$',
r'^show me (?:twitter|tweets)$',
r'^tweet$', # Single form
r'^x\.com$' # New Twitter domain
],
'navigate': [
r'(?:go to|navigate to|open|visit|browse to|load) (.+)',
r'take me to (.+)',
r'show me (.+)',
r'open up (.+)',
r'pull up (.+)'
],
'search_google': [
r'search (?:google )?for (.+)',
r'google search (.+)',
r'find (.+) (?:on google|using google)',
r'look up (.+)',
r'search google for (.+)',
r'google (.+)',
r'search for (.+)',
r'find information about (.+)',
r'what is (.+)',
r'tell me about (.+)'
],
'click': [
# Explicit click commands
r'click (?:on )?(?:the )?(.+?)(?:\s+button|\s+link|\s+element)?$',
r'press (?:the )?(.+?)(?:\s+button|\s+link|\s+element)?$',
r'tap (?:on )?(?:the )?(.+?)(?:\s+button|\s+link|\s+element)?$',
r'select (?:the )?(.+?)(?:\s+button|\s+link|\s+element)?$',
r'choose (?:the )?(.+?)(?:\s+button|\s+link|\s+element)?$',
r'hit (?:the )?(.+?)(?:\s+button|\s+link|\s+element)?$',
# Button-specific patterns
r'(?:click|press|tap) (?:the )?(.+?) button',
r'(?:click|press|tap) button (.+)',
r'button (.+)',
# Link-specific patterns
r'(?:click|press|tap) (?:the )?(.+?) link',
r'(?:click|press|tap) link (.+)',
r'link (.+)',
r'go to (.+)',
# Login/Submit specific patterns
r'(?:click|press|tap) (?:the )?(?:login|log in|sign in|submit)',
r'(?:login|log in|sign in|submit)',
# Common UI elements
r'(?:click|press|tap) (?:the )?(?:menu|dropdown|checkbox|radio)',
r'(?:menu|dropdown|checkbox|radio)',
# Generic element patterns
r'(?:click|press|tap) (.+)',
r'activate (.+)',
r'trigger (.+)'
],
'type': [
r'type (.+)',
r'enter (.+)',
r'input (.+)',
r'write (.+)',
r'fill in (.+)',
r'put in (.+)',
r'add (.+)'
],
'scroll': [
r'scroll (up|down|left|right)',
r'scroll to (.+)',
r'go (up|down)',
r'move (up|down)',
r'page (up|down)',
r'scroll to the (top|bottom)',
r'go to the (top|bottom)'
],
'screenshot': [
r'^take (?:a )?screenshot$',
r'^capture (?:the )?screen$',
r'^show me (?:the )?page$',
r'^save (?:the )?page$',
r'^grab (?:a )?screenshot$',
r'^screenshot this$'
],
'get_search_results': [
r'^get search results$',
r'^show (?:me )?(?:the )?results$',
r'^what (?:are )?(?:the )?results$',
r'^extract results$',
r'^read (?:the )?results$',
r'^what did (?:we|I) find$',
r'^show what we found$'
],
'get_page_content': [
r'(?:get|show|read|extract) (?:the )?(?:page )?content',
r'what(?:\'s| is) on (?:the|this) page',
r'(?:show|tell) me what(?:\'s| is) on (?:the|this) page',
r'read (?:the|this) page',
r'extract (?:all )?text',
r'get (?:all )?text content',
r'what does (?:the|this) page say',
r'page content',
r'page text'
],
'get_form_fields': [
r'(?:get|show|find|list) (?:all )?(?:form )?fields',
r'what fields are (?:on )?(?:the|this) page',
r'(?:show|tell) me (?:the|all) (?:form )?fields',
r'list (?:all )?inputs',
r'find (?:all )?form elements',
r'what can I fill (?:in|out)',
r'available fields',
r'form elements'
],
'get_interactive_elements': [
r'(?:get|show|find|list) (?:all )?(?:interactive|clickable) elements',
r'what can I click',
r'(?:show|tell) me (?:all )?(?:buttons|links)',
r'list (?:all )?(?:buttons|links|clickable elements)',
r'find (?:all )?clickable (?:elements|items)',
r'available (?:buttons|links|actions)',
r'interactive elements',
r'clickable elements'
],
'wait': [
r'wait (?:for )?(\d+) seconds?',
r'pause (?:for )?(\d+) seconds?',
r'hold on (?:for )?(\d+) seconds?',
r'give it (\d+) seconds?'
],
'back': [
r'^go back$',
r'^back$',
r'^previous page$',
r'^navigate back$'
],
'forward': [
r'^go forward$',
r'^forward$',
r'^next page$',
r'^navigate forward$'
],
'refresh': [
r'^refresh$',
r'^reload$',
r'^refresh (?:the )?page$',
r'^reload (?:the )?page$'
]
}
async def connect(self):
"""Connect to the MCP Chrome server"""
if self.server_type == 'stdio':
await self._connect_stdio()
else:
await self._connect_http()
async def _connect_stdio(self):
"""Connect to MCP server via stdio"""
try:
command = self.config.get('mcp_server_command', 'node')
args = self.config.get('mcp_server_args', [])
self.process = subprocess.Popen(
[command] + args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
self.logger.info("Connected to MCP Chrome server via stdio")
except Exception as e:
self.logger.error(f"Failed to connect to MCP server via stdio: {e}")
raise
async def _connect_http(self):
"""Connect to MCP server via streamable-HTTP"""
# Create session with proper timeout and headers for MCP
timeout = aiohttp.ClientTimeout(total=30)
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json, text/event-stream'
}
self.session = aiohttp.ClientSession(timeout=timeout, headers=headers)
try:
# Test connection with MCP initialization
init_payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"clientInfo": {
"name": "LiveKit-Chrome-Agent",
"version": "1.0.0"
}
}
}
async with self.session.post(self.server_url, json=init_payload) as response:
if response.status == 200:
# Extract session ID from response headers if available
session_id = response.headers.get('mcp-session-id')
if session_id:
self.session_id = session_id
self.logger.info(f"Connected to MCP Chrome server via streamable-HTTP with session ID: {session_id}")
else:
self.logger.info("Connected to MCP Chrome server via streamable-HTTP")
# Handle different content types
content_type = response.headers.get('content-type', '')
if 'application/json' in content_type:
result = await response.json()
if "error" in result:
raise Exception(f"MCP initialization error: {result['error']}")
elif 'text/event-stream' in content_type:
# For SSE responses, we just need to confirm the connection is established
self.logger.info("Received SSE response, connection established")
else:
# Try to read as text for debugging
text_response = await response.text()
self.logger.debug(f"Unexpected content type: {content_type}, response: {text_response[:200]}")
# Send initialized notification
initialized_payload = {
"jsonrpc": "2.0",
"method": "notifications/initialized"
}
headers = {}
if self.session_id:
headers['mcp-session-id'] = self.session_id
async with self.session.post(self.server_url, json=initialized_payload, headers=headers) as init_response:
if init_response.status not in [200, 204]:
self.logger.warning(f"Initialized notification failed with status: {init_response.status}")
return
else:
raise Exception(f"Server connection failed: {response.status}")
except Exception as e:
self.logger.error(f"Failed to connect to MCP server via HTTP: {e}")
if self.session:
await self.session.close()
self.session = None
raise
async def disconnect(self):
"""Disconnect from the MCP Chrome server"""
if self.session:
await self.session.close()
self.session = None
if self.process:
self.process.terminate()
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.process.kill()
self.process = None
async def validate_browser_connection(self) -> Dict[str, Any]:
"""Validate that the browser is connected and responsive"""
validation_result = {
"mcp_connected": False,
"browser_responsive": False,
"page_accessible": False,
"current_url": None,
"page_title": None,
"errors": []
}
try:
# Check MCP connection
if self.session:
validation_result["mcp_connected"] = True
self.logger.info("✅ MCP server connection: OK")
else:
validation_result["errors"].append("MCP server not connected")
self.logger.error("❌ MCP server connection: FAILED")
return validation_result
# Test browser responsiveness with a simple call
try:
result = await self._call_mcp_tool("chrome_get_web_content", {
"selector": "title",
"textOnly": True
})
validation_result["browser_responsive"] = True
self.logger.info("✅ Browser responsiveness: OK")
# Extract page info
if result.get("content"):
content = result["content"]
if isinstance(content, list) and len(content) > 0:
validation_result["page_title"] = content[0].get("text", "Unknown")
validation_result["page_accessible"] = True
self.logger.info(f"✅ Page accessible: {validation_result['page_title']}")
except Exception as e:
validation_result["errors"].append(f"Browser not responsive: {e}")
self.logger.error(f"❌ Browser responsiveness: FAILED - {e}")
# Try to get current URL
try:
url_result = await self._call_mcp_tool("chrome_get_web_content", {
"format": "url"
})
if url_result.get("url"):
validation_result["current_url"] = url_result["url"]
self.logger.info(f"✅ Current URL: {validation_result['current_url']}")
except Exception as e:
validation_result["errors"].append(f"Could not get current URL: {e}")
self.logger.warning(f"⚠️ Could not get current URL: {e}")
except Exception as e:
validation_result["errors"].append(f"Validation failed: {e}")
self.logger.error(f"💥 Browser validation failed: {e}")
return validation_result
async def execute_voice_command(self, command: str) -> str:
"""Execute a voice command and return the result with enhanced logging"""
try:
self.logger.info(f"🎤 VOICE COMMAND: '{command}'")
# Parse the voice command
action, params = self._parse_voice_command(command)
if not action:
self.logger.warning(f"❓ COMMAND NOT UNDERSTOOD: '{command}'")
return f"❓ I didn't understand the command: {command}"
self.logger.info(f"📋 PARSED COMMAND: action='{action}', params={params}")
# Execute the parsed command
result = await self._execute_action(action, params)
self.logger.info(f"✅ COMMAND COMPLETED: '{command}' -> {result[:100]}...")
return result
except Exception as e:
self.logger.error(f"💥 VOICE COMMAND ERROR: '{command}' failed with: {e}")
return f"💥 Error executing command: {str(e)}"
def _parse_voice_command(self, command: str) -> tuple[Optional[str], Dict[str, Any]]:
"""Parse a voice command into action and parameters"""
command = command.lower().strip()
for action, patterns in self.command_patterns.items():
for pattern in patterns:
match = re.search(pattern, command, re.IGNORECASE)
if match:
if action == 'fill_field_by_name':
# Handle different parameter orders for field filling
groups = match.groups()
if len(groups) >= 2:
# Determine which group is field name and which is value
group1, group2 = groups[0].strip(), groups[1].strip()
# Enhanced heuristics to determine field name vs value
# Email pattern: if group contains @, it's likely the value
if '@' in group2 and '@' not in group1:
params = {'field_name': group1, 'value': group2}
elif '@' in group1 and '@' not in group2:
params = {'field_name': group2, 'value': group1}
# Phone pattern: if group contains phone number pattern, it's the value
elif re.match(r'[\d\-\+\(\)\s]{10,}', group2) and not re.match(r'[\d\-\+\(\)\s]{10,}', group1):
params = {'field_name': group1, 'value': group2}
elif re.match(r'[\d\-\+\(\)\s]{10,}', group1) and not re.match(r'[\d\-\+\(\)\s]{10,}', group2):
params = {'field_name': group2, 'value': group1}
# Common field names: if one group is a common field name, use it as field_name
elif group1 in ['email', 'e-mail', 'password', 'pass', 'phone', 'telephone', 'mobile', 'name', 'username', 'user', 'search', 'query']:
params = {'field_name': group1, 'value': group2}
elif group2 in ['email', 'e-mail', 'password', 'pass', 'phone', 'telephone', 'mobile', 'name', 'username', 'user', 'search', 'query']:
params = {'field_name': group2, 'value': group1}
# Pattern-based detection: check if pattern indicates order
elif 'with' in pattern or 'to' in pattern:
# "fill X with Y" or "set X to Y" patterns
params = {'field_name': group1, 'value': group2}
elif 'in' in pattern:
# "enter X in Y" patterns
params = {'field_name': group2, 'value': group1}
# Default: assume first group is field name, second is value
else:
params = {'field_name': group1, 'value': group2}
elif len(groups) == 1:
# Single group - try to extract field and value
text = groups[0].strip()
if '@' in text:
params = {'field_name': 'email', 'value': text}
elif re.match(r'[\d\-\+\(\)\s]{10,}', text):
params = {'field_name': 'phone', 'value': text}
else:
params = {'field_name': 'search', 'value': text}
else:
params = {'field_name': '', 'value': ''}
elif action in ['get_page_content', 'get_form_fields', 'get_interactive_elements']:
# Content retrieval commands don't need parameters
params = {}
else:
# For other actions, use the first captured group as text
params = {'text': match.group(1).strip() if match.groups() else ''}
return action, params
return None, {}
async def _execute_action(self, action: str, params: Dict[str, Any]) -> str:
"""Execute a specific action with parameters"""
if self.server_type == 'stdio':
return await self._execute_action_stdio(action, params)
else:
return await self._execute_action_http(action, params)
async def _execute_action_stdio(self, action: str, params: Dict[str, Any]) -> str:
"""Execute action via stdio (simplified for now)"""
if not self.process:
raise Exception("Not connected to MCP server")
# For now, return success messages since full MCP protocol is complex
try:
if action == 'navigate':
return f"Would navigate to {params['text']} (stdio mode - not implemented yet)"
elif action == 'go_to_google':
return "Would open Google (stdio mode - not implemented yet)"
elif action == 'go_to_facebook':
return "Would open Facebook (stdio mode - not implemented yet)"
elif action == 'go_to_twitter':
return "Would open Twitter/X (stdio mode - not implemented yet)"
elif action == 'click':
return f"Would click on {params['text']} (stdio mode - not implemented yet)"
elif action == 'type':
return f"Would type: {params['text']} (stdio mode - not implemented yet)"
elif action == 'scroll':
return f"Would scroll {params['text']} (stdio mode - not implemented yet)"
elif action == 'screenshot':
return "Would take screenshot (stdio mode - not implemented yet)"
elif action == 'search':
return f"Would search for {params['text']} (stdio mode - not implemented yet)"
elif action == 'wait':
await asyncio.sleep(int(params['text']))
return f"Waited for {params['text']} seconds"
elif action == 'back':
return "Would go back (stdio mode - not implemented yet)"
elif action == 'forward':
return "Would go forward (stdio mode - not implemented yet)"
elif action == 'refresh':
return "Would refresh page (stdio mode - not implemented yet)"
elif action == 'keyboard':
return f"Would press key: {params['text']} (stdio mode - not implemented yet)"
else:
return f"Unknown action: {action}"
except Exception as e:
self.logger.error(f"Error executing action {action}: {e}")
return f"Error executing {action}: {str(e)}"
async def _execute_action_http(self, action: str, params: Dict[str, Any]) -> str:
"""Execute action via HTTP using MCP tools"""
if not self.session:
raise Exception("Not connected to MCP server")
try:
if action == 'navigate':
return await self._navigate_mcp(params['text'])
elif action == 'go_to_google':
return await self._go_to_google_mcp()
elif action == 'go_to_facebook':
return await self._go_to_facebook_mcp()
elif action == 'go_to_twitter':
return await self._go_to_twitter_mcp()
elif action == 'search_google':
return await self._search_google_mcp(params['text'])
elif action == 'click':
# Use the new smart click method with enhanced discovery and fallback
return await self.smart_click_with_target_tracking(params['text'])
elif action == 'type':
return await self._type_text_mcp(params['text'])
elif action == 'fill_field_by_name':
# Use the new smart fill method with enhanced discovery and fallback
return await self.smart_fill_with_target_tracking(params['field_name'], params['value'])
elif action == 'type_in_focused':
return await self._type_in_focused_element(params['text'])
elif action == 'scroll':
return await self._scroll_mcp(params['text'])
elif action == 'screenshot':
return await self._take_screenshot_mcp()
elif action == 'get_search_results':
return await self._get_search_results_mcp()
elif action == 'get_page_content':
return await self._get_page_content_mcp()
elif action == 'get_form_fields':
return await self._get_form_fields_mcp()
elif action == 'get_interactive_elements':
return await self._get_interactive_elements_mcp()
elif action == 'wait':
return await self._wait(int(params['text']))
elif action == 'back':
return await self._go_back_mcp()
elif action == 'forward':
return await self._go_forward_mcp()
elif action == 'refresh':
return await self._refresh_mcp()
elif action == 'keyboard':
return await self._keyboard_mcp(params['text'])
else:
return f"Unknown action: {action}"
except Exception as e:
self.logger.error(f"Error executing action {action}: {e}")
return f"Error executing {action}: {str(e)}"
async def _call_mcp_tool(self, tool_name: str, args: Dict[str, Any]) -> Dict[str, Any]:
"""Call an MCP tool and return the result with retry logic and enhanced logging"""
if not self.session:
raise Exception("Not connected to MCP server")
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": tool_name,
"arguments": args
}
}
# Enhanced logging for browser actions
if tool_name in ["chrome_click_element", "chrome_fill_or_select", "chrome_keyboard"]:
self.logger.info(f"🔧 MCP TOOL CALL: {tool_name} with args: {args}")
else:
self.logger.debug(f"🔧 MCP TOOL CALL: {tool_name} with args: {args}")
retry_attempts = 3
retry_delay = 1.0
for attempt in range(retry_attempts):
try:
self.logger.debug(f"📡 HTTP REQUEST: Calling MCP tool {tool_name} (attempt {attempt + 1})")
# Prepare headers with session ID if available
headers = {}
if self.session_id:
headers['mcp-session-id'] = self.session_id
async with self.session.post(self.server_url, json=payload, headers=headers) as response:
if response.status != 200:
error_text = await response.text()
self.logger.error(f"❌ HTTP ERROR: {response.status} - {error_text}")
raise Exception(f"HTTP {response.status}: {error_text}")
# Handle different content types
content_type = response.headers.get('content-type', '')
if 'application/json' in content_type:
result = await response.json()
elif 'text/event-stream' in content_type:
# For SSE responses, read the stream and parse JSON from events
text_response = await response.text()
# Look for JSON data in SSE format
lines = text_response.strip().split('\n')
json_data = None
for line in lines:
if line.startswith('data: '):
try:
json_data = json.loads(line[6:]) # Remove 'data: ' prefix
break
except json.JSONDecodeError:
continue
if json_data:
result = json_data
else:
self.logger.error(f"❌ SSE PARSE ERROR: No valid JSON in response: {text_response[:200]}")
raise Exception(f"No valid JSON found in SSE response: {text_response[:200]}")
else:
# Try to parse as JSON anyway
try:
result = await response.json()
except:
text_response = await response.text()
self.logger.error(f"❌ JSON PARSE ERROR: Unexpected content type {content_type}: {text_response[:200]}")
raise Exception(f"Unexpected content type {content_type}: {text_response[:200]}")
# Enhanced error handling and logging
if "error" in result:
error_msg = result['error']
if isinstance(error_msg, dict):
error_msg = error_msg.get('message', str(error_msg))
self.logger.error(f"❌ MCP TOOL ERROR: {tool_name} failed with error: {error_msg}")
raise Exception(f"MCP tool error: {error_msg}")
# Log successful results for browser actions
tool_result = result.get("result", {})
if tool_name in ["chrome_click_element", "chrome_fill_or_select", "chrome_keyboard"]:
self.logger.info(f"✅ MCP TOOL SUCCESS: {tool_name} completed successfully")
self.logger.debug(f"📝 MCP RESULT: {tool_result}")
# Parse response to extract target element information
parsed_response = self.response_handler.parse_mcp_response(tool_result)
if parsed_response["success"] and parsed_response["target_element"]:
self.last_target_element = parsed_response["target_element"]
self.last_optimal_selector = parsed_response["optimal_selector"]
self.logger.info(f"🎯 TARGET ELEMENT: {self.last_target_element}")
self.logger.info(f"🔍 OPTIMAL SELECTOR: {self.last_optimal_selector}")
else:
self.logger.debug(f"✅ MCP TOOL SUCCESS: {tool_name} completed")
return tool_result
except Exception as e:
self.logger.warning(f"⚠️ MCP RETRY: Tool call attempt {attempt + 1} failed: {e}")
if attempt == retry_attempts - 1:
self.logger.error(f"❌ MCP FINAL FAILURE: Tool {tool_name} failed after {retry_attempts} attempts: {str(e)}")
raise Exception(f"MCP tool {tool_name} failed after {retry_attempts} attempts: {str(e)}")
await asyncio.sleep(retry_delay)
return {}
async def fill_using_target_element(self, value: str, fallback_selectors: List[str] = None) -> str:
"""
Fill a field using the last discovered target element information.
This method prioritizes the actual target element found by MCP tools.
Args:
value: Value to fill in the field
fallback_selectors: List of fallback selectors if target element is not available
Returns:
Result message
"""
try:
# First priority: Use the optimal selector from last target element
if self.last_optimal_selector:
self.logger.info(f"🎯 Using target element selector: {self.last_optimal_selector}")
try:
result = await self._call_mcp_tool("chrome_fill_or_select", {
"selector": self.last_optimal_selector,
"value": value
})
return f"✅ Filled using target element selector '{self.last_optimal_selector}' with value: '{value}'"
except Exception as e:
self.logger.warning(f"⚠️ Target element selector failed: {e}")
# Second priority: Use fallback selectors
if fallback_selectors:
for selector in fallback_selectors:
try:
self.logger.info(f"🔄 Trying fallback selector: {selector}")
result = await self._call_mcp_tool("chrome_fill_or_select", {
"selector": selector,
"value": value
})
return f"✅ Filled using fallback selector '{selector}' with value: '{value}'"
except Exception as e:
self.logger.debug(f"Fallback selector '{selector}' failed: {e}")
continue
return "❌ No valid selectors available for filling"
except Exception as e:
self.logger.error(f"Error in fill_using_target_element: {e}")
return f"❌ Error filling field: {str(e)}"
async def click_using_target_element(self, fallback_selectors: List[str] = None) -> str:
"""
Click an element using the last discovered target element information.
Args:
fallback_selectors: List of fallback selectors if target element is not available
Returns:
Result message
"""
try:
# First priority: Use the optimal selector from last target element
if self.last_optimal_selector:
self.logger.info(f"🎯 Clicking target element: {self.last_optimal_selector}")
try:
result = await self._call_mcp_tool("chrome_click_element", {
"selector": self.last_optimal_selector
})
return f"✅ Clicked target element: {self.last_optimal_selector}"
except Exception as e:
self.logger.warning(f"⚠️ Target element click failed: {e}")
# Second priority: Use fallback selectors
if fallback_selectors:
for selector in fallback_selectors:
try:
self.logger.info(f"🔄 Trying fallback click selector: {selector}")
result = await self._call_mcp_tool("chrome_click_element", {
"selector": selector
})
return f"✅ Clicked using fallback selector: {selector}"
except Exception as e:
self.logger.debug(f"Fallback click selector '{selector}' failed: {e}")
continue
return "❌ No valid selectors available for clicking"
except Exception as e:
self.logger.error(f"Error in click_using_target_element: {e}")
return f"❌ Error clicking element: {str(e)}"
async def _navigate_mcp(self, url: str) -> str:
"""Navigate to a URL using MCP chrome_navigate tool"""
# Add protocol if missing
if not url.startswith(('http://', 'https://')):
url = f"https://{url}"
try:
result = await self._call_mcp_tool("chrome_navigate", {"url": url})
self.current_page_url = url
# Auto-detect all input fields after navigation if enabled
if self.auto_detect_inputs:
await asyncio.sleep(2) # Wait for page to load
await self._auto_detect_input_fields()
return f"Navigated to {url}"
except Exception as e:
return f"Failed to navigate to {url}: {str(e)}"
async def _click_mcp(self, selector: str) -> str:
"""Click on an element using MCP chrome_click_element tool"""
try:
result = await self._call_mcp_tool("chrome_click_element", {"selector": selector})
return f"Clicked on {selector}"
except Exception as e:
return f"Failed to click on {selector}: {str(e)}"
async def _type_text_mcp(self, text: str) -> str:
"""Type text using MCP chrome_fill_or_select tool"""
try:
# Try to use focused element first, then fallback to common input selectors
selectors = [
"input:focus, textarea:focus, [contenteditable]:focus",
"input[name='q'], textarea[name='q']", # Google search box
"input[type='search'], input[type='text']", # General search/text inputs
"input:not([type]), textarea" # Any input without type or textarea
]
for selector in selectors:
try:
result = await self._call_mcp_tool("chrome_fill_or_select", {
"selector": selector,
"value": text
})
return f"Typed: {text}"
except Exception:
continue
return f"Failed to find suitable input field to type: {text}"
except Exception as e:
return f"Failed to type text: {str(e)}"
async def _keyboard_mcp(self, key: str) -> str:
"""Press a keyboard key using MCP chrome_keyboard tool"""
try:
# Normalize key names for common variations
key_map = {
"enter": "Enter",
"return": "Enter",
"space": " ",
"spacebar": " ",
"tab": "Tab",
"escape": "Escape",
"esc": "Escape",
"backspace": "Backspace",
"delete": "Delete",
"up": "ArrowUp",
"down": "ArrowDown",
"left": "ArrowLeft",
"right": "ArrowRight",
"page up": "PageUp",
"page down": "PageDown",
"home": "Home",
"end": "End"
}
# Handle compound keys (like ctrl+a, shift+tab, etc.)
if '+' in key:
# Split compound key and normalize each part
parts = [part.strip() for part in key.split('+')]
normalized_parts = []
for part in parts:
# Normalize modifier keys
if part.lower() in ['ctrl', 'control']:
normalized_parts.append('Control')
elif part.lower() in ['shift']:
normalized_parts.append('Shift')
elif part.lower() in ['alt']:
normalized_parts.append('Alt')
elif part.lower() in ['cmd', 'command', 'meta']:
normalized_parts.append('Meta')
else:
# Use the key map for the actual key
normalized_parts.append(key_map.get(part.lower(), part))
normalized_key = '+'.join(normalized_parts)
else:
# Single key - use the key map
normalized_key = key_map.get(key.lower().strip(), key)
# Try both "keys" and "key" parameters as different MCP servers may expect different formats
try:
result = await self._call_mcp_tool("chrome_keyboard", {"keys": normalized_key})
except Exception:
# Fallback to "key" parameter
result = await self._call_mcp_tool("chrome_keyboard", {"key": normalized_key})
return f"Pressed key: {normalized_key}"
except Exception as e:
return f"Failed to press key '{key}': {str(e)}"
async def _scroll_mcp(self, direction: str) -> str:
"""Scroll the page using keyboard commands"""
try:
key_map = {
"up": "ArrowUp",
"down": "ArrowDown",
"left": "ArrowLeft",
"right": "ArrowRight"
}
key = key_map.get(direction.lower(), "ArrowDown")
result = await self._call_mcp_tool("chrome_keyboard", {"key": key})
return f"Scrolled {direction}"
except Exception as e:
return f"Failed to scroll: {str(e)}"
async def _take_screenshot_mcp(self) -> str:
"""Take a screenshot using MCP chrome_screenshot tool"""
try:
result = await self._call_mcp_tool("chrome_screenshot", {"fullPage": True})
return "Screenshot taken successfully"
except Exception as e:
return f"Failed to take screenshot: {str(e)}"
async def _wait(self, seconds: int) -> str:
"""Wait for a specified number of seconds"""
await asyncio.sleep(seconds)
return f"Waited for {seconds} seconds"
async def _go_to_google_mcp(self) -> str:
"""Open Google using MCP chrome_navigate tool"""
try:
result = await self._call_mcp_tool("chrome_navigate", {"url": "https://www.google.com"})
return "Opened Google"
except Exception as e:
return f"Failed to open Google: {str(e)}"
async def _go_to_facebook_mcp(self) -> str:
"""Open Facebook using MCP chrome_navigate tool"""
try:
result = await self._call_mcp_tool("chrome_navigate", {"url": "https://www.facebook.com"})
return "Opened Facebook"
except Exception as e:
return f"Failed to open Facebook: {str(e)}"
async def _go_to_twitter_mcp(self) -> str:
"""Open Twitter/X using MCP chrome_navigate tool"""
try:
result = await self._call_mcp_tool("chrome_navigate", {"url": "https://www.x.com"})
return "Opened Twitter (X)"
except Exception as e:
return f"Failed to open Twitter: {str(e)}"
async def _search_google_mcp(self, query: str) -> str:
"""Search Google for a query and return results using MCP tools"""
try:
# First, navigate to Google
await self._go_to_google_mcp()
await asyncio.sleep(3) # Wait for page to load
# Try multiple selectors for the search box (Google uses textarea, not input)
search_selectors = [
"#APjFqb", # Main Google search box ID
"textarea[name='q']", # Google search textarea
"[role='combobox']", # Role-based selector
".gLFyf", # Google search box class
"textarea[aria-label*='Search']" # Aria-label based
]
search_success = False
for selector in search_selectors:
try:
# Click to focus the search box
await self._call_mcp_tool("chrome_click_element", {"selector": selector})
await asyncio.sleep(0.5)
# Clear any existing text and fill the search box
await self._call_mcp_tool("chrome_keyboard", {"keys": "Control+a"})
await asyncio.sleep(0.2)
await self._call_mcp_tool("chrome_fill_or_select", {
"selector": selector,
"value": query
})
await asyncio.sleep(1)
# Click the Google Search button instead of pressing Enter
# (Enter just shows autocomplete, doesn't submit search)
search_button_selectors = [
"input[value='Google Search']",
"button[aria-label*='Google Search']",
"input[type='submit'][value*='Google Search']",
".gNO89b", # Google Search button class
"center input[type='submit']:first-of-type" # First submit button in center
]
button_clicked = False
for button_selector in search_button_selectors:
try:
await self._call_mcp_tool("chrome_click_element", {"selector": button_selector})
button_clicked = True
self.logger.info(f"Successfully clicked search button: {button_selector}")
break
except Exception as e:
self.logger.debug(f"Failed to click button {button_selector}: {e}")
continue
if not button_clicked:
# Fallback: try Enter key as last resort
await self._call_mcp_tool("chrome_keyboard", {"keys": "Enter"})
self.logger.info("Fallback: used Enter key for search")
await asyncio.sleep(5) # Wait longer for search results to load
search_success = True
self.logger.info(f"Successfully performed search using selector: {selector}")
break
except Exception as e:
self.logger.debug(f"Failed to search with selector {selector}: {e}")
continue
if not search_success:
return f"Failed to find search input field on Google for query: '{query}'"
# Get search results
return await self._get_search_results_mcp()
except Exception as e:
self.logger.error(f"Error searching Google: {e}")
return f"Error searching Google for '{query}': {str(e)}"
async def _get_search_results_mcp(self) -> str:
"""Extract search results from the current page using MCP tools"""
try:
# Try multiple selectors for Google search results (Google's structure changes frequently)
result_selectors = [
".tF2Cxc", # Current Google search result container
".g", # Traditional Google search result
"#rso .g", # Results container with .g class
"[data-ved]", # Elements with data-ved attribute (Google results)
".yuRUbf", # Google result link container
"#search .g", # Search container with .g class
".rc", # Another Google result class
".r" # Simple result class
]
content = []
successful_selector = None
for selector in result_selectors:
try:
result = await self._call_mcp_tool("chrome_get_web_content", {
"selector": selector,
"textOnly": False
})
temp_content = result.get("content", [])
# Check if we got valid content (not error messages)
if temp_content and not any("Error" in str(item) for item in temp_content):
content = temp_content
successful_selector = selector
self.logger.info(f"Successfully extracted results using selector: {selector}")
break
else:
self.logger.debug(f"No valid content found for selector: {selector}")
except Exception as e:
self.logger.debug(f"Failed to get content with selector {selector}: {e}")
continue
if not content:
# If no results found, try to get any text content from the page
try:
result = await self._call_mcp_tool("chrome_get_web_content", {
"selector": "body",
"textOnly": True
})
page_content = result.get("content", [])
if page_content:
page_text = str(page_content[0]).lower()
if "no results found" in page_text or "did not match" in page_text:
return "No search results found for this query"
elif "search" in page_text:
return "Search was performed but could not extract structured results. The page may have loaded but results are in an unexpected format."
return "No search results found on this page"
except Exception:
return "No search results found on this page"
# Parse the content to extract search results
formatted_results = []
for i, item in enumerate(content[:10], 1): # Limit to top 10 results
try:
# Handle different content formats
if isinstance(item, dict):
text_content = item.get("text", "")
href = item.get("href", "")
else:
text_content = str(item)
href = ""
if not text_content.strip():
continue
# For Google search results, the text content is often JSON
# Try to parse it if it looks like JSON
if text_content.startswith('{"success":true'):
try:
import json
data = json.loads(text_content)
actual_content = data.get("textContent", "")
if actual_content:
text_content = actual_content
except json.JSONDecodeError:
pass # Use original text_content
# Try to extract title, URL, and snippet from the text
lines = [line.strip() for line in text_content.split('\n') if line.strip()]
if not lines:
continue
# For Google results, often the first line is the title
# and subsequent lines are the snippet
title = lines[0] if lines else "No title"
# Skip very short titles that might be navigation elements
if len(title) < 10 and len(lines) > 1:
title = lines[1] if len(lines) > 1 else title
# Extract URL from the text content (Google shows URLs in the results)
extracted_url = "URL not available"
# Look for URLs in the text content
import re
url_patterns = [
r'https?://[^\s\n]+', # Standard HTTP URLs
r'[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}(?:/[^\s\n]*)?', # Domain-based URLs
r'[a-zA-Z0-9.-]+\.(?:com|org|net|edu|gov|io|co\.uk|de|fr|jp)(?:\s*\s*[^\n]*)?' # Common TLDs with separator
]
for pattern in url_patterns:
matches = re.findall(pattern, text_content)
if matches:
# Take the first URL found
found_url = matches[0].strip()
# Clean up the URL (remove and trailing text)
found_url = found_url.split('')[0].strip()
if not found_url.startswith('http'):
found_url = 'https://' + found_url
extracted_url = found_url
break
# Get snippet from remaining lines (skip URL lines)
snippet_lines = []
for line in lines[1:]:
# Skip lines that are just URLs or domain names
if not re.match(r'^https?://', line) and not re.match(r'^[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}', line):
snippet_lines.append(line)
snippet = ' '.join(snippet_lines[:3]) if snippet_lines else "No description"
# Clean up title and snippet
title = title[:100] + "..." if len(title) > 100 else title
snippet = snippet[:200] + "..." if len(snippet) > 200 else snippet
# Skip results that are too generic or empty
if title.lower() in ['no title', 'gmail', 'images'] or len(title.strip()) < 5:
continue
# Use extracted URL or href if available
url = href if href else extracted_url
formatted_results.append(f"{i}. {title}\n {snippet}\n {url}")
except Exception as e:
self.logger.debug(f"Error processing result item {i}: {e}")
continue
if formatted_results:
return f"Search Results (using {successful_selector}):\n\n" + "\n\n".join(formatted_results)
else:
return f"Found {len(content)} search result elements but could not extract readable content"
except Exception as e:
return f"Failed to extract search results: {str(e)}"
async def _go_back_mcp(self) -> str:
"""Navigate back in browser history using MCP tools"""
try:
await self._call_mcp_tool("chrome_keyboard", {"key": "Alt+Left"})
return "Navigated back to previous page"
except Exception as e:
self.logger.error(f"Error going back: {e}")
return f"Error going back: {str(e)}"
async def _go_forward_mcp(self) -> str:
"""Navigate forward in browser history using MCP tools"""
try:
await self._call_mcp_tool("chrome_keyboard", {"key": "Alt+Right"})
return "Navigated forward to next page"
except Exception as e:
self.logger.error(f"Error going forward: {e}")
return f"Error going forward: {str(e)}"
async def _refresh_mcp(self) -> str:
"""Refresh the current page using MCP tools"""
try:
await self._call_mcp_tool("chrome_keyboard", {"key": "F5"})
return "Page refreshed successfully"
except Exception as e:
self.logger.error(f"Error refreshing page: {e}")
return f"Error refreshing page: {str(e)}"
async def get_form_fields(self) -> str:
"""Get all form fields on the current page with enhanced detection"""
try:
# Method 1: Get all interactive elements that are form fields
result = await self._call_mcp_tool("chrome_get_interactive_elements", {
"types": ["input", "textarea", "select"]
})
elements = []
if result:
# Parse the nested JSON response from MCP tool
try:
if "content" in result and result["content"]:
content_text = result["content"][0].get("text", "")
if content_text:
import json
parsed_data = json.loads(content_text)
elements = parsed_data.get("elements", [])
else:
# Fallback: try direct access for backward compatibility
elements = result.get("elements", [])
except (json.JSONDecodeError, KeyError, IndexError) as e:
self.logger.error(f"Error parsing MCP response: {e}")
elements = result.get("elements", [])
# Method 2: If no elements found, try enhanced detection with JavaScript
if not elements:
self.logger.info("No elements found with standard method, trying enhanced detection...")
try:
enhanced_result = await self._call_mcp_tool("chrome_execute_script", {
"script": """
function findAllFormElements() {
const elements = [];
// Find all input elements
document.querySelectorAll('input, textarea, select').forEach((el, index) => {
const rect = el.getBoundingClientRect();
const isVisible = rect.width > 0 && rect.height > 0 &&
window.getComputedStyle(el).display !== 'none' &&
window.getComputedStyle(el).visibility !== 'hidden';
elements.push({
tag: el.tagName.toLowerCase(),
type: el.type || 'text',
name: el.name || '',
id: el.id || '',
placeholder: el.placeholder || '',
value: el.value || '',
className: el.className || '',
selector: generateSelector(el),
visible: isVisible,
required: el.required || false,
disabled: el.disabled || false
});
});
function generateSelector(element) {
if (element.id) return '#' + element.id;
if (element.name) return `[name="${element.name}"]`;
if (element.className) {
const classes = element.className.split(' ').filter(c => c.length > 0);
if (classes.length > 0) return '.' + classes.join('.');
}
return element.tagName.toLowerCase() + ':nth-of-type(' +
(Array.from(element.parentNode.children).indexOf(element) + 1) + ')';
}
return elements;
}
return findAllFormElements();
"""
})
if enhanced_result and "content" in enhanced_result:
content_text = enhanced_result["content"][0].get("text", "")
if content_text:
elements = json.loads(content_text)
self.logger.info(f"Enhanced detection found {len(elements)} elements")
except Exception as e:
self.logger.error(f"Enhanced detection failed: {e}")
if not elements:
return "No form fields found on the current page"
# Format the form fields information
form_fields = []
for i, element in enumerate(elements, 1):
field_info = {
"index": i,
"selector": element.get("selector", ""),
"type": element.get("type", ""),
"name": element.get("name", ""),
"id": element.get("id", ""),
"placeholder": element.get("placeholder", ""),
"value": element.get("value", ""),
"required": element.get("required", False),
"label": element.get("label", "")
}
# Create a readable description
description = f"Field {i}: "
if field_info["label"]:
description += f"'{field_info['label']}' "
if field_info["type"]:
description += f"({field_info['type']}) "
if field_info["name"]:
description += f"name='{field_info['name']}' "
if field_info["id"]:
description += f"id='{field_info['id']}' "
if field_info["placeholder"]:
description += f"placeholder='{field_info['placeholder']}' "
if field_info["required"]:
description += "(required) "
description += f"selector: {field_info['selector']}"
form_fields.append(description)
return f"Found {len(form_fields)} form fields:\n\n" + "\n".join(form_fields)
except Exception as e:
self.logger.error(f"Error getting form fields: {e}")
return f"Error getting form fields: {str(e)}"
async def fill_form_field(self, field_selector: str, value: str) -> str:
"""Fill a specific form field with a value"""
try:
# First click to focus the field
await self._call_mcp_tool("chrome_click_element", {"selector": field_selector})
await asyncio.sleep(0.3)
# Clear existing content
await self._call_mcp_tool("chrome_keyboard", {"keys": "Control+a"})
await asyncio.sleep(0.1)
# Fill the field
result = await self._call_mcp_tool("chrome_fill_or_select", {
"selector": field_selector,
"value": value
})
return f"Successfully filled field '{field_selector}' with value: '{value}'"
except Exception as e:
self.logger.error(f"Error filling form field: {e}")
return f"Error filling form field '{field_selector}': {str(e)}"
async def get_form_field_info(self, field_selector: str) -> str:
"""Get detailed information about a specific form field"""
try:
# Get element information
result = await self._call_mcp_tool("chrome_get_web_content", {
"selector": field_selector,
"textOnly": False
})
if not result or not result.get("content"):
return f"Form field '{field_selector}' not found"
content = result.get("content", [])
if content:
field_data = content[0] if isinstance(content, list) else content
# Extract field information
info = []
info.append(f"Selector: {field_selector}")
if isinstance(field_data, dict):
for key, value in field_data.items():
if value and key not in ['content', 'textContent']:
info.append(f"{key.capitalize()}: {value}")
else:
info.append(f"Content: {str(field_data)}")
return "Form field information:\n" + "\n".join(info)
else:
return f"No information found for field '{field_selector}'"
except Exception as e:
self.logger.error(f"Error getting form field info: {e}")
return f"Error getting form field info for '{field_selector}': {str(e)}"
async def fill_form_step_by_step(self, form_data: str) -> str:
"""Fill form fields one by one with provided data (JSON format)"""
try:
import json
# Parse the form data
try:
data = json.loads(form_data)
except json.JSONDecodeError:
return f"Invalid JSON format in form_data: {form_data}"
if not isinstance(data, dict):
return "Form data must be a JSON object with field selectors as keys and values as values"
results = []
successful_fields = 0
for field_selector, value in data.items():
try:
self.logger.info(f"Filling field '{field_selector}' with value '{value}'")
# Fill the field
result = await self.fill_form_field(field_selector, str(value))
results.append(f"{field_selector}: {result}")
successful_fields += 1
# Small delay between fields
await asyncio.sleep(0.5)
except Exception as e:
error_msg = f"{field_selector}: Error - {str(e)}"
results.append(error_msg)
self.logger.error(f"Error filling field {field_selector}: {e}")
summary = f"Form filling completed: {successful_fields}/{len(data)} fields filled successfully"
return f"{summary}\n\nDetails:\n" + "\n".join(results)
except Exception as e:
self.logger.error(f"Error in step-by-step form filling: {e}")
return f"Error in step-by-step form filling: {str(e)}"
async def fill_qubecare_login(self, email: str, password: str) -> str:
"""Specialized method to fill QuBeCare login form"""
try:
self.logger.info("Starting QuBeCare login form filling...")
# Wait for page to load completely
await asyncio.sleep(2)
# Try multiple strategies to find and fill the login form
strategies = [
# Strategy 1: Common login selectors
{
"email_selectors": [
"input[type='email']",
"input[name='email']",
"input[name='username']",
"input[name='login']",
"#email",
"#username",
"#login",
".email",
".username"
],
"password_selectors": [
"input[type='password']",
"input[name='password']",
"#password",
".password"
]
},
# Strategy 2: QuBeCare specific selectors (if they use specific patterns)
{
"email_selectors": [
"input[placeholder*='email']",
"input[placeholder*='Email']",
"input[aria-label*='email']",
"input[aria-label*='Email']"
],
"password_selectors": [
"input[placeholder*='password']",
"input[placeholder*='Password']",
"input[aria-label*='password']",
"input[aria-label*='Password']"
]
}
]
email_filled = False
password_filled = False
for strategy_num, strategy in enumerate(strategies, 1):
self.logger.info(f"Trying strategy {strategy_num}...")
# Try to fill email field
if not email_filled:
for email_selector in strategy["email_selectors"]:
try:
result = await self.fill_form_field(email_selector, email)
if "Successfully filled" in result:
self.logger.info(f"Email filled with selector: {email_selector}")
email_filled = True
break
except Exception as e:
self.logger.debug(f"Email selector {email_selector} failed: {e}")
continue
# Try to fill password field
if not password_filled:
for password_selector in strategy["password_selectors"]:
try:
result = await self.fill_form_field(password_selector, password)
if "Successfully filled" in result:
self.logger.info(f"Password filled with selector: {password_selector}")
password_filled = True
break
except Exception as e:
self.logger.debug(f"Password selector {password_selector} failed: {e}")
continue
if email_filled and password_filled:
break
# Summary
results = []
if email_filled:
results.append("✓ Email field filled successfully")
else:
results.append("✗ Could not find or fill email field")
if password_filled:
results.append("✓ Password field filled successfully")
else:
results.append("✗ Could not find or fill password field")
success_count = sum([email_filled, password_filled])
summary = f"QuBeCare login form filling: {success_count}/2 fields filled successfully"
return f"{summary}\n\nDetails:\n" + "\n".join(results)
except Exception as e:
self.logger.error(f"Error filling QuBeCare login form: {e}")
return f"Error filling QuBeCare login form: {str(e)}"
async def submit_form(self, form_selector: str = "form") -> str:
"""Submit a form on the current page"""
try:
# Try multiple methods to submit the form
submit_methods = [
# Method 1: Click submit button
{
"method": "submit_button",
"selectors": [
"input[type='submit']",
"button[type='submit']",
"button:contains('Submit')",
"button:contains('Send')",
"button:contains('Save')",
"input[value*='Submit']",
"input[value*='Send']",
".submit-btn",
".btn-submit"
]
},
# Method 2: Press Enter on form
{
"method": "enter_key",
"selector": form_selector
}
]
for method_info in submit_methods:
if method_info["method"] == "submit_button":
# Try to find and click submit button
for selector in method_info["selectors"]:
try:
await self._call_mcp_tool("chrome_click_element", {"selector": selector})
return f"Form submitted successfully by clicking submit button: {selector}"
except Exception:
continue
elif method_info["method"] == "enter_key":
# Try to submit by pressing Enter on the form
try:
await self._call_mcp_tool("chrome_click_element", {"selector": form_selector})
await asyncio.sleep(0.2)
await self._call_mcp_tool("chrome_keyboard", {"keys": "Enter"})
return f"Form submitted successfully using Enter key on: {form_selector}"
except Exception:
continue
return "Could not find a way to submit the form. Please check if there's a submit button or try manually."
except Exception as e:
self.logger.error(f"Error submitting form: {e}")
return f"Error submitting form: {str(e)}"
async def _auto_detect_input_fields(self) -> None:
"""Automatically detect and cache all input fields on the current page"""
try:
self.logger.info("Auto-detecting all input fields on current page...")
# Get all interactive elements including all input types
result = await self._call_mcp_tool("chrome_get_interactive_elements", {
"types": ["input", "textarea", "select", "button"]
})
if not result:
self.logger.debug("No input fields found during auto-detection")
return
# Parse the nested JSON response from MCP tool
elements = []
try:
if "content" in result and result["content"]:
content_text = result["content"][0].get("text", "")
if content_text:
import json
parsed_data = json.loads(content_text)
elements = parsed_data.get("elements", [])
self.logger.debug(f"Parsed {len(elements)} elements from MCP response")
else:
# Fallback: try direct access for backward compatibility
elements = result.get("elements", [])
except (json.JSONDecodeError, KeyError, IndexError) as e:
self.logger.error(f"Error parsing MCP response: {e}")
# Fallback: try direct access
elements = result.get("elements", [])
if not elements:
self.logger.debug("No input field elements found during auto-detection")
return
# Cache all input fields with enhanced metadata
self.cached_input_fields = {}
for element in elements:
field_info = {
"selector": element.get("selector", ""),
"type": element.get("type", ""),
"name": element.get("name", ""),
"id": element.get("id", ""),
"placeholder": element.get("placeholder", ""),
"value": element.get("value", ""),
"required": element.get("required", False),
"label": element.get("label", ""),
"aria_label": element.get("aria-label", ""),
"title": element.get("title", "")
}
# Create multiple lookup keys for flexible field matching
lookup_keys = []
# Add name-based keys
if field_info["name"]:
lookup_keys.extend([
field_info["name"].lower(),
field_info["name"].lower().replace("_", " "),
field_info["name"].lower().replace("-", " ")
])
# Add ID-based keys
if field_info["id"]:
lookup_keys.extend([
field_info["id"].lower(),
field_info["id"].lower().replace("_", " "),
field_info["id"].lower().replace("-", " ")
])
# Add label-based keys
if field_info["label"]:
lookup_keys.append(field_info["label"].lower())
# Add aria-label keys
if field_info["aria_label"]:
lookup_keys.append(field_info["aria_label"].lower())
# Add placeholder-based keys
if field_info["placeholder"]:
lookup_keys.append(field_info["placeholder"].lower())
# Add type-based keys for all input types
field_type = field_info["type"].lower()
if field_type:
lookup_keys.append(field_type)
# Add variations of the type
if field_type == "email":
lookup_keys.extend(["mail", "e-mail"])
elif field_type == "tel":
lookup_keys.extend(["phone", "telephone"])
elif field_type == "search":
lookup_keys.extend(["find", "query", "q"])
# Add common field name patterns (expanded for all input types)
common_patterns = {
"email": ["email", "e-mail", "mail", "email address"],
"password": ["password", "pass", "pwd"],
"phone": ["phone", "telephone", "tel", "mobile", "cell"],
"name": ["name", "full name", "username", "user name"],
"first name": ["first name", "firstname", "fname"],
"last name": ["last name", "lastname", "lname", "surname"],
"address": ["address", "street", "location"],
"city": ["city", "town"],
"zip": ["zip", "postal", "postcode", "zip code"],
"country": ["country", "nation"],
"state": ["state", "province", "region"],
"message": ["message", "comment", "description", "notes"],
"subject": ["subject", "title", "topic"],
"search": ["search", "find", "query", "q", "lookup"],
"text": ["text", "input", "field"],
"number": ["number", "num", "amount", "quantity"],
"date": ["date", "when", "time"],
"url": ["url", "link", "website", "site"],
"file": ["file", "upload", "attach", "document"],
"checkbox": ["check", "checkbox", "tick", "select"],
"radio": ["radio", "option", "choice"],
"submit": ["submit", "send", "save", "go", "enter"],
"button": ["button", "click", "press"]
}
# Match field to common patterns
for pattern_key, pattern_values in common_patterns.items():
for lookup_key in lookup_keys:
if any(pattern in lookup_key for pattern in pattern_values):
lookup_keys.append(pattern_key)
break
# Store field info under all lookup keys
for key in lookup_keys:
if key and key not in self.cached_input_fields:
self.cached_input_fields[key] = field_info
self.logger.info(f"Auto-detected {len(elements)} input fields with {len(self.cached_input_fields)} lookup keys")
except Exception as e:
self.logger.error(f"Error during auto input field detection: {e}")
async def fill_field_by_name(self, field_name: str, value: str) -> str:
"""Fill any input field using ONLY real-time MCP discovery - no cache"""
try:
field_name_lower = field_name.lower().strip()
self.logger.info(f"Starting REAL-TIME form filling for field: '{field_name}' with value: '{value}' (NO CACHE)")
# Step 1: Real-time MCP discovery - get fresh interactive elements
self.logger.info(f"Getting real-time form elements using MCP tools...")
discovery_result = await self._discover_form_fields_dynamically(field_name, value)
if discovery_result["success"]:
return discovery_result["message"]
# Step 2: Enhanced field detection with retry mechanism (real-time only)
self.logger.info(f"Real-time discovery failed, trying enhanced detection with retry...")
enhanced_result = await self._enhanced_field_detection_with_retry(field_name, value, max_retries=3)
if enhanced_result["success"]:
return enhanced_result["message"]
# Step 3: Content analysis as final fallback (real-time only)
self.logger.info(f"Enhanced detection failed, trying real-time content analysis...")
content_result = await self._analyze_page_content_for_field(field_name, value)
if content_result["success"]:
return content_result["message"]
# Step 4: Direct MCP element search as last resort
self.logger.info(f"All methods failed, trying direct MCP element search...")
direct_result = await self._direct_mcp_element_search(field_name, value)
if direct_result["success"]:
return direct_result["message"]
return f"✗ Could not find field '{field_name}' using real-time MCP discovery methods."
except Exception as e:
self.logger.error(f"Error filling field by name: {e}")
return f"Error filling field '{field_name}': {str(e)}"
async def fill_input_field(self, field_selector: str, value: str) -> str:
"""Fill any input field with enhanced typing support and target element tracking"""
try:
# First click to focus the field - this will capture target element info
click_result = await self._call_mcp_tool("chrome_click_element", {"selector": field_selector})
await asyncio.sleep(0.3)
# Clear existing content for input fields (not for buttons)
try:
# Get field type to determine if we should clear content
field_info_result = await self._call_mcp_tool("chrome_get_web_content", {
"selector": field_selector,
"textOnly": False
})
field_type = "text" # default
if field_info_result and field_info_result.get("content"):
content = field_info_result["content"][0] if isinstance(field_info_result["content"], list) else field_info_result["content"]
if isinstance(content, dict):
field_type = content.get("type", "text").lower()
# Only clear content for input fields that accept text
if field_type in ["text", "email", "password", "search", "tel", "url", "number", "textarea"]:
await self._call_mcp_tool("chrome_keyboard", {"keys": "Control+a"})
await asyncio.sleep(0.1)
except Exception as e:
self.logger.debug(f"Could not determine field type, proceeding with fill: {e}")
# Fill the field using target element approach
try:
# Use target element approach with fallback to original selector
result = await self.fill_using_target_element(value, [field_selector])
if "" in result:
return result
else:
# If target element approach failed, try original method
result = await self._call_mcp_tool("chrome_fill_or_select", {
"selector": field_selector,
"value": value
})
return f"Successfully filled field '{field_selector}' with value: '{value}'"
except Exception as e1:
self.logger.debug(f"fill_or_select failed, trying keyboard input: {e1}")
# Fallback: type character by character
try:
# Clear any existing content first
await self._call_mcp_tool("chrome_keyboard", {"keys": "Control+a"})
await asyncio.sleep(0.1)
# Type the value character by character for better compatibility
for char in value:
if char == ' ':
await self._call_mcp_tool("chrome_keyboard", {"keys": "Space"})
elif char == '\n':
await self._call_mcp_tool("chrome_keyboard", {"keys": "Enter"})
elif char == '\t':
await self._call_mcp_tool("chrome_keyboard", {"keys": "Tab"})
else:
await self._call_mcp_tool("chrome_keyboard", {"keys": char})
await asyncio.sleep(0.05) # Small delay between characters
return f"Successfully typed into field '{field_selector}' with value: '{value}'"
except Exception as e2:
self.logger.error(f"Both fill methods failed: fill_or_select={e1}, keyboard={e2}")
raise e2
except Exception as e:
self.logger.error(f"Error filling input field: {e}")
return f"Error filling input field '{field_selector}': {str(e)}"
async def enhanced_element_discovery_with_fallback(self, element_description: str, action_type: str = "fill", value: str = "") -> Dict[str, Any]:
"""
Enhanced element discovery with intelligent fallback mechanism.
Process:
1. Try chrome_get_interactive_elements first
2. If that fails (isError: True), fall back to chrome_get_web_content
3. Extract original selectors and use them for the action
Args:
element_description: Description of element to find (e.g., "username", "login button")
action_type: Type of action ("fill", "click")
value: Value to fill (for fill actions)
Returns:
Dictionary with success status, selector, and result message
"""
try:
self.logger.info(f"🔍 ENHANCED DISCOVERY: Looking for '{element_description}' for {action_type} action")
# Step 1: Try chrome_get_interactive_elements first
self.logger.info("📋 Step 1: Trying chrome_get_interactive_elements...")
try:
interactive_result = await self._call_mcp_tool("chrome_get_interactive_elements", {
"textQuery": element_description
})
# Check if the result has an error
if not interactive_result.get("isError", False):
# Parse the interactive elements response
elements = []
try:
if "content" in interactive_result and interactive_result["content"]:
content_text = interactive_result["content"][0].get("text", "")
if content_text:
parsed_data = json.loads(content_text)
elements = parsed_data.get("elements", [])
except (json.JSONDecodeError, KeyError, IndexError):
elements = interactive_result.get("elements", [])
if elements:
# Found elements, use the first suitable one
for element in elements:
selector = element.get("selector", "")
if selector:
self.logger.info(f"✅ Found element with interactive discovery: {selector}")
return {
"success": True,
"selector": selector,
"method": "interactive_elements",
"element": element
}
self.logger.warning("⚠️ chrome_get_interactive_elements failed or returned no elements")
except Exception as e:
self.logger.warning(f"⚠️ chrome_get_interactive_elements error: {e}")
# Step 2: Fallback to chrome_get_web_content
self.logger.info("🔄 Step 2: Falling back to chrome_get_web_content...")
try:
web_content_result = await self._call_mcp_tool("chrome_get_web_content", {
"textOnly": False
})
if not web_content_result.get("isError", False):
# Parse web content to find selectors
selector = await self._extract_selector_from_web_content(web_content_result, element_description, action_type)
if selector:
self.logger.info(f"✅ Found element with web content discovery: {selector}")
return {
"success": True,
"selector": selector,
"method": "web_content",
"element": {"selector": selector}
}
self.logger.warning("⚠️ chrome_get_web_content failed or no suitable selector found")
except Exception as e:
self.logger.warning(f"⚠️ chrome_get_web_content error: {e}")
# Step 3: Try intelligent selector generation as last resort
self.logger.info("🎯 Step 3: Trying intelligent selector generation...")
intelligent_selectors = self._generate_intelligent_selectors(element_description)
for selector in intelligent_selectors[:3]: # Try first 3 intelligent selectors
try:
# Test if selector exists
test_result = await self._call_mcp_tool("chrome_get_web_content", {
"selector": selector,
"textOnly": False
})
if test_result and not test_result.get("isError", False) and test_result.get("content"):
self.logger.info(f"✅ Found element with intelligent selector: {selector}")
return {
"success": True,
"selector": selector,
"method": "intelligent_generation",
"element": {"selector": selector}
}
except Exception as e:
self.logger.debug(f"Intelligent selector '{selector}' failed: {e}")
continue
return {
"success": False,
"error": f"Could not find element '{element_description}' using any discovery method",
"method": "none"
}
except Exception as e:
self.logger.error(f"Error in enhanced_element_discovery_with_fallback: {e}")
return {
"success": False,
"error": str(e),
"method": "error"
}
async def _extract_selector_from_web_content(self, web_content_result: Dict[str, Any], element_description: str, action_type: str) -> Optional[str]:
"""
Extract a suitable selector from web content based on element description.
Args:
web_content_result: Result from chrome_get_web_content
element_description: Description of element to find
action_type: Type of action ("fill", "click")
Returns:
Suitable CSS selector or None
"""
try:
# Parse web content
content_text = ""
if "content" in web_content_result and web_content_result["content"]:
content_item = web_content_result["content"][0]
if isinstance(content_item, dict):
content_text = content_item.get("text", "")
else:
content_text = str(content_item)
if not content_text:
return None
element_description_lower = element_description.lower()
# Generate selectors based on element description and action type
if action_type == "fill":
# For form fields
if "username" in element_description_lower or "user" in element_description_lower:
return self._find_selector_in_content(content_text, ["input[name*='user']", "input[id*='user']", "input[type='text']"])
elif "email" in element_description_lower or "mail" in element_description_lower:
return self._find_selector_in_content(content_text, ["input[type='email']", "input[name*='email']", "input[id*='email']"])
elif "password" in element_description_lower or "pass" in element_description_lower:
return self._find_selector_in_content(content_text, ["input[type='password']", "input[name*='password']", "input[id*='pass']"])
elif "search" in element_description_lower:
return self._find_selector_in_content(content_text, ["input[type='search']", "input[name='q']", "textarea[name='q']"])
elif "phone" in element_description_lower or "tel" in element_description_lower:
return self._find_selector_in_content(content_text, ["input[type='tel']", "input[name*='phone']", "input[name*='tel']"])
else:
# Generic input field
return self._find_selector_in_content(content_text, ["input[type='text']", "input", "textarea"])
elif action_type == "click":
# For clickable elements
if "login" in element_description_lower:
return self._find_selector_in_content(content_text, ["button[type='submit']", "input[type='submit']", "button", "[role='button']"])
elif "submit" in element_description_lower:
return self._find_selector_in_content(content_text, ["button[type='submit']", "input[type='submit']", "button"])
elif "button" in element_description_lower:
return self._find_selector_in_content(content_text, ["button", "input[type='button']", "[role='button']"])
elif "link" in element_description_lower:
return self._find_selector_in_content(content_text, ["a", "[role='link']"])
else:
# Generic clickable element
return self._find_selector_in_content(content_text, ["button", "a", "[role='button']", "input[type='submit']"])
return None
except Exception as e:
self.logger.error(f"Error extracting selector from web content: {e}")
return None
def _find_selector_in_content(self, content: str, selectors: List[str]) -> Optional[str]:
"""
Find the first selector that appears to be present in the content.
Args:
content: Web page content
selectors: List of selectors to check
Returns:
First matching selector or None
"""
try:
# Simple heuristic: check if selector patterns appear in content
for selector in selectors:
# Extract the key parts of the selector for matching
if "input" in selector and "input" in content.lower():
return selector
elif "button" in selector and "button" in content.lower():
return selector
elif "textarea" in selector and "textarea" in content.lower():
return selector
elif selector.startswith("#") or selector.startswith("."):
# ID or class selectors - harder to validate from content
continue
elif "[" in selector:
# Attribute selectors - check if attribute name appears
attr_match = re.search(r'\[([^=\]]+)', selector)
if attr_match:
attr_name = attr_match.group(1)
if attr_name in content.lower():
return selector
# If no specific match, return the first selector as fallback
return selectors[0] if selectors else None
except Exception as e:
self.logger.error(f"Error finding selector in content: {e}")
return selectors[0] if selectors else None
async def smart_fill_with_target_tracking(self, field_name: str, value: str) -> str:
"""
Enhanced field filling with intelligent fallback mechanism.
Process:
1. Use enhanced discovery (chrome_get_interactive_elements -> chrome_get_web_content fallback)
2. Extract and store actual target element information from MCP response
3. Use specific target element selector for filling
4. Store target element for potential reuse
Args:
field_name: Name or description of the field to find
value: Value to fill in the field
Returns:
Result message with details about the operation
"""
try:
field_name_lower = field_name.lower().strip()
self.logger.info(f"🎯 SMART FILL: Starting enhanced filling for '{field_name}' with '{value}'")
# Clear previous target element to start fresh
self.last_target_element = None
self.last_optimal_selector = None
# Step 1: Use enhanced discovery with fallback mechanism
self.logger.info("🔍 Step 1: Using enhanced discovery with fallback...")
discovery_result = await self.enhanced_element_discovery_with_fallback(field_name, "fill", value)
if discovery_result["success"]:
selector = discovery_result["selector"]
method = discovery_result["method"]
self.logger.info(f"✅ Element found using {method}: {selector}")
# Step 2: Try to fill the field using the discovered selector
try:
# First click to focus and capture target element
await self._call_mcp_tool("chrome_click_element", {"selector": selector})
await asyncio.sleep(0.3)
# Clear existing content
await self._call_mcp_tool("chrome_keyboard", {"keys": "Control+a"})
await asyncio.sleep(0.1)
# Fill the field - this will capture target element info
fill_result = await self._call_mcp_tool("chrome_fill_or_select", {
"selector": selector,
"value": value
})
return f"🎯 ENHANCED FILL SUCCESS: Filled '{field_name}' using {method} method\n🔍 Selector: {selector}\n📍 Target Element: {self.last_target_element}"
except Exception as e:
self.logger.warning(f"⚠️ Direct fill failed: {e}")
# Fallback to target element approach if available
if self.last_optimal_selector:
fallback_selectors = self._generate_fallback_selectors_from_target()
fill_result = await self.fill_using_target_element(value, fallback_selectors)
if "" in fill_result:
return f"🔄 FALLBACK SUCCESS: {fill_result}"
# Step 3: If enhanced discovery failed, try traditional methods
self.logger.info("🔄 Step 2: Enhanced discovery failed, trying traditional methods...")
traditional_result = await self.fill_field_by_name(field_name, value)
if "" not in traditional_result and "Error" not in traditional_result:
return f"🔄 TRADITIONAL SUCCESS: {traditional_result}"
return f"❌ SMART FILL FAILED: Could not find or fill field '{field_name}' using any method\n🔍 Discovery Error: {discovery_result.get('error', 'Unknown error')}"
except Exception as e:
self.logger.error(f"Error in smart_fill_with_target_tracking: {e}")
return f"❌ Error in smart fill: {str(e)}"
def _generate_fallback_selectors_from_target(self) -> List[str]:
"""
Generate intelligent fallback selectors based on the last target element.
Returns:
List of fallback selectors
"""
if not self.last_target_element:
return []
fallback_selectors = []
target = self.last_target_element
# Add variations of the target element
if target.get("id"):
fallback_selectors.append(f"#{target['id']}")
if target.get("name"):
tag = target.get("tagName", "input").lower()
fallback_selectors.extend([
f"{tag}[name='{target['name']}']",
f"[name='{target['name']}']"
])
if target.get("className"):
tag = target.get("tagName", "input").lower()
classes = target["className"].split()
for cls in classes[:2]: # Use first 2 classes
fallback_selectors.append(f"{tag}.{cls}")
if target.get("type"):
fallback_selectors.append(f"input[type='{target['type']}']")
return fallback_selectors
async def smart_click_with_target_tracking(self, element_description: str) -> str:
"""
Enhanced element clicking with intelligent fallback mechanism.
Process:
1. Use enhanced discovery (chrome_get_interactive_elements -> chrome_get_web_content fallback)
2. Extract and store actual target element information from MCP response
3. Use specific target element selector for clicking
4. Store target element for potential reuse
Args:
element_description: Description of element to click (e.g., "login button", "submit")
Returns:
Result message with details about the operation
"""
try:
self.logger.info(f"🎯 SMART CLICK: Starting enhanced clicking for '{element_description}'")
# Clear previous target element to start fresh
self.last_target_element = None
self.last_optimal_selector = None
# Step 1: Use enhanced discovery with fallback mechanism
self.logger.info("🔍 Step 1: Using enhanced discovery with fallback...")
discovery_result = await self.enhanced_element_discovery_with_fallback(element_description, "click")
if discovery_result["success"]:
selector = discovery_result["selector"]
method = discovery_result["method"]
self.logger.info(f"✅ Element found using {method}: {selector}")
# Step 2: Try to click the element using the discovered selector
try:
# Click the element - this will capture target element info
click_result = await self._call_mcp_tool("chrome_click_element", {"selector": selector})
return f"🎯 ENHANCED CLICK SUCCESS: Clicked '{element_description}' using {method} method\n🔍 Selector: {selector}\n📍 Target Element: {self.last_target_element}"
except Exception as e:
self.logger.warning(f"⚠️ Direct click failed: {e}")
# Fallback to target element approach if available
if self.last_optimal_selector:
fallback_selectors = self._generate_fallback_selectors_from_target()
click_result = await self.click_using_target_element(fallback_selectors)
if "" in click_result:
return f"🔄 FALLBACK SUCCESS: {click_result}"
# Step 3: If enhanced discovery failed, try traditional smart click
self.logger.info("🔄 Step 2: Enhanced discovery failed, trying traditional smart click...")
traditional_result = await self._smart_click_mcp(element_description)
if "" not in traditional_result and "Error" not in traditional_result:
return f"🔄 TRADITIONAL SUCCESS: {traditional_result}"
return f"❌ SMART CLICK FAILED: Could not find or click element '{element_description}' using any method\n🔍 Discovery Error: {discovery_result.get('error', 'Unknown error')}"
except Exception as e:
self.logger.error(f"Error in smart_click_with_target_tracking: {e}")
return f"❌ Error in smart click: {str(e)}"
async def get_cached_input_fields(self) -> str:
"""Get the currently cached input fields"""
try:
if not self.cached_input_fields:
await self._auto_detect_input_fields()
if not self.cached_input_fields:
return "No input fields found on the current page"
# Group fields by their actual input field (to avoid duplicates from multiple lookup keys)
unique_fields = {}
for key, field_info in self.cached_input_fields.items():
selector = field_info["selector"]
if selector not in unique_fields:
unique_fields[selector] = field_info
# Format the cached input fields information
input_fields = []
for i, (selector, field_info) in enumerate(unique_fields.items(), 1):
# Create a readable description
description = f"Field {i}: "
# Add all possible names for this field
field_names = []
for cached_key, cached_field in self.cached_input_fields.items():
if cached_field["selector"] == selector:
field_names.append(f"'{cached_key}'")
description += f"Names: {', '.join(field_names[:5])}{'...' if len(field_names) > 5 else ''} "
if field_info["type"]:
description += f"({field_info['type']}) "
if field_info["required"]:
description += "(required) "
description += f"selector: {field_info['selector']}"
input_fields.append(description)
return f"Cached input fields ({len(unique_fields)} fields, {len(self.cached_input_fields)} lookup keys):\n\n" + "\n".join(input_fields)
except Exception as e:
self.logger.error(f"Error getting cached input fields: {e}")
return f"Error getting cached input fields: {str(e)}"
async def refresh_input_fields(self) -> str:
"""Manually refresh the input field cache"""
try:
self.cached_input_fields = {}
await self._auto_detect_input_fields()
return await self.get_cached_input_fields()
except Exception as e:
self.logger.error(f"Error refreshing input fields: {e}")
return f"Error refreshing input fields: {str(e)}"
async def _enhanced_field_detection_and_fill(self, field_name: str, value: str) -> str:
"""Enhanced field detection using chrome_get_content when standard methods fail"""
try:
field_name_lower = field_name.lower().strip()
self.logger.info(f"Starting enhanced field detection for '{field_name}'")
# Step 1: Get page content to analyze for field-related text
page_content_result = await self._call_mcp_tool("chrome_get_web_content", {
"textOnly": True
})
if not page_content_result or not page_content_result.get("content"):
self.logger.debug("Could not get page content for enhanced detection")
return None
page_text = str(page_content_result["content"][0]).lower()
# Step 2: Look for field-related keywords in page content
field_keywords = [
field_name_lower,
field_name_lower.replace(" ", ""),
field_name_lower.replace("_", " "),
field_name_lower.replace("-", " ")
]
# Step 3: Get HTML content to analyze form structure
html_content_result = await self._call_mcp_tool("chrome_get_web_content", {
"textOnly": False,
"selector": "form, [role='form'], .form, #form"
})
# Step 4: Try intelligent selector generation based on field name
intelligent_selectors = self._generate_intelligent_selectors(field_name)
for selector in intelligent_selectors:
try:
# Test if selector exists and is fillable
test_result = await self._call_mcp_tool("chrome_get_web_content", {
"selector": selector,
"textOnly": False
})
if test_result and test_result.get("content"):
# Try to fill the field
fill_result = await self.fill_input_field(selector, value)
self.logger.info(f"Successfully filled field using enhanced detection with selector: {selector}")
return f"✓ Filled '{field_name}' field (enhanced detection): {fill_result}"
except Exception as e:
self.logger.debug(f"Enhanced selector '{selector}' failed: {e}")
continue
# Step 5: Try to find fields by analyzing labels and surrounding text
label_based_result = await self._find_field_by_label_analysis(field_name, value)
if label_based_result:
return label_based_result
self.logger.info(f"Enhanced field detection failed for '{field_name}'")
return None
except Exception as e:
self.logger.error(f"Error in enhanced field detection: {e}")
return None
def _generate_intelligent_selectors(self, field_name: str) -> list:
"""Generate intelligent CSS selectors based on field name"""
field_name_lower = field_name.lower().strip()
field_variations = [
field_name_lower,
field_name_lower.replace(" ", ""),
field_name_lower.replace(" ", "_"),
field_name_lower.replace(" ", "-"),
field_name_lower.replace("_", ""),
field_name_lower.replace("-", ""),
field_name_lower.replace("_", "-"),
field_name_lower.replace("-", "_")
]
selectors = []
# Generate selectors for each variation
for variation in field_variations:
# Direct attribute selectors
selectors.extend([
f"input[name='{variation}']",
f"input[id='{variation}']",
f"input[placeholder*='{variation}']",
f"textarea[name='{variation}']",
f"textarea[id='{variation}']",
f"select[name='{variation}']",
f"select[id='{variation}']",
f"input[data-testid*='{variation}']",
f"input[data-test*='{variation}']",
f"input[class*='{variation}']",
f"[aria-label*='{variation}']",
f"[aria-labelledby*='{variation}']"
])
# Partial match selectors
selectors.extend([
f"input[name*='{variation}']",
f"input[id*='{variation}']",
f"textarea[name*='{variation}']",
f"textarea[id*='{variation}']",
f"select[name*='{variation}']",
f"select[id*='{variation}']"
])
# Common field type patterns
if any(keyword in field_name_lower for keyword in ['email', 'mail']):
selectors.extend([
"input[type='email']",
"input[name*='email']",
"input[id*='email']"
])
if any(keyword in field_name_lower for keyword in ['password', 'pass']):
selectors.extend([
"input[type='password']",
"input[name*='password']",
"input[id*='password']"
])
if any(keyword in field_name_lower for keyword in ['username', 'user', 'login']):
selectors.extend([
"input[name*='username']",
"input[name*='user']",
"input[name*='login']",
"input[id*='username']",
"input[id*='user']",
"input[id*='login']"
])
# Remove duplicates while preserving order
unique_selectors = []
seen = set()
for selector in selectors:
if selector not in seen:
unique_selectors.append(selector)
seen.add(selector)
return unique_selectors
async def _find_field_by_label_analysis(self, field_name: str, value: str) -> str:
"""Find fields by analyzing labels and surrounding text"""
try:
field_name_lower = field_name.lower().strip()
self.logger.info(f"Analyzing labels for field '{field_name}'")
# Get all interactive elements to analyze their context
interactive_result = await self._call_mcp_tool("chrome_get_interactive_elements", {
"types": ["input", "textarea", "select"]
})
if not interactive_result:
return None
# Parse the interactive elements response
elements = []
try:
if "content" in interactive_result and interactive_result["content"]:
content_text = interactive_result["content"][0].get("text", "")
if content_text:
import json
parsed_data = json.loads(content_text)
elements = parsed_data.get("elements", [])
except (json.JSONDecodeError, KeyError, IndexError):
elements = interactive_result.get("elements", [])
# Analyze each element for potential matches
for element in elements:
try:
# Check element properties
element_text = ""
if "text" in element:
element_text += element["text"].lower()
if "placeholder" in element:
element_text += " " + element["placeholder"].lower()
if "ariaLabel" in element:
element_text += " " + element["ariaLabel"].lower()
# Check if field name matches element context
if any(keyword in element_text for keyword in [field_name_lower, field_name_lower.replace(" ", "")]):
selector = element.get("selector")
if selector:
try:
fill_result = await self.fill_input_field(selector, value)
self.logger.info(f"Successfully filled field using label analysis with selector: {selector}")
return f"✓ Filled '{field_name}' field (label analysis): {fill_result}"
except Exception as e:
self.logger.debug(f"Failed to fill field with selector '{selector}': {e}")
continue
except Exception as e:
self.logger.debug(f"Error analyzing element: {e}")
continue
# Try to find fields by looking for labels that contain the field name
label_selectors = [
f"label:contains('{field_name}') + input",
f"label:contains('{field_name}') input",
f"label[for] input[id]", # Will need to be processed differently
]
# Get HTML content to search for labels
try:
html_result = await self._call_mcp_tool("chrome_get_web_content", {
"textOnly": False
})
if html_result and html_result.get("content"):
html_content = str(html_result["content"][0])
# Simple regex to find label-input associations
import re
# Look for labels containing the field name
label_pattern = rf'<label[^>]*>.*?{re.escape(field_name)}.*?</label>'
label_matches = re.findall(label_pattern, html_content, re.IGNORECASE | re.DOTALL)
for label_match in label_matches:
# Extract 'for' attribute if present
for_match = re.search(r'for=["\']([^"\']+)["\']', label_match)
if for_match:
input_id = for_match.group(1)
try:
fill_result = await self.fill_input_field(f"#{input_id}", value)
self.logger.info(f"Successfully filled field using label 'for' attribute: #{input_id}")
return f"✓ Filled '{field_name}' field (label for): {fill_result}"
except Exception:
continue
except Exception as e:
self.logger.debug(f"Error in HTML label analysis: {e}")
return None
except Exception as e:
self.logger.error(f"Error in label analysis: {e}")
return None
async def execute_field_workflow(self, field_name: str, field_value: str, actions: list = None, max_retries: int = 3) -> dict:
"""
Execute the complete workflow: detect field, fill it, and execute actions.
This implements the enhanced workflow for handling missing webpage fields:
1. Use MCP to automatically detect and retrieve the correct CSS selector
2. Use the retrieved selector to locate and fill the field
3. Execute required actions (form submission, button click, navigation)
Args:
field_name: Name or identifier of the field to find
field_value: Value to fill in the field
actions: List of actions to execute after successful field filling
Format: [{"type": "submit", "selector": "form"}, {"type": "click", "selector": "button"}]
max_retries: Maximum number of detection attempts
Returns:
Dictionary containing workflow results and status
"""
workflow_start = asyncio.get_event_loop().time()
results = {
"success": False,
"field_filled": False,
"actions_executed": [],
"detection_method": None,
"errors": [],
"execution_time": 0.0,
"field_selector": None
}
if actions is None:
actions = []
try:
self.logger.info(f"Starting enhanced field workflow for '{field_name}'")
# Step 1: Attempt to detect and fill the field using multiple strategies
detection_result = await self._workflow_detect_and_fill_field(field_name, field_value, max_retries)
if not detection_result["success"]:
results["errors"].append(f"Field detection failed: {detection_result.get('error', 'Unknown error')}")
results["execution_time"] = asyncio.get_event_loop().time() - workflow_start
return results
results["field_filled"] = True
results["detection_method"] = detection_result["method"]
results["field_selector"] = detection_result.get("selector")
self.logger.info(f"Successfully filled field '{field_name}' using {detection_result['method']}")
# Step 2: Execute post-fill actions
if actions:
action_results = await self._execute_workflow_actions(actions)
results["actions_executed"] = action_results
# Check if all required actions succeeded
required_actions_success = all(
result["success"] for result in action_results
if result.get("required", True)
)
results["success"] = required_actions_success
if not required_actions_success:
failed_actions = [r for r in action_results if not r["success"]]
results["errors"].extend([f"Action failed: {r.get('error', 'Unknown error')}" for r in failed_actions])
else:
results["success"] = True
except Exception as e:
self.logger.error(f"Workflow execution error: {e}")
results["errors"].append(f"Workflow error: {str(e)}")
finally:
results["execution_time"] = asyncio.get_event_loop().time() - workflow_start
return results
async def _workflow_detect_and_fill_field(self, field_name: str, field_value: str, max_retries: int) -> dict:
"""
Attempt to detect and fill a field using multiple MCP-based strategies.
Detection strategies in order of preference:
1. Cached fields (fastest, most reliable)
2. Enhanced field detection (intelligent selectors)
3. Label analysis (context-based)
4. Content analysis (page text analysis)
5. Fallback patterns (last resort)
"""
strategies = [
("cached_fields", self._try_cached_field_detection),
("enhanced_detection", self._try_enhanced_field_detection),
("label_analysis", self._try_label_field_detection),
("content_analysis", self._try_content_field_detection),
("fallback_patterns", self._try_fallback_field_detection)
]
for attempt in range(max_retries):
self.logger.info(f"Field detection attempt {attempt + 1}/{max_retries} for '{field_name}'")
for strategy_name, strategy_func in strategies:
try:
result = await strategy_func(field_name, field_value)
if result["success"]:
result["method"] = strategy_name
return result
except Exception as e:
self.logger.debug(f"Strategy {strategy_name} failed: {e}")
continue
# Wait before retry
if attempt < max_retries - 1:
await asyncio.sleep(1.0)
return {
"success": False,
"error": f"All detection strategies failed after {max_retries} attempts"
}
async def _try_cached_field_detection(self, field_name: str, field_value: str) -> dict:
"""Try using cached field information."""
try:
field_name_lower = field_name.lower().strip()
# Refresh cache if empty
if not self.cached_input_fields:
await self._auto_detect_input_fields()
if field_name_lower in self.cached_input_fields:
field_info = self.cached_input_fields[field_name_lower]
selector = field_info["selector"]
result = await self.fill_input_field(selector, field_value)
return {
"success": True,
"selector": selector,
"result": result,
"confidence": 0.9
}
else:
return {"success": False, "error": "Field not found in cache"}
except Exception as e:
return {"success": False, "error": str(e)}
async def _try_enhanced_field_detection(self, field_name: str, field_value: str) -> dict:
"""Try using enhanced field detection with intelligent selectors."""
try:
enhanced_result = await self._enhanced_field_detection_and_fill(field_name, field_value)
if enhanced_result and "" in enhanced_result:
return {
"success": True,
"result": enhanced_result,
"confidence": 0.8
}
else:
return {"success": False, "error": "Enhanced detection did not find field"}
except Exception as e:
return {"success": False, "error": str(e)}
async def _try_label_field_detection(self, field_name: str, field_value: str) -> dict:
"""Try using label analysis to find fields."""
try:
label_result = await self._find_field_by_label_analysis(field_name, field_value)
if label_result and "" in label_result:
return {
"success": True,
"result": label_result,
"confidence": 0.7
}
else:
return {"success": False, "error": "Label analysis did not find field"}
except Exception as e:
return {"success": False, "error": str(e)}
async def _try_content_field_detection(self, field_name: str, field_value: str) -> dict:
"""Try using page content analysis to find fields."""
try:
# Get page content for analysis
page_content = await self._call_mcp_tool("chrome_get_web_content", {"textOnly": True})
if not page_content or not page_content.get("content"):
return {"success": False, "error": "Could not get page content"}
# Analyze content for field-related keywords
content_text = str(page_content["content"][0]).lower()
field_keywords = [
field_name.lower(),
field_name.lower().replace(" ", ""),
field_name.lower().replace("_", " "),
field_name.lower().replace("-", " ")
]
# Look for form elements if keywords are found in content
if any(keyword in content_text for keyword in field_keywords):
# Get all form elements
form_elements = await self._call_mcp_tool("chrome_get_interactive_elements", {
"types": ["input", "textarea", "select"]
})
if form_elements and form_elements.get("elements"):
# Try to match elements based on proximity to keywords
for element in form_elements["elements"]:
if isinstance(element, dict):
element_text = str(element).lower()
if any(keyword in element_text for keyword in field_keywords):
selector = element.get("selector")
if selector:
try:
result = await self.fill_input_field(selector, field_value)
return {
"success": True,
"selector": selector,
"result": result,
"confidence": 0.6
}
except Exception:
continue
return {"success": False, "error": "Content analysis did not find matching field"}
except Exception as e:
return {"success": False, "error": str(e)}
async def _try_fallback_field_detection(self, field_name: str, field_value: str) -> dict:
"""Try using fallback patterns as last resort."""
try:
# Common fallback selectors
fallback_selectors = [
"input:not([type='hidden']):not([type='submit']):not([type='button'])",
"textarea",
"select",
"input[type='text']",
"input[type='email']",
"input[type='password']",
"input:first-of-type",
"form input:first-child",
"[contenteditable='true']"
]
for selector in fallback_selectors:
try:
# Check if element exists and is visible
test_result = await self._call_mcp_tool("chrome_get_web_content", {
"selector": selector,
"textOnly": False
})
if test_result and test_result.get("content"):
# Try to fill the field
result = await self.fill_input_field(selector, field_value)
return {
"success": True,
"selector": selector,
"result": result,
"confidence": 0.3
}
except Exception:
continue
return {"success": False, "error": "No fallback patterns worked"}
except Exception as e:
return {"success": False, "error": str(e)}
async def _execute_workflow_actions(self, actions: list) -> list:
"""
Execute a list of actions after successful field filling.
Supported action types:
- submit: Submit a form
- click: Click an element
- navigate: Navigate to a URL
- wait: Wait for a specified time
- keyboard: Send keyboard input
"""
action_results = []
for i, action in enumerate(actions):
action_type = action.get("type", "").lower()
target = action.get("target", "")
delay = action.get("delay", 0.0)
required = action.get("required", True)
self.logger.info(f"Executing action {i+1}/{len(actions)}: {action_type}")
result = {
"action_index": i,
"action_type": action_type,
"target": target,
"success": False,
"required": required,
"error": None
}
try:
# Add delay before action if specified
if delay > 0:
await asyncio.sleep(delay)
if action_type == "submit":
# Submit form
if target:
await self._call_mcp_tool("chrome_click_element", {"selector": target})
else:
# Try common submit methods
await self._call_mcp_tool("chrome_keyboard", {"keys": "Enter"})
result["success"] = True
elif action_type == "click":
# Click element
if not target:
raise ValueError("Click action requires a target selector")
await self._call_mcp_tool("chrome_click_element", {"selector": target})
result["success"] = True
elif action_type == "navigate":
# Navigate to URL
if not target:
raise ValueError("Navigate action requires a target URL")
await self._navigate_mcp(target)
result["success"] = True
elif action_type == "wait":
# Wait for specified time
wait_time = float(target) if target else 1.0
await asyncio.sleep(wait_time)
result["success"] = True
elif action_type == "keyboard":
# Send keyboard input
if not target:
raise ValueError("Keyboard action requires target keys")
await self._call_mcp_tool("chrome_keyboard", {"keys": target})
result["success"] = True
else:
raise ValueError(f"Unknown action type: {action_type}")
except Exception as e:
self.logger.error(f"Action {action_type} failed: {e}")
result["error"] = str(e)
# If this is a required action and it failed, we might want to stop
if required:
self.logger.warning(f"Required action {action_type} failed, continuing with remaining actions")
action_results.append(result)
return action_results
# Legacy methods for backward compatibility
async def get_cached_form_fields(self) -> str:
"""Legacy method - redirects to get_cached_input_fields"""
return await self.get_cached_input_fields()
async def refresh_form_fields(self) -> str:
"""Legacy method - redirects to refresh_input_fields"""
return await self.refresh_input_fields()
async def _auto_detect_form_fields(self) -> None:
"""Legacy method - redirects to _auto_detect_input_fields"""
await self._auto_detect_input_fields()
async def _type_in_focused_element(self, text: str) -> str:
"""Type text in the currently focused element or find a suitable input field"""
try:
# First try to type in the currently focused element
try:
# Try typing directly - this works if an element is already focused
for char in text:
if char == ' ':
await self._call_mcp_tool("chrome_keyboard", {"keys": "Space"})
elif char == '\n':
await self._call_mcp_tool("chrome_keyboard", {"keys": "Enter"})
elif char == '\t':
await self._call_mcp_tool("chrome_keyboard", {"keys": "Tab"})
else:
await self._call_mcp_tool("chrome_keyboard", {"keys": char})
await asyncio.sleep(0.05) # Small delay between characters
return f"✓ Typed text: '{text}' in focused element"
except Exception as e:
self.logger.debug(f"Direct typing failed, trying to find input field: {e}")
# If direct typing fails, try to find and focus a suitable input field
# Look for common input field selectors
input_selectors = [
"input:focus, textarea:focus, [contenteditable]:focus", # Already focused
"input[type='text']:visible, input[type='search']:visible, textarea:visible", # Visible text inputs
"input:not([type]):visible", # Input without type
"input[type='email']:visible, input[type='password']:visible", # Common input types
"[contenteditable='true']:visible", # Contenteditable elements
"input:visible, textarea:visible" # Any visible input
]
for selector in input_selectors:
try:
# Click to focus the input
await self._call_mcp_tool("chrome_click_element", {"selector": selector})
await asyncio.sleep(0.3)
# Clear existing content
await self._call_mcp_tool("chrome_keyboard", {"keys": "Control+a"})
await asyncio.sleep(0.1)
# Type the text
for char in text:
if char == ' ':
await self._call_mcp_tool("chrome_keyboard", {"keys": "Space"})
elif char == '\n':
await self._call_mcp_tool("chrome_keyboard", {"keys": "Enter"})
elif char == '\t':
await self._call_mcp_tool("chrome_keyboard", {"keys": "Tab"})
else:
await self._call_mcp_tool("chrome_keyboard", {"keys": char})
await asyncio.sleep(0.05)
return f"✓ Typed text: '{text}' in input field (selector: {selector})"
except Exception:
continue
# Last resort: try the old fill method
return await self._type_text_mcp(text)
except Exception as e:
self.logger.error(f"Error typing in focused element: {e}")
return f"Error typing text: {str(e)}"
async def _discover_form_fields_dynamically(self, field_name: str, value: str) -> dict:
"""
Dynamically discover form fields using MCP tools without relying on cached data.
This method uses chrome_get_interactive_elements and chrome_get_content_web_form
to find form fields in real-time.
"""
try:
field_name_lower = field_name.lower().strip()
self.logger.info(f"Starting dynamic discovery for field: '{field_name}'")
# Strategy 1: Use chrome_get_interactive_elements to get all form elements
try:
interactive_result = await self._call_mcp_tool("chrome_get_interactive_elements", {
"types": ["input", "textarea", "select"]
})
if interactive_result and "elements" in interactive_result:
elements = interactive_result["elements"]
self.logger.info(f"Found {len(elements)} interactive form elements")
# Search for matching field by various attributes
for element in elements:
if self._is_field_match(element, field_name_lower):
selector = self._extract_best_selector(element)
if selector:
try:
fill_result = await self.fill_input_field(selector, value)
self.logger.info(f"Successfully filled field using dynamic discovery: {selector}")
return {
"success": True,
"message": f"✓ Filled '{field_name}' field using dynamic discovery: {fill_result}",
"method": "interactive_elements",
"selector": selector
}
except Exception as e:
self.logger.debug(f"Failed to fill with selector {selector}: {e}")
continue
except Exception as e:
self.logger.debug(f"chrome_get_interactive_elements failed: {e}")
# Strategy 2: Use chrome_get_content_web_form to get form-specific content
try:
form_result = await self._call_mcp_tool("chrome_get_content_web_form", {})
if form_result and "content" in form_result:
form_content = form_result["content"]
self.logger.info(f"Retrieved form content for analysis")
# Parse form content to find matching fields
selector = self._parse_form_content_for_field(form_content, field_name_lower)
if selector:
try:
fill_result = await self.fill_input_field(selector, value)
self.logger.info(f"Successfully filled field using form content analysis: {selector}")
return {
"success": True,
"message": f"✓ Filled '{field_name}' field using form content analysis: {fill_result}",
"method": "form_content",
"selector": selector
}
except Exception as e:
self.logger.debug(f"Failed to fill with form content selector {selector}: {e}")
except Exception as e:
self.logger.debug(f"chrome_get_content_web_form failed: {e}")
return {"success": False, "message": "Dynamic discovery failed"}
except Exception as e:
self.logger.error(f"Error in dynamic form field discovery: {e}")
return {"success": False, "message": f"Error in dynamic discovery: {str(e)}"}
def _is_field_match(self, element: dict, field_name_lower: str) -> bool:
"""
Check if an element matches the requested field name using various attributes.
"""
# Get element attributes
attrs = element.get("attributes", {})
tag_name = element.get("tagName", "").lower()
text_content = element.get("textContent", "").lower()
# Extract relevant attributes
name = attrs.get("name", "").lower()
id_attr = attrs.get("id", "").lower()
placeholder = attrs.get("placeholder", "").lower()
aria_label = attrs.get("aria-label", "").lower()
class_attr = attrs.get("class", "").lower()
type_attr = attrs.get("type", "").lower()
# Define field name variations
field_variations = [
field_name_lower,
field_name_lower.replace(" ", ""),
field_name_lower.replace("_", ""),
field_name_lower.replace("-", ""),
field_name_lower.replace(" ", "_"),
field_name_lower.replace(" ", "-")
]
# Check for matches in various attributes
for variation in field_variations:
if (variation in name or
variation in id_attr or
variation in placeholder or
variation in aria_label or
variation in class_attr or
variation in text_content):
return True
# Special handling for common field types
if variation in ["email", "mail"] and ("email" in name or "mail" in name or type_attr == "email"):
return True
if variation in ["password", "pass"] and (type_attr == "password" or "password" in name):
return True
if variation in ["search"] and (type_attr == "search" or "search" in name or "search" in placeholder):
return True
if variation in ["phone", "tel"] and (type_attr == "tel" or "phone" in name or "tel" in name):
return True
if variation in ["name", "username", "user"] and ("name" in name or "user" in name):
return True
return False
def _extract_best_selector(self, element: dict) -> str:
"""
Extract the best CSS selector for an element, prioritizing reliability with enhanced logging.
"""
attrs = element.get("attributes", {})
tag_name = element.get("tagName", "").lower()
self.logger.debug(f"🔧 SELECTOR GENERATION: tag='{tag_name}', attrs={attrs}")
# Priority order: id > name > type+name > class > tag+attributes
if attrs.get("id"):
selector = f"#{attrs['id']}"
self.logger.debug(f"🎯 SELECTOR: Using ID selector: {selector}")
return selector
if attrs.get("name"):
selector = f"{tag_name}[name='{attrs['name']}']"
self.logger.debug(f"🎯 SELECTOR: Using name selector: {selector}")
return selector
if attrs.get("type") and attrs.get("name"):
selector = f"{tag_name}[type='{attrs['type']}'][name='{attrs['name']}']"
self.logger.debug(f"🎯 SELECTOR: Using type+name selector: {selector}")
return selector
if attrs.get("type"):
selector = f"{tag_name}[type='{attrs['type']}']"
self.logger.debug(f"🎯 SELECTOR: Using type selector: {selector}")
return selector
if attrs.get("class"):
# Use first class for selector
first_class = attrs["class"].split()[0] if attrs["class"].split() else ""
if first_class:
selector = f"{tag_name}.{first_class}"
self.logger.debug(f"🎯 SELECTOR: Using class selector: {selector}")
return selector
if attrs.get("placeholder"):
selector = f"{tag_name}[placeholder='{attrs['placeholder']}']"
self.logger.debug(f"🎯 SELECTOR: Using placeholder selector: {selector}")
return selector
if attrs.get("aria-label"):
selector = f"{tag_name}[aria-label='{attrs['aria-label']}']"
self.logger.debug(f"🎯 SELECTOR: Using aria-label selector: {selector}")
return selector
# Fallback to tag name (least reliable)
selector = tag_name
self.logger.debug(f"⚠️ SELECTOR: Using fallback tag selector: {selector}")
return selector
def _parse_form_content_for_field(self, form_content: list, field_name_lower: str) -> str:
"""
Parse form content to find a selector for the requested field.
"""
try:
# Convert form content to string for analysis
content_text = ""
if isinstance(form_content, list):
for item in form_content:
if isinstance(item, dict) and "text" in item:
content_text += item["text"] + " "
elif isinstance(item, str):
content_text += item + " "
else:
content_text = str(form_content)
content_lower = content_text.lower()
# Look for field patterns in the content
field_variations = [
field_name_lower,
field_name_lower.replace(" ", ""),
field_name_lower.replace("_", ""),
field_name_lower.replace("-", "")
]
# Generate potential selectors based on field name
potential_selectors = []
for variation in field_variations:
potential_selectors.extend([
f"input[name*='{variation}']",
f"input[id*='{variation}']",
f"input[placeholder*='{variation}']",
f"textarea[name*='{variation}']",
f"textarea[id*='{variation}']",
f"select[name*='{variation}']",
f"[aria-label*='{variation}']"
])
# Return the first potential selector (could be enhanced with content analysis)
return potential_selectors[0] if potential_selectors else ""
except Exception as e:
self.logger.debug(f"Error parsing form content: {e}")
return ""
async def _enhanced_field_detection_with_retry(self, field_name: str, value: str, max_retries: int = 3) -> dict:
"""
Enhanced field detection with retry mechanism using multiple MCP strategies.
"""
field_name_lower = field_name.lower().strip()
for attempt in range(max_retries):
try:
self.logger.info(f"Enhanced detection attempt {attempt + 1}/{max_retries} for field: '{field_name}'")
# Strategy 1: Get all interactive elements and retry field matching
try:
interactive_result = await self._call_mcp_tool("chrome_get_interactive_elements", {
"types": ["input", "textarea", "select", "button"]
})
if interactive_result and "elements" in interactive_result:
elements = interactive_result["elements"]
# Try more flexible matching on each retry
for element in elements:
if self._is_flexible_field_match(element, field_name_lower, attempt):
selector = self._extract_best_selector(element)
if selector:
try:
fill_result = await self.fill_input_field(selector, value)
return {
"success": True,
"message": f"✓ Filled '{field_name}' field using enhanced detection (attempt {attempt + 1}): {fill_result}",
"method": f"enhanced_retry_{attempt + 1}",
"selector": selector
}
except Exception as e:
self.logger.debug(f"Failed to fill with enhanced selector {selector}: {e}")
continue
except Exception as e:
self.logger.debug(f"Enhanced detection attempt {attempt + 1} failed: {e}")
# Wait before retry
if attempt < max_retries - 1:
await asyncio.sleep(1)
except Exception as e:
self.logger.debug(f"Enhanced detection attempt {attempt + 1} error: {e}")
return {"success": False, "message": "Enhanced detection with retry failed"}
def _is_flexible_field_match(self, element: dict, field_name_lower: str, attempt: int) -> bool:
"""
Flexible field matching that becomes more permissive with each retry attempt.
"""
# Get element attributes
attrs = element.get("attributes", {})
text_content = element.get("textContent", "").lower()
# Extract relevant attributes
name = attrs.get("name", "").lower()
id_attr = attrs.get("id", "").lower()
placeholder = attrs.get("placeholder", "").lower()
aria_label = attrs.get("aria-label", "").lower()
class_attr = attrs.get("class", "").lower()
type_attr = attrs.get("type", "").lower()
# Attempt 0: Exact matching
if attempt == 0:
return (field_name_lower in name or
field_name_lower in id_attr or
field_name_lower in placeholder or
field_name_lower in aria_label)
# Attempt 1: Partial matching
elif attempt == 1:
field_parts = field_name_lower.split()
for part in field_parts:
if (part in name or part in id_attr or
part in placeholder or part in aria_label or
part in class_attr or part in text_content):
return True
# Attempt 2: Very flexible matching
elif attempt >= 2:
# Remove common words and try matching
common_words = ["field", "input", "box", "text", "enter", "type"]
field_clean = field_name_lower
for word in common_words:
field_clean = field_clean.replace(word, "").strip()
if field_clean and (field_clean in name or field_clean in id_attr or
field_clean in placeholder or field_clean in aria_label or
field_clean in class_attr):
return True
# Type-based matching as last resort
if field_name_lower in ["email", "mail"] and type_attr == "email":
return True
if field_name_lower in ["password", "pass"] and type_attr == "password":
return True
if field_name_lower in ["search"] and type_attr == "search":
return True
return False
async def _analyze_page_content_for_field(self, field_name: str, value: str) -> dict:
"""
Analyze page content to find form fields as a final fallback method.
"""
try:
field_name_lower = field_name.lower().strip()
self.logger.info(f"Starting content analysis for field: '{field_name}'")
# Get page content for analysis
try:
content_result = await self._call_mcp_tool("chrome_get_web_content", {
"textOnly": False
})
if not content_result or "content" not in content_result:
return {"success": False, "message": "Could not get page content for analysis"}
# Generate intelligent selectors based on field name and content analysis
intelligent_selectors = self._generate_intelligent_selectors_from_content(field_name_lower)
for selector in intelligent_selectors:
try:
# Test if selector exists
test_result = await self._call_mcp_tool("chrome_get_web_content", {
"selector": selector,
"textOnly": False
})
if test_result and test_result.get("content"):
# Try to fill the field
fill_result = await self.fill_input_field(selector, value)
self.logger.info(f"Successfully filled field using content analysis: {selector}")
return {
"success": True,
"message": f"✓ Filled '{field_name}' field using content analysis: {fill_result}",
"method": "content_analysis",
"selector": selector
}
except Exception as e:
self.logger.debug(f"Content analysis selector '{selector}' failed: {e}")
continue
except Exception as e:
self.logger.debug(f"Content analysis failed: {e}")
return {"success": False, "message": "Content analysis failed to find field"}
except Exception as e:
self.logger.error(f"Error in content analysis: {e}")
return {"success": False, "message": f"Error in content analysis: {str(e)}"}
def _generate_intelligent_selectors_from_content(self, field_name_lower: str) -> list:
"""
Generate intelligent CSS selectors based on field name and common patterns.
"""
selectors = []
# Field name variations
variations = [
field_name_lower,
field_name_lower.replace(" ", ""),
field_name_lower.replace("_", ""),
field_name_lower.replace("-", ""),
field_name_lower.replace(" ", "_"),
field_name_lower.replace(" ", "-")
]
# Generate selectors for each variation
for variation in variations:
selectors.extend([
f"input[name*='{variation}']",
f"input[id*='{variation}']",
f"input[placeholder*='{variation}']",
f"textarea[name*='{variation}']",
f"textarea[id*='{variation}']",
f"select[name*='{variation}']",
f"[aria-label*='{variation}']",
f".{variation}",
f"#{variation}",
f"input[class*='{variation}']",
f"textarea[class*='{variation}']"
])
# Add type-specific selectors
if field_name_lower in ["email", "mail"]:
selectors.extend([
"input[type='email']",
"input[name*='email']",
"input[name*='mail']"
])
elif field_name_lower in ["password", "pass"]:
selectors.extend([
"input[type='password']",
"input[name*='password']",
"input[name*='pass']"
])
elif field_name_lower in ["search"]:
selectors.extend([
"input[type='search']",
"input[name*='search']",
"input[name='q']",
"textarea[name='q']"
])
elif field_name_lower in ["phone", "tel"]:
selectors.extend([
"input[type='tel']",
"input[name*='phone']",
"input[name*='tel']"
])
elif field_name_lower in ["name", "username", "user"]:
selectors.extend([
"input[name*='name']",
"input[name*='user']"
])
return selectors
async def _direct_mcp_element_search(self, field_name: str, value: str) -> dict:
"""
Direct MCP element search as final fallback - uses only real-time MCP tools.
This method exhaustively searches for form elements using various MCP approaches.
"""
try:
field_name_lower = field_name.lower().strip()
self.logger.info(f"Starting direct MCP element search for field: '{field_name}'")
# Strategy 1: Get ALL interactive elements and search exhaustively
try:
all_elements_result = await self._call_mcp_tool("chrome_get_interactive_elements", {})
if all_elements_result and "elements" in all_elements_result:
elements = all_elements_result["elements"]
self.logger.info(f"Found {len(elements)} total interactive elements")
# Search through ALL elements with very flexible matching
for element in elements:
if self._is_very_flexible_match(element, field_name_lower):
selector = self._extract_best_selector(element)
if selector:
try:
fill_result = await self.fill_input_field(selector, value)
self.logger.info(f"Successfully filled using direct search: {selector}")
return {
"success": True,
"message": f"✓ Filled '{field_name}' using direct MCP search: {fill_result}",
"method": "direct_mcp_search",
"selector": selector
}
except Exception as e:
self.logger.debug(f"Direct search selector {selector} failed: {e}")
continue
except Exception as e:
self.logger.debug(f"Direct MCP element search failed: {e}")
# Strategy 2: Use chrome_get_web_content to find ANY input elements
try:
input_search_result = await self._call_mcp_tool("chrome_get_web_content", {
"selector": "input, textarea, select",
"textOnly": False
})
if input_search_result and input_search_result.get("content"):
self.logger.info("Found input elements via web content search")
# Generate and test common selectors
common_selectors = self._generate_common_selectors(field_name_lower)
for selector in common_selectors:
try:
# Test if selector exists
test_result = await self._call_mcp_tool("chrome_get_web_content", {
"selector": selector,
"textOnly": False
})
if test_result and test_result.get("content"):
fill_result = await self.fill_input_field(selector, value)
self.logger.info(f"Successfully filled using common selector: {selector}")
return {
"success": True,
"message": f"✓ Filled '{field_name}' using common selector: {fill_result}",
"method": "common_selector",
"selector": selector
}
except Exception as e:
self.logger.debug(f"Common selector {selector} failed: {e}")
continue
except Exception as e:
self.logger.debug(f"Web content search failed: {e}")
return {"success": False, "message": "Direct MCP search failed"}
except Exception as e:
self.logger.error(f"Error in direct MCP element search: {e}")
return {"success": False, "message": f"Error in direct search: {str(e)}"}
def _is_very_flexible_match(self, element: dict, field_name_lower: str) -> bool:
"""
Very flexible matching for direct search - matches almost anything related.
"""
# Get element attributes
attrs = element.get("attributes", {})
tag_name = element.get("tagName", "").lower()
text_content = element.get("textContent", "").lower()
# Only consider form elements
if tag_name not in ["input", "textarea", "select"]:
return False
# Extract all text-based attributes
all_text = " ".join([
attrs.get("name", ""),
attrs.get("id", ""),
attrs.get("placeholder", ""),
attrs.get("aria-label", ""),
attrs.get("class", ""),
attrs.get("title", ""),
text_content
]).lower()
# Very flexible matching - any partial match
field_parts = field_name_lower.replace("-", " ").replace("_", " ").split()
for part in field_parts:
if len(part) > 2 and part in all_text: # Only match parts longer than 2 chars
return True
# Type-based matching for common fields
type_attr = attrs.get("type", "").lower()
if field_name_lower in ["email", "mail"] and type_attr == "email":
return True
if field_name_lower in ["password", "pass"] and type_attr == "password":
return True
if field_name_lower in ["search", "query"] and type_attr == "search":
return True
if field_name_lower in ["phone", "tel"] and type_attr == "tel":
return True
return False
def _generate_common_selectors(self, field_name_lower: str) -> list:
"""
Generate common CSS selectors for field names.
"""
selectors = []
# Clean field name variations
variations = [
field_name_lower,
field_name_lower.replace(" ", ""),
field_name_lower.replace("_", ""),
field_name_lower.replace("-", ""),
field_name_lower.replace(" ", "_"),
field_name_lower.replace(" ", "-")
]
# Generate selectors for each variation
for variation in variations:
if variation: # Only if not empty
selectors.extend([
f"input[name='{variation}']",
f"input[id='{variation}']",
f"textarea[name='{variation}']",
f"textarea[id='{variation}']",
f"select[name='{variation}']",
f"select[id='{variation}']",
f"#{variation}",
f".{variation}",
f"input[name*='{variation}']",
f"input[id*='{variation}']",
f"input[placeholder*='{variation}']",
f"[aria-label*='{variation}']"
])
# Add type-specific selectors
if field_name_lower in ["email", "mail"]:
selectors.extend([
"input[type='email']",
"input[name*='email']",
"input[name*='mail']",
"input[id*='email']",
"input[id*='mail']"
])
elif field_name_lower in ["password", "pass"]:
selectors.extend([
"input[type='password']",
"input[name*='password']",
"input[name*='pass']"
])
elif field_name_lower in ["search", "query"]:
selectors.extend([
"input[type='search']",
"input[name*='search']",
"input[name='q']",
"textarea[name='q']",
"[role='searchbox']"
])
elif field_name_lower in ["phone", "tel"]:
selectors.extend([
"input[type='tel']",
"input[name*='phone']",
"input[name*='tel']"
])
elif field_name_lower in ["name", "username", "user"]:
selectors.extend([
"input[name*='name']",
"input[name*='user']",
"input[id*='name']",
"input[id*='user']"
])
# Remove duplicates while preserving order
seen = set()
unique_selectors = []
for selector in selectors:
if selector not in seen:
seen.add(selector)
unique_selectors.append(selector)
return unique_selectors
async def _smart_click_mcp(self, element_description: str) -> str:
"""Smart click that finds elements by text content, labels, or descriptions with enhanced logging"""
try:
self.logger.info(f"🔍 SELECTOR SEARCH: Looking for clickable element matching '{element_description}'")
# First try to find interactive elements
self.logger.debug("📋 Step 1: Getting interactive elements from page")
interactive_result = await self._call_mcp_tool("chrome_get_interactive_elements", {
"types": ["button", "a", "input", "select"]
})
if interactive_result and "elements" in interactive_result:
elements = interactive_result["elements"]
self.logger.info(f"📊 Found {len(elements)} interactive elements on page")
# Log all found elements for debugging
for i, element in enumerate(elements):
element_info = {
"index": i,
"tag": element.get("tagName", "unknown"),
"text": element.get("textContent", "")[:50],
"attributes": {k: v for k, v in element.get("attributes", {}).items() if k in ["id", "class", "name", "type", "aria-label", "title", "value"]}
}
self.logger.debug(f"🔍 Element {i}: {element_info}")
# Look for elements that match the description
matching_elements = []
for i, element in enumerate(elements):
if self._element_matches_description(element, element_description):
selector = self._extract_best_selector(element)
if selector:
matching_elements.append({
"index": i,
"element": element,
"selector": selector,
"match_reason": self._get_match_reason(element, element_description)
})
if matching_elements:
self.logger.info(f"✅ Found {len(matching_elements)} matching elements:")
for match in matching_elements:
self.logger.info(f" 🎯 Match {match['index']}: selector='{match['selector']}', reason='{match['match_reason']}'")
# Try the first matching element
best_match = matching_elements[0]
selector = best_match["selector"]
self.logger.info(f"🚀 EXECUTING CLICK: Using selector '{selector}' (reason: {best_match['match_reason']})")
try:
result = await self._call_mcp_tool("chrome_click_element", {"selector": selector})
self.logger.info(f"✅ CLICK SUCCESS: Clicked on '{element_description}' using selector: {selector}")
self.logger.debug(f"📝 MCP Result: {result}")
return f"✅ Clicked on '{element_description}' using selector: {selector} (reason: {best_match['match_reason']})"
except Exception as click_error:
self.logger.error(f"❌ CLICK FAILED: Error clicking selector '{selector}': {click_error}")
# Try other matching elements if available
for match in matching_elements[1:]:
try:
alt_selector = match["selector"]
self.logger.info(f"🔄 RETRY: Trying alternative selector '{alt_selector}'")
result = await self._call_mcp_tool("chrome_click_element", {"selector": alt_selector})
self.logger.info(f"✅ RETRY SUCCESS: Clicked using alternative selector: {alt_selector}")
return f"✅ Clicked on '{element_description}' using alternative selector: {alt_selector}"
except Exception as retry_error:
self.logger.debug(f"❌ Alternative selector '{alt_selector}' also failed: {retry_error}")
continue
# If all matching elements failed, continue to fallback methods
self.logger.warning(f"⚠️ All {len(matching_elements)} matching elements failed to click")
else:
self.logger.warning(f"⚠️ No elements matched description '{element_description}' in interactive elements")
# Fallback to direct selector if description looks like a CSS selector
if any(char in element_description for char in ['#', '.', '[', ']']):
self.logger.info(f"🔧 FALLBACK 1: Treating '{element_description}' as direct CSS selector")
try:
result = await self._call_mcp_tool("chrome_click_element", {"selector": element_description})
self.logger.info(f"✅ DIRECT SELECTOR SUCCESS: Clicked using direct selector: {element_description}")
return f"✅ Clicked on element with direct selector: {element_description}"
except Exception as direct_error:
self.logger.error(f"❌ DIRECT SELECTOR FAILED: {direct_error}")
# Try common button/link patterns
self.logger.info(f"🔧 FALLBACK 2: Trying common selector patterns for '{element_description}'")
common_selectors = [
f"button:contains('{element_description}')",
f"a:contains('{element_description}')",
f"input[value*='{element_description}']",
f"[aria-label*='{element_description}']",
f"[title*='{element_description}']"
]
for i, selector in enumerate(common_selectors):
try:
self.logger.debug(f"🔍 Trying pattern {i+1}/{len(common_selectors)}: {selector}")
result = await self._call_mcp_tool("chrome_click_element", {"selector": selector})
self.logger.info(f"✅ PATTERN SUCCESS: Clicked using pattern: {selector}")
return f"✅ Clicked on '{element_description}' using pattern: {selector}"
except Exception as pattern_error:
self.logger.debug(f"❌ Pattern failed: {pattern_error}")
continue
self.logger.error(f"❌ ALL METHODS FAILED: Could not find or click element matching: {element_description}")
return f"❌ Could not find clickable element matching: {element_description}"
except Exception as e:
self.logger.error(f"💥 CRITICAL ERROR in smart click: {str(e)}")
return f"💥 Error in smart click: {str(e)}"
def _element_matches_description(self, element: dict, description: str) -> bool:
"""Check if an element matches the given description"""
description_lower = description.lower()
# Check text content
text_content = element.get("textContent", "").lower()
if description_lower in text_content:
return True
# Check attributes
attrs = element.get("attributes", {})
for attr_name, attr_value in attrs.items():
if isinstance(attr_value, str) and description_lower in attr_value.lower():
return True
# Check for common button/link text patterns
if element.get("tagName", "").lower() in ["button", "a", "input"]:
# Check value attribute for buttons
if "value" in attrs and description_lower in attrs["value"].lower():
return True
# Check aria-label
if "aria-label" in attrs and description_lower in attrs["aria-label"].lower():
return True
# Check title
if "title" in attrs and description_lower in attrs["title"].lower():
return True
return False
def _get_match_reason(self, element: dict, description: str) -> str:
"""Get the reason why an element matches the description (for debugging)"""
description_lower = description.lower()
reasons = []
# Check text content
text_content = element.get("textContent", "").lower()
if description_lower in text_content:
reasons.append(f"text_content='{text_content[:30]}...'")
# Check attributes
attrs = element.get("attributes", {})
for attr_name, attr_value in attrs.items():
if isinstance(attr_value, str) and description_lower in attr_value.lower():
reasons.append(f"{attr_name}='{attr_value}'")
# Check for common button/link text patterns
if element.get("tagName", "").lower() in ["button", "a", "input"]:
# Check value attribute for buttons
if "value" in attrs and description_lower in attrs["value"].lower():
reasons.append(f"value='{attrs['value']}'")
# Check aria-label
if "aria-label" in attrs and description_lower in attrs["aria-label"].lower():
reasons.append(f"aria-label='{attrs['aria-label']}'")
# Check title
if "title" in attrs and description_lower in attrs["title"].lower():
reasons.append(f"title='{attrs['title']}'")
return "; ".join(reasons) if reasons else "unknown_match"
async def _get_page_content_mcp(self) -> str:
"""Get page content using MCP chrome_get_web_content tool"""
try:
result = await self._call_mcp_tool("chrome_get_web_content", {
"format": "text"
})
if result and "content" in result:
content = result["content"]
if isinstance(content, list) and len(content) > 0:
text_content = content[0].get("text", "")
return f"Page content retrieved:\n{text_content[:1000]}..." if len(text_content) > 1000 else f"Page content:\n{text_content}"
else:
return str(content)
else:
return "No content found on the page"
except Exception as e:
return f"Error getting page content: {str(e)}"
async def _get_form_fields_mcp(self) -> str:
"""Get form fields using MCP chrome_get_interactive_elements tool"""
try:
result = await self._call_mcp_tool("chrome_get_interactive_elements", {
"types": ["input", "textarea", "select"]
})
if result and "elements" in result:
elements = result["elements"]
if not elements:
return "No form fields found on the page"
field_info = []
for element in elements:
attrs = element.get("attributes", {})
tag_name = element.get("tagName", "").lower()
field_desc = f"- {tag_name}"
if "name" in attrs:
field_desc += f" (name: {attrs['name']})"
if "id" in attrs:
field_desc += f" (id: {attrs['id']})"
if "type" in attrs:
field_desc += f" (type: {attrs['type']})"
if "placeholder" in attrs:
field_desc += f" (placeholder: {attrs['placeholder']})"
field_info.append(field_desc)
return f"Found {len(elements)} form fields:\n" + "\n".join(field_info[:10])
else:
return "No form fields found"
except Exception as e:
return f"Error getting form fields: {str(e)}"
async def _get_interactive_elements_mcp(self) -> str:
"""Get interactive elements using MCP chrome_get_interactive_elements tool"""
try:
result = await self._call_mcp_tool("chrome_get_interactive_elements", {
"types": ["button", "a", "input", "select"]
})
if result and "elements" in result:
elements = result["elements"]
if not elements:
return "No interactive elements found on the page"
element_info = []
for element in elements:
attrs = element.get("attributes", {})
tag_name = element.get("tagName", "").lower()
text_content = element.get("textContent", "").strip()
element_desc = f"- {tag_name}"
if text_content:
element_desc += f" '{text_content[:50]}'"
if "id" in attrs:
element_desc += f" (id: {attrs['id']})"
if "class" in attrs:
element_desc += f" (class: {attrs['class'][:30]})"
element_info.append(element_desc)
return f"Found {len(elements)} interactive elements:\n" + "\n".join(element_info[:15])
else:
return "No interactive elements found"
except Exception as e:
return f"Error getting interactive elements: {str(e)}"
async def process_natural_language_command(self, command: str) -> str:
"""
Process natural language commands with enhanced real-time capabilities.
This is the main entry point for voice commands with intelligent routing.
"""
try:
self.logger.info(f"Processing natural language command: {command}")
# Parse the command
action, params = self._parse_voice_command(command)
if not action:
# Try to infer action from command context
action, params = self._infer_action_from_context(command)
if action:
# Execute with real-time feedback
result = await self._execute_action(action, params)
# Provide contextual response
return self._format_response_for_voice(action, result, params)
else:
return f"I didn't understand the command: {command}. Try saying something like 'fill email with john@example.com' or 'click login button'."
except Exception as e:
self.logger.error(f"Error processing natural language command: {e}")
return f"Error processing command: {str(e)}"
def _infer_action_from_context(self, command: str) -> tuple[Optional[str], Dict[str, Any]]:
"""Infer action from command context when direct parsing fails"""
command_lower = command.lower().strip()
# Email detection
if '@' in command and any(word in command_lower for word in ['email', 'mail']):
email_match = re.search(r'([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', command)
if email_match:
return 'fill_field_by_name', {'field_name': 'email', 'value': email_match.group(1)}
# Phone number detection
phone_match = re.search(r'([\d\-\+\(\)\s]{10,})', command)
if phone_match and any(word in command_lower for word in ['phone', 'number', 'mobile', 'telephone']):
return 'fill_field_by_name', {'field_name': 'phone', 'value': phone_match.group(1)}
# Password detection
if any(word in command_lower for word in ['password', 'pass']):
# Extract potential password (non-space sequence after password keyword)
password_match = re.search(r'(?:password|pass)\s+(\S+)', command_lower)
if password_match:
return 'fill_field_by_name', {'field_name': 'password', 'value': password_match.group(1)}
# Button/link click detection
if any(word in command_lower for word in ['button', 'link', 'click', 'press', 'tap']):
# Extract button/link text
for pattern in [r'(?:click|press|tap)\s+(?:on\s+)?(?:the\s+)?(.+)', r'(.+)\s+(?:button|link)']:
match = re.search(pattern, command_lower)
if match:
return 'click', {'text': match.group(1).strip()}
# Search detection
if any(word in command_lower for word in ['search', 'find', 'look']):
search_match = re.search(r'(?:search|find|look)\s+(?:for\s+)?(.+)', command_lower)
if search_match:
return 'fill_field_by_name', {'field_name': 'search', 'value': search_match.group(1)}
return None, {}
def _format_response_for_voice(self, action: str, result: str, params: Dict[str, Any]) -> str:
"""Format response for voice output with context"""
try:
if action == 'fill_field_by_name':
field_name = params.get('field_name', 'field')
value = params.get('value', '')
if 'success' in result.lower() or 'filled' in result.lower():
return f"Successfully filled {field_name} field with {value[:20]}{'...' if len(value) > 20 else ''}"
else:
return f"Could not fill {field_name} field. {result}"
elif action == 'click':
element = params.get('text', 'element')
if 'success' in result.lower() or 'clicked' in result.lower():
return f"Successfully clicked {element}"
else:
return f"Could not click {element}. {result}"
elif action in ['get_page_content', 'get_form_fields', 'get_interactive_elements']:
return result
else:
return result
except Exception:
return result