commit 8eed508e6d39fb1481a700128cd335ba23f1c1b8 Author: nasir@endelospay.com Date: Tue May 20 01:22:10 2025 +0500 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9c7133c --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.myenv +.env.local +__pycache__ \ No newline at end of file diff --git a/AssistantFnc.py b/AssistantFnc.py new file mode 100644 index 0000000..c9762b6 --- /dev/null +++ b/AssistantFnc.py @@ -0,0 +1,2314 @@ +import json +import uuid +from livekit.agents import (llm, JobContext) +from typing import Annotated, Dict, Any, List +import logging +import requests +import datetime +import os +import re +logger = logging.getLogger("voice-agent") +Base_url = "http://hgh-copy.test" + +class AssistantFnc(llm.FunctionContext): + """ + Function context for QubeCare's autonomous onboarding agent. + Handles the complete provider onboarding process through sequential steps: + 1. Authentication base on check user if return true then login otherwise signup (signup/login) + 2. Form Setup + 3. Product Configuration + 4. Product Configuration + 5. Signature Setup + """ + + def __init__(self, ctx: JobContext): + super().__init__() + self.ctx = ctx + self.accessToken = None + self.onboarding_stage = "authentication" # Track current onboarding stage + self.form_data = {} # Store form data during the process + self.product_data = {} # Store product data during the process + self.category_id = None # Replace with your actual category ID + self.username = None # Store username for later use + self.password = None # Store password for later use + self.user_email = None # Store email for later use + self.start_date = None + self.end_date = None + self.booking_data=None + metadata = json.loads(self.ctx.job.metadata) + token = metadata.get("token") + #self.accessToken='342|lRppO40lM2myq2E9aru4fMeN6RsFOyarlBacfmU8c904c6a3' + self.accessToken=token + self.timezone =metadata.get("timezone") + timezone_list = { + 'America/New_York': 'EST', + 'America/Los_Angeles': 'PST', + 'America/Chicago': 'CST', + 'America/Denver': 'MST', + 'UTC': 'UTC', + } + + self.getTimezone = timezone_list.get(self.timezone, self.timezone) + logger.info(f"self.getTimezone is {self.getTimezone}") + @llm.ai_callable() + async def getAppointment(self, + date: Annotated[ + str, llm.TypeInfo(description="Specific date for appointments (MM-DD-YYYY)") + ] = None, + start_date: Annotated[ + str, llm.TypeInfo(description="Start date for appointments range (MM-DD-YYYY)") + ] = None, + end_date: Annotated[ + str, llm.TypeInfo(description="End date for appointments range (MM-DD-YYYY)") + ] = None + ): + """Get appointments based on a specific date or date range""" + try: + # Prepare parameters + logger.info(f"Getting appointments with parameters: {self.accessToken}") + parameters = {} + + # If specific date is provided, use it for both start and end date + if date: + try: + month, day, year = date.split('-') + formatted_date = f"{year}-{month}-{day}" + parameters["start_date"] = formatted_date + parameters["end_date"] = formatted_date + logger.info(f"Using specific date: {formatted_date}") + except ValueError: + # If already in YYYY-MM-DD format or other format, use as is + parameters["start_date"] = date + parameters["end_date"] = date + logger.info(f"Using specific date (as-is): {date}") + # Otherwise use date range if provided + else: + # If dates not provided, use current month's start and end dates + if not start_date or not end_date: + today = datetime.datetime.now() + first_day = datetime.datetime(today.year, today.month, 1) + # Get the last day of the current month + if today.month == 12: + last_day = datetime.datetime(today.year + 1, 1, 1) - datetime.timedelta(days=1) + else: + last_day = datetime.datetime(today.year, today.month + 1, 1) - datetime.timedelta(days=1) + + if not start_date: + start_date = first_day.strftime("%m-%d-%Y") + logger.info(f"Using default start date: {start_date}") + + if not end_date: + end_date = last_day.strftime("%m-%d-%Y") + logger.info(f"Using default end date: {end_date}") + + # Process start date + if start_date: + try: + month, day, year = start_date.split('-') + formatted_date = f"{year}-{month}-{day}" + parameters["start_date"] = formatted_date + except ValueError: + # If already in YYYY-MM-DD format or other format, use as is + parameters["start_date"] = start_date + + # Process end date + if end_date: + try: + month, day, year = end_date.split('-') + formatted_date = f"{year}-{month}-{day}" + parameters["end_date"] = formatted_date + except ValueError: + # If already in YYYY-MM-DD format or other format, use as is + parameters["end_date"] = end_date + + url = f"{Base_url}/api/assistant/get-appointment-list-date" + self.start_date= parameters["start_date"] + self.end_date= parameters["end_date"] + if self.accessToken: + headers = { + "authorization": f"Bearer {self.accessToken}", + "accesstoken": f"{self.accessToken}", + "content-type": "application/json", + "accept": "application/json" + } + + response = requests.post(url, json=parameters, headers=headers) + logger.info(f"Response status code: {response.status_code}") + logger.info(f"Response content: {response.text}") + + if response.status_code == 200: + logger.info("Appointments retrieved successfully") + appointments_data = response.json() + + # Format the appointments for better presentation to the user + formatted_appointments = [] + for appt in appointments_data.get('appointments', []): + appt_id = appt.get('id') + patient_id = appt.get('patient_id') + patient_name = appt.get('title', 'Unknown Patient') + appt_date = appt.get('date', 'Unknown Date') + appt_time = appt.get('start_time', 'Unknown Time') + + formatted_appt = { + "appointment_id": appt_id, + "patient_id": patient_id, + "patient_name": patient_name, + "date": appt_date, + "time": appt_time, + "instruction": f"To call this patient, say 'Start call with appointment {appt_id}'" + } + formatted_appointments.append(formatted_appt) + + # Add instructions for the agent + agent_instructions = { + "status": "success", + "message": "Appointments retrieved successfully", + "data": formatted_appointments, + "agent_instructions": "Present these appointments to the user and ask which one they would like to call. When they specify an appointment ID, use the start_call function with the corresponding patient_id and appointment_id." + } + + return agent_instructions + else: + logger.error(f"Failed to retrieve appointments. Status code: {response.status_code}") + return {"status": "error", "message": f"Failed to retrieve appointments. Status code: {response.status_code}"} + + except Exception as e: + logger.error(f"Error getting appointments: {str(e)}") + return {"error": f"Failed to get appointments: {str(e)}"} + + @llm.ai_callable() + async def getAppointmentsByTimeframe(self, + timeframe: Annotated[ + str, llm.TypeInfo(description="Standardized timeframe like 'today', 'tomorrow', 'this_week','current_week', 'next_week','current_date', 'current_month', etc.") + ] + ): + """Get appointments for a standardized timeframe""" + try: + logger.info(f"Getting appointments for standardized timeframe: {timeframe}") + + today = datetime.datetime.now() + + # Calculate start and end dates based on the timeframe + if timeframe == "today" or timeframe == "current_date": + start_date = today.strftime("%m-%d-%Y") + end_date = start_date + + elif timeframe == "tomorrow": + tomorrow = today + datetime.timedelta(days=1) + start_date = tomorrow.strftime("%m-%d-%Y") + end_date = start_date + + elif timeframe == "this_week" or timeframe == "current_week": + # Get the start of the current week (Monday) + start_of_week = today - datetime.timedelta(days=today.weekday()) + end_of_week = start_of_week + datetime.timedelta(days=6) # Sunday + start_date = start_of_week.strftime("%m-%d-%Y") + end_date = end_of_week.strftime("%m-%d-%Y") + + elif timeframe == "next_week": + # Get the start of next week (next Monday) + days_until_next_monday = 7 - today.weekday() + if days_until_next_monday == 7: + days_until_next_monday = 0 # If today is Monday, get next Monday + start_of_next_week = today + datetime.timedelta(days=days_until_next_monday) + end_of_next_week = start_of_next_week + datetime.timedelta(days=6) # Sunday + start_date = start_of_next_week.strftime("%m-%d-%Y") + end_date = end_of_next_week.strftime("%m-%d-%Y") + + elif timeframe == "last_week": + # Get the start of last week (previous Monday) + days_since_last_monday = today.weekday() + 7 + start_of_last_week = today - datetime.timedelta(days=days_since_last_monday) + end_of_last_week = start_of_last_week + datetime.timedelta(days=6) # Sunday + start_date = start_of_last_week.strftime("%m-%d-%Y") + end_date = end_of_last_week.strftime("%m-%d-%Y") + + elif timeframe == "this_month" or timeframe == "current_month": + # Get the first day of the current month + first_day = datetime.datetime(today.year, today.month, 1) + # Get the last day of the current month + if today.month == 12: + last_day = datetime.datetime(today.year + 1, 1, 1) - datetime.timedelta(days=1) + else: + last_day = datetime.datetime(today.year, today.month + 1, 1) - datetime.timedelta(days=1) + + start_date = first_day.strftime("%m-%d-%Y") + end_date = last_day.strftime("%m-%d-%Y") + + elif timeframe == "next_month": + # Get the first day of the next month + if today.month == 12: + first_day = datetime.datetime(today.year + 1, 1, 1) + last_day = datetime.datetime(today.year + 1, 2, 1) - datetime.timedelta(days=1) + else: + first_day = datetime.datetime(today.year, today.month + 1, 1) + if today.month == 11: # December + last_day = datetime.datetime(today.year + 1, 1, 1) - datetime.timedelta(days=1) + else: + last_day = datetime.datetime(today.year, today.month + 2, 1) - datetime.timedelta(days=1) + + start_date = first_day.strftime("%m-%d-%Y") + end_date = last_day.strftime("%m-%d-%Y") + + elif timeframe == "last_month": + # Get the first day of the last month + if today.month == 1: + first_day = datetime.datetime(today.year - 1, 12, 1) + last_day = datetime.datetime(today.year, 1, 1) - datetime.timedelta(days=1) + else: + first_day = datetime.datetime(today.year, today.month - 1, 1) + last_day = datetime.datetime(today.year, today.month, 1) - datetime.timedelta(days=1) + + start_date = first_day.strftime("%m-%d-%Y") + end_date = last_day.strftime("%m-%d-%Y") + + else: + # Default to current month if timeframe is not recognized + logger.warning(f"Unrecognized timeframe '{timeframe}', defaulting to current month") + first_day = datetime.datetime(today.year, today.month, 1) + if today.month == 12: + last_day = datetime.datetime(today.year + 1, 1, 1) - datetime.timedelta(days=1) + else: + last_day = datetime.datetime(today.year, today.month + 1, 1) - datetime.timedelta(days=1) + + start_date = first_day.strftime("%m-%d-%Y") + end_date = last_day.strftime("%m-%d-%Y") + + logger.info(f"Converted timeframe '{timeframe}' to date range: {start_date} to {end_date}") + + # Use the existing getAppointment function with the calculated date range + return await self.getAppointment(start_date=start_date, end_date=end_date) + + except Exception as e: + logger.error(f"Error getting appointments by timeframe: {str(e)}") + return {"error": f"Failed to get appointments for timeframe '{timeframe}': {str(e)}"} + + @llm.ai_callable() + async def getAppointmentsByDate(self, + date_str: Annotated[ + str, llm.TypeInfo(description="Date in natural language format like 'January 15', 'next Monday', 'May 3rd', etc.") + ] + ): + """Get appointments for a specific date expressed in natural language""" + try: + logger.info(f"Getting appointments for natural date: {date_str}") + + # Try to parse the natural language date + today = datetime.datetime.now() + + # Handle common date formats + date_str_lower = date_str.lower().strip() + + # Handle relative dates + if "today" in date_str_lower: + target_date = today + elif "tomorrow" in date_str_lower: + target_date = today + datetime.timedelta(days=1) + elif "yesterday" in date_str_lower: + target_date = today - datetime.timedelta(days=1) + elif "next monday" in date_str_lower: + days_until_next_monday = (7 - today.weekday()) % 7 + if days_until_next_monday == 0: + days_until_next_monday = 7 # If today is Monday, get next Monday + target_date = today + datetime.timedelta(days=days_until_next_monday) + elif "next tuesday" in date_str_lower: + days_until_next_tuesday = (8 - today.weekday()) % 7 + if days_until_next_tuesday == 0: + days_until_next_tuesday = 7 + target_date = today + datetime.timedelta(days=days_until_next_tuesday) + elif "next wednesday" in date_str_lower: + days_until_next_wednesday = (9 - today.weekday()) % 7 + if days_until_next_wednesday == 0: + days_until_next_wednesday = 7 + target_date = today + datetime.timedelta(days=days_until_next_wednesday) + elif "next thursday" in date_str_lower: + days_until_next_thursday = (10 - today.weekday()) % 7 + if days_until_next_thursday == 0: + days_until_next_thursday = 7 + target_date = today + datetime.timedelta(days=days_until_next_thursday) + elif "next friday" in date_str_lower: + days_until_next_friday = (11 - today.weekday()) % 7 + if days_until_next_friday == 0: + days_until_next_friday = 7 + target_date = today + datetime.timedelta(days=days_until_next_friday) + elif "next saturday" in date_str_lower: + days_until_next_saturday = (12 - today.weekday()) % 7 + if days_until_next_saturday == 0: + days_until_next_saturday = 7 + target_date = today + datetime.timedelta(days=days_until_next_saturday) + elif "next sunday" in date_str_lower: + days_until_next_sunday = (13 - today.weekday()) % 7 + if days_until_next_sunday == 0: + days_until_next_sunday = 7 + target_date = today + datetime.timedelta(days=days_until_next_sunday) + # Handle current week days + elif "monday" in date_str_lower: + days_until_monday = (0 - today.weekday()) % 7 + target_date = today + datetime.timedelta(days=days_until_monday) + elif "tuesday" in date_str_lower: + days_until_tuesday = (1 - today.weekday()) % 7 + target_date = today + datetime.timedelta(days=days_until_tuesday) + elif "wednesday" in date_str_lower: + days_until_wednesday = (2 - today.weekday()) % 7 + target_date = today + datetime.timedelta(days=days_until_wednesday) + elif "thursday" in date_str_lower: + days_until_thursday = (3 - today.weekday()) % 7 + target_date = today + datetime.timedelta(days=days_until_thursday) + elif "friday" in date_str_lower: + days_until_friday = (4 - today.weekday()) % 7 + target_date = today + datetime.timedelta(days=days_until_friday) + elif "saturday" in date_str_lower: + days_until_saturday = (5 - today.weekday()) % 7 + target_date = today + datetime.timedelta(days=days_until_saturday) + elif "sunday" in date_str_lower: + days_until_sunday = (6 - today.weekday()) % 7 + target_date = today + datetime.timedelta(days=days_until_sunday) + else: + # For more complex date strings, we'll use a simple approach + # This is a simplified approach - in a production system, you might want to use + # a more sophisticated date parsing library like dateparser + try: + # Try to parse common date formats + for fmt in ["%B %d", "%b %d", "%B %d %Y", "%b %d %Y", "%m/%d/%Y", "%m-%d-%Y", "%Y-%m-%d"]: + try: + parsed_date = datetime.datetime.strptime(date_str, fmt) + # If year is not specified, use current year + if fmt in ["%B %d", "%b %d"]: + parsed_date = parsed_date.replace(year=today.year) + target_date = parsed_date + break + except ValueError: + continue + else: + # If we couldn't parse the date, default to today + logger.warning(f"Could not parse date '{date_str}', defaulting to today") + target_date = today + except Exception as parse_error: + logger.error(f"Error parsing date '{date_str}': {str(parse_error)}") + target_date = today + + # Format the date for the API call (MM-DD-YYYY) + formatted_date = target_date.strftime("%m-%d-%Y") + logger.info(f"Parsed natural date '{date_str}' to formatted date: {formatted_date}") + + # Use the existing function to get appointments for this date + return await self.getAppointment(date=formatted_date) + + except Exception as e: + logger.error(f"Error getting appointments by natural date: {str(e)}") + return {"error": f"Failed to get appointments for date '{date_str}': {str(e)}"} + @llm.ai_callable() + async def start_call_by_patient_name(self, + patient_name: Annotated[ + str, llm.TypeInfo(description="Name of the patient to call") + ], + date: Annotated[ + str, llm.TypeInfo(description="Optional date to filter appointments (MM-DD-YYYY)") + ] = None + ): + """Start a call with a patient by searching for their name in upcoming appointments. + If multiple appointments are found, the most recent one will be used.""" + try: + logger.info(f"Starting call process for patient name: {patient_name}") + + # First, get a list of appointments to search through + today = datetime.datetime.now() + if self.start_date and self.end_date: + # Use the provided date range + parameters = { + "start_date": self.start_date, + "end_date": self.end_date + } + else: + # If date is provided, use it; otherwise use today's date as start and a future date as end + if date: + try: + month, day, year = date.split('-') + formatted_date = f"{year}-{month}-{day}" + start_date = formatted_date + except ValueError: + # If already in YYYY-MM-DD format or other format, use as is + start_date = date + else: + # Use today as start date + start_date = today.strftime("%Y-%m-%d") + + # Use a date 30 days in the future as end date if no specific date is provided + if not date: + end_date = (today + datetime.timedelta(days=30)).strftime("%Y-%m-%d") + else: + end_date = start_date + + parameters = { + "start_date": start_date, + "end_date": end_date + } + logger.info(f"Response parameters: {parameters}") + url = f"{Base_url}/api/assistant/get-appointment-list-date" + + if not self.accessToken: + return { + "status": "error", + "message": "Authentication token not available." + } + + headers = { + "authorization": f"Bearer {self.accessToken}", + "accesstoken": f"{self.accessToken}", + "content-type": "application/json", + "accept": "application/json" + } + + response = requests.post(url, json=parameters, headers=headers) + logger.info(f"Response status code: {response.status_code}") + + if response.status_code != 200: + return { + "status": "error", + "message": f"Failed to retrieve appointments. Status code: {response.status_code}" + } + + appointments_data = response.json() + appointments = appointments_data.get('appointments', []) + + if not appointments: + return { + "status": "error", + "message": f"No appointments found for the specified date range." + } + + # Search for appointments matching the patient name (case-insensitive partial match) + patient_name_lower = patient_name.lower() + matching_appointments = [] + + for appt in appointments: + appt_patient_name = appt.get('title', '').lower() + if patient_name_lower in appt_patient_name: + matching_appointments.append(appt) + + if not matching_appointments: + return { + "status": "error", + "message": f"No appointments found for patient name '{patient_name}'. Please check the spelling or try using the appointment ID instead." + } + + # Sort appointments by date (most recent first) + matching_appointments.sort(key=lambda x: x.get('date', ''), reverse=True) + + # Use the first (most recent) matching appointment + selected_appointment = matching_appointments[0] + appointment_id = selected_appointment.get('id') + patient_id = selected_appointment.get('patient_id') + + if not appointment_id or not patient_id: + return { + "status": "error", + "message": "The appointment information is incomplete. Please try using the appointment ID instead." + } + + # Now we have both appointment_id and patient_id, proceed with the call + logger.info(f"Starting call with patient_id: {patient_id}, appointment_id: {appointment_id}") + + # Show call interface for the user + participant = await self.ctx.wait_for_participant() + try: + resp = await self.ctx.room.local_participant.perform_rpc( + destination_identity=participant.identity, + method="startCall", + response_timeout=30, # 30 second timeout for the RPC call + payload=json.dumps({ + "patient_id": patient_id, + "appointment_id": appointment_id, + }), + ) + + logger.info(f"Call interface displayed to user. Response: {json.dumps(resp)}") + return { + "status": "success", + "message": f"I've initiated a call with {selected_appointment.get('title', 'the patient')}. Please wait while the connection is established.", + "call_status": "connecting", + "appointment_details": { + "patient_name": selected_appointment.get('title'), + "date": selected_appointment.get('date'), + "time": selected_appointment.get('start_time') + } + } + except Exception as rpc_error: + logger.error(f"RPC call failed: {str(rpc_error)}") + return { + "status": "error", + "message": f"Failed to start the call: {str(rpc_error)}" + } + + except Exception as e: + logger.error(f"Error starting call by patient name: {str(e)}") + return {"status": "error", "message": f"Error starting call: {str(e)}"} + + + + async def checkUserExist(self, + email: Annotated[ + str, llm.TypeInfo(description="Email address of the user to check") + ] + ): + """Check if a user exists in the system based on their email address""" + try: + # Store email for later use + self.user_email = email + + user_info = {"email": email} + logger.info(f"Checking user existence: {user_info}") + url = f"{Base_url}/api/check-user" + + response = requests.post(url, json=user_info) + + if response.status_code == 200: + logger.info("User check completed successfully") + result = response.json() + user_exists = result.get('success', False) # Adjust based on actual API response + logger.info(f"User exists: {user_exists}") + + # Return a boolean value directly + return user_exists + else: + logger.error(f"Failed to check user. Status code: {response.status_code}") + return False + except Exception as e: + logger.error(f"Error checking user existence: {str(e)}") + return False + + @llm.ai_callable() + async def add_practitioner(self, + firstName: Annotated[ + str, llm.TypeInfo(description="First name of the practitioner") + ], + lastName: Annotated[ + str, llm.TypeInfo(description="Last name of the practitioner") + ], + emailAddress: Annotated[ + str, llm.TypeInfo(description="Email address of the practitioner") + ], + password: Annotated[ + str, llm.TypeInfo(description="Password for the practitioner account") + ], + gender: Annotated[ + str, llm.TypeInfo(description="Gender of the practitioner (Male or Female)") + ], + dateOfBirth: Annotated[ + str, llm.TypeInfo(description="Date of birth in MM-DD-YYYY format") + ] = None, + + phoneNumber: Annotated[ + str, llm.TypeInfo(description="Phone number of the practitioner") + ] = None, + + ): + """Add a new practitioner or doctor to the system with the specified parameters""" + try: + logger.info(f"Adding new practitioner: {firstName} {lastName}") + self.user_email = emailAddress + + # Check if user exists first + user_exists = await self.checkUserExist(emailAddress) + + if user_exists: + logger.info(f"User with email {emailAddress} already exists") + return { + "status": "error", + "message": f"A user with email {emailAddress} already exists. Please use a different email address." + } + + # Prepare the practitioner data + practitioner_data = { + "firstName": firstName, + "lastName": lastName, + "emailAddress": emailAddress, + "username": emailAddress, # Default to email if username not provided + "type": "practitioner" + } + + # Add optional fields if provided + if dateOfBirth: + practitioner_data["dateOfBirth"] = dateOfBirth + else: + practitioner_data["dateOfBirth"] = "1990-01-01" # Placeholder value + if gender: + practitioner_data["gender"] = gender + + # Add phone number if provided, otherwise use a default + if phoneNumber: + practitioner_data["textMessageNumber"] = phoneNumber + else: + practitioner_data["textMessageNumber"] = '+1-123-456-7890' + + if password: + practitioner_data["newUserPassword"] = password + practitioner_data["adminPassword"] = password + + # Set default values for required fields that might be missing + practitioner_data.setdefault("sendEmail", False) + practitioner_data.setdefault("analytics", "All Reports") + + url = f"{Base_url}/api/assistant/add-user" + + if not self.accessToken: + return { + "status": "error", + "message": "Authentication token not available." + } + + headers = { + "authorization": f"Bearer {self.accessToken}", + "accesstoken": f"{self.accessToken}", + "content-type": "application/json", + "accept": "application/json" + } + + logger.info(f"Sending practitioner data: {json.dumps(practitioner_data)}") + response = requests.post(url, json=practitioner_data, headers=headers) + logger.info(f"Response status code: {response.status_code}") + logger.info(f"Response content: {response.text}") + + if response.status_code == 200: + response_data = response.json() + return { + "status": "success", + "message": f"Successfully added practitioner {firstName} {lastName}", + "data": response_data + } + else: + return { + "status": "error", + "message": f"Failed to add practitioner. Status code: {response.status_code}", + "details": response.text + } + + except Exception as e: + logger.error(f"Error adding practitioner: {str(e)}") + return {"status": "error", "message": f"Error adding practitioner: {str(e)}"} + + @llm.ai_callable() + async def add_practitioner_availability(self, + practitioner_id: Annotated[ + int, llm.TypeInfo(description="ID of the practitioner to add availability for") + ], + date: Annotated[ + str, llm.TypeInfo(description="Start Date for the availability in YYYY-MM-DD format or natural language like '1 May 2025'") + ], + start_time: Annotated[ + str, llm.TypeInfo(description="Start time in format like '07:30 AM'") + ], + until_date: Annotated[ + str, llm.TypeInfo(description="End date for repeating availability in YYYY-MM-DD format or natural language like '1 May 2025'") + ], + end_time: Annotated[ + str, llm.TypeInfo(description="End time in format like '03:15 PM'") + ], + repeats: Annotated[ + bool, llm.TypeInfo(description="Whether this availability repeats") + ] = True, + select_days: Annotated[ + str, llm.TypeInfo(description="Comma-separated list of days when availability repeats (e.g., 'Monday,Tuesday')") + ] = None, + + comment: Annotated[ + str, llm.TypeInfo(description="Optional comment about this availability") + ] = "" + ): + """Add availability for a practitioner with the specified parameters""" + try: + logger.info(f"Adding availability for practitioner ID: {practitioner_id}") + + start_date = self._parse_date_to_iso(date) + if not start_date: + return { + "status": "error", + "message": f"Invalid date format: {date}. Please use YYYY-MM-DD format or natural language like '1 May 2025'." + } + + # Parse and format the end date if provided + end_date = None + if until_date: + end_date = self._parse_date_to_iso(until_date) + if not end_date: + return { + "status": "error", + "message": f"Invalid until_date format: {until_date}. Please use YYYY-MM-DD format or natural language." + } + else: + # Default to start date if no end date provided + end_date = start_date + + # Format the start and end datetimes in ISO 8601 format with UTC timezone + start_datetime_result = self._format_datetime_start(start_date, start_time,self.getTimezone) + end_datetime_result = self._format_datetime_end(end_date, end_time,self.getTimezone) + + if not start_datetime_result or not end_datetime_result: + return { + "status": "error", + "message": "Failed to format date and time correctly." + } + + + start_time_utc = start_datetime_result['time'] + end_time_utc = end_datetime_result['time'] + # start_time_24h = self._convert_time_to_24h(start_time_utc) + # end_time_24h = self._convert_time_to_24h(end_time_utc) + + # # Format the start and end times for the API with UTC timezone + # # The API expects ISO format strings for start and end + # start_datetime = f"{start_date}T{start_time_24h}" + # end_datetime = f"{start_date}T{end_time_24h}" + + # Prepare the availability data + availability_data = { + "practitioner_id": practitioner_id, + "shift_title": "", + "type": "availability", + "allDay": False, + "date": start_date, # Use the parsed ISO date + "start": start_datetime_result['formatted_string'], + "end": end_datetime_result['formatted_string'], + "start_time": start_time_utc, # Keep original format for display + "end_time": end_time_utc, # Keep original format for display + "service": [], + "facility": 0, + "billing_facility": 0, + "repeats": repeats, + "repeatFrequency": "", + "repeatUnit": "", + "comment": comment + } + + + + # Add repeating information if applicable + if repeats: + if until_date: + availability_data["untilDate"] = end_date + else: + availability_data["untilDate"] = end_date # Default to the same date if not specified + + availability_data["weakDay"] = True if select_days else False + + if select_days: + # Convert comma-separated string to list + days_list = [day.strip() for day in select_days.split(',')] + availability_data["selectDays"] = days_list + + logger.info(f"Response content: {availability_data}") + + url = f"{Base_url}/api/assistant/provider-add-availability" + + if not self.accessToken: + return { + "status": "error", + "message": "Authentication token not available." + } + + headers = { + "authorization": f"Bearer {self.accessToken}", + "accesstoken": f"{self.accessToken}", + "content-type": "application/json", + "accept": "application/json" + } + + logger.info(f"Sending availability data: {json.dumps(availability_data)}") + response = requests.post(url, json=availability_data, headers=headers) + logger.info(f"Response status code: {response.status_code}") + + + if response.status_code == 200: + response_data = response.json() + return { + "status": "success", + "message": f"Successfully added availability for practitioner ID {practitioner_id}", + "data": response_data + } + else: + return { + "status": "error", + "message": f"Failed to add availability. Status code: {response.status_code}", + "details": response.text + } + + except Exception as e: + logger.error(f"Error adding practitioner availability: {str(e)}") + return {"status": "error", "message": f"Error adding availability: {str(e)}"} + + def _convert_time_to_24h(self, time_str): + """Helper method to convert time from 12-hour format (e.g., '8:00 AM') to ISO 8601 format (e.g., '12:00:00.000Z')""" + try: + # Parse the time string + time_parts = time_str.split() + time_components = time_parts[0].split(':') + + hour = int(time_components[0]) + minute = int(time_components[1]) if len(time_components) > 1 else 0 + + # Adjust for AM/PM + if len(time_parts) > 1: + am_pm = time_parts[1].upper() + if am_pm == 'PM' and hour < 12: + hour += 12 + elif am_pm == 'AM' and hour == 12: + hour = 0 + + # Format as ISO 8601 time with milliseconds and 'Z' suffix to indicate UTC timezone + # For this specific format, we're using a fixed UTC time without offset calculation + return f"{hour:02d}:{minute:02d}:00.000Z" + except Exception as e: + logger.error(f"Error converting time format: {str(e)}") + return time_str # Return original if conversion fails + + @llm.ai_callable() + async def get_practitioners(self): + """Get a list of all practitioners in the system""" + try: + logger.info("Getting list of practitioners") + + url = f"{Base_url}/api/assistant/practitioners-list" + + if not self.accessToken: + return { + "status": "error", + "message": "Authentication token not available." + } + + headers = { + "authorization": f"Bearer {self.accessToken}", + "accesstoken": f"{self.accessToken}", + "content-type": "application/json", + "accept": "application/json" + } + + response = requests.get(url, headers=headers) + + logger.info(f"Response status code: {response.status_code}") + + if response.status_code == 200: + practitioners_data = response.json() + logger.info(f"Response status code: {practitioners_data.get('data', [])}") + # Format the practitioners for better presentation + formatted_practitioners = [] + for practitioner in practitioners_data.get('data', []): + formatted_practitioner = { + "id": practitioner.get('id'), + "name": f"{practitioner.get('fname', '')} {practitioner.get('lname', '')}", + "email": practitioner.get('email', '') + } + formatted_practitioners.append(formatted_practitioner) + + return { + "status": "success", + "message": "Successfully retrieved practitioners", + "data": formatted_practitioners + } + else: + return { + "status": "error", + "message": f"Failed to retrieve practitioners. Status code: {response.status_code}", + "details": response.text if hasattr(response, 'text') else None + } + + except Exception as e: + logger.error(f"Error getting practitioners: {str(e)}") + return {"status": "error", "message": f"Error getting practitioners: {str(e)}"} + def _parse_date_to_iso(self, date_str): + """Helper method to parse various date formats to ISO format (YYYY-MM-DD)""" + try: + # Check if already in YYYY-MM-DD format + if re.match(r'^\d{4}-\d{2}-\d{2}$', date_str): + return date_str + + # Try to parse common date formats + for fmt in [ + "%d %B %Y", # 1 May 2025 + "%d %b %Y", # 1 May 2025 + "%B %d %Y", # May 1 2025 + "%b %d %Y", # May 1 2025 + "%m/%d/%Y", # 05/01/2025 + "%m-%d-%Y", # 05-01-2025 + "%d/%m/%Y", # 01/05/2025 + "%d-%m-%Y" # 01-05-2025 + ]: + try: + parsed_date = datetime.datetime.strptime(date_str, fmt) + # Just return the formatted date without UTC conversion + return parsed_date.strftime("%Y-%m-%d") + except ValueError: + continue + + # Try to handle natural language date expressions + date_str_lower = date_str.lower().strip() + today = datetime.datetime.now() + + # Handle relative dates + if "today" in date_str_lower: + return today.strftime("%Y-%m-%d") + elif "tomorrow" in date_str_lower: + tomorrow = today + datetime.timedelta(days=1) + return tomorrow.strftime("%Y-%m-%d") + elif "yesterday" in date_str_lower: + yesterday = today - datetime.timedelta(days=1) + return yesterday.strftime("%Y-%m-%d") + + # Handle day names + days = {"monday": 0, "tuesday": 1, "wednesday": 2, "thursday": 3, + "friday": 4, "saturday": 5, "sunday": 6} + + for day_name, day_num in days.items(): + if day_name in date_str_lower: + # Calculate days until the next occurrence of this day + days_ahead = (day_num - today.weekday()) % 7 + if "next" in date_str_lower: + # If "next" is specified, add 7 days if it would otherwise be today + if days_ahead == 0: + days_ahead = 7 + next_day = today + datetime.timedelta(days=days_ahead) + return next_day.strftime("%Y-%m-%d") + + # Try to extract month and day from strings like "May 1" or "1 May" + months = { + "january": 1, "february": 2, "march": 3, "april": 4, "may": 5, "june": 6, + "july": 7, "august": 8, "september": 9, "october": 10, "november": 11, "december": 12, + "jan": 1, "feb": 2, "mar": 3, "apr": 4, "jun": 6, "jul": 7, "aug": 8, + "sep": 9, "sept": 9, "oct": 10, "nov": 11, "dec": 12 + } + + for month_name, month_num in months.items(): + if month_name in date_str_lower: + # Try to extract the day number + day_match = re.search(r'\b(\d{1,2})(st|nd|rd|th)?\b', date_str_lower) + if day_match: + day = int(day_match.group(1)) + # Determine year (current year, unless the date would be in the past) + year = today.year + if month_num < today.month or (month_num == today.month and day < today.day): + year += 1 + + try: + parsed_date = datetime.datetime(year, month_num, day) + return parsed_date.strftime("%Y-%m-%d") + except ValueError: + # Invalid date (e.g., February 30) + pass + + # If all parsing attempts failed + logger.error(f"Could not parse date: {date_str}") + return None + + except Exception as e: + logger.error(f"Error parsing date: {str(e)}") + return None + + def _convert_to_utc(self, dt): + """ + This function is kept for compatibility but now simply returns the datetime object + without timezone conversion since we're only working with date formats + """ + return dt # Simply return the datetime object without conversion + + + def _format_datetime_start(self, date, time_str, timezone='EST'): + """ + Convert a date and time from a specific timezone to UTC format + Similar to formatDateTimeStart in JavaScript + + Args: + date (str): Date in YYYY-MM-DD format + time_str (str): Time in format like '07:30 AM' + timezone (str): Source timezone (default: 'EST') + + Returns: + dict: Dictionary containing: + - 'formatted_string': Formatted datetime string in ISO 8601 format with UTC timezone + - 'datetime_obj': Python datetime object in UTC timezone + - 'date': Date component in YYYY-MM-DD format + - 'time': Time component in HH:MM AM/PM format + """ + try: + # Use built-in datetime module + import datetime + + # Log the input parameters for debugging + logger.info(f"_format_datetime_start called with date={date}, time_str={time_str}, timezone={timezone}") + + # Parse the date and time + datetime_str = f"{date} {time_str}" + + # Try different formats for the time + formats = [ + "%Y-%m-%d %I:%M %p", # 07:30 AM + "%Y-%m-%d %H:%M", # 07:30 (24-hour) + "%Y-%m-%d %I:%M%p", # 07:30AM (no space) + "%Y-%m-%d %I:%M" # 07:30 (12-hour, no AM/PM) + ] + + dt = None + for fmt in formats: + try: + dt = datetime.datetime.strptime(datetime_str, fmt) + logger.info(f"Successfully parsed datetime with format: {fmt}") + break + except ValueError: + continue + + if not dt: + raise ValueError(f"Could not parse datetime: {datetime_str}") + + # Handle timezone conversion using a more robust approach + # First, create a naive datetime (without timezone) + naive_dt = dt + + # Then, handle the timezone conversion manually + # For EST (UTC-5), add 5 hours to convert to UTC + timezone_offsets = { + 'EST': -4, # Eastern Standard Time (UTC-5) + 'EDT': -4, # Eastern Daylight Time (UTC-4) + #'CST': -6, # Central Standard Time (UTC-6) + 'CST': -5, # Central Daylight Time (UTC-5) + 'MST': -6, # Mountain Standard Time (UTC-7) + #'MDT': -6, # Mountain Daylight Time (UTC-6) + 'PST': -7, # Pacific Standard Time (UTC-8) + 'PDT': -7, # Pacific Daylight Time (UTC-7) + 'CDT': -5 # Central Daylight Time (UTC-5) + } + + # Get the UTC offset for the specified timezone + offset_hours = timezone_offsets.get(timezone.upper(), 0) + logger.info(f"Using timezone offset: {offset_hours} hours for timezone: {timezone}") + + # Apply the offset to convert to UTC + utc_dt = naive_dt + datetime.timedelta(hours=-offset_hours) + + # Format in ISO 8601 with milliseconds and Z suffix + formatted_datetime = utc_dt.strftime("%Y-%m-%dT%H:%M:%S.000Z") + + # Log detailed information about the conversion + logger.info(f"Original datetime: {dt.strftime('%Y-%m-%d %I:%M %p')} {timezone}") + logger.info(f"UTC datetime: {utc_dt.strftime('%Y-%m-%d %H:%M:%S')} UTC") + logger.info(f"Formatted ISO datetime: {formatted_datetime}") + logger.info(f"Date component: {utc_dt.strftime('%Y-%m-%d')}") + logger.info(f"Time component (AM/PM): {utc_dt.strftime('%I:%M %p')}") + logger.info(f"Time component (24h): {utc_dt.strftime('%H:%M:%S')}") + + # Return a dictionary with both the formatted string and the datetime object + return { + 'formatted_string': formatted_datetime, + 'datetime_obj': utc_dt, + 'date': utc_dt.strftime("%Y-%m-%d"), + 'time': utc_dt.strftime("%I:%M %p") # Format time in AM/PM format + } + except Exception as e: + logger.error(f"Error formatting datetime: {str(e)}") + return None + + def _format_datetime_end(self, date, end_time, timezone='EST'): + """ + Convert a date and end time to UTC format + Similar to formatDateTimeEnd in JavaScript + + Args: + date (str): Date in YYYY-MM-DD format + end_time (str): Time in format like '03:15 PM' + timezone (str, optional): Timezone name. Defaults to 'EST'. + + Returns: + dict: Dictionary containing: + - 'formatted_string': Formatted datetime string in ISO 8601 format with UTC timezone + - 'datetime_obj': Python datetime object in UTC timezone + - 'date': Date component in YYYY-MM-DD format + - 'time': Time component in HH:MM:SS format + """ + try: + # Use built-in datetime module + import datetime + + # Log the input parameters for debugging + logger.info(f"_format_datetime_end called with date={date}, end_time={end_time}, timezone={timezone}") + + # Parse the date and time + datetime_str = f"{date} {end_time}" + # Try different formats for the time + formats = [ + "%Y-%m-%d %I:%M %p", # 03:15 PM + "%Y-%m-%d %H:%M", # 15:15 (24-hour) + "%Y-%m-%d %I:%M%p", # 03:15PM (no space) + "%Y-%m-%d %I:%M" # 03:15 (12-hour, no AM/PM) + ] + + dt = None + for fmt in formats: + try: + dt = datetime.datetime.strptime(datetime_str, fmt) + logger.info(f"Successfully parsed datetime with format: {fmt}") + break + except ValueError: + continue + + if not dt: + raise ValueError(f"Could not parse datetime: {datetime_str}") + + # Handle timezone conversion using a more robust approach + # First, create a naive datetime (without timezone) + naive_dt = dt + + # Then, handle the timezone conversion manually + # For EST (UTC-5), add 5 hours to convert to UTC + timezone_offsets = { + 'EST': -4, # Eastern Standard Time (UTC-5) + 'EDT': -4, # Eastern Daylight Time (UTC-4) + #'CST': -6, # Central Standard Time (UTC-6) + 'CST': -5, # Central Daylight Time (UTC-5) + 'MST': -6, # Mountain Standard Time (UTC-7) + #'MDT': -6, # Mountain Daylight Time (UTC-6) + 'PST': -7, # Pacific Standard Time (UTC-8) + 'PDT': -7, # Pacific Daylight Time (UTC-7) + 'CDT': -5 # Central Daylight Time (UTC-5) + # 'EST': -4, # Eastern Daylight Time (UTC-4) + # 'EDT': -4, # Eastern Daylight Time (UTC-4) + # 'CST': -6, # Central Standard Time (UTC-6) + + # 'MST': -7, # Mountain Standard Time (UTC-7) + # 'MDT': -6, # Mountain Daylight Time (UTC-6) + # 'PST': -8, # Pacific Standard Time (UTC-8) + # 'PDT': -7 # Pacific Daylight Time (UTC-7) + } + + # Get the UTC offset for the specified timezone + offset_hours = timezone_offsets.get(timezone.upper(), 0) + logger.info(f"Using timezone offset: {offset_hours} hours for timezone: {timezone}") + + # Apply the offset to convert to UTC + utc_dt = naive_dt + datetime.timedelta(hours=-offset_hours) + + # Format in ISO 8601 with milliseconds and Z suffix + formatted_datetime = utc_dt.strftime("%Y-%m-%dT%H:%M:%S.000Z") + + # Log detailed information about the conversion + logger.info(f"Original datetime: {dt.strftime('%Y-%m-%d %I:%M %p')} {timezone}") + logger.info(f"UTC datetime: {utc_dt.strftime('%Y-%m-%d %H:%M:%S')} UTC") + logger.info(f"Formatted ISO datetime: {formatted_datetime}") + logger.info(f"Date component: {utc_dt.strftime('%Y-%m-%d')}") + logger.info(f"Time component (AM/PM): {utc_dt.strftime('%I:%M %p')}") + logger.info(f"Time component (24h): {utc_dt.strftime('%H:%M:%S')}") + + # Return a dictionary with both the formatted string and the datetime object + return { + 'formatted_string': formatted_datetime, + 'datetime_obj': utc_dt, + 'date': utc_dt.strftime("%Y-%m-%d"), + 'time': utc_dt.strftime("%I:%M %p") # Format time in AM/PM format + } + except Exception as e: + logger.error(f"Error formatting end datetime: {str(e)}") + return None + + @llm.ai_callable() + async def addCategory(self, + categoryName: Annotated[ + str, llm.TypeInfo(description="Name of the category to save") + ] + ): + """Save a category with the given name and return status""" + try: + category_data = { + "name": categoryName + } + + logger.info(f"Category data: {category_data}") + url = f"{Base_url}/api/assistant/save-category" + + if self.accessToken: + headers = { + "authorization": f"Bearer {self.accessToken}", + "accesstoken": f"{self.accessToken}", + "content-type": "application/json", + "accept": "application/json" + } + + response = requests.post(url, json=category_data, headers=headers) + + if response.status_code == 200: + logger.info("Category data sent successfully") + category_id = response.json().get('category_id') + self.category_id=category_id + return { + "status": "success", + "message": "Category saved successfully", + + } + + else: + logger.error(f"Failed to save category. Status code: {response.status_code}") + return {"status": "error", "message": f"Failed to save category. Status code: {response.status_code}"} + else: + logger.error("No access token available. Please register or login first.") + return {"status": "error", "message": "Authentication required. Please register or login first."} + except Exception as e: + logger.error(f"Error saving category: {str(e)}") + return {"status": "error", "message": f"Error saving category: {str(e)}"} + + @llm.ai_callable() + async def createProduct(self, + productName: Annotated[ + str, llm.TypeInfo(description="Name of the product") + ], + productCatName: Annotated[ + str, llm.TypeInfo(description="Name of the product catgory") + ], + productPrice: Annotated[ + float, llm.TypeInfo(description="Price of the product") + ], + productDescription: Annotated[ + str, llm.TypeInfo(description="Description of the product") + ], + prescription: Annotated[ + str, llm.TypeInfo(description="prescrption of the product (yes, no)") + ] + + ): + """Create a new product with the specified details""" + try: + # Store product data for reference + self.product_data = { + "name": productName, + "price": productPrice, + "productCatName": productCatName, + "description": productDescription + } + logger.info(f"Creating product catagory: {productCatName}") + slug = productName.lower().replace(' ', '-').replace('&', 'and') + is_prescription = True if prescription.lower() == "yes" else False + product_info = { + "name": productName, + "slug": slug, + "description": productDescription, + "price": productPrice, + "type": "One Time", + "isPrescription": is_prescription, + "inTakeForm_id": "", + "questiioneriesForm_id": "", + "category_id": productCatName, + "sku": "", + "status": "active" + } + + logger.info(f"Creating product: {product_info}") + url = f"{Base_url}/api/assistant/save-product" + + if self.accessToken: + headers = { + "authorization": f"Bearer {self.accessToken}", + "accesstoken": f"{self.accessToken}", + "content-type": "application/json", + "accept": "application/json" + } + + response = requests.post(url, json=product_info, headers=headers) + + if response.status_code == 200: + logger.info("Product created successfully") + + return { + "status": "success", + "message": "Product created successfully", + } + else: + logger.error(f"Failed to create product. Status code: {response.status_code}") + return {"status": "error", "message": f"Failed to create product. Status code: {response.status_code}"} + else: + logger.error("No access token available. Please register or login first.") + return {"status": "error", "message": "Authentication required. Please register or login first."} + except Exception as e: + logger.error(f"Error creating product: {str(e)}") + return {"status": "error", "message": f"Error creating product: {str(e)}"} + + + @llm.ai_callable() + async def suggestFormFields(self, + formName: Annotated[ + str, llm.TypeInfo(description="Name of the form") + ], + formType: Annotated[ + str, llm.TypeInfo(description="Type of form (charting-forms, document-forms, simple-forms, consent-forms)") + ] + ): + """Suggest appropriate form fields based on form name and type""" + try: + # Store form metadata for later use + self.form_data = { + "name": formName, + "type": formType + } + + suggested_fields = [] + + # Generate field suggestions based on form type + if "charting" in formType.lower(): + suggested_fields = [ + {"label": "Date of Visit", "type": "Date", "required": True}, + {"label": "Chief Complaint", "type": "Textarea", "required": True}, + {"label": "Vital Signs", "type": "Textarea", "required": False}, + {"label": "Assessment", "type": "Textarea", "required": True}, + {"label": "Treatment Plan", "type": "Textarea", "required": True}, + {"label": "Provider Signature", "type": "Signature", "required": True} + ] + + # Add more specific fields based on form name + if "progress" in formName.lower(): + suggested_fields.extend([ + {"label": "Progress Notes", "type": "Textarea", "required": True}, + {"label": "Follow-up Plan", "type": "Textarea", "required": True} + ]) + elif "initial" in formName.lower(): + suggested_fields.extend([ + {"label": "Medical History", "type": "Textarea", "required": True}, + {"label": "Family History", "type": "Textarea", "required": False}, + {"label": "Social History", "type": "Textarea", "required": False}, + {"label": "Allergies", "type": "Textarea", "required": True} + ]) + + elif "document" in formType.lower(): + suggested_fields = [ + {"label": "Document Title", "type": "Text", "required": True}, + {"label": "Date", "type": "Date", "required": True}, + {"label": "Document Content", "type": "Textarea", "required": True}, + {"label": "Supporting Files", "type": "File", "required": False}, + {"label": "Provider Signature", "type": "Signature", "required": True} + ] + + elif "consent" in formType.lower(): + suggested_fields = [ + {"label": "Date", "type": "Date", "required": True}, + {"label": "Consent Statement", "type": "Textarea", "required": True}, + {"label": "I understand and agree to the above", "type": "Checkbox", "required": True}, + {"label": "Patient Signature", "type": "Signature", "required": True}, + {"label": "Provider Signature", "type": "Signature", "required": True} + ] + + else: # simple-forms or default + suggested_fields = [ + {"label": "Patient First Name", "type": "Text", "required": True, "name": "patient_custom_fname", "readonly": True}, + {"label": "Patient Last Name", "type": "Text", "required": True, "name": "patient_custom_lname", "readonly": True}, + {"label": "Patient Gender", "type": "Select", "required": True, "name": "patient_custom_gender_identity", + "options": ["Male", "Female", "Other"]}, + {"label": "Patient DOB", "type": "date", "required": True, "name": "patient_custom_DOB"}, + {"label": "Patient Email", "type": "Text", "required": True, "name": "patient_custom_email"}, + {"label": "Patient Phone", "type": "Text", "required": True, "name": "patient_extra_data_preferred_phone"}, + + ] + + # Store suggested fields for later modification + self.form_data["suggested_fields"] = suggested_fields + + return { + "status": "success", + "fields": suggested_fields, + "message": f"Suggested {len(suggested_fields)} fields for {formType} form: {formName}" + } + + except Exception as e: + logger.error(f"Error suggesting form fields: {str(e)}") + return {"status": "error", "message": f"Error suggesting form fields: {str(e)}"} + + @llm.ai_callable() + async def modifyFormFields(self, + modifications: Annotated[ + str, llm.TypeInfo(description="JSON string of field modifications (add, remove, or change)") + ] + ): + """Modify the suggested form fields based on user feedback""" + try: + if not self.form_data.get("suggested_fields"): + return {"status": "error", "message": "No suggested fields found. Please suggest fields first."} + + # Parse the modifications + mods = json.loads(modifications) + current_fields = self.form_data["suggested_fields"] + + # Handle additions + if "add" in mods: + for new_field in mods["add"]: + current_fields.append(new_field) + + # Handle removals + if "remove" in mods: + labels_to_remove = [f.lower() for f in mods["remove"]] + current_fields = [f for f in current_fields if f["label"].lower() not in labels_to_remove] + + # Handle changes + if "change" in mods: + for change in mods["change"]: + if "old_label" in change and "new_field" in change: + old_label = change["old_label"] + new_field = change["new_field"] + + # Find and update the field + for i, field in enumerate(current_fields): + if field["label"].lower() == old_label.lower(): + current_fields[i] = new_field + break + + # Update the stored fields + self.form_data["suggested_fields"] = current_fields + + return { + "status": "success", + "fields": current_fields, + "message": "Form fields modified successfully" + } + + except json.JSONDecodeError: + logger.error("Invalid JSON format for modifications") + return {"status": "error", "message": "Invalid JSON format for modifications"} + except Exception as e: + logger.error(f"Error modifying form fields: {str(e)}") + return {"status": "error", "message": f"Error modifying form fields: {str(e)}"} + + @llm.ai_callable() + async def saveForm(self, + formName: Annotated[ + str, llm.TypeInfo(description="Name of the form to be saved") + ], + formType: Annotated[ + str, llm.TypeInfo(description="Type of form (charting-forms, document-forms, simple-forms, consent-forms)") + ], + formFields: Annotated[ + str, llm.TypeInfo(description="JSON string of form fields, each containing label, type, and options") + ] + ): + """Save form configuration with fields and return status""" + try: + # Parse the JSON string to a list + logger.info(f"Form data: {json.dumps(formFields)}") + formeo_json_str = await self.convertToFormeoJson(formFields) + + form_data = { + "data": formeo_json_str, + "name": formName, + "type": formType + } + + logger.info(f"Saving form: {formName} of type {formType}") + logger.info(f"Form data: {json.dumps(form_data)}") + url = f"{Base_url}/api/assistant/store-form" + + if self.accessToken: + headers = { + "Authorization": f"Bearer {self.accessToken}", + "accesstoken": f"{self.accessToken}", + "content-type": "application/json", + "accept": "application/json" + } + + response = requests.post(url, json=form_data, headers=headers) + logger.info(f"Response Status Code: {response.status_code}") + logger.info(f"Response Content: {response.text}") + if response.status_code == 200: + logger.info("Form saved successfully") + self.onboarding_stage = "product_setup" # Advance to next stage + return { + "status": "success", + "message": "Form saved successfully", + "next_stage": "product_setup" + } + else: + logger.error(f"Failed to save form. Status code: {response.status_code}") + return {"status": "error", "message": f"Failed to save form. Status code: {response.status_code}"} + else: + logger.error("No access token available. Please register or login first.") + return {"status": "error", "message": "Authentication required. Please register or login first."} + except json.JSONDecodeError: + logger.error("Invalid JSON format for formFields") + return {"status": "error", "message": "Invalid JSON format for form fields"} + except Exception as e: + logger.error(f"Error saving form: {str(e)}") + return {"status": "error", "message": f"Error saving form: {str(e)}"} + + @llm.ai_callable() + async def bookAppointment(self, + practitioner_id: Annotated[ + str, llm.TypeInfo(description="ID of the practitioner to check availability for") + ] + ): + """Book an appointment process by checking a practitioner's availability first""" + try: + logger.info(f"Starting appointment booking process for practitioner ID: {practitioner_id}") + + # First, check if the practitioner exists and has availability + availability_result = await self.check_practitioner_availability( + practitioner_id=practitioner_id + ) + + if availability_result.get("status") == "error": + return { + "status": "error", + "message": f"Failed to check practitioner availability: {availability_result.get('message')}", + "details": availability_result + } + + # Initialize booking_data as a dictionary if it doesn't exist + + self.booking_data = {} + + # Store the practitioner ID for the next steps + self.booking_data["practitioner_id"] = practitioner_id + + + + # Get the formatted available dates + available_dates = availability_result.get("allowed_dates", []) + + + + if not available_dates: + return { + "status": "error", + "message": f"No available slots found for practitioner {practitioner_id} in the current month.", + "next_step": "Try a different practitioner or a different month" + } + + # Store all slots data for future reference + self.booking_data["all_slots_data"] = availability_result.get("all_slots_data", []) + available_slots = availability_result.get("all_slots_data", []) + logger.info(f"Available dates for practitioner ID {practitioner_id}: {available_slots}") + return { + "status": "success", + "message": f"Found availability for practitioner {practitioner_id} on {len(available_dates)} dates", + "available_dates": available_dates, + "next_step": "select_date", + "instruction": "Please select one of the available date to proceed with booking" + } + + except Exception as e: + logger.error(f"Error starting appointment booking: {str(e)}") + return {"status": "error", "message": f"Error starting appointment booking: {str(e)}"} + @llm.ai_callable() + async def find_patient(self, + search_term: Annotated[ + str, llm.TypeInfo(description="Name, email, or ID of the patient to search for") + ] + ): + """Find a patient by name, email, or ID""" + try: + logger.info(f"Searching for patient with term: {search_term}") + + url = f"{Base_url}/api/emr/patients-list" + + if not self.accessToken: + return { + "status": "error", + "message": "Authentication token not available." + } + + headers = { + "authorization": f"Bearer {self.accessToken}", + "accesstoken": f"{self.accessToken}", + "content-type": "application/json", + "accept": "application/json" + } + + params = { + "search": search_term + } + + response = requests.get(url, headers=headers, params=params) + logger.info(f"Response status code: {response.status_code}") + + if response.status_code == 200: + patients_data = response.json() + patients = patients_data.get('data', []) + + if not patients: + return { + "status": "error", + "message": f"No patients found matching '{search_term}'." + } + + # Format the patients for better presentation + formatted_patients = [] + for patient in patients: + formatted_patient = { + "id": patient.get('id'), + "name": f"{patient.get('fname', '')} {patient.get('lname', '')}", + "email": patient.get('email', ''), + } + formatted_patients.append(formatted_patient) + + return { + "status": "success", + "message": f"Found {len(formatted_patients)} patients matching '{search_term}'", + "patients": formatted_patients + } + else: + return { + "status": "error", + "message": f"Failed to search patients. Status code: {response.status_code}", + "details": response.text + } + + except Exception as e: + logger.error(f"Error searching patients: {str(e)}") + return {"status": "error", "message": f"Error searching patients: {str(e)}"} + + + @llm.ai_callable() + async def check_practitioner_availability(self, + practitioner_id: Annotated[ + str, llm.TypeInfo(description="ID of the practitioner to check availability for") + ], + ): + """Check a practitioner's availability get all available dates""" + try: + # If no date is provided, use current month + today = datetime.datetime.now() + date = today.strftime("%Y-%m-%d") + logger.info(f"No date provided, using current date: {date}") + + logger.info(f"Checking availability for practitioner ID: {practitioner_id} on date: {date}") + + # Parse and format the date + formatted_date = self._parse_date_to_iso(date) + if not formatted_date: + return { + "status": "error", + "message": f"Invalid date format: {date}. Please use YYYY-MM-DD format or natural language." + } + + # Extract month from the formatted date + try: + date_obj = datetime.datetime.strptime(formatted_date, "%Y-%m-%d") + month = date_obj.strftime("%m") # Get month as 01-12 string + year = date_obj.strftime("%Y") # Get year as YYYY string + except ValueError: + return { + "status": "error", + "message": f"Failed to extract month from date: {formatted_date}" + } + + url = f"{Base_url}/api/assistant/get-available-slots-data/{practitioner_id}" + + if not self.accessToken: + return { + "status": "error", + "message": "Authentication token not available." + } + + headers = { + "authorization": f"Bearer {self.accessToken}", + "accesstoken": f"{self.accessToken}", + "content-type": "application/json", + "accept": "application/json" + } + + params = { + "month": month, + "timezone": self.getTimezone or "EST" + } + + logger.info(f"Requesting availability with params: {params}") + response = requests.post(url, headers=headers, json=params) + + + if response.status_code == 200: + availability_data = response.json() + + # The response is a dictionary with dates as keys and arrays of slots as values + all_dates = availability_data + + if not all_dates: + return { + "status": "error", + "message": f"No availability found for practitioner {practitioner_id} in {month}/{year}" + } + + # Extract the date keys (similar to Object.keys() in JavaScript) + date_keys = list(all_dates.keys()) + + # Format the dates (similar to the JavaScript map operation) + allowed_dates = [] + for date_key in date_keys: + # Only include dates that have available slots + if date_key in all_dates and all_dates[date_key] and len(all_dates[date_key]) > 0: + # Format date from YYYY-MM-DD to MM-DD-YYYY for display + try: + date_obj = datetime.datetime.strptime(date_key, "%Y-%m-%d") + formatted_display = date_obj.strftime("%m-%d-%Y") + + allowed_dates.append({ + "title": formatted_display, # Display format MM-DD-YYYY + "value": date_key, # Original date value YYYY-MM-DD + "slots_count": len(all_dates[date_key]) # Number of available slots + }) + + except ValueError: + logger.warning(f"Could not format date: {date_key}") + + # Sort dates chronologically + allowed_dates.sort(key=lambda x: x["value"]) + logger.info(f"Checking availability {allowed_dates}") + return { + "status": "success", + "message": f"Found availability on {len(allowed_dates)} dates for practitioner {practitioner_id}", + "allowed_dates": allowed_dates, + "all_slots_data": all_dates # Keep the full data for reference + } + else: + return { + "status": "error", + "message": f"Failed to retrieve availability. Status code: {response.status_code}", + "details": response.text + } + + except Exception as e: + logger.error(f"Error checking practitioner availability: {str(e)}") + return {"status": "error", "message": f"Error checking practitioner availability: {str(e)}"} + + + @llm.ai_callable() + async def select_appointment_date(self, + date: Annotated[ + str, llm.TypeInfo(description="Date for the appointment in YYYY-MM-DD format or natural language") + ] + ): + """Select a date for the appointment after checking practitioner availability""" + try: + if not hasattr(self, 'booking_data') or not self.booking_data: + return { + "status": "error", + "message": "No active booking process. Please start by selecting a practitioner first.", + "next_step": "bookAppointment" + } + + practitioner_id = self.booking_data.get("practitioner_id") + if not practitioner_id: + return { + "status": "error", + "message": "Practitioner ID not found in booking data. Please start the booking process again.", + "next_step": "bookAppointment" + } + + # Parse the date if it's in natural language format + parsed_date = self._parse_date_to_iso(date) if hasattr(self, '_parse_date_to_iso') else date + if not parsed_date: + return { + "status": "error", + "message": f"Invalid date format: {date}. Please use YYYY-MM-DD format or natural language.", + "next_step": "select_appointment_date" + } + + logger.info(f"Selected date for appointment: {parsed_date}") + + # Check if the selected date is in the available dates + all_slots_data = self.booking_data.get("all_slots_data", {}) + + if not all_slots_data: + return { + "status": "error", + "message": "No available slots data found. Please check practitioner availability first.", + "next_step": "bookAppointment" + } + + # Check if the date exists in the available slots + if parsed_date not in all_slots_data: + # Try to find the date in the formatted_datetime fields + found = False + for date_key, slots in all_slots_data.items(): + for slot in slots: + if parsed_date in slot.get("formatted_datetime", ""): + parsed_date = date_key + found = True + break + if found: + break + + if not found: + return { + "status": "error", + "message": f"No available slots found for the selected date: {parsed_date}", + "next_step": "select_appointment_date", + "instruction": "Please select a different date from the available dates" + } + + # Get the available time slots for the selected date + available_slots = all_slots_data.get(parsed_date, []) + + if not available_slots or len(available_slots) == 0: + return { + "status": "error", + "message": f"No available time slots found for the selected date: {parsed_date}", + "next_step": "select_appointment_date", + "instruction": "Please select a different date from the available dates" + } + + # Format the time slots for display + formatted_slots = [] + for slot in available_slots: + start_time = slot.get("start_time", "") + formatted_datetime = slot.get("formatted_datetime", "") + + # Extract time from formatted_datetime + time_part = "" + if " " in formatted_datetime: + _, time_part = formatted_datetime.split(" ", 1) + + formatted_slots.append({ + "start_time": start_time, + "formatted_time": time_part, + "value": start_time, # Keep original ISO format for backend + "display": time_part or start_time + }) + + # Store the selected date in booking data + self.booking_data["selected_date"] = parsed_date + self.booking_data["available_slots"] = formatted_slots + + # Format the date for display + try: + date_obj = datetime.datetime.strptime(parsed_date, "%Y-%m-%d") + formatted_display_date = date_obj.strftime("%A, %B %d, %Y") # e.g., "Monday, January 1, 2023" + except ValueError: + formatted_display_date = parsed_date + + logger.info(f"Selected date for appointment: {parsed_date} ={formatted_display_date}={formatted_slots}") + return { + "status": "success", + "message": f"Found {len(formatted_slots)} available time slots on {formatted_display_date}", + "date": parsed_date, + "formatted_date": formatted_display_date, + "available_slots": formatted_slots, + "next_step": "select_appointment_time", + "instruction": "Please select one of the available time slots to proceed with booking" + } + + except Exception as e: + logger.error(f"Error selecting appointment date: {str(e)}") + return {"status": "error", "message": f"Error selecting appointment date: {str(e)}"} + + @llm.ai_callable() + async def select_appointment_time_and_patient(self, + time: Annotated[ + str, llm.TypeInfo(description="Time for the appointment in HH:MM AM/PM format") + ], + patient_id: Annotated[ + str, llm.TypeInfo(description="ID of the patient for the appointment") + ] + ): + """Select a time slot and patient for the appointment after selecting a date""" + try: + if not hasattr(self, 'booking_data') or not self.booking_data.get("practitioner_id") or not self.booking_data.get("selected_date"): + return { + "status": "error", + "message": "Incomplete booking information. Please select a practitioner and date first.", + "next_step": "start_appointment_booking" + } + + practitioner_id = self.booking_data.get("practitioner_id") + selected_date = self.booking_data.get("selected_date") + available_slots = self.booking_data.get("available_slots", []) + + logger.info(f"Selecting time {time} for date {selected_date} with practitioner {practitioner_id}") + logger.info(f"Available slots: {available_slots}") + + # Find the matching time slot from our stored available slots + selected_slot = None + for slot in available_slots: + formatted_time = slot.get("formatted_time", "") + display_time = slot.get("display", "") + + # Try to match with either formatted_time or display + if self._time_matches(formatted_time, time) or self._time_matches(display_time, time): + selected_slot = slot + logger.info(f"Found matching slot: {slot}") + break + + if not selected_slot: + return { + "status": "error", + "message": f"No available slot found at {time} on {selected_date}", + "available_times": [slot.get("display") for slot in available_slots], + "next_step": "select_appointment_time", + "instruction": "Please select from one of the available times" + } + + # Verify that the patient exists + patient_result = await self.find_patient(search_term=patient_id) + + if patient_result.get("status") != "success": + return { + "status": "error", + "message": f"Failed to verify patient ID: {patient_id}", + "details": patient_result, + "next_step": "find_patient", + "instruction": "Please provide a valid patient ID or search for a patient" + } + + # Store the selected time and patient + self.booking_data["time"] = time + self.booking_data["display_time"] = selected_slot.get("display", time) + self.booking_data["start_time"] = selected_slot.get("start_time") # This is the ISO format time for the backend + self.booking_data["patient_id"] = patient_id + self.booking_data["selected_slot"] = selected_slot + + # Get patient details for confirmation + patient_details = None + if patient_result.get("patients") and len(patient_result.get("patients")) > 0: + for patient in patient_result.get("patients"): + if str(patient.get("id")) == str(patient_id): + patient_details = patient + break + + # Now we have all the information needed to book the appointment + return { + "status": "success", + "message": f"Selected time {selected_slot.get('display')} on {selected_date} for patient {patient_details.get('name') if patient_details else patient_id}", + "booking_details": { + "practitioner_id": practitioner_id, + "date": selected_date, + "display_time": selected_slot.get("display", time), + "start_time": selected_slot.get("start_time"), # ISO format time for backend + "patient_id": patient_id, + "patient_name": patient_details.get("name") if patient_details else "Unknown" + }, + "next_step": "confirm_booking", + "instruction": "Please confirm the booking details or provide appointment type and notes" + } + + except Exception as e: + logger.error(f"Error selecting appointment time and patient: {str(e)}") + return {"status": "error", "message": f"Error selecting appointment time and patient: {str(e)}"} + + + @llm.ai_callable() + async def confirm_booking(self, + confirm: Annotated[ + bool, llm.TypeInfo(description="Confirm the booking (true) or cancel (false)") + ], + appointment_type: Annotated[ + str, llm.TypeInfo(description="Type of appointment ('Telehealth Visit','Follow Up','Lab Review','Initial Consult', 'Office Visit', etc.)") + ] = "Consultation", + notes: Annotated[ + str, llm.TypeInfo(description="Any additional notes for the appointment") + ] = "", + service: Annotated[ + str, llm.TypeInfo(description="Service type (e.g., 'Patient Followup', 'New Patient', etc.)") + ] = "Patient Followup", + payment_type: Annotated[ + str, llm.TypeInfo(description="Payment type ('Cash Only', 'Insurance Only','Cash and Insurance','Auto','Workers','Prepaid')") + ] = "Cash Only", + + + ): + """Confirm and finalize the appointment booking with optional appointment type and notes""" + try: + if not hasattr(self, 'booking_data') or not self.booking_data.get("practitioner_id") or not self.booking_data.get("selected_date") or not self.booking_data.get("start_time"): + return { + "status": "error", + "message": "Incomplete booking information. Please complete the booking process first.", + "next_step": "start_appointment_booking" + } + + if not confirm: + # Clear booking data and cancel the process + self.booking_data = {} + return { + "status": "info", + "message": "Booking process cancelled.", + "next_step": "start_appointment_booking" + } + + # Extract all required booking information + practitioner_id = self.booking_data.get("practitioner_id") + patient_id = self.booking_data.get("patient_id") + appointment_date = self.booking_data.get("selected_date") + start_time = self.booking_data.get("start_time") + display_time = self.booking_data.get("display_time", "") + + logger.info(f"Confirming appointment booking for patient {patient_id} with practitioner {practitioner_id}") + logger.info(f"Date: {appointment_date}, Time: {start_time} ({display_time})") + logger.info(f"Type: {appointment_type}, Notes: {notes}") + + # Calculate end time (assuming 30 minutes appointment duration) + # This is a placeholder - you may need to adjust based on your requirements + end_time = "09:00 PM" # Default end time + + # Get patient name for title + patient_name = "Patient Appointment" + try: + patient_result = await self.find_patient(search_term=patient_id) + if patient_result.get("status") == "success" and patient_result.get("patients"): + for patient in patient_result.get("patients"): + if str(patient.get("id")) == str(patient_id): + patient_name = patient.get("name", "Patient Appointment") + break + except Exception as e: + logger.warning(f"Could not retrieve patient name: {str(e)}") + + # Prepare the booking data for API request according to the required payload structure + booking_data = { + "practitioner_id": practitioner_id, + "patient_id": patient_id, + "title": patient_name, + "date": appointment_date, + "start_time": start_time, # Use the ISO format time from the slot + "end_time": end_time, + "appointment_type": appointment_type, + "service": service, + "payment_type": payment_type, + "location": '', + "room": '', + "notes": notes, + "status": "ON_TIME" + } + + url = f"{Base_url}/api/assistant/book-appointment" + + if not self.accessToken: + return { + "status": "error", + "message": "Authentication token not available.", + "next_step": "login" + } + + headers = { + "authorization": f"Bearer {self.accessToken}", + "accesstoken": f"{self.accessToken}", + "content-type": "application/json", + "accept": "application/json" + } + + logger.info(f"Sending booking request with data: {booking_data}") + response = requests.post(url, headers=headers, json=booking_data) + logger.info(f"Booking response status code: {response.status_code}") + logger.info(f"Booking response content: {response.text}") + + if response.status_code == 200: + booking_result = response.json() + + # Format the date and time for display + try: + date_obj = datetime.datetime.strptime(appointment_date, "%Y-%m-%d") + formatted_date = date_obj.strftime("%A, %B %d, %Y") # e.g., "Monday, January 1, 2023" + except ValueError: + formatted_date = appointment_date + + # Clear booking data as the process is complete + self.booking_data = {} + + return { + "status": "success", + "message": f"Appointment successfully booked!", + "appointment_details": { + "id": booking_result.get("data", {}).get("id", ""), + "practitioner_id": practitioner_id, + "patient_id": patient_id, + "patient_name": patient_name, + "date": formatted_date, + "time": display_time or start_time, + "appointment_type": appointment_type, + "service": service, + "payment_type": payment_type, + "notes": notes + }, + "confirmation": f"Appointment confirmed for {patient_name} on {formatted_date} at {display_time or start_time}", + "next_step": "appointment_booked" + } + else: + error_message = "Failed to book appointment" + try: + error_data = response.json() + if "message" in error_data: + error_message = error_data["message"] + except: + error_message = f"Failed to book appointment. Status code: {response.status_code}" + + return { + "status": "error", + "message": error_message, + "details": response.text, + "next_step": "retry_booking" + } + + except Exception as e: + logger.error(f"Error confirming appointment booking: {str(e)}") + return { + "status": "error", + "message": f"Error confirming appointment booking: {str(e)}", + "next_step": "retry_booking" + } + + # @llm.ai_callable() + # async def saveAppointment(self, + # practitioner_id: Annotated[ + # str, llm.TypeInfo(description="ID of the practitioner to book with") + # ], + # patient_id: Annotated[ + # str, llm.TypeInfo(description="ID of the patient for the appointment") + # ], + # appointment_date: Annotated[ + # str, llm.TypeInfo(description="Date for the appointment in YYYY-MM-DD format or natural language") + # ], + # appointment_time: Annotated[ + # str, llm.TypeInfo(description="Time for the appointment in HH:MM AM/PM format") + # ], + # appointment_type: Annotated[ + # str, llm.TypeInfo(description="Type of appointment (e.g., 'Consultation', 'Follow-up', etc.)") + # ] = "Consultation", + # notes: Annotated[ + # str, llm.TypeInfo(description="Any additional notes for the appointment") + # ] = "" + # ): + # """Book an appointment for a patient with a practitioner on base availablity.""" + # try: + # logger.info(f"Booking appointment for patient {patient_id} with practitioner {practitioner_id} on {appointment_date} at {appointment_time}") + + # # Parse the date if it's in natural language format + # parsed_date = self._parse_date_to_iso(appointment_date) + # if parsed_date: + # appointment_date = parsed_date + # logger.info(f"Parsed appointment date to: {appointment_date}") + + # # First, check if the slot is available + # availability_result = await self.check_practitioner_availability( + # practitioner_id=practitioner_id, + # ) + + # if availability_result.get("status") != "success": + # # If no slots available on the requested date, suggest alternative dates + # if availability_result.get("status") == "info" and availability_result.get("available_dates"): + # available_dates = availability_result.get("available_dates", {}) + # date_suggestions = list(available_dates.keys())[:5] # Suggest up to 5 alternative dates + + # return { + # "status": "error", + # "message": f"No available slots found on {appointment_date}", + # "suggested_dates": date_suggestions, + # "details": "Please try booking on one of the suggested dates" + # } + # else: + # return { + # "status": "error", + # "message": f"Failed to verify availability: {availability_result.get('message')}", + # "details": availability_result + # } + + # # Find the matching time slot + # available_slots = availability_result.get("available_slots", []) + # selected_slot = None + + # # Normalize the input time for comparison + # try: + # # Try multiple time formats for flexibility + # time_formats = ["%I:%M %p", "%H:%M", "%I:%M%p", "%I.%M %p", "%I.%M%p"] + # input_time_obj = None + + # for fmt in time_formats: + # try: + # input_time_obj = datetime.datetime.strptime(appointment_time, fmt) + # logger.info(f"Successfully parsed time with format: {fmt}") + # break + # except ValueError: + # continue + + # if not input_time_obj: + # raise ValueError(f"Could not parse time: {appointment_time}") + + # input_time_24h = input_time_obj.strftime("%H:%M") + # input_time_12h = input_time_obj.strftime("%I:%M %p") + # logger.info(f"Normalized appointment time: 24h={input_time_24h}, 12h={input_time_12h}") + + # except ValueError: + # return { + # "status": "error", + # "message": f"Invalid time format: {appointment_time}. Please use format like '2:30 PM'.", + # "available_formats": ["1:30 PM", "13:30", "1:30PM", "1.30 PM"] + # } + + # # Find the matching slot + # for slot in available_slots: + # slot_datetime = slot.get("formatted_datetime", "") + # # Extract just the time portion from the formatted datetime (e.g., "2025-05-15 02:30 PM") + # if " " in slot_datetime: + # slot_date, slot_time = slot_datetime.split(" ", 1) + # if self._time_matches(slot_time, input_time_12h): + # selected_slot = slot + # logger.info(f"Found matching slot: {slot_datetime}") + # break + + # if not selected_slot: + # # If no exact match found, suggest available times + # available_times = [slot.get("formatted_datetime").split(" ", 1)[1] + # for slot in available_slots if " " in slot.get("formatted_datetime", "")] + + # return { + # "status": "error", + # "message": f"No available slot found at {appointment_time} on {appointment_date}", + # "available_times": available_times, + # "suggestion": "Please select from one of the available times" + # } + + # # Now book the appointment using the selected slot + # url = f"{Base_url}/api/assistant/book-appointment" + + # if not self.accessToken: + # return { + # "status": "error", + # "message": "Authentication token not available." + # } + + # headers = { + # "authorization": f"Bearer {self.accessToken}", + # "accesstoken": f"{self.accessToken}", + # "content-type": "application/json", + # "accept": "application/json" + # } + + # # Prepare the booking data + # booking_data = { + # "practitioner_id": practitioner_id, + # "patient_id": patient_id, + # "appointment_date": appointment_date, + # "start_time": selected_slot.get("start_time"), # Use the ISO format time from the slot + # "appointment_type": appointment_type, + # "notes": notes + # } + + # logger.info(f"Sending booking request with data: {booking_data}") + # response = requests.post(url, headers=headers, json=booking_data) + # logger.info(f"Booking response status code: {response.status_code}") + + # if response.status_code == 200: + # booking_result = response.json() + # return { + # "status": "success", + # "message": f"Successfully booked appointment on {appointment_date} at {appointment_time}", + # "appointment_details": booking_result.get("data", {}), + # "confirmation": f"Appointment confirmed for {appointment_date} at {appointment_time} with appointment type: {appointment_type}" + # } + # else: + # return { + # "status": "error", + # "message": f"Failed to book appointment. Status code: {response.status_code}", + # "details": response.text + # } + + # except Exception as e: + # logger.error(f"Error booking appointment: {str(e)}") + # return {"status": "error", "message": f"Error booking appointment: {str(e)}"} + + def _time_matches(self, slot_time, input_time): + """Helper function to compare if two time strings match, handling different formats""" + try: + # Normalize both times to 24-hour format for comparison + slot_time_obj = datetime.datetime.strptime(slot_time, "%I:%M %p") + slot_time_24h = slot_time_obj.strftime("%H:%M") + + input_time_obj = datetime.datetime.strptime(input_time, "%I:%M %p") + input_time_24h = input_time_obj.strftime("%H:%M") + + return slot_time_24h == input_time_24h + except ValueError: + # If parsing fails, do a direct string comparison + return slot_time.strip() == input_time.strip() \ No newline at end of file diff --git a/agent.py b/agent.py new file mode 100644 index 0000000..0321190 --- /dev/null +++ b/agent.py @@ -0,0 +1,123 @@ +import json +import logging +import asyncio +import uuid +from AssistantFnc import AssistantFnc +from typing import Annotated +from dotenv import load_dotenv +from livekit.agents import ( + AutoSubscribe, + JobContext, + JobProcess, + WorkerOptions, + cli, + llm, + metrics, +) +from PIL import Image +from livekit.agents.pipeline import VoicePipelineAgent +from livekit.plugins import cartesia, openai, deepgram, silero, turn_detector, anthropic +import io +import base64 +from livekit.agents.llm import ChatMessage, ChatImage +import os +from jira import JIRA +from livekit.agents.utils.images import encode, EncodeOptions, ResizeOptions + +# Load environment variables +load_dotenv('.env.local') +logger = logging.getLogger("voice-agent") + + +def prewarm(proc: JobProcess): + proc.userdata["vad"] = silero.VAD.load() + +import json + +# When receiving the data from the client + +async def entrypoint(ctx: JobContext): + + metadata = json.loads(ctx.job.metadata) + token = metadata.get("token") + logger.info(f"Room metadata is {metadata}") + + initial_ctx = llm.ChatContext().append( + role="system", + text=( + """You are an AI assistant designed to support healthcare providers. Your role is to assist with clinical decision-making, patient management, documentation, scheduling, and communication, using all available tools and resources. Prioritize patient safety, evidence-based practice, and confidentiality. Always act as a supportive, efficient, and knowledgeable assistant, but defer final decisions to the licensed healthcare provider. + +When performing any action that requires multiple pieces of information: +1. Ask for details one by one in a conversational manner rather than requesting all information at once +2. Confirm each piece of information before moving to the next question +3. Summarize all collected information before executing the final action +4. For forms or complex data entry, guide the user through each field step by step +5. If the user provides multiple pieces of information at once, acknowledge them and confirm before proceeding + +For example, when adding a practitioner: +- First ask for the practitioner's name +- Then ask for their email address +- Then ask for additional details like gender, date of birth, etc. +- Confirm all details before submitting + +This approach makes the interaction more natural and ensures all necessary information is collected accurately.""" + ), + ) + _active_tasks = [] + + + + fnc_ctx = AssistantFnc(ctx=ctx) + chunks = [] + + + + logger.info(f"connecting to room {ctx.room.name}") + await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) + + + participant = await ctx.wait_for_participant() + + + logger.info(f"starting voice assistant for participant {participant.identity}") + + agent = VoicePipelineAgent( + vad=ctx.proc.userdata["vad"], + stt=deepgram.STT(), + llm=openai.LLM(), + tts=deepgram.TTS(), + turn_detector=turn_detector.EOUModel(), + # minimum delay for endpointing, used when turn detector believes the user is done with their turn + min_endpointing_delay=0.5, + # maximum delay for endpointing, used when turn detector does not believe the user is done with their turn + max_endpointing_delay=10.0, + max_nested_fnc_calls= 3, + chat_ctx=initial_ctx, + fnc_ctx=fnc_ctx, + ) + + usage_collector = metrics.UsageCollector() + + @agent.on("metrics_collected") + def on_metrics_collected(agent_metrics: metrics.AgentMetrics): + metrics.log_metrics(agent_metrics) + usage_collector.collect(agent_metrics) + + agent.start(ctx.room, participant) + + # The agent should be polite and greet the user when it joins :) + await agent.say("Hey, I am Adi, how can I help you today?", allow_interruptions=True) + + + + + + +if __name__ == "__main__": + cli.run_app( + WorkerOptions( + entrypoint_fnc=entrypoint, + prewarm_fnc=prewarm, + agent_name='Provider_DashBoard_Assistant', + ), + ) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..750c535 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +livekit-agents>=0.12.11,<1.0.0 +livekit-plugins-openai>=0.10.17,<1.0.0 +livekit-plugins-cartesia>=0.4.7,<1.0.0 +livekit-plugins-deepgram>=0.6.17,<1.0.0 +livekit-plugins-silero>=0.7.4,<1.0.0 +livekit-plugins-turn-detector>=0.4.0,<1.0.0 +livekit-plugins-anthropic>=0.2.13,<1.0.0 +jira~=3.8.0 +python-dotenv~=1.0 diff --git a/sample.json b/sample.json new file mode 100644 index 0000000..838a3ef --- /dev/null +++ b/sample.json @@ -0,0 +1,34 @@ +{ + "getAppointment": { + "method": "POST", + "url": "/api/get-appointment-list-date", + "fields": [ + { + "name": "date", + "description": "date of appointment" + }, + { + "name": "patient_name", + "description": "patient's name" + } + ], + "payload": { + "start_date": "2025-04-01", + "end_date": "2025-04-30" + } + }, + "start_call": { + "method": "POST", + "url": "/api/get-appointment-list-date", + "fields": [ + { + "name": "patient_id", + "description": "Id of patient" + }, + { + "name": "appoinment_id", + "description": "appoinment id of patient" + } + ] + } +}