Files
provider-dashboard-agent/AssistantFnc.py
nasir@endelospay.com 2f2e1008a9 fix
2025-05-21 01:04:20 +05:00

2677 lines
124 KiB
Python

import json
import uuid
from livekit.agents import (llm, JobContext)
from typing import Annotated, Dict, Any, List
import logging
import requests
import datetime
import os
import re
logger = logging.getLogger("voice-agent")
Base_url = "http://hgh-copy.test"
class AssistantFnc(llm.FunctionContext):
"""
Function context for QubeCare's autonomous onboarding agent.
Handles the complete provider onboarding process through sequential steps:
1. Authentication base on check user if return true then login otherwise signup (signup/login)
2. Form Setup
3. Product Configuration
4. Product Configuration
5. Signature Setup
"""
def __init__(self, ctx: JobContext):
super().__init__()
self.ctx = ctx
self.accessToken = None
self.onboarding_stage = "authentication" # Track current onboarding stage
self.form_data = {} # Store form data during the process
self.product_data = {} # Store product data during the process
self.category_id = None # Replace with your actual category ID
self.username = None # Store username for later use
self.password = None # Store password for later use
self.user_email = None # Store email for later use
self.start_date = None
self.end_date = None
self.booking_data=None
metadata = json.loads(self.ctx.job.metadata)
token = metadata.get("token")
#self.accessToken='342|lRppO40lM2myq2E9aru4fMeN6RsFOyarlBacfmU8c904c6a3'
self.accessToken=token
self.timezone =metadata.get("timezone")
timezone_list = {
'America/New_York': 'EST',
'America/Los_Angeles': 'PST',
'America/Chicago': 'CST',
'America/Denver': 'MST',
'UTC': 'UTC',
}
self.getTimezone = timezone_list.get(self.timezone, self.timezone)
logger.info(f"self.getTimezone is {self.getTimezone}")
@llm.ai_callable()
async def getAppointment(self,
date: Annotated[
str, llm.TypeInfo(description="Specific date for appointments (MM-DD-YYYY)")
] = None,
start_date: Annotated[
str, llm.TypeInfo(description="Start date for appointments range (MM-DD-YYYY)")
] = None,
end_date: Annotated[
str, llm.TypeInfo(description="End date for appointments range (MM-DD-YYYY)")
] = None
):
"""Get appointments based on a specific date or date range"""
try:
# Prepare parameters
logger.info(f"Getting appointments with parameters: {self.accessToken}")
parameters = {}
# If specific date is provided, use it for both start and end date
if date:
try:
month, day, year = date.split('-')
formatted_date = f"{year}-{month}-{day}"
parameters["start_date"] = formatted_date
parameters["end_date"] = formatted_date
logger.info(f"Using specific date: {formatted_date}")
except ValueError:
# If already in YYYY-MM-DD format or other format, use as is
parameters["start_date"] = date
parameters["end_date"] = date
logger.info(f"Using specific date (as-is): {date}")
# Otherwise use date range if provided
else:
# If dates not provided, use current month's start and end dates
if not start_date or not end_date:
today = datetime.datetime.now()
first_day = datetime.datetime(today.year, today.month, 1)
# Get the last day of the current month
if today.month == 12:
last_day = datetime.datetime(today.year + 1, 1, 1) - datetime.timedelta(days=1)
else:
last_day = datetime.datetime(today.year, today.month + 1, 1) - datetime.timedelta(days=1)
if not start_date:
start_date = first_day.strftime("%m-%d-%Y")
logger.info(f"Using default start date: {start_date}")
if not end_date:
end_date = last_day.strftime("%m-%d-%Y")
logger.info(f"Using default end date: {end_date}")
# Process start date
if start_date:
try:
month, day, year = start_date.split('-')
formatted_date = f"{year}-{month}-{day}"
parameters["start_date"] = formatted_date
except ValueError:
# If already in YYYY-MM-DD format or other format, use as is
parameters["start_date"] = start_date
# Process end date
if end_date:
try:
month, day, year = end_date.split('-')
formatted_date = f"{year}-{month}-{day}"
parameters["end_date"] = formatted_date
except ValueError:
# If already in YYYY-MM-DD format or other format, use as is
parameters["end_date"] = end_date
url = f"{Base_url}/api/assistant/get-appointment-list-date"
self.start_date= parameters["start_date"]
self.end_date= parameters["end_date"]
if self.accessToken:
headers = {
"authorization": f"Bearer {self.accessToken}",
"accesstoken": f"{self.accessToken}",
"content-type": "application/json",
"accept": "application/json"
}
response = requests.post(url, json=parameters, headers=headers)
logger.info(f"Response status code: {response.status_code}")
logger.info(f"Response content: {response.text}")
if response.status_code == 200:
logger.info("Appointments retrieved successfully")
appointments_data = response.json()
# Format the appointments for better presentation to the user
formatted_appointments = []
for appt in appointments_data.get('appointments', []):
appt_id = appt.get('id')
patient_id = appt.get('patient_id')
patient_name = appt.get('title', 'Unknown Patient')
appt_date = appt.get('date', 'Unknown Date')
appt_time = appt.get('start_time', 'Unknown Time')
formatted_appt = {
"appointment_id": appt_id,
"patient_id": patient_id,
"patient_name": patient_name,
"date": appt_date,
"time": appt_time,
"instruction": f"To call this patient, say 'Start call with appointment {appt_id}'"
}
formatted_appointments.append(formatted_appt)
# Add instructions for the agent
agent_instructions = {
"status": "success",
"message": "Appointments retrieved successfully",
"data": formatted_appointments,
"agent_instructions": "Present these appointments to the user and ask which one they would like to call. When they specify an appointment ID, use the start_call function with the corresponding patient_id and appointment_id."
}
return agent_instructions
else:
logger.error(f"Failed to retrieve appointments. Status code: {response.status_code}")
return {"status": "error", "message": f"Failed to retrieve appointments. Status code: {response.status_code}"}
except Exception as e:
logger.error(f"Error getting appointments: {str(e)}")
return {"error": f"Failed to get appointments: {str(e)}"}
@llm.ai_callable()
async def getAppointmentsByTimeframe(self,
timeframe: Annotated[
str, llm.TypeInfo(description="Standardized timeframe like 'today', 'tomorrow', 'this_week','current_week', 'next_week','current_date', 'current_month', etc.")
]
):
"""Get appointments for a standardized timeframe"""
try:
logger.info(f"Getting appointments for standardized timeframe: {timeframe}")
today = datetime.datetime.now()
# Calculate start and end dates based on the timeframe
if timeframe == "today" or timeframe == "current_date":
start_date = today.strftime("%m-%d-%Y")
end_date = start_date
elif timeframe == "tomorrow":
tomorrow = today + datetime.timedelta(days=1)
start_date = tomorrow.strftime("%m-%d-%Y")
end_date = start_date
elif timeframe == "this_week" or timeframe == "current_week":
# Get the start of the current week (Monday)
start_of_week = today - datetime.timedelta(days=today.weekday())
end_of_week = start_of_week + datetime.timedelta(days=6) # Sunday
start_date = start_of_week.strftime("%m-%d-%Y")
end_date = end_of_week.strftime("%m-%d-%Y")
elif timeframe == "next_week":
# Get the start of next week (next Monday)
days_until_next_monday = 7 - today.weekday()
if days_until_next_monday == 7:
days_until_next_monday = 0 # If today is Monday, get next Monday
start_of_next_week = today + datetime.timedelta(days=days_until_next_monday)
end_of_next_week = start_of_next_week + datetime.timedelta(days=6) # Sunday
start_date = start_of_next_week.strftime("%m-%d-%Y")
end_date = end_of_next_week.strftime("%m-%d-%Y")
elif timeframe == "last_week":
# Get the start of last week (previous Monday)
days_since_last_monday = today.weekday() + 7
start_of_last_week = today - datetime.timedelta(days=days_since_last_monday)
end_of_last_week = start_of_last_week + datetime.timedelta(days=6) # Sunday
start_date = start_of_last_week.strftime("%m-%d-%Y")
end_date = end_of_last_week.strftime("%m-%d-%Y")
elif timeframe == "this_month" or timeframe == "current_month":
# Get the first day of the current month
first_day = datetime.datetime(today.year, today.month, 1)
# Get the last day of the current month
if today.month == 12:
last_day = datetime.datetime(today.year + 1, 1, 1) - datetime.timedelta(days=1)
else:
last_day = datetime.datetime(today.year, today.month + 1, 1) - datetime.timedelta(days=1)
start_date = first_day.strftime("%m-%d-%Y")
end_date = last_day.strftime("%m-%d-%Y")
elif timeframe == "next_month":
# Get the first day of the next month
if today.month == 12:
first_day = datetime.datetime(today.year + 1, 1, 1)
last_day = datetime.datetime(today.year + 1, 2, 1) - datetime.timedelta(days=1)
else:
first_day = datetime.datetime(today.year, today.month + 1, 1)
if today.month == 11: # December
last_day = datetime.datetime(today.year + 1, 1, 1) - datetime.timedelta(days=1)
else:
last_day = datetime.datetime(today.year, today.month + 2, 1) - datetime.timedelta(days=1)
start_date = first_day.strftime("%m-%d-%Y")
end_date = last_day.strftime("%m-%d-%Y")
elif timeframe == "last_month":
# Get the first day of the last month
if today.month == 1:
first_day = datetime.datetime(today.year - 1, 12, 1)
last_day = datetime.datetime(today.year, 1, 1) - datetime.timedelta(days=1)
else:
first_day = datetime.datetime(today.year, today.month - 1, 1)
last_day = datetime.datetime(today.year, today.month, 1) - datetime.timedelta(days=1)
start_date = first_day.strftime("%m-%d-%Y")
end_date = last_day.strftime("%m-%d-%Y")
else:
# Default to current month if timeframe is not recognized
logger.warning(f"Unrecognized timeframe '{timeframe}', defaulting to current month")
first_day = datetime.datetime(today.year, today.month, 1)
if today.month == 12:
last_day = datetime.datetime(today.year + 1, 1, 1) - datetime.timedelta(days=1)
else:
last_day = datetime.datetime(today.year, today.month + 1, 1) - datetime.timedelta(days=1)
start_date = first_day.strftime("%m-%d-%Y")
end_date = last_day.strftime("%m-%d-%Y")
logger.info(f"Converted timeframe '{timeframe}' to date range: {start_date} to {end_date}")
# Use the existing getAppointment function with the calculated date range
return await self.getAppointment(start_date=start_date, end_date=end_date)
except Exception as e:
logger.error(f"Error getting appointments by timeframe: {str(e)}")
return {"error": f"Failed to get appointments for timeframe '{timeframe}': {str(e)}"}
@llm.ai_callable()
async def getAppointmentsByDate(self,
date_str: Annotated[
str, llm.TypeInfo(description="Date in natural language format like 'January 15', 'next Monday', 'May 3rd', etc.")
]
):
"""Get appointments for a specific date expressed in natural language"""
try:
logger.info(f"Getting appointments for natural date: {date_str}")
# Try to parse the natural language date
today = datetime.datetime.now()
# Handle common date formats
date_str_lower = date_str.lower().strip()
# Handle relative dates
if "today" in date_str_lower:
target_date = today
elif "tomorrow" in date_str_lower:
target_date = today + datetime.timedelta(days=1)
elif "yesterday" in date_str_lower:
target_date = today - datetime.timedelta(days=1)
elif "next monday" in date_str_lower:
days_until_next_monday = (7 - today.weekday()) % 7
if days_until_next_monday == 0:
days_until_next_monday = 7 # If today is Monday, get next Monday
target_date = today + datetime.timedelta(days=days_until_next_monday)
elif "next tuesday" in date_str_lower:
days_until_next_tuesday = (8 - today.weekday()) % 7
if days_until_next_tuesday == 0:
days_until_next_tuesday = 7
target_date = today + datetime.timedelta(days=days_until_next_tuesday)
elif "next wednesday" in date_str_lower:
days_until_next_wednesday = (9 - today.weekday()) % 7
if days_until_next_wednesday == 0:
days_until_next_wednesday = 7
target_date = today + datetime.timedelta(days=days_until_next_wednesday)
elif "next thursday" in date_str_lower:
days_until_next_thursday = (10 - today.weekday()) % 7
if days_until_next_thursday == 0:
days_until_next_thursday = 7
target_date = today + datetime.timedelta(days=days_until_next_thursday)
elif "next friday" in date_str_lower:
days_until_next_friday = (11 - today.weekday()) % 7
if days_until_next_friday == 0:
days_until_next_friday = 7
target_date = today + datetime.timedelta(days=days_until_next_friday)
elif "next saturday" in date_str_lower:
days_until_next_saturday = (12 - today.weekday()) % 7
if days_until_next_saturday == 0:
days_until_next_saturday = 7
target_date = today + datetime.timedelta(days=days_until_next_saturday)
elif "next sunday" in date_str_lower:
days_until_next_sunday = (13 - today.weekday()) % 7
if days_until_next_sunday == 0:
days_until_next_sunday = 7
target_date = today + datetime.timedelta(days=days_until_next_sunday)
# Handle current week days
elif "monday" in date_str_lower:
days_until_monday = (0 - today.weekday()) % 7
target_date = today + datetime.timedelta(days=days_until_monday)
elif "tuesday" in date_str_lower:
days_until_tuesday = (1 - today.weekday()) % 7
target_date = today + datetime.timedelta(days=days_until_tuesday)
elif "wednesday" in date_str_lower:
days_until_wednesday = (2 - today.weekday()) % 7
target_date = today + datetime.timedelta(days=days_until_wednesday)
elif "thursday" in date_str_lower:
days_until_thursday = (3 - today.weekday()) % 7
target_date = today + datetime.timedelta(days=days_until_thursday)
elif "friday" in date_str_lower:
days_until_friday = (4 - today.weekday()) % 7
target_date = today + datetime.timedelta(days=days_until_friday)
elif "saturday" in date_str_lower:
days_until_saturday = (5 - today.weekday()) % 7
target_date = today + datetime.timedelta(days=days_until_saturday)
elif "sunday" in date_str_lower:
days_until_sunday = (6 - today.weekday()) % 7
target_date = today + datetime.timedelta(days=days_until_sunday)
else:
# For more complex date strings, we'll use a simple approach
# This is a simplified approach - in a production system, you might want to use
# a more sophisticated date parsing library like dateparser
try:
# Try to parse common date formats
for fmt in ["%B %d", "%b %d", "%B %d %Y", "%b %d %Y", "%m/%d/%Y", "%m-%d-%Y", "%Y-%m-%d"]:
try:
parsed_date = datetime.datetime.strptime(date_str, fmt)
# If year is not specified, use current year
if fmt in ["%B %d", "%b %d"]:
parsed_date = parsed_date.replace(year=today.year)
target_date = parsed_date
break
except ValueError:
continue
else:
# If we couldn't parse the date, default to today
logger.warning(f"Could not parse date '{date_str}', defaulting to today")
target_date = today
except Exception as parse_error:
logger.error(f"Error parsing date '{date_str}': {str(parse_error)}")
target_date = today
# Format the date for the API call (MM-DD-YYYY)
formatted_date = target_date.strftime("%m-%d-%Y")
logger.info(f"Parsed natural date '{date_str}' to formatted date: {formatted_date}")
# Use the existing function to get appointments for this date
return await self.getAppointment(date=formatted_date)
except Exception as e:
logger.error(f"Error getting appointments by natural date: {str(e)}")
return {"error": f"Failed to get appointments for date '{date_str}': {str(e)}"}
@llm.ai_callable()
async def start_call_by_patient_name(self,
patient_name: Annotated[
str, llm.TypeInfo(description="Name of the patient to call")
],
date: Annotated[
str, llm.TypeInfo(description="Optional date to filter appointments (MM-DD-YYYY)")
] = None
):
"""Start a call with a patient by searching for their name in upcoming appointments.
If multiple appointments are found, the most recent one will be used."""
try:
logger.info(f"Starting call process for patient name: {patient_name}")
# First, get a list of appointments to search through
today = datetime.datetime.now()
if self.start_date and self.end_date:
# Use the provided date range
parameters = {
"start_date": self.start_date,
"end_date": self.end_date
}
else:
# If date is provided, use it; otherwise use today's date as start and a future date as end
if date:
try:
month, day, year = date.split('-')
formatted_date = f"{year}-{month}-{day}"
start_date = formatted_date
except ValueError:
# If already in YYYY-MM-DD format or other format, use as is
start_date = date
else:
# Use today as start date
start_date = today.strftime("%Y-%m-%d")
# Use a date 30 days in the future as end date if no specific date is provided
if not date:
end_date = (today + datetime.timedelta(days=30)).strftime("%Y-%m-%d")
else:
end_date = start_date
parameters = {
"start_date": start_date,
"end_date": end_date
}
logger.info(f"Response parameters: {parameters}")
url = f"{Base_url}/api/assistant/get-appointment-list-date"
if not self.accessToken:
return {
"status": "error",
"message": "Authentication token not available."
}
headers = {
"authorization": f"Bearer {self.accessToken}",
"accesstoken": f"{self.accessToken}",
"content-type": "application/json",
"accept": "application/json"
}
response = requests.post(url, json=parameters, headers=headers)
logger.info(f"Response status code: {response.status_code}")
if response.status_code != 200:
return {
"status": "error",
"message": f"Failed to retrieve appointments. Status code: {response.status_code}"
}
appointments_data = response.json()
appointments = appointments_data.get('appointments', [])
if not appointments:
return {
"status": "error",
"message": f"No appointments found for the specified date range."
}
# Search for appointments matching the patient name (case-insensitive partial match)
patient_name_lower = patient_name.lower()
matching_appointments = []
for appt in appointments:
appt_patient_name = appt.get('title', '').lower()
if patient_name_lower in appt_patient_name:
matching_appointments.append(appt)
if not matching_appointments:
return {
"status": "error",
"message": f"No appointments found for patient name '{patient_name}'. Please check the spelling or try using the appointment ID instead."
}
# Sort appointments by date (most recent first)
matching_appointments.sort(key=lambda x: x.get('date', ''), reverse=True)
# Use the first (most recent) matching appointment
selected_appointment = matching_appointments[0]
appointment_id = selected_appointment.get('id')
patient_id = selected_appointment.get('patient_id')
if not appointment_id or not patient_id:
return {
"status": "error",
"message": "The appointment information is incomplete. Please try using the appointment ID instead."
}
# Now we have both appointment_id and patient_id, proceed with the call
logger.info(f"Starting call with patient_id: {patient_id}, appointment_id: {appointment_id}")
# Show call interface for the user
participant = await self.ctx.wait_for_participant()
try:
resp = await self.ctx.room.local_participant.perform_rpc(
destination_identity=participant.identity,
method="startCall",
response_timeout=30, # 30 second timeout for the RPC call
payload=json.dumps({
"patient_id": patient_id,
"appointment_id": appointment_id,
}),
)
logger.info(f"Call interface displayed to user. Response: {json.dumps(resp)}")
return {
"status": "success",
"message": f"I've initiated a call with {selected_appointment.get('title', 'the patient')}. Please wait while the connection is established.",
"call_status": "connecting",
"appointment_details": {
"patient_name": selected_appointment.get('title'),
"date": selected_appointment.get('date'),
"time": selected_appointment.get('start_time')
}
}
except Exception as rpc_error:
logger.error(f"RPC call failed: {str(rpc_error)}")
return {
"status": "error",
"message": f"Failed to start the call: {str(rpc_error)}"
}
except Exception as e:
logger.error(f"Error starting call by patient name: {str(e)}")
return {"status": "error", "message": f"Error starting call: {str(e)}"}
async def checkUserExist(self,
email: Annotated[
str, llm.TypeInfo(description="Email address of the user to check")
]
):
"""Check if a user exists in the system based on their email address"""
try:
# Store email for later use
self.user_email = email
user_info = {"email": email}
logger.info(f"Checking user existence: {user_info}")
url = f"{Base_url}/api/check-user"
response = requests.post(url, json=user_info)
if response.status_code == 200:
logger.info("User check completed successfully")
result = response.json()
user_exists = result.get('success', False) # Adjust based on actual API response
logger.info(f"User exists: {user_exists}")
# Return a boolean value directly
return user_exists
else:
logger.error(f"Failed to check user. Status code: {response.status_code}")
return False
except Exception as e:
logger.error(f"Error checking user existence: {str(e)}")
return False
@llm.ai_callable()
async def add_practitioner(self,
firstName: Annotated[
str, llm.TypeInfo(description="First name of the practitioner")
],
lastName: Annotated[
str, llm.TypeInfo(description="Last name of the practitioner")
],
emailAddress: Annotated[
str, llm.TypeInfo(description="Email address of the practitioner")
],
password: Annotated[
str, llm.TypeInfo(description="Password for the practitioner account")
],
gender: Annotated[
str, llm.TypeInfo(description="Gender of the practitioner (Male or Female)")
],
dateOfBirth: Annotated[
str, llm.TypeInfo(description="Date of birth in MM-DD-YYYY format")
] = None,
phoneNumber: Annotated[
str, llm.TypeInfo(description="Phone number of the practitioner")
] = None,
):
"""Add a new practitioner or doctor to the system with the specified parameters"""
try:
logger.info(f"Adding new practitioner: {firstName} {lastName}")
self.user_email = emailAddress
# Check if user exists first
user_exists = await self.checkUserExist(emailAddress)
if user_exists:
logger.info(f"User with email {emailAddress} already exists")
return {
"status": "error",
"message": f"A user with email {emailAddress} already exists. Please use a different email address."
}
# Prepare the practitioner data
practitioner_data = {
"firstName": firstName,
"lastName": lastName,
"emailAddress": emailAddress,
"username": emailAddress, # Default to email if username not provided
"type": "practitioner"
}
# Add optional fields if provided
if dateOfBirth:
practitioner_data["dateOfBirth"] = dateOfBirth
else:
practitioner_data["dateOfBirth"] = "1990-01-01" # Placeholder value
if gender:
practitioner_data["gender"] = gender
# Add phone number if provided, otherwise use a default
if phoneNumber:
practitioner_data["textMessageNumber"] = phoneNumber
else:
practitioner_data["textMessageNumber"] = '+1-123-456-7890'
if password:
practitioner_data["newUserPassword"] = password
practitioner_data["adminPassword"] = password
# Set default values for required fields that might be missing
practitioner_data.setdefault("sendEmail", False)
practitioner_data.setdefault("analytics", "All Reports")
url = f"{Base_url}/api/assistant/add-user"
if not self.accessToken:
return {
"status": "error",
"message": "Authentication token not available."
}
headers = {
"authorization": f"Bearer {self.accessToken}",
"accesstoken": f"{self.accessToken}",
"content-type": "application/json",
"accept": "application/json"
}
logger.info(f"Sending practitioner data: {json.dumps(practitioner_data)}")
response = requests.post(url, json=practitioner_data, headers=headers)
logger.info(f"Response status code: {response.status_code}")
logger.info(f"Response content: {response.text}")
if response.status_code == 200:
response_data = response.json()
return {
"status": "success",
"message": f"Successfully added practitioner {firstName} {lastName}",
"data": response_data
}
else:
return {
"status": "error",
"message": f"Failed to add practitioner. Status code: {response.status_code}",
"details": response.text
}
except Exception as e:
logger.error(f"Error adding practitioner: {str(e)}")
return {"status": "error", "message": f"Error adding practitioner: {str(e)}"}
@llm.ai_callable()
async def add_practitioner_availability(self,
practitioner_id: Annotated[
int, llm.TypeInfo(description="ID of the practitioner to add availability for")
],
date: Annotated[
str, llm.TypeInfo(description="Start Date for the availability in YYYY-MM-DD format or natural language like '1 May 2025'")
],
start_time: Annotated[
str, llm.TypeInfo(description="Start time in format like '07:30 AM'")
],
until_date: Annotated[
str, llm.TypeInfo(description="End date for repeating availability in YYYY-MM-DD format or natural language like '1 May 2025'")
],
end_time: Annotated[
str, llm.TypeInfo(description="End time in format like '03:15 PM'")
],
repeats: Annotated[
bool, llm.TypeInfo(description="Whether this availability repeats")
] = True,
select_days: Annotated[
str, llm.TypeInfo(description="Comma-separated list of days when availability repeats (e.g., 'Monday,Tuesday')")
] = None,
comment: Annotated[
str, llm.TypeInfo(description="Optional comment about this availability")
] = ""
):
"""Add availability for a practitioner with the specified parameters"""
try:
logger.info(f"Adding availability for practitioner ID: {practitioner_id}")
start_date = self._parse_date_to_iso(date)
if not start_date:
return {
"status": "error",
"message": f"Invalid date format: {date}. Please use YYYY-MM-DD format or natural language like '1 May 2025'."
}
# Parse and format the end date if provided
end_date = None
if until_date:
end_date = self._parse_date_to_iso(until_date)
if not end_date:
return {
"status": "error",
"message": f"Invalid until_date format: {until_date}. Please use YYYY-MM-DD format or natural language."
}
else:
# Default to start date if no end date provided
end_date = start_date
# Format the start and end datetimes in ISO 8601 format with UTC timezone
start_datetime_result = self._format_datetime_start(start_date, start_time,self.getTimezone)
end_datetime_result = self._format_datetime_end(end_date, end_time,self.getTimezone)
if not start_datetime_result or not end_datetime_result:
return {
"status": "error",
"message": "Failed to format date and time correctly."
}
start_time_utc = start_datetime_result['time']
end_time_utc = end_datetime_result['time']
# start_time_24h = self._convert_time_to_24h(start_time_utc)
# end_time_24h = self._convert_time_to_24h(end_time_utc)
# # Format the start and end times for the API with UTC timezone
# # The API expects ISO format strings for start and end
# start_datetime = f"{start_date}T{start_time_24h}"
# end_datetime = f"{start_date}T{end_time_24h}"
# Prepare the availability data
availability_data = {
"practitioner_id": practitioner_id,
"shift_title": "",
"type": "availability",
"allDay": False,
"date": start_date, # Use the parsed ISO date
"start": start_datetime_result['formatted_string'],
"end": end_datetime_result['formatted_string'],
"start_time": start_time_utc, # Keep original format for display
"end_time": end_time_utc, # Keep original format for display
"service": [],
"facility": 0,
"billing_facility": 0,
"repeats": repeats,
"repeatFrequency": "",
"repeatUnit": "",
"comment": comment
}
# Add repeating information if applicable
if repeats:
if until_date:
availability_data["untilDate"] = end_date
else:
availability_data["untilDate"] = end_date # Default to the same date if not specified
availability_data["weakDay"] = True if select_days else False
if select_days:
# Convert comma-separated string to list
days_list = [day.strip() for day in select_days.split(',')]
availability_data["selectDays"] = days_list
logger.info(f"Response content: {availability_data}")
url = f"{Base_url}/api/assistant/provider-add-availability"
if not self.accessToken:
return {
"status": "error",
"message": "Authentication token not available."
}
headers = {
"authorization": f"Bearer {self.accessToken}",
"accesstoken": f"{self.accessToken}",
"content-type": "application/json",
"accept": "application/json"
}
logger.info(f"Sending availability data: {json.dumps(availability_data)}")
response = requests.post(url, json=availability_data, headers=headers)
logger.info(f"Response status code: {response.status_code}")
if response.status_code == 200:
response_data = response.json()
return {
"status": "success",
"message": f"Successfully added availability for practitioner ID {practitioner_id}",
"data": response_data
}
else:
return {
"status": "error",
"message": f"Failed to add availability. Status code: {response.status_code}",
"details": response.text
}
except Exception as e:
logger.error(f"Error adding practitioner availability: {str(e)}")
return {"status": "error", "message": f"Error adding availability: {str(e)}"}
def _convert_time_to_24h(self, time_str):
"""Helper method to convert time from 12-hour format (e.g., '8:00 AM') to ISO 8601 format (e.g., '12:00:00.000Z')"""
try:
# Parse the time string
time_parts = time_str.split()
time_components = time_parts[0].split(':')
hour = int(time_components[0])
minute = int(time_components[1]) if len(time_components) > 1 else 0
# Adjust for AM/PM
if len(time_parts) > 1:
am_pm = time_parts[1].upper()
if am_pm == 'PM' and hour < 12:
hour += 12
elif am_pm == 'AM' and hour == 12:
hour = 0
# Format as ISO 8601 time with milliseconds and 'Z' suffix to indicate UTC timezone
# For this specific format, we're using a fixed UTC time without offset calculation
return f"{hour:02d}:{minute:02d}:00.000Z"
except Exception as e:
logger.error(f"Error converting time format: {str(e)}")
return time_str # Return original if conversion fails
@llm.ai_callable()
async def get_practitioners(self):
"""Get a list of all practitioners in the system"""
try:
logger.info("Getting list of practitioners")
url = f"{Base_url}/api/assistant/practitioners-list"
if not self.accessToken:
return {
"status": "error",
"message": "Authentication token not available."
}
headers = {
"authorization": f"Bearer {self.accessToken}",
"accesstoken": f"{self.accessToken}",
"content-type": "application/json",
"accept": "application/json"
}
response = requests.get(url, headers=headers)
logger.info(f"Response status code: {response.status_code}")
if response.status_code == 200:
practitioners_data = response.json()
logger.info(f"Response status code: {practitioners_data.get('data', [])}")
# Format the practitioners for better presentation
formatted_practitioners = []
for practitioner in practitioners_data.get('data', []):
formatted_practitioner = {
"id": practitioner.get('id'),
"name": f"{practitioner.get('fname', '')} {practitioner.get('lname', '')}",
"email": practitioner.get('email', '')
}
formatted_practitioners.append(formatted_practitioner)
return {
"status": "success",
"message": "Successfully retrieved practitioners",
"data": formatted_practitioners
}
else:
return {
"status": "error",
"message": f"Failed to retrieve practitioners. Status code: {response.status_code}",
"details": response.text if hasattr(response, 'text') else None
}
except Exception as e:
logger.error(f"Error getting practitioners: {str(e)}")
return {"status": "error", "message": f"Error getting practitioners: {str(e)}"}
def _parse_date_to_iso(self, date_str):
"""Helper method to parse various date formats to ISO format (YYYY-MM-DD)"""
try:
# Check if already in YYYY-MM-DD format
if re.match(r'^\d{4}-\d{2}-\d{2}$', date_str):
return date_str
# Try to parse common date formats
for fmt in [
"%d %B %Y", # 1 May 2025
"%d %b %Y", # 1 May 2025
"%B %d %Y", # May 1 2025
"%b %d %Y", # May 1 2025
"%m/%d/%Y", # 05/01/2025
"%m-%d-%Y", # 05-01-2025
"%d/%m/%Y", # 01/05/2025
"%d-%m-%Y" # 01-05-2025
]:
try:
parsed_date = datetime.datetime.strptime(date_str, fmt)
# Just return the formatted date without UTC conversion
return parsed_date.strftime("%Y-%m-%d")
except ValueError:
continue
# Try to handle natural language date expressions
date_str_lower = date_str.lower().strip()
today = datetime.datetime.now()
# Handle relative dates
if "today" in date_str_lower:
return today.strftime("%Y-%m-%d")
elif "tomorrow" in date_str_lower:
tomorrow = today + datetime.timedelta(days=1)
return tomorrow.strftime("%Y-%m-%d")
elif "yesterday" in date_str_lower:
yesterday = today - datetime.timedelta(days=1)
return yesterday.strftime("%Y-%m-%d")
# Handle day names
days = {"monday": 0, "tuesday": 1, "wednesday": 2, "thursday": 3,
"friday": 4, "saturday": 5, "sunday": 6}
for day_name, day_num in days.items():
if day_name in date_str_lower:
# Calculate days until the next occurrence of this day
days_ahead = (day_num - today.weekday()) % 7
if "next" in date_str_lower:
# If "next" is specified, add 7 days if it would otherwise be today
if days_ahead == 0:
days_ahead = 7
next_day = today + datetime.timedelta(days=days_ahead)
return next_day.strftime("%Y-%m-%d")
# Try to extract month and day from strings like "May 1" or "1 May"
months = {
"january": 1, "february": 2, "march": 3, "april": 4, "may": 5, "june": 6,
"july": 7, "august": 8, "september": 9, "october": 10, "november": 11, "december": 12,
"jan": 1, "feb": 2, "mar": 3, "apr": 4, "jun": 6, "jul": 7, "aug": 8,
"sep": 9, "sept": 9, "oct": 10, "nov": 11, "dec": 12
}
for month_name, month_num in months.items():
if month_name in date_str_lower:
# Try to extract the day number
day_match = re.search(r'\b(\d{1,2})(st|nd|rd|th)?\b', date_str_lower)
if day_match:
day = int(day_match.group(1))
# Determine year (current year, unless the date would be in the past)
year = today.year
if month_num < today.month or (month_num == today.month and day < today.day):
year += 1
try:
parsed_date = datetime.datetime(year, month_num, day)
return parsed_date.strftime("%Y-%m-%d")
except ValueError:
# Invalid date (e.g., February 30)
pass
# If all parsing attempts failed
logger.error(f"Could not parse date: {date_str}")
return None
except Exception as e:
logger.error(f"Error parsing date: {str(e)}")
return None
def _convert_to_utc(self, dt):
"""
This function is kept for compatibility but now simply returns the datetime object
without timezone conversion since we're only working with date formats
"""
return dt # Simply return the datetime object without conversion
def _format_datetime_start(self, date, time_str, timezone='EST'):
"""
Convert a date and time from a specific timezone to UTC format
Similar to formatDateTimeStart in JavaScript
Args:
date (str): Date in YYYY-MM-DD format
time_str (str): Time in format like '07:30 AM'
timezone (str): Source timezone (default: 'EST')
Returns:
dict: Dictionary containing:
- 'formatted_string': Formatted datetime string in ISO 8601 format with UTC timezone
- 'datetime_obj': Python datetime object in UTC timezone
- 'date': Date component in YYYY-MM-DD format
- 'time': Time component in HH:MM AM/PM format
"""
try:
# Use built-in datetime module
import datetime
# Log the input parameters for debugging
logger.info(f"_format_datetime_start called with date={date}, time_str={time_str}, timezone={timezone}")
# Parse the date and time
datetime_str = f"{date} {time_str}"
# Try different formats for the time
formats = [
"%Y-%m-%d %I:%M %p", # 07:30 AM
"%Y-%m-%d %H:%M", # 07:30 (24-hour)
"%Y-%m-%d %I:%M%p", # 07:30AM (no space)
"%Y-%m-%d %I:%M" # 07:30 (12-hour, no AM/PM)
]
dt = None
for fmt in formats:
try:
dt = datetime.datetime.strptime(datetime_str, fmt)
logger.info(f"Successfully parsed datetime with format: {fmt}")
break
except ValueError:
continue
if not dt:
raise ValueError(f"Could not parse datetime: {datetime_str}")
# Handle timezone conversion using a more robust approach
# First, create a naive datetime (without timezone)
naive_dt = dt
# Then, handle the timezone conversion manually
# For EST (UTC-5), add 5 hours to convert to UTC
timezone_offsets = {
'EST': -4, # Eastern Standard Time (UTC-5)
'EDT': -4, # Eastern Daylight Time (UTC-4)
#'CST': -6, # Central Standard Time (UTC-6)
'CST': -5, # Central Daylight Time (UTC-5)
'MST': -6, # Mountain Standard Time (UTC-7)
#'MDT': -6, # Mountain Daylight Time (UTC-6)
'PST': -7, # Pacific Standard Time (UTC-8)
'PDT': -7, # Pacific Daylight Time (UTC-7)
'CDT': -5 # Central Daylight Time (UTC-5)
}
# Get the UTC offset for the specified timezone
offset_hours = timezone_offsets.get(timezone.upper(), 0)
logger.info(f"Using timezone offset: {offset_hours} hours for timezone: {timezone}")
# Apply the offset to convert to UTC
utc_dt = naive_dt + datetime.timedelta(hours=-offset_hours)
# Format in ISO 8601 with milliseconds and Z suffix
formatted_datetime = utc_dt.strftime("%Y-%m-%dT%H:%M:%S.000Z")
# Log detailed information about the conversion
logger.info(f"Original datetime: {dt.strftime('%Y-%m-%d %I:%M %p')} {timezone}")
logger.info(f"UTC datetime: {utc_dt.strftime('%Y-%m-%d %H:%M:%S')} UTC")
logger.info(f"Formatted ISO datetime: {formatted_datetime}")
logger.info(f"Date component: {utc_dt.strftime('%Y-%m-%d')}")
logger.info(f"Time component (AM/PM): {utc_dt.strftime('%I:%M %p')}")
logger.info(f"Time component (24h): {utc_dt.strftime('%H:%M:%S')}")
# Return a dictionary with both the formatted string and the datetime object
return {
'formatted_string': formatted_datetime,
'datetime_obj': utc_dt,
'date': utc_dt.strftime("%Y-%m-%d"),
'time': utc_dt.strftime("%I:%M %p") # Format time in AM/PM format
}
except Exception as e:
logger.error(f"Error formatting datetime: {str(e)}")
return None
def _format_datetime_end(self, date, end_time, timezone='EST'):
"""
Convert a date and end time to UTC format
Similar to formatDateTimeEnd in JavaScript
Args:
date (str): Date in YYYY-MM-DD format
end_time (str): Time in format like '03:15 PM'
timezone (str, optional): Timezone name. Defaults to 'EST'.
Returns:
dict: Dictionary containing:
- 'formatted_string': Formatted datetime string in ISO 8601 format with UTC timezone
- 'datetime_obj': Python datetime object in UTC timezone
- 'date': Date component in YYYY-MM-DD format
- 'time': Time component in HH:MM:SS format
"""
try:
# Use built-in datetime module
import datetime
# Log the input parameters for debugging
logger.info(f"_format_datetime_end called with date={date}, end_time={end_time}, timezone={timezone}")
# Parse the date and time
datetime_str = f"{date} {end_time}"
# Try different formats for the time
formats = [
"%Y-%m-%d %I:%M %p", # 03:15 PM
"%Y-%m-%d %H:%M", # 15:15 (24-hour)
"%Y-%m-%d %I:%M%p", # 03:15PM (no space)
"%Y-%m-%d %I:%M" # 03:15 (12-hour, no AM/PM)
]
dt = None
for fmt in formats:
try:
dt = datetime.datetime.strptime(datetime_str, fmt)
logger.info(f"Successfully parsed datetime with format: {fmt}")
break
except ValueError:
continue
if not dt:
raise ValueError(f"Could not parse datetime: {datetime_str}")
# Handle timezone conversion using a more robust approach
# First, create a naive datetime (without timezone)
naive_dt = dt
# Then, handle the timezone conversion manually
# For EST (UTC-5), add 5 hours to convert to UTC
timezone_offsets = {
'EST': -4, # Eastern Standard Time (UTC-5)
'EDT': -4, # Eastern Daylight Time (UTC-4)
#'CST': -6, # Central Standard Time (UTC-6)
'CST': -5, # Central Daylight Time (UTC-5)
'MST': -6, # Mountain Standard Time (UTC-7)
#'MDT': -6, # Mountain Daylight Time (UTC-6)
'PST': -7, # Pacific Standard Time (UTC-8)
'PDT': -7, # Pacific Daylight Time (UTC-7)
'CDT': -5 # Central Daylight Time (UTC-5)
# 'EST': -4, # Eastern Daylight Time (UTC-4)
# 'EDT': -4, # Eastern Daylight Time (UTC-4)
# 'CST': -6, # Central Standard Time (UTC-6)
# 'MST': -7, # Mountain Standard Time (UTC-7)
# 'MDT': -6, # Mountain Daylight Time (UTC-6)
# 'PST': -8, # Pacific Standard Time (UTC-8)
# 'PDT': -7 # Pacific Daylight Time (UTC-7)
}
# Get the UTC offset for the specified timezone
offset_hours = timezone_offsets.get(timezone.upper(), 0)
logger.info(f"Using timezone offset: {offset_hours} hours for timezone: {timezone}")
# Apply the offset to convert to UTC
utc_dt = naive_dt + datetime.timedelta(hours=-offset_hours)
# Format in ISO 8601 with milliseconds and Z suffix
formatted_datetime = utc_dt.strftime("%Y-%m-%dT%H:%M:%S.000Z")
# Log detailed information about the conversion
logger.info(f"Original datetime: {dt.strftime('%Y-%m-%d %I:%M %p')} {timezone}")
logger.info(f"UTC datetime: {utc_dt.strftime('%Y-%m-%d %H:%M:%S')} UTC")
logger.info(f"Formatted ISO datetime: {formatted_datetime}")
logger.info(f"Date component: {utc_dt.strftime('%Y-%m-%d')}")
logger.info(f"Time component (AM/PM): {utc_dt.strftime('%I:%M %p')}")
logger.info(f"Time component (24h): {utc_dt.strftime('%H:%M:%S')}")
# Return a dictionary with both the formatted string and the datetime object
return {
'formatted_string': formatted_datetime,
'datetime_obj': utc_dt,
'date': utc_dt.strftime("%Y-%m-%d"),
'time': utc_dt.strftime("%I:%M %p") # Format time in AM/PM format
}
except Exception as e:
logger.error(f"Error formatting end datetime: {str(e)}")
return None
@llm.ai_callable()
async def addCategory(self,
categoryName: Annotated[
str, llm.TypeInfo(description="Name of the category to save")
]
):
"""Save a category with the given name and return status"""
try:
category_data = {
"name": categoryName
}
logger.info(f"Category data: {category_data}")
url = f"{Base_url}/api/assistant/save-category"
if self.accessToken:
headers = {
"authorization": f"Bearer {self.accessToken}",
"accesstoken": f"{self.accessToken}",
"content-type": "application/json",
"accept": "application/json"
}
response = requests.post(url, json=category_data, headers=headers)
if response.status_code == 200:
logger.info("Category data sent successfully")
category_id = response.json().get('category_id')
self.category_id=category_id
return {
"status": "success",
"message": "Category saved successfully",
}
else:
logger.error(f"Failed to save category. Status code: {response.status_code}")
return {"status": "error", "message": f"Failed to save category. Status code: {response.status_code}"}
else:
logger.error("No access token available. Please register or login first.")
return {"status": "error", "message": "Authentication required. Please register or login first."}
except Exception as e:
logger.error(f"Error saving category: {str(e)}")
return {"status": "error", "message": f"Error saving category: {str(e)}"}
@llm.ai_callable()
async def createProduct(self,
productName: Annotated[
str, llm.TypeInfo(description="Name of the product")
],
productCatName: Annotated[
str, llm.TypeInfo(description="Name of the product catgory")
],
productPrice: Annotated[
float, llm.TypeInfo(description="Price of the product")
],
productDescription: Annotated[
str, llm.TypeInfo(description="Description of the product")
],
prescription: Annotated[
str, llm.TypeInfo(description="prescrption of the product (yes, no)")
]
):
"""Create a new product with the specified details"""
try:
# Store product data for reference
self.product_data = {
"name": productName,
"price": productPrice,
"productCatName": productCatName,
"description": productDescription
}
logger.info(f"Creating product catagory: {productCatName}")
slug = productName.lower().replace(' ', '-').replace('&', 'and')
is_prescription = True if prescription.lower() == "yes" else False
product_info = {
"name": productName,
"slug": slug,
"description": productDescription,
"price": productPrice,
"type": "One Time",
"isPrescription": is_prescription,
"inTakeForm_id": "",
"questiioneriesForm_id": "",
"category_id": productCatName,
"sku": "",
"status": "active"
}
logger.info(f"Creating product: {product_info}")
url = f"{Base_url}/api/assistant/save-product"
if self.accessToken:
headers = {
"authorization": f"Bearer {self.accessToken}",
"accesstoken": f"{self.accessToken}",
"content-type": "application/json",
"accept": "application/json"
}
response = requests.post(url, json=product_info, headers=headers)
if response.status_code == 200:
logger.info("Product created successfully")
return {
"status": "success",
"message": "Product created successfully",
}
else:
logger.error(f"Failed to create product. Status code: {response.status_code}")
return {"status": "error", "message": f"Failed to create product. Status code: {response.status_code}"}
else:
logger.error("No access token available. Please register or login first.")
return {"status": "error", "message": "Authentication required. Please register or login first."}
except Exception as e:
logger.error(f"Error creating product: {str(e)}")
return {"status": "error", "message": f"Error creating product: {str(e)}"}
@llm.ai_callable()
async def suggestFormFields(self,
formName: Annotated[
str, llm.TypeInfo(description="Name of the form")
],
formType: Annotated[
str, llm.TypeInfo(description="Type of form (charting-forms, document-forms, simple-forms, consent-forms)")
]
):
"""Suggest appropriate form fields based on form name and type"""
try:
# Store form metadata for later use
self.form_data = {
"name": formName,
"type": formType
}
suggested_fields = []
# Generate field suggestions based on form type
if "charting" in formType.lower():
suggested_fields = [
{"label": "Date of Visit", "type": "Date", "required": True},
{"label": "Chief Complaint", "type": "Textarea", "required": True},
{"label": "Vital Signs", "type": "Textarea", "required": False},
{"label": "Assessment", "type": "Textarea", "required": True},
{"label": "Treatment Plan", "type": "Textarea", "required": True},
{"label": "Provider Signature", "type": "Signature", "required": True}
]
# Add more specific fields based on form name
if "progress" in formName.lower():
suggested_fields.extend([
{"label": "Progress Notes", "type": "Textarea", "required": True},
{"label": "Follow-up Plan", "type": "Textarea", "required": True}
])
elif "initial" in formName.lower():
suggested_fields.extend([
{"label": "Medical History", "type": "Textarea", "required": True},
{"label": "Family History", "type": "Textarea", "required": False},
{"label": "Social History", "type": "Textarea", "required": False},
{"label": "Allergies", "type": "Textarea", "required": True}
])
elif "document" in formType.lower():
suggested_fields = [
{"label": "Document Title", "type": "Text", "required": True},
{"label": "Date", "type": "Date", "required": True},
{"label": "Document Content", "type": "Textarea", "required": True},
{"label": "Supporting Files", "type": "File", "required": False},
{"label": "Provider Signature", "type": "Signature", "required": True}
]
elif "consent" in formType.lower():
suggested_fields = [
{"label": "Date", "type": "Date", "required": True},
{"label": "Consent Statement", "type": "Textarea", "required": True},
{"label": "I understand and agree to the above", "type": "Checkbox", "required": True},
{"label": "Patient Signature", "type": "Signature", "required": True},
{"label": "Provider Signature", "type": "Signature", "required": True}
]
else: # simple-forms or default
suggested_fields = [
{"label": "Patient First Name", "type": "Text", "required": True, "name": "patient_custom_fname", "readonly": True},
{"label": "Patient Last Name", "type": "Text", "required": True, "name": "patient_custom_lname", "readonly": True},
{"label": "Patient Gender", "type": "Select", "required": True, "name": "patient_custom_gender_identity",
"options": ["Male", "Female", "Other"]},
{"label": "Patient DOB", "type": "date", "required": True, "name": "patient_custom_DOB"},
{"label": "Patient Email", "type": "Text", "required": True, "name": "patient_custom_email"},
{"label": "Patient Phone", "type": "Text", "required": True, "name": "patient_extra_data_preferred_phone"},
]
# Store suggested fields for later modification
self.form_data["suggested_fields"] = suggested_fields
return {
"status": "success",
"fields": suggested_fields,
"message": f"Suggested {len(suggested_fields)} fields for {formType} form: {formName}"
}
except Exception as e:
logger.error(f"Error suggesting form fields: {str(e)}")
return {"status": "error", "message": f"Error suggesting form fields: {str(e)}"}
@llm.ai_callable()
async def modifyFormFields(self,
modifications: Annotated[
str, llm.TypeInfo(description="JSON string of field modifications (add, remove, or change)")
]
):
"""Modify the suggested form fields based on user feedback"""
try:
if not self.form_data.get("suggested_fields"):
return {"status": "error", "message": "No suggested fields found. Please suggest fields first."}
# Parse the modifications
mods = json.loads(modifications)
current_fields = self.form_data["suggested_fields"]
# Handle additions
if "add" in mods:
for new_field in mods["add"]:
current_fields.append(new_field)
# Handle removals
if "remove" in mods:
labels_to_remove = [f.lower() for f in mods["remove"]]
current_fields = [f for f in current_fields if f["label"].lower() not in labels_to_remove]
# Handle changes
if "change" in mods:
for change in mods["change"]:
if "old_label" in change and "new_field" in change:
old_label = change["old_label"]
new_field = change["new_field"]
# Find and update the field
for i, field in enumerate(current_fields):
if field["label"].lower() == old_label.lower():
current_fields[i] = new_field
break
# Update the stored fields
self.form_data["suggested_fields"] = current_fields
return {
"status": "success",
"fields": current_fields,
"message": "Form fields modified successfully"
}
except json.JSONDecodeError:
logger.error("Invalid JSON format for modifications")
return {"status": "error", "message": "Invalid JSON format for modifications"}
except Exception as e:
logger.error(f"Error modifying form fields: {str(e)}")
return {"status": "error", "message": f"Error modifying form fields: {str(e)}"}
@llm.ai_callable()
async def saveForm(self,
formName: Annotated[
str, llm.TypeInfo(description="Name of the form to be saved")
],
formType: Annotated[
str, llm.TypeInfo(description="Type of form (charting-forms, document-forms, simple-forms, consent-forms)")
],
formFields: Annotated[
str, llm.TypeInfo(description="JSON string of form fields, each containing label, type, and options")
]
):
"""Save form configuration with fields and return status"""
try:
# Parse the JSON string to a list
logger.info(f"Form data: {json.dumps(formFields)}")
formeo_json_str = await self.convertToFormeoJson(formFields)
form_data = {
"data": formeo_json_str,
"name": formName,
"type": formType
}
logger.info(f"Saving form: {formName} of type {formType}")
logger.info(f"Form data: {json.dumps(form_data)}")
url = f"{Base_url}/api/assistant/store-form"
if self.accessToken:
headers = {
"Authorization": f"Bearer {self.accessToken}",
"accesstoken": f"{self.accessToken}",
"content-type": "application/json",
"accept": "application/json"
}
response = requests.post(url, json=form_data, headers=headers)
logger.info(f"Response Status Code: {response.status_code}")
logger.info(f"Response Content: {response.text}")
if response.status_code == 200:
logger.info("Form saved successfully")
self.onboarding_stage = "product_setup" # Advance to next stage
return {
"status": "success",
"message": "Form saved successfully",
"next_stage": "product_setup"
}
else:
logger.error(f"Failed to save form. Status code: {response.status_code}")
return {"status": "error", "message": f"Failed to save form. Status code: {response.status_code}"}
else:
logger.error("No access token available. Please register or login first.")
return {"status": "error", "message": "Authentication required. Please register or login first."}
except json.JSONDecodeError:
logger.error("Invalid JSON format for formFields")
return {"status": "error", "message": "Invalid JSON format for form fields"}
except Exception as e:
logger.error(f"Error saving form: {str(e)}")
return {"status": "error", "message": f"Error saving form: {str(e)}"}
async def convertToFormeoJson(self, formFields):
"""Convert simple form fields JSON to Formeo JSON structure"""
try:
# Parse the input JSON
fields_list = json.loads(formFields)
# Generate unique IDs for the main structure
form_id = str(uuid.uuid4())
stage_id = str(uuid.uuid4())
# Initialize the structure with empty collections
formeo_json = {
"id": form_id,
"stages": {
stage_id: {
"children": [],
"id": stage_id
}
},
"rows": {},
"columns": {},
"fields": {}
}
# Create rows for each field
row_ids = []
for field_data in fields_list:
# Generate IDs for each component
row_id = str(uuid.uuid4())
column_id = str(uuid.uuid4())
field_id = str(uuid.uuid4())
# Add row ID to the stage children and row_ids list
formeo_json["stages"][stage_id]["children"].append(row_id)
row_ids.append(row_id)
# Create the row
formeo_json["rows"][row_id] = {
"config": {
"fieldset": False,
"legend": "",
"inputGroup": False
},
"children": [column_id],
"className": ["formeo-row"],
"id": row_id
}
# Create the column
formeo_json["columns"][column_id] = {
"config": {
"width": "100%"
},
"children": [field_id],
"className": ["formeo-column"],
"id": column_id
}
# Get field type and label
field_type = field_data.get("type", "Text")
field_label = field_data.get("label", "Field")
field_options = field_data.get("options", [])
field_required = field_data.get("required", False)
field_name = field_data.get("name", f"key_{uuid.uuid4().hex[:20]}")
field_readonly = field_data.get("readonly", False)
field_placeholder = field_data.get("placeholder", "")
# Create the appropriate field based on type
if field_type.lower() == "text":
if "patient first name" in field_label.lower():
formeo_json["fields"][field_id] = {
"tag": "input",
"config": {
"label": field_label,
"disabledAttrs": ["type"],
"lockedAttrs": ["className"],
"controlId": "patient-first-name"
},
"meta": {
"group": "common",
"icon": "👤",
"id": "patient-first-name"
},
"attrs": {
"className": "patient-first-name",
"type": "text",
"name": "patient_custom_fname",
"readonly": True,
"required": field_required,
"placeholder": "Enter patient first name"
},
"id": field_id
}
elif "patient last name" in field_label.lower():
formeo_json["fields"][field_id] = {
"tag": "input",
"config": {
"label": field_label,
"disabledAttrs": ["type"],
"lockedAttrs": ["className"],
"controlId": "patient-last-name"
},
"meta": {
"group": "common",
"icon": "👤",
"id": "patient-last-name"
},
"attrs": {
"className": "patient-last-name",
"type": "text",
"name": "patient_custom_lname",
"readonly": True,
"required": field_required,
"placeholder": "Enter patient last name"
},
"id": field_id
}
elif "patient email" in field_label.lower():
formeo_json["fields"][field_id] = {
"tag": "input",
"config": {
"label": field_label,
"disabledAttrs": ["type"],
"lockedAttrs": ["className"],
"controlId": "patient-email"
},
"meta": {
"group": "common",
"icon": "📧",
"id": "patient-email"
},
"attrs": {
"className": "patient-email",
"type": "email",
"name": "patient_custom_email",
"readonly": True,
"required": field_required,
"placeholder": "Enter patient email"
},
"id": field_id
}
elif "patient phone" in field_label.lower():
formeo_json["fields"][field_id] = {
"tag": "input",
"config": {
"label": field_label,
"disabledAttrs": ["type"],
"lockedAttrs": ["className"],
"controlId": "patient-phone"
},
"meta": {
"group": "common",
"icon": "📞",
"id": "patient-phone"
},
"attrs": {
"className": "patient-phone",
"type": "tel",
"name": "patient_extra_data_preferred_phone",
"required": field_required,
"placeholder": "Enter patient phone number"
},
"id": field_id
}
elif "tags" in field_label.lower():
formeo_json["fields"][field_id] = {
"tag": "input",
"config": {
"label": field_label,
"disabledAttrs": ["type"],
"lockedAttrs": ["className"],
"controlId": "tag-input"
},
"meta": {
"group": "common",
"icon": "🏷️",
"id": "tag-input"
},
"attrs": {
"className": "tags-input",
"type": "text",
"name": field_name,
"required": field_required,
"placeholder": "Add tags..."
},
"id": field_id
}
else:
field_attrs = {
"required": field_required,
"type": "text",
"className": "form-control",
"name": field_name
}
if field_readonly:
field_attrs["readonly"] = True
if field_placeholder:
field_attrs["placeholder"] = field_placeholder
formeo_json["fields"][field_id] = {
"tag": "input",
"attrs": field_attrs,
"config": {
"label": field_label,
"controlId": "text-input"
},
"id": field_id
}
elif field_type.lower() == "textarea":
formeo_json["fields"][field_id] = {
"tag": "textarea",
"attrs": {
"required": field_required,
"className": "form-control",
"name": f"key_{uuid.uuid4().hex[:20]}"
},
"config": {
"label": field_label,
"controlId": "textarea"
},
"id": field_id
}
elif field_type.lower() == "number":
formeo_json["fields"][field_id] = {
"tag": "input",
"attrs": {
"required": field_required,
"type": "number",
"className": "form-control",
"name": f"key_{uuid.uuid4().hex[:20]}"
},
"config": {
"label": field_label,
"controlId": "number"
},
"id": field_id
}
elif field_type.lower() == "date":
if "patient dob" in field_label.lower():
formeo_json["fields"][field_id] = {
"tag": "input",
"config": {
"label": field_label,
"disabledAttrs": ["type"],
"lockedAttrs": ["className"],
"controlId": "patient-dob"
},
"meta": {
"group": "common",
"icon": '📅',
"id": "patient-dob"
},
"attrs": {
"className": "patient-dob",
"type": "date",
"name": "patient_custom_DOB",
"required": field_required,
"placeholder": "Enter patient DOB"
},
"id": field_id
}
else:
formeo_json["fields"][field_id] = {
"tag": "input",
"attrs": {
"required": field_required,
"type": "date",
"className": "form-control",
"name": f"key_{uuid.uuid4().hex[:20]}"
},
"config": {
"label": field_label,
"controlId": "date-input"
},
"id": field_id
}
elif field_type.lower() == "select" or field_type.lower() == "dropdown":
if "patient gender" in field_label.lower():
formeo_json["fields"][field_id] = {
"tag": "select",
"config": {
"label": "Patient Gender",
"disabledAttrs": ["type"],
"lockedAttrs": ["className"]
},
"meta": {
"group": "common",
"icon": "🚻",
"id": "patient-gender"
},
"attrs": {
"className": "patient-gender",
"name": "patient_custom_gender_identity",
"required": True
},
"options": [
{"label": "Male", "value": "Male"},
{"label": "Female", "value": "Female"},
{"label": "Other", "value": "other"}
],
"id": field_id
}
else:
select_field = {
"tag": "select",
"attrs": {
"required": field_required,
"className": "form-control",
"name": f"key_{uuid.uuid4().hex[:20]}"
},
"config": {
"label": field_label,
"controlId": "select"
},
"options": [{"label": opt, "value": opt} for opt in field_options],
"id": field_id
}
formeo_json["fields"][field_id] = select_field
elif field_type.lower() == "checkbox":
formeo_json["fields"][field_id] = {
"tag": "input",
"attrs": {
"required": field_required,
"type": "checkbox",
"className": "form-check-input",
"name": f"key_{uuid.uuid4().hex[:20]}"
},
"config": {
"label": field_label,
"controlId": "checkbox"
},
"id": field_id
}
elif field_type.lower() == "radio":
radio_field = {
"tag": "div",
"attrs": {
"className": "radio-group",
"name": f"key_{uuid.uuid4().hex[:20]}"
},
"config": {
"label": field_label,
"controlId": "radio-group"
},
"content": "",
"id": field_id
}
# Add radio options
radio_content = "<div class='radio-options'>"
for opt in field_options:
radio_id = f"radio_{uuid.uuid4().hex[:8]}"
radio_content += f"""
<div class="form-check">
<input class="form-check-input" type="radio" name="{field_label.lower().replace(' ', '_')}" id="{radio_id}" value="{opt}">
<label class="form-check-label" for="{radio_id}">{opt}</label>
</div>
"""
radio_content += "</div>"
radio_field["content"] = radio_content
formeo_json["fields"][field_id] = radio_field
elif field_type.lower() == "signature":
formeo_json["fields"][field_id] = {
"tag": "signature-pad",
"attrs": {
"className": "signature-pad",
"name": f"signature_key_{uuid.uuid4().hex[:20]}"
},
"config": {
"label": field_label,
"controlId": "signature",
"disabledAttrs": ["type"],
"lockedAttrs": ["className"]
},
"meta": {
"group": "common",
"icon": "✍️",
"id": "signature"
},
"children": [
{
"tag": "div",
"attrs": {
"className": "canvas-wrapper"
},
"children": [
{
"tag": "canvas"
},
{
"tag": "input",
"attrs": {
"type": "hidden",
"name": "signature-data",
"className": "signature-hidden-input"
}
}
]
},
{
"tag": "div",
"attrs": {
"className": "signature-pad--footer"
},
"children": [
{
"tag": "div",
"attrs": {
"className": "signature-pad--actions"
},
"children": [
{
"tag": "div",
"attrs": {
"className": "column"
},
"children": [
{
"tag": "button",
"attrs": {
"type": "button",
"className": "button clear",
"data-action": "clear"
},
"content": "Clear"
}
]
}
]
}
]
}
],
"id": field_id
}
elif field_type.lower() == "file" or field_type.lower() == "upload":
formeo_json["fields"][field_id] = {
"tag": "input",
"attrs": {
"required": field_required,
"type": "file",
"className": "form-control",
"name": f"key_{uuid.uuid4().hex[:20]}"
},
"config": {
"label": field_label,
"controlId": "file-input"
},
"id": field_id
}
elif field_type.lower() == "drag-drop-file":
formeo_json["fields"][field_id] = {
"tag": "drag-drop-file",
"attrs": {
"className": "drag-drop-file-container",
"required": field_required,
"type": "file",
"name": f"key_{uuid.uuid4().hex[:20]}"
},
"config": {
"label": field_label,
"controlId": "drag-drop-file",
"disabledAttrs": ["type"],
"lockedAttrs": ["className"]
},
"meta": {
"group": "common",
"icon": "📂",
"id": "drag-drop-file"
},
"content": """
<div class="drag-drop-container">
<div class="drag-drop-area">
<p>Drag and drop or click to upload ID</p>
<p class="file-types">Accepted: JPEG, PNG, PDF (Max 5MB)</p>
<input type="file" class="file-input" accept=".jpeg,.jpg,.png,.pdf" style="opacity: 0; position: absolute; z-index: -1;" />
<div class="file-list"></div>
</div>
</div>
""",
"id": field_id
}
else: # Default to text input for unknown types
formeo_json["fields"][field_id] = {
"tag": "input",
"attrs": {
"required": field_required,
"type": "text",
"className": "form-control",
"name": f"key_{uuid.uuid4().hex[:20]}"
},
"config": {
"label": field_label,
"controlId": "text-input"
},
"id": field_id
}
# Convert to JSON string
logger.info(f"Form data formeo_json: {formeo_json}")
return formeo_json
except Exception as e:
logger.error(f"Error converting to Formeo JSON: {str(e)}")
return json.dumps({})
@llm.ai_callable()
async def bookAppointment(self,
practitioner_id: Annotated[
str, llm.TypeInfo(description="ID of the practitioner to check availability for")
]
):
"""Book an appointment process by checking a practitioner's availability first"""
try:
logger.info(f"Starting appointment booking process for practitioner ID: {practitioner_id}")
# First, check if the practitioner exists and has availability
availability_result = await self.check_practitioner_availability(
practitioner_id=practitioner_id
)
if availability_result.get("status") == "error":
return {
"status": "error",
"message": f"Failed to check practitioner availability: {availability_result.get('message')}",
"details": availability_result
}
# Initialize booking_data as a dictionary if it doesn't exist
self.booking_data = {}
# Store the practitioner ID for the next steps
self.booking_data["practitioner_id"] = practitioner_id
# Get the formatted available dates
available_dates = availability_result.get("allowed_dates", [])
if not available_dates:
return {
"status": "error",
"message": f"No available slots found for practitioner {practitioner_id} in the current month.",
"next_step": "Try a different practitioner or a different month"
}
# Store all slots data for future reference
self.booking_data["all_slots_data"] = availability_result.get("all_slots_data", [])
available_slots = availability_result.get("all_slots_data", [])
logger.info(f"Available dates for practitioner ID {practitioner_id}: {available_slots}")
return {
"status": "success",
"message": f"Found availability for practitioner {practitioner_id} on {len(available_dates)} dates",
"available_dates": available_dates,
"next_step": "select_date",
"instruction": "Please select one of the available date to proceed with booking"
}
except Exception as e:
logger.error(f"Error starting appointment booking: {str(e)}")
return {"status": "error", "message": f"Error starting appointment booking: {str(e)}"}
@llm.ai_callable()
async def find_patient(self,
search_term: Annotated[
str, llm.TypeInfo(description="Name, email, or ID of the patient to search for")
]
):
"""Find a patient by name, email, or ID"""
try:
logger.info(f"Searching for patient with term: {search_term}")
url = f"{Base_url}/api/emr/patients-list"
if not self.accessToken:
return {
"status": "error",
"message": "Authentication token not available."
}
headers = {
"authorization": f"Bearer {self.accessToken}",
"accesstoken": f"{self.accessToken}",
"content-type": "application/json",
"accept": "application/json"
}
params = {
"search": search_term
}
response = requests.get(url, headers=headers, params=params)
logger.info(f"Response status code: {response.status_code}")
if response.status_code == 200:
patients_data = response.json()
patients = patients_data.get('data', [])
if not patients:
return {
"status": "error",
"message": f"No patients found matching '{search_term}'."
}
# Format the patients for better presentation
formatted_patients = []
for patient in patients:
formatted_patient = {
"id": patient.get('id'),
"name": f"{patient.get('fname', '')} {patient.get('lname', '')}",
"email": patient.get('email', ''),
}
formatted_patients.append(formatted_patient)
return {
"status": "success",
"message": f"Found {len(formatted_patients)} patients matching '{search_term}'",
"patients": formatted_patients
}
else:
return {
"status": "error",
"message": f"Failed to search patients. Status code: {response.status_code}",
"details": response.text
}
except Exception as e:
logger.error(f"Error searching patients: {str(e)}")
return {"status": "error", "message": f"Error searching patients: {str(e)}"}
@llm.ai_callable()
async def check_practitioner_availability(self,
practitioner_id: Annotated[
str, llm.TypeInfo(description="ID of the practitioner to check availability for")
],
):
"""Check a practitioner's availability get all available dates"""
try:
# If no date is provided, use current month
today = datetime.datetime.now()
date = today.strftime("%Y-%m-%d")
logger.info(f"No date provided, using current date: {date}")
logger.info(f"Checking availability for practitioner ID: {practitioner_id} on date: {date}")
# Parse and format the date
formatted_date = self._parse_date_to_iso(date)
if not formatted_date:
return {
"status": "error",
"message": f"Invalid date format: {date}. Please use YYYY-MM-DD format or natural language."
}
# Extract month from the formatted date
try:
date_obj = datetime.datetime.strptime(formatted_date, "%Y-%m-%d")
month = date_obj.strftime("%m") # Get month as 01-12 string
year = date_obj.strftime("%Y") # Get year as YYYY string
except ValueError:
return {
"status": "error",
"message": f"Failed to extract month from date: {formatted_date}"
}
url = f"{Base_url}/api/assistant/get-available-slots-data/{practitioner_id}"
if not self.accessToken:
return {
"status": "error",
"message": "Authentication token not available."
}
headers = {
"authorization": f"Bearer {self.accessToken}",
"accesstoken": f"{self.accessToken}",
"content-type": "application/json",
"accept": "application/json"
}
params = {
"month": month,
"timezone": self.getTimezone or "EST"
}
logger.info(f"Requesting availability with params: {params}")
response = requests.post(url, headers=headers, json=params)
if response.status_code == 200:
availability_data = response.json()
# The response is a dictionary with dates as keys and arrays of slots as values
all_dates = availability_data
if not all_dates:
return {
"status": "error",
"message": f"No availability found for practitioner {practitioner_id} in {month}/{year}"
}
# Extract the date keys (similar to Object.keys() in JavaScript)
date_keys = list(all_dates.keys())
# Format the dates (similar to the JavaScript map operation)
allowed_dates = []
for date_key in date_keys:
# Only include dates that have available slots
if date_key in all_dates and all_dates[date_key] and len(all_dates[date_key]) > 0:
# Format date from YYYY-MM-DD to MM-DD-YYYY for display
try:
date_obj = datetime.datetime.strptime(date_key, "%Y-%m-%d")
formatted_display = date_obj.strftime("%m-%d-%Y")
allowed_dates.append({
"title": formatted_display, # Display format MM-DD-YYYY
"value": date_key, # Original date value YYYY-MM-DD
"slots_count": len(all_dates[date_key]) # Number of available slots
})
except ValueError:
logger.warning(f"Could not format date: {date_key}")
# Sort dates chronologically
allowed_dates.sort(key=lambda x: x["value"])
logger.info(f"Checking availability {allowed_dates}")
return {
"status": "success",
"message": f"Found availability on {len(allowed_dates)} dates for practitioner {practitioner_id}",
"allowed_dates": allowed_dates,
"all_slots_data": all_dates # Keep the full data for reference
}
else:
return {
"status": "error",
"message": f"Failed to retrieve availability. Status code: {response.status_code}",
"details": response.text
}
except Exception as e:
logger.error(f"Error checking practitioner availability: {str(e)}")
return {"status": "error", "message": f"Error checking practitioner availability: {str(e)}"}
@llm.ai_callable()
async def select_appointment_date(self,
date: Annotated[
str, llm.TypeInfo(description="Date for the appointment in YYYY-MM-DD format or natural language")
]
):
"""Select a date for the appointment after checking practitioner availability"""
try:
if not hasattr(self, 'booking_data') or not self.booking_data:
return {
"status": "error",
"message": "No active booking process. Please start by selecting a practitioner first.",
"next_step": "bookAppointment"
}
practitioner_id = self.booking_data.get("practitioner_id")
if not practitioner_id:
return {
"status": "error",
"message": "Practitioner ID not found in booking data. Please start the booking process again.",
"next_step": "bookAppointment"
}
# Parse the date if it's in natural language format
parsed_date = self._parse_date_to_iso(date) if hasattr(self, '_parse_date_to_iso') else date
if not parsed_date:
return {
"status": "error",
"message": f"Invalid date format: {date}. Please use YYYY-MM-DD format or natural language.",
"next_step": "select_appointment_date"
}
logger.info(f"Selected date for appointment: {parsed_date}")
# Check if the selected date is in the available dates
all_slots_data = self.booking_data.get("all_slots_data", {})
if not all_slots_data:
return {
"status": "error",
"message": "No available slots data found. Please check practitioner availability first.",
"next_step": "bookAppointment"
}
# Check if the date exists in the available slots
if parsed_date not in all_slots_data:
# Try to find the date in the formatted_datetime fields
found = False
for date_key, slots in all_slots_data.items():
for slot in slots:
if parsed_date in slot.get("formatted_datetime", ""):
parsed_date = date_key
found = True
break
if found:
break
if not found:
return {
"status": "error",
"message": f"No available slots found for the selected date: {parsed_date}",
"next_step": "select_appointment_date",
"instruction": "Please select a different date from the available dates"
}
# Get the available time slots for the selected date
available_slots = all_slots_data.get(parsed_date, [])
if not available_slots or len(available_slots) == 0:
return {
"status": "error",
"message": f"No available time slots found for the selected date: {parsed_date}",
"next_step": "select_appointment_date",
"instruction": "Please select a different date from the available dates"
}
# Format the time slots for display
formatted_slots = []
for slot in available_slots:
start_time = slot.get("start_time", "")
formatted_datetime = slot.get("formatted_datetime", "")
# Extract time from formatted_datetime
time_part = ""
if " " in formatted_datetime:
_, time_part = formatted_datetime.split(" ", 1)
formatted_slots.append({
"start_time": start_time,
"formatted_time": time_part,
"value": start_time, # Keep original ISO format for backend
"display": time_part or start_time
})
# Store the selected date in booking data
self.booking_data["selected_date"] = parsed_date
self.booking_data["available_slots"] = formatted_slots
# Format the date for display
try:
date_obj = datetime.datetime.strptime(parsed_date, "%Y-%m-%d")
formatted_display_date = date_obj.strftime("%A, %B %d, %Y") # e.g., "Monday, January 1, 2023"
except ValueError:
formatted_display_date = parsed_date
logger.info(f"Selected date for appointment: {parsed_date} ={formatted_display_date}={formatted_slots}")
return {
"status": "success",
"message": f"Found {len(formatted_slots)} available time slots on {formatted_display_date}",
"date": parsed_date,
"formatted_date": formatted_display_date,
"available_slots": formatted_slots,
"next_step": "select_appointment_time",
"instruction": "Please select one of the available time slots to proceed with booking"
}
except Exception as e:
logger.error(f"Error selecting appointment date: {str(e)}")
return {"status": "error", "message": f"Error selecting appointment date: {str(e)}"}
@llm.ai_callable()
async def select_appointment_time_and_patient(self,
time: Annotated[
str, llm.TypeInfo(description="Time for the appointment in HH:MM AM/PM format")
],
patient_id: Annotated[
str, llm.TypeInfo(description="ID of the patient for the appointment")
]
):
"""Select a time slot and patient for the appointment after selecting a date"""
try:
if not hasattr(self, 'booking_data') or not self.booking_data.get("practitioner_id") or not self.booking_data.get("selected_date"):
return {
"status": "error",
"message": "Incomplete booking information. Please select a practitioner and date first.",
"next_step": "start_appointment_booking"
}
practitioner_id = self.booking_data.get("practitioner_id")
selected_date = self.booking_data.get("selected_date")
available_slots = self.booking_data.get("available_slots", [])
logger.info(f"Selecting time {time} for date {selected_date} with practitioner {practitioner_id}")
logger.info(f"Available slots: {available_slots}")
# Find the matching time slot from our stored available slots
selected_slot = None
for slot in available_slots:
formatted_time = slot.get("formatted_time", "")
display_time = slot.get("display", "")
# Try to match with either formatted_time or display
if self._time_matches(formatted_time, time) or self._time_matches(display_time, time):
selected_slot = slot
logger.info(f"Found matching slot: {slot}")
break
if not selected_slot:
return {
"status": "error",
"message": f"No available slot found at {time} on {selected_date}",
"available_times": [slot.get("display") for slot in available_slots],
"next_step": "select_appointment_time",
"instruction": "Please select from one of the available times"
}
# Verify that the patient exists
patient_result = await self.find_patient(search_term=patient_id)
if patient_result.get("status") != "success":
return {
"status": "error",
"message": f"Failed to verify patient ID: {patient_id}",
"details": patient_result,
"next_step": "find_patient",
"instruction": "Please provide a valid patient ID or search for a patient"
}
# Store the selected time and patient
self.booking_data["time"] = time
self.booking_data["display_time"] = selected_slot.get("display", time)
self.booking_data["start_time"] = selected_slot.get("start_time") # This is the ISO format time for the backend
self.booking_data["patient_id"] = patient_id
self.booking_data["selected_slot"] = selected_slot
# Get patient details for confirmation
patient_details = None
if patient_result.get("patients") and len(patient_result.get("patients")) > 0:
for patient in patient_result.get("patients"):
if str(patient.get("id")) == str(patient_id):
patient_details = patient
break
# Now we have all the information needed to book the appointment
return {
"status": "success",
"message": f"Selected time {selected_slot.get('display')} on {selected_date} for patient {patient_details.get('name') if patient_details else patient_id}",
"booking_details": {
"practitioner_id": practitioner_id,
"date": selected_date,
"display_time": selected_slot.get("display", time),
"start_time": selected_slot.get("start_time"), # ISO format time for backend
"patient_id": patient_id,
"patient_name": patient_details.get("name") if patient_details else "Unknown"
},
"next_step": "confirm_booking",
"instruction": "Please confirm the booking details or provide appointment type and notes"
}
except Exception as e:
logger.error(f"Error selecting appointment time and patient: {str(e)}")
return {"status": "error", "message": f"Error selecting appointment time and patient: {str(e)}"}
@llm.ai_callable()
async def confirm_booking(self,
confirm: Annotated[
bool, llm.TypeInfo(description="Confirm the booking (true) or cancel (false)")
],
appointment_type: Annotated[
str, llm.TypeInfo(description="Type of appointment ('Telehealth Visit','Follow Up','Lab Review','Initial Consult', 'Office Visit', etc.)")
] = "Consultation",
notes: Annotated[
str, llm.TypeInfo(description="Any additional notes for the appointment")
] = "",
service: Annotated[
str, llm.TypeInfo(description="Service type (e.g., 'Patient Followup', 'New Patient', etc.)")
] = "Patient Followup",
payment_type: Annotated[
str, llm.TypeInfo(description="Payment type ('Cash Only', 'Insurance Only','Cash and Insurance','Auto','Workers','Prepaid')")
] = "Cash Only",
):
"""Confirm and finalize the appointment booking with optional appointment type and notes"""
try:
if not hasattr(self, 'booking_data') or not self.booking_data.get("practitioner_id") or not self.booking_data.get("selected_date") or not self.booking_data.get("start_time"):
return {
"status": "error",
"message": "Incomplete booking information. Please complete the booking process first.",
"next_step": "start_appointment_booking"
}
if not confirm:
# Clear booking data and cancel the process
self.booking_data = {}
return {
"status": "info",
"message": "Booking process cancelled.",
"next_step": "start_appointment_booking"
}
# Extract all required booking information
practitioner_id = self.booking_data.get("practitioner_id")
patient_id = self.booking_data.get("patient_id")
appointment_date = self.booking_data.get("selected_date")
start_time = self.booking_data.get("start_time")
display_time = self.booking_data.get("display_time", "")
logger.info(f"Confirming appointment booking for patient {patient_id} with practitioner {practitioner_id}")
logger.info(f"Date: {appointment_date}, Time: {start_time} ({display_time})")
logger.info(f"Type: {appointment_type}, Notes: {notes}")
# Calculate end time (assuming 30 minutes appointment duration)
# This is a placeholder - you may need to adjust based on your requirements
end_time = "09:00 PM" # Default end time
# Get patient name for title
patient_name = "Patient Appointment"
try:
patient_result = await self.find_patient(search_term=patient_id)
if patient_result.get("status") == "success" and patient_result.get("patients"):
for patient in patient_result.get("patients"):
if str(patient.get("id")) == str(patient_id):
patient_name = patient.get("name", "Patient Appointment")
break
except Exception as e:
logger.warning(f"Could not retrieve patient name: {str(e)}")
# Prepare the booking data for API request according to the required payload structure
booking_data = {
"practitioner_id": practitioner_id,
"patient_id": patient_id,
"title": patient_name,
"date": appointment_date,
"start_time": start_time, # Use the ISO format time from the slot
"end_time": end_time,
"appointment_type": appointment_type,
"service": service,
"payment_type": payment_type,
"location": '',
"room": '',
"notes": notes,
"status": "ON_TIME"
}
url = f"{Base_url}/api/assistant/book-appointment"
if not self.accessToken:
return {
"status": "error",
"message": "Authentication token not available.",
"next_step": "login"
}
headers = {
"authorization": f"Bearer {self.accessToken}",
"accesstoken": f"{self.accessToken}",
"content-type": "application/json",
"accept": "application/json"
}
logger.info(f"Sending booking request with data: {booking_data}")
response = requests.post(url, headers=headers, json=booking_data)
logger.info(f"Booking response status code: {response.status_code}")
logger.info(f"Booking response content: {response.text}")
if response.status_code == 200:
booking_result = response.json()
# Format the date and time for display
try:
date_obj = datetime.datetime.strptime(appointment_date, "%Y-%m-%d")
formatted_date = date_obj.strftime("%A, %B %d, %Y") # e.g., "Monday, January 1, 2023"
except ValueError:
formatted_date = appointment_date
# Clear booking data as the process is complete
self.booking_data = {}
return {
"status": "success",
"message": f"Appointment successfully booked!",
"appointment_details": {
"id": booking_result.get("data", {}).get("id", ""),
"practitioner_id": practitioner_id,
"patient_id": patient_id,
"patient_name": patient_name,
"date": formatted_date,
"time": display_time or start_time,
"appointment_type": appointment_type,
"service": service,
"payment_type": payment_type,
"notes": notes
},
"confirmation": f"Appointment confirmed for {patient_name} on {formatted_date} at {display_time or start_time}",
"next_step": "appointment_booked"
}
else:
error_message = "Failed to book appointment"
try:
error_data = response.json()
if "message" in error_data:
error_message = error_data["message"]
except:
error_message = f"Failed to book appointment. Status code: {response.status_code}"
return {
"status": "error",
"message": error_message,
"details": response.text,
"next_step": "retry_booking"
}
except Exception as e:
logger.error(f"Error confirming appointment booking: {str(e)}")
return {
"status": "error",
"message": f"Error confirming appointment booking: {str(e)}",
"next_step": "retry_booking"
}
def _time_matches(self, slot_time, input_time):
"""Helper function to compare if two time strings match, handling different formats"""
try:
# Normalize both times to 24-hour format for comparison
slot_time_obj = datetime.datetime.strptime(slot_time, "%I:%M %p")
slot_time_24h = slot_time_obj.strftime("%H:%M")
input_time_obj = datetime.datetime.strptime(input_time, "%I:%M %p")
input_time_24h = input_time_obj.strftime("%H:%M")
return slot_time_24h == input_time_24h
except ValueError:
# If parsing fails, do a direct string comparison
return slot_time.strip() == input_time.strip()