2157 lines
100 KiB
Python
2157 lines
100 KiB
Python
import json
|
|
import uuid
|
|
from livekit.agents import (llm, JobContext)
|
|
from typing import Annotated, Dict, Any, List
|
|
import logging
|
|
import requests
|
|
import datetime
|
|
import os
|
|
import re
|
|
logger = logging.getLogger("voice-agent")
|
|
Base_url = "http://hgh-copy.test"
|
|
|
|
class AssistantFnc(llm.FunctionContext):
|
|
"""
|
|
Function context for QubeCare's autonomous onboarding agent.
|
|
Handles the complete provider onboarding process through sequential steps:
|
|
1. Authentication base on check user if return true then login otherwise signup (signup/login)
|
|
2. Form Setup
|
|
3. Product Configuration
|
|
4. Product Configuration
|
|
5. Signature Setup
|
|
"""
|
|
|
|
def __init__(self, ctx: JobContext):
|
|
super().__init__()
|
|
self.ctx = ctx
|
|
self.accessToken = None
|
|
self.onboarding_stage = "authentication" # Track current onboarding stage
|
|
self.form_data = {} # Store form data during the process
|
|
self.product_data = {} # Store product data during the process
|
|
self.category_id = None # Replace with your actual category ID
|
|
self.username = None # Store username for later use
|
|
self.password = None # Store password for later use
|
|
self.user_email = None # Store email for later use
|
|
self.start_date = None
|
|
self.end_date = None
|
|
self.booking_data=None
|
|
metadata = json.loads(self.ctx.job.metadata)
|
|
token = metadata.get("token")
|
|
#self.accessToken='342|lRppO40lM2myq2E9aru4fMeN6RsFOyarlBacfmU8c904c6a3'
|
|
self.accessToken=token
|
|
self.timezone =metadata.get("timezone")
|
|
timezone_list = {
|
|
'America/New_York': 'EST',
|
|
'America/Los_Angeles': 'PST',
|
|
'America/Chicago': 'CST',
|
|
'America/Denver': 'MST',
|
|
'UTC': 'UTC',
|
|
}
|
|
|
|
self.getTimezone = timezone_list.get(self.timezone, self.timezone)
|
|
logger.info(f"self.getTimezone is {self.getTimezone}")
|
|
@llm.ai_callable()
|
|
async def getAppointment(self,
|
|
date: Annotated[
|
|
str, llm.TypeInfo(description="Specific date for appointments (MM-DD-YYYY)")
|
|
] = None,
|
|
start_date: Annotated[
|
|
str, llm.TypeInfo(description="Start date for appointments range (MM-DD-YYYY)")
|
|
] = None,
|
|
end_date: Annotated[
|
|
str, llm.TypeInfo(description="End date for appointments range (MM-DD-YYYY)")
|
|
] = None
|
|
):
|
|
"""Get appointments based on a specific date or date range"""
|
|
try:
|
|
# Prepare parameters
|
|
logger.info(f"Getting appointments with parameters: {self.accessToken}")
|
|
parameters = {}
|
|
|
|
# If specific date is provided, use it for both start and end date
|
|
if date:
|
|
try:
|
|
month, day, year = date.split('-')
|
|
formatted_date = f"{year}-{month}-{day}"
|
|
parameters["start_date"] = formatted_date
|
|
parameters["end_date"] = formatted_date
|
|
logger.info(f"Using specific date: {formatted_date}")
|
|
except ValueError:
|
|
# If already in YYYY-MM-DD format or other format, use as is
|
|
parameters["start_date"] = date
|
|
parameters["end_date"] = date
|
|
logger.info(f"Using specific date (as-is): {date}")
|
|
# Otherwise use date range if provided
|
|
else:
|
|
# If dates not provided, use current month's start and end dates
|
|
if not start_date or not end_date:
|
|
today = datetime.datetime.now()
|
|
first_day = datetime.datetime(today.year, today.month, 1)
|
|
# Get the last day of the current month
|
|
if today.month == 12:
|
|
last_day = datetime.datetime(today.year + 1, 1, 1) - datetime.timedelta(days=1)
|
|
else:
|
|
last_day = datetime.datetime(today.year, today.month + 1, 1) - datetime.timedelta(days=1)
|
|
|
|
if not start_date:
|
|
start_date = first_day.strftime("%m-%d-%Y")
|
|
logger.info(f"Using default start date: {start_date}")
|
|
|
|
if not end_date:
|
|
end_date = last_day.strftime("%m-%d-%Y")
|
|
logger.info(f"Using default end date: {end_date}")
|
|
|
|
# Process start date
|
|
if start_date:
|
|
try:
|
|
month, day, year = start_date.split('-')
|
|
formatted_date = f"{year}-{month}-{day}"
|
|
parameters["start_date"] = formatted_date
|
|
except ValueError:
|
|
# If already in YYYY-MM-DD format or other format, use as is
|
|
parameters["start_date"] = start_date
|
|
|
|
# Process end date
|
|
if end_date:
|
|
try:
|
|
month, day, year = end_date.split('-')
|
|
formatted_date = f"{year}-{month}-{day}"
|
|
parameters["end_date"] = formatted_date
|
|
except ValueError:
|
|
# If already in YYYY-MM-DD format or other format, use as is
|
|
parameters["end_date"] = end_date
|
|
|
|
url = f"{Base_url}/api/assistant/get-appointment-list-date"
|
|
self.start_date= parameters["start_date"]
|
|
self.end_date= parameters["end_date"]
|
|
if self.accessToken:
|
|
headers = {
|
|
"authorization": f"Bearer {self.accessToken}",
|
|
"accesstoken": f"{self.accessToken}",
|
|
"content-type": "application/json",
|
|
"accept": "application/json"
|
|
}
|
|
|
|
response = requests.post(url, json=parameters, headers=headers)
|
|
logger.info(f"Response status code: {response.status_code}")
|
|
logger.info(f"Response content: {response.text}")
|
|
|
|
if response.status_code == 200:
|
|
logger.info("Appointments retrieved successfully")
|
|
appointments_data = response.json()
|
|
|
|
# Format the appointments for better presentation to the user
|
|
formatted_appointments = []
|
|
for appt in appointments_data.get('appointments', []):
|
|
appt_id = appt.get('id')
|
|
patient_id = appt.get('patient_id')
|
|
patient_name = appt.get('title', 'Unknown Patient')
|
|
appt_date = appt.get('date', 'Unknown Date')
|
|
appt_time = appt.get('start_time', 'Unknown Time')
|
|
|
|
formatted_appt = {
|
|
"appointment_id": appt_id,
|
|
"patient_id": patient_id,
|
|
"patient_name": patient_name,
|
|
"date": appt_date,
|
|
"time": appt_time,
|
|
"instruction": f"To call this patient, say 'Start call with appointment {appt_id}'"
|
|
}
|
|
formatted_appointments.append(formatted_appt)
|
|
|
|
# Add instructions for the agent
|
|
agent_instructions = {
|
|
"status": "success",
|
|
"message": "Appointments retrieved successfully",
|
|
"data": formatted_appointments,
|
|
"agent_instructions": "Present these appointments to the user and ask which one they would like to call. When they specify an appointment ID, use the start_call function with the corresponding patient_id and appointment_id."
|
|
}
|
|
|
|
return agent_instructions
|
|
else:
|
|
logger.error(f"Failed to retrieve appointments. Status code: {response.status_code}")
|
|
return {"status": "error", "message": f"Failed to retrieve appointments. Status code: {response.status_code}"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting appointments: {str(e)}")
|
|
return {"error": f"Failed to get appointments: {str(e)}"}
|
|
|
|
@llm.ai_callable()
|
|
async def getAppointmentsByTimeframe(self,
|
|
timeframe: Annotated[
|
|
str, llm.TypeInfo(description="Standardized timeframe like 'today', 'tomorrow', 'this_week','current_week', 'next_week','current_date', 'current_month', etc.")
|
|
]
|
|
):
|
|
"""Get appointments for a standardized timeframe"""
|
|
try:
|
|
logger.info(f"Getting appointments for standardized timeframe: {timeframe}")
|
|
|
|
today = datetime.datetime.now()
|
|
|
|
# Calculate start and end dates based on the timeframe
|
|
if timeframe == "today" or timeframe == "current_date":
|
|
start_date = today.strftime("%m-%d-%Y")
|
|
end_date = start_date
|
|
|
|
elif timeframe == "tomorrow":
|
|
tomorrow = today + datetime.timedelta(days=1)
|
|
start_date = tomorrow.strftime("%m-%d-%Y")
|
|
end_date = start_date
|
|
|
|
elif timeframe == "this_week" or timeframe == "current_week":
|
|
# Get the start of the current week (Monday)
|
|
start_of_week = today - datetime.timedelta(days=today.weekday())
|
|
end_of_week = start_of_week + datetime.timedelta(days=6) # Sunday
|
|
start_date = start_of_week.strftime("%m-%d-%Y")
|
|
end_date = end_of_week.strftime("%m-%d-%Y")
|
|
|
|
elif timeframe == "next_week":
|
|
# Get the start of next week (next Monday)
|
|
days_until_next_monday = 7 - today.weekday()
|
|
if days_until_next_monday == 7:
|
|
days_until_next_monday = 0 # If today is Monday, get next Monday
|
|
start_of_next_week = today + datetime.timedelta(days=days_until_next_monday)
|
|
end_of_next_week = start_of_next_week + datetime.timedelta(days=6) # Sunday
|
|
start_date = start_of_next_week.strftime("%m-%d-%Y")
|
|
end_date = end_of_next_week.strftime("%m-%d-%Y")
|
|
|
|
elif timeframe == "last_week":
|
|
# Get the start of last week (previous Monday)
|
|
days_since_last_monday = today.weekday() + 7
|
|
start_of_last_week = today - datetime.timedelta(days=days_since_last_monday)
|
|
end_of_last_week = start_of_last_week + datetime.timedelta(days=6) # Sunday
|
|
start_date = start_of_last_week.strftime("%m-%d-%Y")
|
|
end_date = end_of_last_week.strftime("%m-%d-%Y")
|
|
|
|
elif timeframe == "this_month" or timeframe == "current_month":
|
|
# Get the first day of the current month
|
|
first_day = datetime.datetime(today.year, today.month, 1)
|
|
# Get the last day of the current month
|
|
if today.month == 12:
|
|
last_day = datetime.datetime(today.year + 1, 1, 1) - datetime.timedelta(days=1)
|
|
else:
|
|
last_day = datetime.datetime(today.year, today.month + 1, 1) - datetime.timedelta(days=1)
|
|
|
|
start_date = first_day.strftime("%m-%d-%Y")
|
|
end_date = last_day.strftime("%m-%d-%Y")
|
|
|
|
elif timeframe == "next_month":
|
|
# Get the first day of the next month
|
|
if today.month == 12:
|
|
first_day = datetime.datetime(today.year + 1, 1, 1)
|
|
last_day = datetime.datetime(today.year + 1, 2, 1) - datetime.timedelta(days=1)
|
|
else:
|
|
first_day = datetime.datetime(today.year, today.month + 1, 1)
|
|
if today.month == 11: # December
|
|
last_day = datetime.datetime(today.year + 1, 1, 1) - datetime.timedelta(days=1)
|
|
else:
|
|
last_day = datetime.datetime(today.year, today.month + 2, 1) - datetime.timedelta(days=1)
|
|
|
|
start_date = first_day.strftime("%m-%d-%Y")
|
|
end_date = last_day.strftime("%m-%d-%Y")
|
|
|
|
elif timeframe == "last_month":
|
|
# Get the first day of the last month
|
|
if today.month == 1:
|
|
first_day = datetime.datetime(today.year - 1, 12, 1)
|
|
last_day = datetime.datetime(today.year, 1, 1) - datetime.timedelta(days=1)
|
|
else:
|
|
first_day = datetime.datetime(today.year, today.month - 1, 1)
|
|
last_day = datetime.datetime(today.year, today.month, 1) - datetime.timedelta(days=1)
|
|
|
|
start_date = first_day.strftime("%m-%d-%Y")
|
|
end_date = last_day.strftime("%m-%d-%Y")
|
|
|
|
else:
|
|
# Default to current month if timeframe is not recognized
|
|
logger.warning(f"Unrecognized timeframe '{timeframe}', defaulting to current month")
|
|
first_day = datetime.datetime(today.year, today.month, 1)
|
|
if today.month == 12:
|
|
last_day = datetime.datetime(today.year + 1, 1, 1) - datetime.timedelta(days=1)
|
|
else:
|
|
last_day = datetime.datetime(today.year, today.month + 1, 1) - datetime.timedelta(days=1)
|
|
|
|
start_date = first_day.strftime("%m-%d-%Y")
|
|
end_date = last_day.strftime("%m-%d-%Y")
|
|
|
|
logger.info(f"Converted timeframe '{timeframe}' to date range: {start_date} to {end_date}")
|
|
|
|
# Use the existing getAppointment function with the calculated date range
|
|
return await self.getAppointment(start_date=start_date, end_date=end_date)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting appointments by timeframe: {str(e)}")
|
|
return {"error": f"Failed to get appointments for timeframe '{timeframe}': {str(e)}"}
|
|
|
|
@llm.ai_callable()
|
|
async def getAppointmentsByDate(self,
|
|
date_str: Annotated[
|
|
str, llm.TypeInfo(description="Date in natural language format like 'January 15', 'next Monday', 'May 3rd', etc.")
|
|
]
|
|
):
|
|
"""Get appointments for a specific date expressed in natural language"""
|
|
try:
|
|
logger.info(f"Getting appointments for natural date: {date_str}")
|
|
|
|
# Try to parse the natural language date
|
|
today = datetime.datetime.now()
|
|
|
|
# Handle common date formats
|
|
date_str_lower = date_str.lower().strip()
|
|
|
|
# Handle relative dates
|
|
if "today" in date_str_lower:
|
|
target_date = today
|
|
elif "tomorrow" in date_str_lower:
|
|
target_date = today + datetime.timedelta(days=1)
|
|
elif "yesterday" in date_str_lower:
|
|
target_date = today - datetime.timedelta(days=1)
|
|
elif "next monday" in date_str_lower:
|
|
days_until_next_monday = (7 - today.weekday()) % 7
|
|
if days_until_next_monday == 0:
|
|
days_until_next_monday = 7 # If today is Monday, get next Monday
|
|
target_date = today + datetime.timedelta(days=days_until_next_monday)
|
|
elif "next tuesday" in date_str_lower:
|
|
days_until_next_tuesday = (8 - today.weekday()) % 7
|
|
if days_until_next_tuesday == 0:
|
|
days_until_next_tuesday = 7
|
|
target_date = today + datetime.timedelta(days=days_until_next_tuesday)
|
|
elif "next wednesday" in date_str_lower:
|
|
days_until_next_wednesday = (9 - today.weekday()) % 7
|
|
if days_until_next_wednesday == 0:
|
|
days_until_next_wednesday = 7
|
|
target_date = today + datetime.timedelta(days=days_until_next_wednesday)
|
|
elif "next thursday" in date_str_lower:
|
|
days_until_next_thursday = (10 - today.weekday()) % 7
|
|
if days_until_next_thursday == 0:
|
|
days_until_next_thursday = 7
|
|
target_date = today + datetime.timedelta(days=days_until_next_thursday)
|
|
elif "next friday" in date_str_lower:
|
|
days_until_next_friday = (11 - today.weekday()) % 7
|
|
if days_until_next_friday == 0:
|
|
days_until_next_friday = 7
|
|
target_date = today + datetime.timedelta(days=days_until_next_friday)
|
|
elif "next saturday" in date_str_lower:
|
|
days_until_next_saturday = (12 - today.weekday()) % 7
|
|
if days_until_next_saturday == 0:
|
|
days_until_next_saturday = 7
|
|
target_date = today + datetime.timedelta(days=days_until_next_saturday)
|
|
elif "next sunday" in date_str_lower:
|
|
days_until_next_sunday = (13 - today.weekday()) % 7
|
|
if days_until_next_sunday == 0:
|
|
days_until_next_sunday = 7
|
|
target_date = today + datetime.timedelta(days=days_until_next_sunday)
|
|
# Handle current week days
|
|
elif "monday" in date_str_lower:
|
|
days_until_monday = (0 - today.weekday()) % 7
|
|
target_date = today + datetime.timedelta(days=days_until_monday)
|
|
elif "tuesday" in date_str_lower:
|
|
days_until_tuesday = (1 - today.weekday()) % 7
|
|
target_date = today + datetime.timedelta(days=days_until_tuesday)
|
|
elif "wednesday" in date_str_lower:
|
|
days_until_wednesday = (2 - today.weekday()) % 7
|
|
target_date = today + datetime.timedelta(days=days_until_wednesday)
|
|
elif "thursday" in date_str_lower:
|
|
days_until_thursday = (3 - today.weekday()) % 7
|
|
target_date = today + datetime.timedelta(days=days_until_thursday)
|
|
elif "friday" in date_str_lower:
|
|
days_until_friday = (4 - today.weekday()) % 7
|
|
target_date = today + datetime.timedelta(days=days_until_friday)
|
|
elif "saturday" in date_str_lower:
|
|
days_until_saturday = (5 - today.weekday()) % 7
|
|
target_date = today + datetime.timedelta(days=days_until_saturday)
|
|
elif "sunday" in date_str_lower:
|
|
days_until_sunday = (6 - today.weekday()) % 7
|
|
target_date = today + datetime.timedelta(days=days_until_sunday)
|
|
else:
|
|
# For more complex date strings, we'll use a simple approach
|
|
# This is a simplified approach - in a production system, you might want to use
|
|
# a more sophisticated date parsing library like dateparser
|
|
try:
|
|
# Try to parse common date formats
|
|
for fmt in ["%B %d", "%b %d", "%B %d %Y", "%b %d %Y", "%m/%d/%Y", "%m-%d-%Y", "%Y-%m-%d"]:
|
|
try:
|
|
parsed_date = datetime.datetime.strptime(date_str, fmt)
|
|
# If year is not specified, use current year
|
|
if fmt in ["%B %d", "%b %d"]:
|
|
parsed_date = parsed_date.replace(year=today.year)
|
|
target_date = parsed_date
|
|
break
|
|
except ValueError:
|
|
continue
|
|
else:
|
|
# If we couldn't parse the date, default to today
|
|
logger.warning(f"Could not parse date '{date_str}', defaulting to today")
|
|
target_date = today
|
|
except Exception as parse_error:
|
|
logger.error(f"Error parsing date '{date_str}': {str(parse_error)}")
|
|
target_date = today
|
|
|
|
# Format the date for the API call (MM-DD-YYYY)
|
|
formatted_date = target_date.strftime("%m-%d-%Y")
|
|
logger.info(f"Parsed natural date '{date_str}' to formatted date: {formatted_date}")
|
|
|
|
# Use the existing function to get appointments for this date
|
|
return await self.getAppointment(date=formatted_date)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting appointments by natural date: {str(e)}")
|
|
return {"error": f"Failed to get appointments for date '{date_str}': {str(e)}"}
|
|
@llm.ai_callable()
|
|
async def start_call_by_patient_name(self,
|
|
patient_name: Annotated[
|
|
str, llm.TypeInfo(description="Name of the patient to call")
|
|
],
|
|
date: Annotated[
|
|
str, llm.TypeInfo(description="Optional date to filter appointments (MM-DD-YYYY)")
|
|
] = None
|
|
):
|
|
"""Start a call with a patient by searching for their name in upcoming appointments.
|
|
If multiple appointments are found, the most recent one will be used."""
|
|
try:
|
|
logger.info(f"Starting call process for patient name: {patient_name}")
|
|
|
|
# First, get a list of appointments to search through
|
|
today = datetime.datetime.now()
|
|
if self.start_date and self.end_date:
|
|
# Use the provided date range
|
|
parameters = {
|
|
"start_date": self.start_date,
|
|
"end_date": self.end_date
|
|
}
|
|
else:
|
|
# If date is provided, use it; otherwise use today's date as start and a future date as end
|
|
if date:
|
|
try:
|
|
month, day, year = date.split('-')
|
|
formatted_date = f"{year}-{month}-{day}"
|
|
start_date = formatted_date
|
|
except ValueError:
|
|
# If already in YYYY-MM-DD format or other format, use as is
|
|
start_date = date
|
|
else:
|
|
# Use today as start date
|
|
start_date = today.strftime("%Y-%m-%d")
|
|
|
|
# Use a date 30 days in the future as end date if no specific date is provided
|
|
if not date:
|
|
end_date = (today + datetime.timedelta(days=30)).strftime("%Y-%m-%d")
|
|
else:
|
|
end_date = start_date
|
|
|
|
parameters = {
|
|
"start_date": start_date,
|
|
"end_date": end_date
|
|
}
|
|
logger.info(f"Response parameters: {parameters}")
|
|
url = f"{Base_url}/api/assistant/get-appointment-list-date"
|
|
|
|
if not self.accessToken:
|
|
return {
|
|
"status": "error",
|
|
"message": "Authentication token not available."
|
|
}
|
|
|
|
headers = {
|
|
"authorization": f"Bearer {self.accessToken}",
|
|
"accesstoken": f"{self.accessToken}",
|
|
"content-type": "application/json",
|
|
"accept": "application/json"
|
|
}
|
|
|
|
response = requests.post(url, json=parameters, headers=headers)
|
|
logger.info(f"Response status code: {response.status_code}")
|
|
|
|
if response.status_code != 200:
|
|
return {
|
|
"status": "error",
|
|
"message": f"Failed to retrieve appointments. Status code: {response.status_code}"
|
|
}
|
|
|
|
appointments_data = response.json()
|
|
appointments = appointments_data.get('appointments', [])
|
|
|
|
if not appointments:
|
|
return {
|
|
"status": "error",
|
|
"message": f"No appointments found for the specified date range."
|
|
}
|
|
|
|
# Search for appointments matching the patient name (case-insensitive partial match)
|
|
patient_name_lower = patient_name.lower()
|
|
matching_appointments = []
|
|
|
|
for appt in appointments:
|
|
appt_patient_name = appt.get('title', '').lower()
|
|
if patient_name_lower in appt_patient_name:
|
|
matching_appointments.append(appt)
|
|
|
|
if not matching_appointments:
|
|
return {
|
|
"status": "error",
|
|
"message": f"No appointments found for patient name '{patient_name}'. Please check the spelling or try using the appointment ID instead."
|
|
}
|
|
|
|
# Sort appointments by date (most recent first)
|
|
matching_appointments.sort(key=lambda x: x.get('date', ''), reverse=True)
|
|
|
|
# Use the first (most recent) matching appointment
|
|
selected_appointment = matching_appointments[0]
|
|
appointment_id = selected_appointment.get('id')
|
|
patient_id = selected_appointment.get('patient_id')
|
|
|
|
if not appointment_id or not patient_id:
|
|
return {
|
|
"status": "error",
|
|
"message": "The appointment information is incomplete. Please try using the appointment ID instead."
|
|
}
|
|
|
|
# Now we have both appointment_id and patient_id, proceed with the call
|
|
logger.info(f"Starting call with patient_id: {patient_id}, appointment_id: {appointment_id}")
|
|
|
|
# Show call interface for the user
|
|
participant = await self.ctx.wait_for_participant()
|
|
try:
|
|
resp = await self.ctx.room.local_participant.perform_rpc(
|
|
destination_identity=participant.identity,
|
|
method="startCall",
|
|
response_timeout=30, # 30 second timeout for the RPC call
|
|
payload=json.dumps({
|
|
"patient_id": patient_id,
|
|
"appointment_id": appointment_id,
|
|
}),
|
|
)
|
|
|
|
logger.info(f"Call interface displayed to user. Response: {json.dumps(resp)}")
|
|
return {
|
|
"status": "success",
|
|
"message": f"I've initiated a call with {selected_appointment.get('title', 'the patient')}. Please wait while the connection is established.",
|
|
"call_status": "connecting",
|
|
"appointment_details": {
|
|
"patient_name": selected_appointment.get('title'),
|
|
"date": selected_appointment.get('date'),
|
|
"time": selected_appointment.get('start_time')
|
|
}
|
|
}
|
|
except Exception as rpc_error:
|
|
logger.error(f"RPC call failed: {str(rpc_error)}")
|
|
return {
|
|
"status": "error",
|
|
"message": f"Failed to start the call: {str(rpc_error)}"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting call by patient name: {str(e)}")
|
|
return {"status": "error", "message": f"Error starting call: {str(e)}"}
|
|
|
|
|
|
|
|
async def checkUserExist(self,
|
|
email: Annotated[
|
|
str, llm.TypeInfo(description="Email address of the user to check")
|
|
]
|
|
):
|
|
"""Check if a user exists in the system based on their email address"""
|
|
try:
|
|
# Store email for later use
|
|
self.user_email = email
|
|
|
|
user_info = {"email": email}
|
|
logger.info(f"Checking user existence: {user_info}")
|
|
url = f"{Base_url}/api/check-user"
|
|
|
|
response = requests.post(url, json=user_info)
|
|
|
|
if response.status_code == 200:
|
|
logger.info("User check completed successfully")
|
|
result = response.json()
|
|
user_exists = result.get('success', False) # Adjust based on actual API response
|
|
logger.info(f"User exists: {user_exists}")
|
|
|
|
# Return a boolean value directly
|
|
return user_exists
|
|
else:
|
|
logger.error(f"Failed to check user. Status code: {response.status_code}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error checking user existence: {str(e)}")
|
|
return False
|
|
|
|
@llm.ai_callable()
|
|
async def add_practitioner(self,
|
|
firstName: Annotated[
|
|
str, llm.TypeInfo(description="First name of the practitioner")
|
|
],
|
|
lastName: Annotated[
|
|
str, llm.TypeInfo(description="Last name of the practitioner")
|
|
],
|
|
emailAddress: Annotated[
|
|
str, llm.TypeInfo(description="Email address of the practitioner")
|
|
],
|
|
password: Annotated[
|
|
str, llm.TypeInfo(description="Password for the practitioner account")
|
|
],
|
|
gender: Annotated[
|
|
str, llm.TypeInfo(description="Gender of the practitioner (Male or Female)")
|
|
],
|
|
dateOfBirth: Annotated[
|
|
str, llm.TypeInfo(description="Date of birth in MM-DD-YYYY format")
|
|
] = None,
|
|
|
|
phoneNumber: Annotated[
|
|
str, llm.TypeInfo(description="Phone number of the practitioner")
|
|
] = None,
|
|
|
|
):
|
|
"""Add a new practitioner or doctor to the system with the specified parameters"""
|
|
try:
|
|
logger.info(f"Adding new practitioner: {firstName} {lastName}")
|
|
self.user_email = emailAddress
|
|
|
|
# Check if user exists first
|
|
user_exists = await self.checkUserExist(emailAddress)
|
|
|
|
if user_exists:
|
|
logger.info(f"User with email {emailAddress} already exists")
|
|
return {
|
|
"status": "error",
|
|
"message": f"A user with email {emailAddress} already exists. Please use a different email address."
|
|
}
|
|
|
|
# Prepare the practitioner data
|
|
practitioner_data = {
|
|
"firstName": firstName,
|
|
"lastName": lastName,
|
|
"emailAddress": emailAddress,
|
|
"username": emailAddress, # Default to email if username not provided
|
|
"type": "practitioner"
|
|
}
|
|
|
|
# Add optional fields if provided
|
|
if dateOfBirth:
|
|
practitioner_data["dateOfBirth"] = dateOfBirth
|
|
else:
|
|
practitioner_data["dateOfBirth"] = "1990-01-01" # Placeholder value
|
|
if gender:
|
|
practitioner_data["gender"] = gender
|
|
|
|
# Add phone number if provided, otherwise use a default
|
|
if phoneNumber:
|
|
practitioner_data["textMessageNumber"] = phoneNumber
|
|
else:
|
|
practitioner_data["textMessageNumber"] = '+1-123-456-7890'
|
|
|
|
if password:
|
|
practitioner_data["newUserPassword"] = password
|
|
practitioner_data["adminPassword"] = password
|
|
|
|
# Set default values for required fields that might be missing
|
|
practitioner_data.setdefault("sendEmail", False)
|
|
practitioner_data.setdefault("analytics", "All Reports")
|
|
|
|
url = f"{Base_url}/api/assistant/add-user"
|
|
|
|
if not self.accessToken:
|
|
return {
|
|
"status": "error",
|
|
"message": "Authentication token not available."
|
|
}
|
|
|
|
headers = {
|
|
"authorization": f"Bearer {self.accessToken}",
|
|
"accesstoken": f"{self.accessToken}",
|
|
"content-type": "application/json",
|
|
"accept": "application/json"
|
|
}
|
|
|
|
logger.info(f"Sending practitioner data: {json.dumps(practitioner_data)}")
|
|
response = requests.post(url, json=practitioner_data, headers=headers)
|
|
logger.info(f"Response status code: {response.status_code}")
|
|
logger.info(f"Response content: {response.text}")
|
|
|
|
if response.status_code == 200:
|
|
response_data = response.json()
|
|
return {
|
|
"status": "success",
|
|
"message": f"Successfully added practitioner {firstName} {lastName}",
|
|
"data": response_data
|
|
}
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": f"Failed to add practitioner. Status code: {response.status_code}",
|
|
"details": response.text
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding practitioner: {str(e)}")
|
|
return {"status": "error", "message": f"Error adding practitioner: {str(e)}"}
|
|
|
|
@llm.ai_callable()
|
|
async def add_practitioner_availability(self,
|
|
practitioner_id: Annotated[
|
|
int, llm.TypeInfo(description="ID of the practitioner to add availability for")
|
|
],
|
|
date: Annotated[
|
|
str, llm.TypeInfo(description="Start Date for the availability in YYYY-MM-DD format or natural language like '1 May 2025'")
|
|
],
|
|
start_time: Annotated[
|
|
str, llm.TypeInfo(description="Start time in format like '07:30 AM'")
|
|
],
|
|
until_date: Annotated[
|
|
str, llm.TypeInfo(description="End date for repeating availability in YYYY-MM-DD format or natural language like '1 May 2025'")
|
|
],
|
|
end_time: Annotated[
|
|
str, llm.TypeInfo(description="End time in format like '03:15 PM'")
|
|
],
|
|
repeats: Annotated[
|
|
bool, llm.TypeInfo(description="Whether this availability repeats")
|
|
] = True,
|
|
select_days: Annotated[
|
|
str, llm.TypeInfo(description="Comma-separated list of days when availability repeats (e.g., 'Monday,Tuesday')")
|
|
] = None,
|
|
|
|
comment: Annotated[
|
|
str, llm.TypeInfo(description="Optional comment about this availability")
|
|
] = ""
|
|
):
|
|
"""Add availability for a practitioner with the specified parameters"""
|
|
try:
|
|
logger.info(f"Adding availability for practitioner ID: {practitioner_id}")
|
|
|
|
start_date = self._parse_date_to_iso(date)
|
|
if not start_date:
|
|
return {
|
|
"status": "error",
|
|
"message": f"Invalid date format: {date}. Please use YYYY-MM-DD format or natural language like '1 May 2025'."
|
|
}
|
|
|
|
# Parse and format the end date if provided
|
|
end_date = None
|
|
if until_date:
|
|
end_date = self._parse_date_to_iso(until_date)
|
|
if not end_date:
|
|
return {
|
|
"status": "error",
|
|
"message": f"Invalid until_date format: {until_date}. Please use YYYY-MM-DD format or natural language."
|
|
}
|
|
else:
|
|
# Default to start date if no end date provided
|
|
end_date = start_date
|
|
|
|
# Format the start and end datetimes in ISO 8601 format with UTC timezone
|
|
start_datetime_result = self._format_datetime_start(start_date, start_time,self.getTimezone)
|
|
end_datetime_result = self._format_datetime_end(end_date, end_time,self.getTimezone)
|
|
|
|
if not start_datetime_result or not end_datetime_result:
|
|
return {
|
|
"status": "error",
|
|
"message": "Failed to format date and time correctly."
|
|
}
|
|
|
|
|
|
start_time_utc = start_datetime_result['time']
|
|
end_time_utc = end_datetime_result['time']
|
|
# start_time_24h = self._convert_time_to_24h(start_time_utc)
|
|
# end_time_24h = self._convert_time_to_24h(end_time_utc)
|
|
|
|
# # Format the start and end times for the API with UTC timezone
|
|
# # The API expects ISO format strings for start and end
|
|
# start_datetime = f"{start_date}T{start_time_24h}"
|
|
# end_datetime = f"{start_date}T{end_time_24h}"
|
|
|
|
# Prepare the availability data
|
|
availability_data = {
|
|
"practitioner_id": practitioner_id,
|
|
"shift_title": "",
|
|
"type": "availability",
|
|
"allDay": False,
|
|
"date": start_date, # Use the parsed ISO date
|
|
"start": start_datetime_result['formatted_string'],
|
|
"end": end_datetime_result['formatted_string'],
|
|
"start_time": start_time_utc, # Keep original format for display
|
|
"end_time": end_time_utc, # Keep original format for display
|
|
"service": [],
|
|
"facility": 0,
|
|
"billing_facility": 0,
|
|
"repeats": repeats,
|
|
"repeatFrequency": "",
|
|
"repeatUnit": "",
|
|
"comment": comment
|
|
}
|
|
|
|
|
|
|
|
# Add repeating information if applicable
|
|
if repeats:
|
|
if until_date:
|
|
availability_data["untilDate"] = end_date
|
|
else:
|
|
availability_data["untilDate"] = end_date # Default to the same date if not specified
|
|
|
|
availability_data["weakDay"] = True if select_days else False
|
|
|
|
if select_days:
|
|
# Convert comma-separated string to list
|
|
days_list = [day.strip() for day in select_days.split(',')]
|
|
availability_data["selectDays"] = days_list
|
|
|
|
logger.info(f"Response content: {availability_data}")
|
|
|
|
url = f"{Base_url}/api/assistant/provider-add-availability"
|
|
|
|
if not self.accessToken:
|
|
return {
|
|
"status": "error",
|
|
"message": "Authentication token not available."
|
|
}
|
|
|
|
headers = {
|
|
"authorization": f"Bearer {self.accessToken}",
|
|
"accesstoken": f"{self.accessToken}",
|
|
"content-type": "application/json",
|
|
"accept": "application/json"
|
|
}
|
|
|
|
logger.info(f"Sending availability data: {json.dumps(availability_data)}")
|
|
response = requests.post(url, json=availability_data, headers=headers)
|
|
logger.info(f"Response status code: {response.status_code}")
|
|
|
|
|
|
if response.status_code == 200:
|
|
response_data = response.json()
|
|
return {
|
|
"status": "success",
|
|
"message": f"Successfully added availability for practitioner ID {practitioner_id}",
|
|
"data": response_data
|
|
}
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": f"Failed to add availability. Status code: {response.status_code}",
|
|
"details": response.text
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error adding practitioner availability: {str(e)}")
|
|
return {"status": "error", "message": f"Error adding availability: {str(e)}"}
|
|
|
|
def _convert_time_to_24h(self, time_str):
|
|
"""Helper method to convert time from 12-hour format (e.g., '8:00 AM') to ISO 8601 format (e.g., '12:00:00.000Z')"""
|
|
try:
|
|
# Parse the time string
|
|
time_parts = time_str.split()
|
|
time_components = time_parts[0].split(':')
|
|
|
|
hour = int(time_components[0])
|
|
minute = int(time_components[1]) if len(time_components) > 1 else 0
|
|
|
|
# Adjust for AM/PM
|
|
if len(time_parts) > 1:
|
|
am_pm = time_parts[1].upper()
|
|
if am_pm == 'PM' and hour < 12:
|
|
hour += 12
|
|
elif am_pm == 'AM' and hour == 12:
|
|
hour = 0
|
|
|
|
# Format as ISO 8601 time with milliseconds and 'Z' suffix to indicate UTC timezone
|
|
# For this specific format, we're using a fixed UTC time without offset calculation
|
|
return f"{hour:02d}:{minute:02d}:00.000Z"
|
|
except Exception as e:
|
|
logger.error(f"Error converting time format: {str(e)}")
|
|
return time_str # Return original if conversion fails
|
|
|
|
@llm.ai_callable()
|
|
async def get_practitioners(self):
|
|
"""Get a list of all practitioners in the system"""
|
|
try:
|
|
logger.info("Getting list of practitioners")
|
|
|
|
url = f"{Base_url}/api/assistant/practitioners-list"
|
|
|
|
if not self.accessToken:
|
|
return {
|
|
"status": "error",
|
|
"message": "Authentication token not available."
|
|
}
|
|
|
|
headers = {
|
|
"authorization": f"Bearer {self.accessToken}",
|
|
"accesstoken": f"{self.accessToken}",
|
|
"content-type": "application/json",
|
|
"accept": "application/json"
|
|
}
|
|
|
|
response = requests.get(url, headers=headers)
|
|
|
|
logger.info(f"Response status code: {response.status_code}")
|
|
|
|
if response.status_code == 200:
|
|
practitioners_data = response.json()
|
|
logger.info(f"Response status code: {practitioners_data.get('data', [])}")
|
|
# Format the practitioners for better presentation
|
|
formatted_practitioners = []
|
|
for practitioner in practitioners_data.get('data', []):
|
|
formatted_practitioner = {
|
|
"id": practitioner.get('id'),
|
|
"name": f"{practitioner.get('fname', '')} {practitioner.get('lname', '')}",
|
|
"email": practitioner.get('email', '')
|
|
}
|
|
formatted_practitioners.append(formatted_practitioner)
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "Successfully retrieved practitioners",
|
|
"data": formatted_practitioners
|
|
}
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": f"Failed to retrieve practitioners. Status code: {response.status_code}",
|
|
"details": response.text if hasattr(response, 'text') else None
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting practitioners: {str(e)}")
|
|
return {"status": "error", "message": f"Error getting practitioners: {str(e)}"}
|
|
def _parse_date_to_iso(self, date_str):
|
|
"""Helper method to parse various date formats to ISO format (YYYY-MM-DD)"""
|
|
try:
|
|
# Check if already in YYYY-MM-DD format
|
|
if re.match(r'^\d{4}-\d{2}-\d{2}$', date_str):
|
|
return date_str
|
|
|
|
# Try to parse common date formats
|
|
for fmt in [
|
|
"%d %B %Y", # 1 May 2025
|
|
"%d %b %Y", # 1 May 2025
|
|
"%B %d %Y", # May 1 2025
|
|
"%b %d %Y", # May 1 2025
|
|
"%m/%d/%Y", # 05/01/2025
|
|
"%m-%d-%Y", # 05-01-2025
|
|
"%d/%m/%Y", # 01/05/2025
|
|
"%d-%m-%Y" # 01-05-2025
|
|
]:
|
|
try:
|
|
parsed_date = datetime.datetime.strptime(date_str, fmt)
|
|
# Just return the formatted date without UTC conversion
|
|
return parsed_date.strftime("%Y-%m-%d")
|
|
except ValueError:
|
|
continue
|
|
|
|
# Try to handle natural language date expressions
|
|
date_str_lower = date_str.lower().strip()
|
|
today = datetime.datetime.now()
|
|
|
|
# Handle relative dates
|
|
if "today" in date_str_lower:
|
|
return today.strftime("%Y-%m-%d")
|
|
elif "tomorrow" in date_str_lower:
|
|
tomorrow = today + datetime.timedelta(days=1)
|
|
return tomorrow.strftime("%Y-%m-%d")
|
|
elif "yesterday" in date_str_lower:
|
|
yesterday = today - datetime.timedelta(days=1)
|
|
return yesterday.strftime("%Y-%m-%d")
|
|
|
|
# Handle day names
|
|
days = {"monday": 0, "tuesday": 1, "wednesday": 2, "thursday": 3,
|
|
"friday": 4, "saturday": 5, "sunday": 6}
|
|
|
|
for day_name, day_num in days.items():
|
|
if day_name in date_str_lower:
|
|
# Calculate days until the next occurrence of this day
|
|
days_ahead = (day_num - today.weekday()) % 7
|
|
if "next" in date_str_lower:
|
|
# If "next" is specified, add 7 days if it would otherwise be today
|
|
if days_ahead == 0:
|
|
days_ahead = 7
|
|
next_day = today + datetime.timedelta(days=days_ahead)
|
|
return next_day.strftime("%Y-%m-%d")
|
|
|
|
# Try to extract month and day from strings like "May 1" or "1 May"
|
|
months = {
|
|
"january": 1, "february": 2, "march": 3, "april": 4, "may": 5, "june": 6,
|
|
"july": 7, "august": 8, "september": 9, "october": 10, "november": 11, "december": 12,
|
|
"jan": 1, "feb": 2, "mar": 3, "apr": 4, "jun": 6, "jul": 7, "aug": 8,
|
|
"sep": 9, "sept": 9, "oct": 10, "nov": 11, "dec": 12
|
|
}
|
|
|
|
for month_name, month_num in months.items():
|
|
if month_name in date_str_lower:
|
|
# Try to extract the day number
|
|
day_match = re.search(r'\b(\d{1,2})(st|nd|rd|th)?\b', date_str_lower)
|
|
if day_match:
|
|
day = int(day_match.group(1))
|
|
# Determine year (current year, unless the date would be in the past)
|
|
year = today.year
|
|
if month_num < today.month or (month_num == today.month and day < today.day):
|
|
year += 1
|
|
|
|
try:
|
|
parsed_date = datetime.datetime(year, month_num, day)
|
|
return parsed_date.strftime("%Y-%m-%d")
|
|
except ValueError:
|
|
# Invalid date (e.g., February 30)
|
|
pass
|
|
|
|
# If all parsing attempts failed
|
|
logger.error(f"Could not parse date: {date_str}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing date: {str(e)}")
|
|
return None
|
|
|
|
def _convert_to_utc(self, dt):
|
|
"""
|
|
This function is kept for compatibility but now simply returns the datetime object
|
|
without timezone conversion since we're only working with date formats
|
|
"""
|
|
return dt # Simply return the datetime object without conversion
|
|
|
|
|
|
def _format_datetime_start(self, date, time_str, timezone='EST'):
|
|
"""
|
|
Convert a date and time from a specific timezone to UTC format
|
|
Similar to formatDateTimeStart in JavaScript
|
|
|
|
Args:
|
|
date (str): Date in YYYY-MM-DD format
|
|
time_str (str): Time in format like '07:30 AM'
|
|
timezone (str): Source timezone (default: 'EST')
|
|
|
|
Returns:
|
|
dict: Dictionary containing:
|
|
- 'formatted_string': Formatted datetime string in ISO 8601 format with UTC timezone
|
|
- 'datetime_obj': Python datetime object in UTC timezone
|
|
- 'date': Date component in YYYY-MM-DD format
|
|
- 'time': Time component in HH:MM AM/PM format
|
|
"""
|
|
try:
|
|
# Use built-in datetime module
|
|
import datetime
|
|
|
|
# Log the input parameters for debugging
|
|
logger.info(f"_format_datetime_start called with date={date}, time_str={time_str}, timezone={timezone}")
|
|
|
|
# Parse the date and time
|
|
datetime_str = f"{date} {time_str}"
|
|
|
|
# Try different formats for the time
|
|
formats = [
|
|
"%Y-%m-%d %I:%M %p", # 07:30 AM
|
|
"%Y-%m-%d %H:%M", # 07:30 (24-hour)
|
|
"%Y-%m-%d %I:%M%p", # 07:30AM (no space)
|
|
"%Y-%m-%d %I:%M" # 07:30 (12-hour, no AM/PM)
|
|
]
|
|
|
|
dt = None
|
|
for fmt in formats:
|
|
try:
|
|
dt = datetime.datetime.strptime(datetime_str, fmt)
|
|
logger.info(f"Successfully parsed datetime with format: {fmt}")
|
|
break
|
|
except ValueError:
|
|
continue
|
|
|
|
if not dt:
|
|
raise ValueError(f"Could not parse datetime: {datetime_str}")
|
|
|
|
# Handle timezone conversion using a more robust approach
|
|
# First, create a naive datetime (without timezone)
|
|
naive_dt = dt
|
|
|
|
# Then, handle the timezone conversion manually
|
|
# For EST (UTC-5), add 5 hours to convert to UTC
|
|
timezone_offsets = {
|
|
'EST': -4, # Eastern Standard Time (UTC-5)
|
|
'EDT': -4, # Eastern Daylight Time (UTC-4)
|
|
#'CST': -6, # Central Standard Time (UTC-6)
|
|
'CST': -5, # Central Daylight Time (UTC-5)
|
|
'MST': -6, # Mountain Standard Time (UTC-7)
|
|
#'MDT': -6, # Mountain Daylight Time (UTC-6)
|
|
'PST': -7, # Pacific Standard Time (UTC-8)
|
|
'PDT': -7, # Pacific Daylight Time (UTC-7)
|
|
'CDT': -5 # Central Daylight Time (UTC-5)
|
|
}
|
|
|
|
# Get the UTC offset for the specified timezone
|
|
offset_hours = timezone_offsets.get(timezone.upper(), 0)
|
|
logger.info(f"Using timezone offset: {offset_hours} hours for timezone: {timezone}")
|
|
|
|
# Apply the offset to convert to UTC
|
|
utc_dt = naive_dt + datetime.timedelta(hours=-offset_hours)
|
|
|
|
# Format in ISO 8601 with milliseconds and Z suffix
|
|
formatted_datetime = utc_dt.strftime("%Y-%m-%dT%H:%M:%S.000Z")
|
|
|
|
# Log detailed information about the conversion
|
|
logger.info(f"Original datetime: {dt.strftime('%Y-%m-%d %I:%M %p')} {timezone}")
|
|
logger.info(f"UTC datetime: {utc_dt.strftime('%Y-%m-%d %H:%M:%S')} UTC")
|
|
logger.info(f"Formatted ISO datetime: {formatted_datetime}")
|
|
logger.info(f"Date component: {utc_dt.strftime('%Y-%m-%d')}")
|
|
logger.info(f"Time component (AM/PM): {utc_dt.strftime('%I:%M %p')}")
|
|
logger.info(f"Time component (24h): {utc_dt.strftime('%H:%M:%S')}")
|
|
|
|
# Return a dictionary with both the formatted string and the datetime object
|
|
return {
|
|
'formatted_string': formatted_datetime,
|
|
'datetime_obj': utc_dt,
|
|
'date': utc_dt.strftime("%Y-%m-%d"),
|
|
'time': utc_dt.strftime("%I:%M %p") # Format time in AM/PM format
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error formatting datetime: {str(e)}")
|
|
return None
|
|
|
|
def _format_datetime_end(self, date, end_time, timezone='EST'):
|
|
"""
|
|
Convert a date and end time to UTC format
|
|
Similar to formatDateTimeEnd in JavaScript
|
|
|
|
Args:
|
|
date (str): Date in YYYY-MM-DD format
|
|
end_time (str): Time in format like '03:15 PM'
|
|
timezone (str, optional): Timezone name. Defaults to 'EST'.
|
|
|
|
Returns:
|
|
dict: Dictionary containing:
|
|
- 'formatted_string': Formatted datetime string in ISO 8601 format with UTC timezone
|
|
- 'datetime_obj': Python datetime object in UTC timezone
|
|
- 'date': Date component in YYYY-MM-DD format
|
|
- 'time': Time component in HH:MM:SS format
|
|
"""
|
|
try:
|
|
# Use built-in datetime module
|
|
import datetime
|
|
|
|
# Log the input parameters for debugging
|
|
logger.info(f"_format_datetime_end called with date={date}, end_time={end_time}, timezone={timezone}")
|
|
|
|
# Parse the date and time
|
|
datetime_str = f"{date} {end_time}"
|
|
# Try different formats for the time
|
|
formats = [
|
|
"%Y-%m-%d %I:%M %p", # 03:15 PM
|
|
"%Y-%m-%d %H:%M", # 15:15 (24-hour)
|
|
"%Y-%m-%d %I:%M%p", # 03:15PM (no space)
|
|
"%Y-%m-%d %I:%M" # 03:15 (12-hour, no AM/PM)
|
|
]
|
|
|
|
dt = None
|
|
for fmt in formats:
|
|
try:
|
|
dt = datetime.datetime.strptime(datetime_str, fmt)
|
|
logger.info(f"Successfully parsed datetime with format: {fmt}")
|
|
break
|
|
except ValueError:
|
|
continue
|
|
|
|
if not dt:
|
|
raise ValueError(f"Could not parse datetime: {datetime_str}")
|
|
|
|
# Handle timezone conversion using a more robust approach
|
|
# First, create a naive datetime (without timezone)
|
|
naive_dt = dt
|
|
|
|
# Then, handle the timezone conversion manually
|
|
# For EST (UTC-5), add 5 hours to convert to UTC
|
|
timezone_offsets = {
|
|
'EST': -4, # Eastern Standard Time (UTC-5)
|
|
'EDT': -4, # Eastern Daylight Time (UTC-4)
|
|
#'CST': -6, # Central Standard Time (UTC-6)
|
|
'CST': -5, # Central Daylight Time (UTC-5)
|
|
'MST': -6, # Mountain Standard Time (UTC-7)
|
|
#'MDT': -6, # Mountain Daylight Time (UTC-6)
|
|
'PST': -7, # Pacific Standard Time (UTC-8)
|
|
'PDT': -7, # Pacific Daylight Time (UTC-7)
|
|
'CDT': -5 # Central Daylight Time (UTC-5)
|
|
# 'EST': -4, # Eastern Daylight Time (UTC-4)
|
|
# 'EDT': -4, # Eastern Daylight Time (UTC-4)
|
|
# 'CST': -6, # Central Standard Time (UTC-6)
|
|
|
|
# 'MST': -7, # Mountain Standard Time (UTC-7)
|
|
# 'MDT': -6, # Mountain Daylight Time (UTC-6)
|
|
# 'PST': -8, # Pacific Standard Time (UTC-8)
|
|
# 'PDT': -7 # Pacific Daylight Time (UTC-7)
|
|
}
|
|
|
|
# Get the UTC offset for the specified timezone
|
|
offset_hours = timezone_offsets.get(timezone.upper(), 0)
|
|
logger.info(f"Using timezone offset: {offset_hours} hours for timezone: {timezone}")
|
|
|
|
# Apply the offset to convert to UTC
|
|
utc_dt = naive_dt + datetime.timedelta(hours=-offset_hours)
|
|
|
|
# Format in ISO 8601 with milliseconds and Z suffix
|
|
formatted_datetime = utc_dt.strftime("%Y-%m-%dT%H:%M:%S.000Z")
|
|
|
|
# Log detailed information about the conversion
|
|
logger.info(f"Original datetime: {dt.strftime('%Y-%m-%d %I:%M %p')} {timezone}")
|
|
logger.info(f"UTC datetime: {utc_dt.strftime('%Y-%m-%d %H:%M:%S')} UTC")
|
|
logger.info(f"Formatted ISO datetime: {formatted_datetime}")
|
|
logger.info(f"Date component: {utc_dt.strftime('%Y-%m-%d')}")
|
|
logger.info(f"Time component (AM/PM): {utc_dt.strftime('%I:%M %p')}")
|
|
logger.info(f"Time component (24h): {utc_dt.strftime('%H:%M:%S')}")
|
|
|
|
# Return a dictionary with both the formatted string and the datetime object
|
|
return {
|
|
'formatted_string': formatted_datetime,
|
|
'datetime_obj': utc_dt,
|
|
'date': utc_dt.strftime("%Y-%m-%d"),
|
|
'time': utc_dt.strftime("%I:%M %p") # Format time in AM/PM format
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error formatting end datetime: {str(e)}")
|
|
return None
|
|
|
|
@llm.ai_callable()
|
|
async def addCategory(self,
|
|
categoryName: Annotated[
|
|
str, llm.TypeInfo(description="Name of the category to save")
|
|
]
|
|
):
|
|
"""Save a category with the given name and return status"""
|
|
try:
|
|
category_data = {
|
|
"name": categoryName
|
|
}
|
|
|
|
logger.info(f"Category data: {category_data}")
|
|
url = f"{Base_url}/api/assistant/save-category"
|
|
|
|
if self.accessToken:
|
|
headers = {
|
|
"authorization": f"Bearer {self.accessToken}",
|
|
"accesstoken": f"{self.accessToken}",
|
|
"content-type": "application/json",
|
|
"accept": "application/json"
|
|
}
|
|
|
|
response = requests.post(url, json=category_data, headers=headers)
|
|
|
|
if response.status_code == 200:
|
|
logger.info("Category data sent successfully")
|
|
category_id = response.json().get('category_id')
|
|
self.category_id=category_id
|
|
return {
|
|
"status": "success",
|
|
"message": "Category saved successfully",
|
|
|
|
}
|
|
|
|
else:
|
|
logger.error(f"Failed to save category. Status code: {response.status_code}")
|
|
return {"status": "error", "message": f"Failed to save category. Status code: {response.status_code}"}
|
|
else:
|
|
logger.error("No access token available. Please register or login first.")
|
|
return {"status": "error", "message": "Authentication required. Please register or login first."}
|
|
except Exception as e:
|
|
logger.error(f"Error saving category: {str(e)}")
|
|
return {"status": "error", "message": f"Error saving category: {str(e)}"}
|
|
|
|
@llm.ai_callable()
|
|
async def createProduct(self,
|
|
productName: Annotated[
|
|
str, llm.TypeInfo(description="Name of the product")
|
|
],
|
|
productCatName: Annotated[
|
|
str, llm.TypeInfo(description="Name of the product catgory")
|
|
],
|
|
productPrice: Annotated[
|
|
float, llm.TypeInfo(description="Price of the product")
|
|
],
|
|
productDescription: Annotated[
|
|
str, llm.TypeInfo(description="Description of the product")
|
|
],
|
|
prescription: Annotated[
|
|
str, llm.TypeInfo(description="prescrption of the product (yes, no)")
|
|
]
|
|
|
|
):
|
|
"""Create a new product with the specified details"""
|
|
try:
|
|
# Store product data for reference
|
|
self.product_data = {
|
|
"name": productName,
|
|
"price": productPrice,
|
|
"productCatName": productCatName,
|
|
"description": productDescription
|
|
}
|
|
logger.info(f"Creating product catagory: {productCatName}")
|
|
slug = productName.lower().replace(' ', '-').replace('&', 'and')
|
|
is_prescription = True if prescription.lower() == "yes" else False
|
|
product_info = {
|
|
"name": productName,
|
|
"slug": slug,
|
|
"description": productDescription,
|
|
"price": productPrice,
|
|
"type": "One Time",
|
|
"isPrescription": is_prescription,
|
|
"inTakeForm_id": "",
|
|
"questiioneriesForm_id": "",
|
|
"category_id": productCatName,
|
|
"sku": "",
|
|
"status": "active"
|
|
}
|
|
|
|
logger.info(f"Creating product: {product_info}")
|
|
url = f"{Base_url}/api/assistant/save-product"
|
|
|
|
if self.accessToken:
|
|
headers = {
|
|
"authorization": f"Bearer {self.accessToken}",
|
|
"accesstoken": f"{self.accessToken}",
|
|
"content-type": "application/json",
|
|
"accept": "application/json"
|
|
}
|
|
|
|
response = requests.post(url, json=product_info, headers=headers)
|
|
|
|
if response.status_code == 200:
|
|
logger.info("Product created successfully")
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": "Product created successfully",
|
|
}
|
|
else:
|
|
logger.error(f"Failed to create product. Status code: {response.status_code}")
|
|
return {"status": "error", "message": f"Failed to create product. Status code: {response.status_code}"}
|
|
else:
|
|
logger.error("No access token available. Please register or login first.")
|
|
return {"status": "error", "message": "Authentication required. Please register or login first."}
|
|
except Exception as e:
|
|
logger.error(f"Error creating product: {str(e)}")
|
|
return {"status": "error", "message": f"Error creating product: {str(e)}"}
|
|
|
|
|
|
@llm.ai_callable()
|
|
async def suggestFormFields(self,
|
|
formName: Annotated[
|
|
str, llm.TypeInfo(description="Name of the form")
|
|
],
|
|
formType: Annotated[
|
|
str, llm.TypeInfo(description="Type of form (charting-forms, document-forms, simple-forms, consent-forms)")
|
|
]
|
|
):
|
|
"""Suggest appropriate form fields based on form name and type"""
|
|
try:
|
|
# Store form metadata for later use
|
|
self.form_data = {
|
|
"name": formName,
|
|
"type": formType
|
|
}
|
|
|
|
suggested_fields = []
|
|
|
|
# Generate field suggestions based on form type
|
|
if "charting" in formType.lower():
|
|
suggested_fields = [
|
|
{"label": "Date of Visit", "type": "Date", "required": True},
|
|
{"label": "Chief Complaint", "type": "Textarea", "required": True},
|
|
{"label": "Vital Signs", "type": "Textarea", "required": False},
|
|
{"label": "Assessment", "type": "Textarea", "required": True},
|
|
{"label": "Treatment Plan", "type": "Textarea", "required": True},
|
|
{"label": "Provider Signature", "type": "Signature", "required": True}
|
|
]
|
|
|
|
# Add more specific fields based on form name
|
|
if "progress" in formName.lower():
|
|
suggested_fields.extend([
|
|
{"label": "Progress Notes", "type": "Textarea", "required": True},
|
|
{"label": "Follow-up Plan", "type": "Textarea", "required": True}
|
|
])
|
|
elif "initial" in formName.lower():
|
|
suggested_fields.extend([
|
|
{"label": "Medical History", "type": "Textarea", "required": True},
|
|
{"label": "Family History", "type": "Textarea", "required": False},
|
|
{"label": "Social History", "type": "Textarea", "required": False},
|
|
{"label": "Allergies", "type": "Textarea", "required": True}
|
|
])
|
|
|
|
elif "document" in formType.lower():
|
|
suggested_fields = [
|
|
{"label": "Document Title", "type": "Text", "required": True},
|
|
{"label": "Date", "type": "Date", "required": True},
|
|
{"label": "Document Content", "type": "Textarea", "required": True},
|
|
{"label": "Supporting Files", "type": "File", "required": False},
|
|
{"label": "Provider Signature", "type": "Signature", "required": True}
|
|
]
|
|
|
|
elif "consent" in formType.lower():
|
|
suggested_fields = [
|
|
{"label": "Date", "type": "Date", "required": True},
|
|
{"label": "Consent Statement", "type": "Textarea", "required": True},
|
|
{"label": "I understand and agree to the above", "type": "Checkbox", "required": True},
|
|
{"label": "Patient Signature", "type": "Signature", "required": True},
|
|
{"label": "Provider Signature", "type": "Signature", "required": True}
|
|
]
|
|
|
|
else: # simple-forms or default
|
|
suggested_fields = [
|
|
{"label": "Patient First Name", "type": "Text", "required": True, "name": "patient_custom_fname", "readonly": True},
|
|
{"label": "Patient Last Name", "type": "Text", "required": True, "name": "patient_custom_lname", "readonly": True},
|
|
{"label": "Patient Gender", "type": "Select", "required": True, "name": "patient_custom_gender_identity",
|
|
"options": ["Male", "Female", "Other"]},
|
|
{"label": "Patient DOB", "type": "date", "required": True, "name": "patient_custom_DOB"},
|
|
{"label": "Patient Email", "type": "Text", "required": True, "name": "patient_custom_email"},
|
|
{"label": "Patient Phone", "type": "Text", "required": True, "name": "patient_extra_data_preferred_phone"},
|
|
|
|
]
|
|
|
|
# Store suggested fields for later modification
|
|
self.form_data["suggested_fields"] = suggested_fields
|
|
|
|
return {
|
|
"status": "success",
|
|
"fields": suggested_fields,
|
|
"message": f"Suggested {len(suggested_fields)} fields for {formType} form: {formName}"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error suggesting form fields: {str(e)}")
|
|
return {"status": "error", "message": f"Error suggesting form fields: {str(e)}"}
|
|
|
|
@llm.ai_callable()
|
|
async def modifyFormFields(self,
|
|
modifications: Annotated[
|
|
str, llm.TypeInfo(description="JSON string of field modifications (add, remove, or change)")
|
|
]
|
|
):
|
|
"""Modify the suggested form fields based on user feedback"""
|
|
try:
|
|
if not self.form_data.get("suggested_fields"):
|
|
return {"status": "error", "message": "No suggested fields found. Please suggest fields first."}
|
|
|
|
# Parse the modifications
|
|
mods = json.loads(modifications)
|
|
current_fields = self.form_data["suggested_fields"]
|
|
|
|
# Handle additions
|
|
if "add" in mods:
|
|
for new_field in mods["add"]:
|
|
current_fields.append(new_field)
|
|
|
|
# Handle removals
|
|
if "remove" in mods:
|
|
labels_to_remove = [f.lower() for f in mods["remove"]]
|
|
current_fields = [f for f in current_fields if f["label"].lower() not in labels_to_remove]
|
|
|
|
# Handle changes
|
|
if "change" in mods:
|
|
for change in mods["change"]:
|
|
if "old_label" in change and "new_field" in change:
|
|
old_label = change["old_label"]
|
|
new_field = change["new_field"]
|
|
|
|
# Find and update the field
|
|
for i, field in enumerate(current_fields):
|
|
if field["label"].lower() == old_label.lower():
|
|
current_fields[i] = new_field
|
|
break
|
|
|
|
# Update the stored fields
|
|
self.form_data["suggested_fields"] = current_fields
|
|
|
|
return {
|
|
"status": "success",
|
|
"fields": current_fields,
|
|
"message": "Form fields modified successfully"
|
|
}
|
|
|
|
except json.JSONDecodeError:
|
|
logger.error("Invalid JSON format for modifications")
|
|
return {"status": "error", "message": "Invalid JSON format for modifications"}
|
|
except Exception as e:
|
|
logger.error(f"Error modifying form fields: {str(e)}")
|
|
return {"status": "error", "message": f"Error modifying form fields: {str(e)}"}
|
|
|
|
@llm.ai_callable()
|
|
async def saveForm(self,
|
|
formName: Annotated[
|
|
str, llm.TypeInfo(description="Name of the form to be saved")
|
|
],
|
|
formType: Annotated[
|
|
str, llm.TypeInfo(description="Type of form (charting-forms, document-forms, simple-forms, consent-forms)")
|
|
],
|
|
formFields: Annotated[
|
|
str, llm.TypeInfo(description="JSON string of form fields, each containing label, type, and options")
|
|
]
|
|
):
|
|
"""Save form configuration with fields and return status"""
|
|
try:
|
|
# Parse the JSON string to a list
|
|
logger.info(f"Form data: {json.dumps(formFields)}")
|
|
formeo_json_str = await self.convertToFormeoJson(formFields)
|
|
|
|
form_data = {
|
|
"data": formeo_json_str,
|
|
"name": formName,
|
|
"type": formType
|
|
}
|
|
|
|
logger.info(f"Saving form: {formName} of type {formType}")
|
|
logger.info(f"Form data: {json.dumps(form_data)}")
|
|
url = f"{Base_url}/api/assistant/store-form"
|
|
|
|
if self.accessToken:
|
|
headers = {
|
|
"Authorization": f"Bearer {self.accessToken}",
|
|
"accesstoken": f"{self.accessToken}",
|
|
"content-type": "application/json",
|
|
"accept": "application/json"
|
|
}
|
|
|
|
response = requests.post(url, json=form_data, headers=headers)
|
|
logger.info(f"Response Status Code: {response.status_code}")
|
|
logger.info(f"Response Content: {response.text}")
|
|
if response.status_code == 200:
|
|
logger.info("Form saved successfully")
|
|
self.onboarding_stage = "product_setup" # Advance to next stage
|
|
return {
|
|
"status": "success",
|
|
"message": "Form saved successfully",
|
|
"next_stage": "product_setup"
|
|
}
|
|
else:
|
|
logger.error(f"Failed to save form. Status code: {response.status_code}")
|
|
return {"status": "error", "message": f"Failed to save form. Status code: {response.status_code}"}
|
|
else:
|
|
logger.error("No access token available. Please register or login first.")
|
|
return {"status": "error", "message": "Authentication required. Please register or login first."}
|
|
except json.JSONDecodeError:
|
|
logger.error("Invalid JSON format for formFields")
|
|
return {"status": "error", "message": "Invalid JSON format for form fields"}
|
|
except Exception as e:
|
|
logger.error(f"Error saving form: {str(e)}")
|
|
return {"status": "error", "message": f"Error saving form: {str(e)}"}
|
|
|
|
@llm.ai_callable()
|
|
async def bookAppointment(self,
|
|
practitioner_id: Annotated[
|
|
str, llm.TypeInfo(description="ID of the practitioner to check availability for")
|
|
]
|
|
):
|
|
"""Book an appointment process by checking a practitioner's availability first"""
|
|
try:
|
|
logger.info(f"Starting appointment booking process for practitioner ID: {practitioner_id}")
|
|
|
|
# First, check if the practitioner exists and has availability
|
|
availability_result = await self.check_practitioner_availability(
|
|
practitioner_id=practitioner_id
|
|
)
|
|
|
|
if availability_result.get("status") == "error":
|
|
return {
|
|
"status": "error",
|
|
"message": f"Failed to check practitioner availability: {availability_result.get('message')}",
|
|
"details": availability_result
|
|
}
|
|
|
|
# Initialize booking_data as a dictionary if it doesn't exist
|
|
|
|
self.booking_data = {}
|
|
|
|
# Store the practitioner ID for the next steps
|
|
self.booking_data["practitioner_id"] = practitioner_id
|
|
|
|
|
|
|
|
# Get the formatted available dates
|
|
available_dates = availability_result.get("allowed_dates", [])
|
|
|
|
|
|
|
|
if not available_dates:
|
|
return {
|
|
"status": "error",
|
|
"message": f"No available slots found for practitioner {practitioner_id} in the current month.",
|
|
"next_step": "Try a different practitioner or a different month"
|
|
}
|
|
|
|
# Store all slots data for future reference
|
|
self.booking_data["all_slots_data"] = availability_result.get("all_slots_data", [])
|
|
available_slots = availability_result.get("all_slots_data", [])
|
|
logger.info(f"Available dates for practitioner ID {practitioner_id}: {available_slots}")
|
|
return {
|
|
"status": "success",
|
|
"message": f"Found availability for practitioner {practitioner_id} on {len(available_dates)} dates",
|
|
"available_dates": available_dates,
|
|
"next_step": "select_date",
|
|
"instruction": "Please select one of the available date to proceed with booking"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting appointment booking: {str(e)}")
|
|
return {"status": "error", "message": f"Error starting appointment booking: {str(e)}"}
|
|
@llm.ai_callable()
|
|
async def find_patient(self,
|
|
search_term: Annotated[
|
|
str, llm.TypeInfo(description="Name, email, or ID of the patient to search for")
|
|
]
|
|
):
|
|
"""Find a patient by name, email, or ID"""
|
|
try:
|
|
logger.info(f"Searching for patient with term: {search_term}")
|
|
|
|
url = f"{Base_url}/api/emr/patients-list"
|
|
|
|
if not self.accessToken:
|
|
return {
|
|
"status": "error",
|
|
"message": "Authentication token not available."
|
|
}
|
|
|
|
headers = {
|
|
"authorization": f"Bearer {self.accessToken}",
|
|
"accesstoken": f"{self.accessToken}",
|
|
"content-type": "application/json",
|
|
"accept": "application/json"
|
|
}
|
|
|
|
params = {
|
|
"search": search_term
|
|
}
|
|
|
|
response = requests.get(url, headers=headers, params=params)
|
|
logger.info(f"Response status code: {response.status_code}")
|
|
|
|
if response.status_code == 200:
|
|
patients_data = response.json()
|
|
patients = patients_data.get('data', [])
|
|
|
|
if not patients:
|
|
return {
|
|
"status": "error",
|
|
"message": f"No patients found matching '{search_term}'."
|
|
}
|
|
|
|
# Format the patients for better presentation
|
|
formatted_patients = []
|
|
for patient in patients:
|
|
formatted_patient = {
|
|
"id": patient.get('id'),
|
|
"name": f"{patient.get('fname', '')} {patient.get('lname', '')}",
|
|
"email": patient.get('email', ''),
|
|
}
|
|
formatted_patients.append(formatted_patient)
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": f"Found {len(formatted_patients)} patients matching '{search_term}'",
|
|
"patients": formatted_patients
|
|
}
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": f"Failed to search patients. Status code: {response.status_code}",
|
|
"details": response.text
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching patients: {str(e)}")
|
|
return {"status": "error", "message": f"Error searching patients: {str(e)}"}
|
|
|
|
|
|
@llm.ai_callable()
|
|
async def check_practitioner_availability(self,
|
|
practitioner_id: Annotated[
|
|
str, llm.TypeInfo(description="ID of the practitioner to check availability for")
|
|
],
|
|
):
|
|
"""Check a practitioner's availability get all available dates"""
|
|
try:
|
|
# If no date is provided, use current month
|
|
today = datetime.datetime.now()
|
|
date = today.strftime("%Y-%m-%d")
|
|
logger.info(f"No date provided, using current date: {date}")
|
|
|
|
logger.info(f"Checking availability for practitioner ID: {practitioner_id} on date: {date}")
|
|
|
|
# Parse and format the date
|
|
formatted_date = self._parse_date_to_iso(date)
|
|
if not formatted_date:
|
|
return {
|
|
"status": "error",
|
|
"message": f"Invalid date format: {date}. Please use YYYY-MM-DD format or natural language."
|
|
}
|
|
|
|
# Extract month from the formatted date
|
|
try:
|
|
date_obj = datetime.datetime.strptime(formatted_date, "%Y-%m-%d")
|
|
month = date_obj.strftime("%m") # Get month as 01-12 string
|
|
year = date_obj.strftime("%Y") # Get year as YYYY string
|
|
except ValueError:
|
|
return {
|
|
"status": "error",
|
|
"message": f"Failed to extract month from date: {formatted_date}"
|
|
}
|
|
|
|
url = f"{Base_url}/api/assistant/get-available-slots-data/{practitioner_id}"
|
|
|
|
if not self.accessToken:
|
|
return {
|
|
"status": "error",
|
|
"message": "Authentication token not available."
|
|
}
|
|
|
|
headers = {
|
|
"authorization": f"Bearer {self.accessToken}",
|
|
"accesstoken": f"{self.accessToken}",
|
|
"content-type": "application/json",
|
|
"accept": "application/json"
|
|
}
|
|
|
|
params = {
|
|
"month": month,
|
|
"timezone": self.getTimezone or "EST"
|
|
}
|
|
|
|
logger.info(f"Requesting availability with params: {params}")
|
|
response = requests.post(url, headers=headers, json=params)
|
|
|
|
|
|
if response.status_code == 200:
|
|
availability_data = response.json()
|
|
|
|
# The response is a dictionary with dates as keys and arrays of slots as values
|
|
all_dates = availability_data
|
|
|
|
if not all_dates:
|
|
return {
|
|
"status": "error",
|
|
"message": f"No availability found for practitioner {practitioner_id} in {month}/{year}"
|
|
}
|
|
|
|
# Extract the date keys (similar to Object.keys() in JavaScript)
|
|
date_keys = list(all_dates.keys())
|
|
|
|
# Format the dates (similar to the JavaScript map operation)
|
|
allowed_dates = []
|
|
for date_key in date_keys:
|
|
# Only include dates that have available slots
|
|
if date_key in all_dates and all_dates[date_key] and len(all_dates[date_key]) > 0:
|
|
# Format date from YYYY-MM-DD to MM-DD-YYYY for display
|
|
try:
|
|
date_obj = datetime.datetime.strptime(date_key, "%Y-%m-%d")
|
|
formatted_display = date_obj.strftime("%m-%d-%Y")
|
|
|
|
allowed_dates.append({
|
|
"title": formatted_display, # Display format MM-DD-YYYY
|
|
"value": date_key, # Original date value YYYY-MM-DD
|
|
"slots_count": len(all_dates[date_key]) # Number of available slots
|
|
})
|
|
|
|
except ValueError:
|
|
logger.warning(f"Could not format date: {date_key}")
|
|
|
|
# Sort dates chronologically
|
|
allowed_dates.sort(key=lambda x: x["value"])
|
|
logger.info(f"Checking availability {allowed_dates}")
|
|
return {
|
|
"status": "success",
|
|
"message": f"Found availability on {len(allowed_dates)} dates for practitioner {practitioner_id}",
|
|
"allowed_dates": allowed_dates,
|
|
"all_slots_data": all_dates # Keep the full data for reference
|
|
}
|
|
else:
|
|
return {
|
|
"status": "error",
|
|
"message": f"Failed to retrieve availability. Status code: {response.status_code}",
|
|
"details": response.text
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking practitioner availability: {str(e)}")
|
|
return {"status": "error", "message": f"Error checking practitioner availability: {str(e)}"}
|
|
|
|
|
|
@llm.ai_callable()
|
|
async def select_appointment_date(self,
|
|
date: Annotated[
|
|
str, llm.TypeInfo(description="Date for the appointment in YYYY-MM-DD format or natural language")
|
|
]
|
|
):
|
|
"""Select a date for the appointment after checking practitioner availability"""
|
|
try:
|
|
if not hasattr(self, 'booking_data') or not self.booking_data:
|
|
return {
|
|
"status": "error",
|
|
"message": "No active booking process. Please start by selecting a practitioner first.",
|
|
"next_step": "bookAppointment"
|
|
}
|
|
|
|
practitioner_id = self.booking_data.get("practitioner_id")
|
|
if not practitioner_id:
|
|
return {
|
|
"status": "error",
|
|
"message": "Practitioner ID not found in booking data. Please start the booking process again.",
|
|
"next_step": "bookAppointment"
|
|
}
|
|
|
|
# Parse the date if it's in natural language format
|
|
parsed_date = self._parse_date_to_iso(date) if hasattr(self, '_parse_date_to_iso') else date
|
|
if not parsed_date:
|
|
return {
|
|
"status": "error",
|
|
"message": f"Invalid date format: {date}. Please use YYYY-MM-DD format or natural language.",
|
|
"next_step": "select_appointment_date"
|
|
}
|
|
|
|
logger.info(f"Selected date for appointment: {parsed_date}")
|
|
|
|
# Check if the selected date is in the available dates
|
|
all_slots_data = self.booking_data.get("all_slots_data", {})
|
|
|
|
if not all_slots_data:
|
|
return {
|
|
"status": "error",
|
|
"message": "No available slots data found. Please check practitioner availability first.",
|
|
"next_step": "bookAppointment"
|
|
}
|
|
|
|
# Check if the date exists in the available slots
|
|
if parsed_date not in all_slots_data:
|
|
# Try to find the date in the formatted_datetime fields
|
|
found = False
|
|
for date_key, slots in all_slots_data.items():
|
|
for slot in slots:
|
|
if parsed_date in slot.get("formatted_datetime", ""):
|
|
parsed_date = date_key
|
|
found = True
|
|
break
|
|
if found:
|
|
break
|
|
|
|
if not found:
|
|
return {
|
|
"status": "error",
|
|
"message": f"No available slots found for the selected date: {parsed_date}",
|
|
"next_step": "select_appointment_date",
|
|
"instruction": "Please select a different date from the available dates"
|
|
}
|
|
|
|
# Get the available time slots for the selected date
|
|
available_slots = all_slots_data.get(parsed_date, [])
|
|
|
|
if not available_slots or len(available_slots) == 0:
|
|
return {
|
|
"status": "error",
|
|
"message": f"No available time slots found for the selected date: {parsed_date}",
|
|
"next_step": "select_appointment_date",
|
|
"instruction": "Please select a different date from the available dates"
|
|
}
|
|
|
|
# Format the time slots for display
|
|
formatted_slots = []
|
|
for slot in available_slots:
|
|
start_time = slot.get("start_time", "")
|
|
formatted_datetime = slot.get("formatted_datetime", "")
|
|
|
|
# Extract time from formatted_datetime
|
|
time_part = ""
|
|
if " " in formatted_datetime:
|
|
_, time_part = formatted_datetime.split(" ", 1)
|
|
|
|
formatted_slots.append({
|
|
"start_time": start_time,
|
|
"formatted_time": time_part,
|
|
"value": start_time, # Keep original ISO format for backend
|
|
"display": time_part or start_time
|
|
})
|
|
|
|
# Store the selected date in booking data
|
|
self.booking_data["selected_date"] = parsed_date
|
|
self.booking_data["available_slots"] = formatted_slots
|
|
|
|
# Format the date for display
|
|
try:
|
|
date_obj = datetime.datetime.strptime(parsed_date, "%Y-%m-%d")
|
|
formatted_display_date = date_obj.strftime("%A, %B %d, %Y") # e.g., "Monday, January 1, 2023"
|
|
except ValueError:
|
|
formatted_display_date = parsed_date
|
|
|
|
logger.info(f"Selected date for appointment: {parsed_date} ={formatted_display_date}={formatted_slots}")
|
|
return {
|
|
"status": "success",
|
|
"message": f"Found {len(formatted_slots)} available time slots on {formatted_display_date}",
|
|
"date": parsed_date,
|
|
"formatted_date": formatted_display_date,
|
|
"available_slots": formatted_slots,
|
|
"next_step": "select_appointment_time",
|
|
"instruction": "Please select one of the available time slots to proceed with booking"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error selecting appointment date: {str(e)}")
|
|
return {"status": "error", "message": f"Error selecting appointment date: {str(e)}"}
|
|
|
|
@llm.ai_callable()
|
|
async def select_appointment_time_and_patient(self,
|
|
time: Annotated[
|
|
str, llm.TypeInfo(description="Time for the appointment in HH:MM AM/PM format")
|
|
],
|
|
patient_id: Annotated[
|
|
str, llm.TypeInfo(description="ID of the patient for the appointment")
|
|
]
|
|
):
|
|
"""Select a time slot and patient for the appointment after selecting a date"""
|
|
try:
|
|
if not hasattr(self, 'booking_data') or not self.booking_data.get("practitioner_id") or not self.booking_data.get("selected_date"):
|
|
return {
|
|
"status": "error",
|
|
"message": "Incomplete booking information. Please select a practitioner and date first.",
|
|
"next_step": "start_appointment_booking"
|
|
}
|
|
|
|
practitioner_id = self.booking_data.get("practitioner_id")
|
|
selected_date = self.booking_data.get("selected_date")
|
|
available_slots = self.booking_data.get("available_slots", [])
|
|
|
|
logger.info(f"Selecting time {time} for date {selected_date} with practitioner {practitioner_id}")
|
|
logger.info(f"Available slots: {available_slots}")
|
|
|
|
# Find the matching time slot from our stored available slots
|
|
selected_slot = None
|
|
for slot in available_slots:
|
|
formatted_time = slot.get("formatted_time", "")
|
|
display_time = slot.get("display", "")
|
|
|
|
# Try to match with either formatted_time or display
|
|
if self._time_matches(formatted_time, time) or self._time_matches(display_time, time):
|
|
selected_slot = slot
|
|
logger.info(f"Found matching slot: {slot}")
|
|
break
|
|
|
|
if not selected_slot:
|
|
return {
|
|
"status": "error",
|
|
"message": f"No available slot found at {time} on {selected_date}",
|
|
"available_times": [slot.get("display") for slot in available_slots],
|
|
"next_step": "select_appointment_time",
|
|
"instruction": "Please select from one of the available times"
|
|
}
|
|
|
|
# Verify that the patient exists
|
|
patient_result = await self.find_patient(search_term=patient_id)
|
|
|
|
if patient_result.get("status") != "success":
|
|
return {
|
|
"status": "error",
|
|
"message": f"Failed to verify patient ID: {patient_id}",
|
|
"details": patient_result,
|
|
"next_step": "find_patient",
|
|
"instruction": "Please provide a valid patient ID or search for a patient"
|
|
}
|
|
|
|
# Store the selected time and patient
|
|
self.booking_data["time"] = time
|
|
self.booking_data["display_time"] = selected_slot.get("display", time)
|
|
self.booking_data["start_time"] = selected_slot.get("start_time") # This is the ISO format time for the backend
|
|
self.booking_data["patient_id"] = patient_id
|
|
self.booking_data["selected_slot"] = selected_slot
|
|
|
|
# Get patient details for confirmation
|
|
patient_details = None
|
|
if patient_result.get("patients") and len(patient_result.get("patients")) > 0:
|
|
for patient in patient_result.get("patients"):
|
|
if str(patient.get("id")) == str(patient_id):
|
|
patient_details = patient
|
|
break
|
|
|
|
# Now we have all the information needed to book the appointment
|
|
return {
|
|
"status": "success",
|
|
"message": f"Selected time {selected_slot.get('display')} on {selected_date} for patient {patient_details.get('name') if patient_details else patient_id}",
|
|
"booking_details": {
|
|
"practitioner_id": practitioner_id,
|
|
"date": selected_date,
|
|
"display_time": selected_slot.get("display", time),
|
|
"start_time": selected_slot.get("start_time"), # ISO format time for backend
|
|
"patient_id": patient_id,
|
|
"patient_name": patient_details.get("name") if patient_details else "Unknown"
|
|
},
|
|
"next_step": "confirm_booking",
|
|
"instruction": "Please confirm the booking details or provide appointment type and notes"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error selecting appointment time and patient: {str(e)}")
|
|
return {"status": "error", "message": f"Error selecting appointment time and patient: {str(e)}"}
|
|
|
|
|
|
@llm.ai_callable()
|
|
async def confirm_booking(self,
|
|
confirm: Annotated[
|
|
bool, llm.TypeInfo(description="Confirm the booking (true) or cancel (false)")
|
|
],
|
|
appointment_type: Annotated[
|
|
str, llm.TypeInfo(description="Type of appointment ('Telehealth Visit','Follow Up','Lab Review','Initial Consult', 'Office Visit', etc.)")
|
|
] = "Consultation",
|
|
notes: Annotated[
|
|
str, llm.TypeInfo(description="Any additional notes for the appointment")
|
|
] = "",
|
|
service: Annotated[
|
|
str, llm.TypeInfo(description="Service type (e.g., 'Patient Followup', 'New Patient', etc.)")
|
|
] = "Patient Followup",
|
|
payment_type: Annotated[
|
|
str, llm.TypeInfo(description="Payment type ('Cash Only', 'Insurance Only','Cash and Insurance','Auto','Workers','Prepaid')")
|
|
] = "Cash Only",
|
|
|
|
|
|
):
|
|
"""Confirm and finalize the appointment booking with optional appointment type and notes"""
|
|
try:
|
|
if not hasattr(self, 'booking_data') or not self.booking_data.get("practitioner_id") or not self.booking_data.get("selected_date") or not self.booking_data.get("start_time"):
|
|
return {
|
|
"status": "error",
|
|
"message": "Incomplete booking information. Please complete the booking process first.",
|
|
"next_step": "start_appointment_booking"
|
|
}
|
|
|
|
if not confirm:
|
|
# Clear booking data and cancel the process
|
|
self.booking_data = {}
|
|
return {
|
|
"status": "info",
|
|
"message": "Booking process cancelled.",
|
|
"next_step": "start_appointment_booking"
|
|
}
|
|
|
|
# Extract all required booking information
|
|
practitioner_id = self.booking_data.get("practitioner_id")
|
|
patient_id = self.booking_data.get("patient_id")
|
|
appointment_date = self.booking_data.get("selected_date")
|
|
start_time = self.booking_data.get("start_time")
|
|
display_time = self.booking_data.get("display_time", "")
|
|
|
|
logger.info(f"Confirming appointment booking for patient {patient_id} with practitioner {practitioner_id}")
|
|
logger.info(f"Date: {appointment_date}, Time: {start_time} ({display_time})")
|
|
logger.info(f"Type: {appointment_type}, Notes: {notes}")
|
|
|
|
# Calculate end time (assuming 30 minutes appointment duration)
|
|
# This is a placeholder - you may need to adjust based on your requirements
|
|
end_time = "09:00 PM" # Default end time
|
|
|
|
# Get patient name for title
|
|
patient_name = "Patient Appointment"
|
|
try:
|
|
patient_result = await self.find_patient(search_term=patient_id)
|
|
if patient_result.get("status") == "success" and patient_result.get("patients"):
|
|
for patient in patient_result.get("patients"):
|
|
if str(patient.get("id")) == str(patient_id):
|
|
patient_name = patient.get("name", "Patient Appointment")
|
|
break
|
|
except Exception as e:
|
|
logger.warning(f"Could not retrieve patient name: {str(e)}")
|
|
|
|
# Prepare the booking data for API request according to the required payload structure
|
|
booking_data = {
|
|
"practitioner_id": practitioner_id,
|
|
"patient_id": patient_id,
|
|
"title": patient_name,
|
|
"date": appointment_date,
|
|
"start_time": start_time, # Use the ISO format time from the slot
|
|
"end_time": end_time,
|
|
"appointment_type": appointment_type,
|
|
"service": service,
|
|
"payment_type": payment_type,
|
|
"location": '',
|
|
"room": '',
|
|
"notes": notes,
|
|
"status": "ON_TIME"
|
|
}
|
|
|
|
url = f"{Base_url}/api/assistant/book-appointment"
|
|
|
|
if not self.accessToken:
|
|
return {
|
|
"status": "error",
|
|
"message": "Authentication token not available.",
|
|
"next_step": "login"
|
|
}
|
|
|
|
headers = {
|
|
"authorization": f"Bearer {self.accessToken}",
|
|
"accesstoken": f"{self.accessToken}",
|
|
"content-type": "application/json",
|
|
"accept": "application/json"
|
|
}
|
|
|
|
logger.info(f"Sending booking request with data: {booking_data}")
|
|
response = requests.post(url, headers=headers, json=booking_data)
|
|
logger.info(f"Booking response status code: {response.status_code}")
|
|
logger.info(f"Booking response content: {response.text}")
|
|
|
|
if response.status_code == 200:
|
|
booking_result = response.json()
|
|
|
|
# Format the date and time for display
|
|
try:
|
|
date_obj = datetime.datetime.strptime(appointment_date, "%Y-%m-%d")
|
|
formatted_date = date_obj.strftime("%A, %B %d, %Y") # e.g., "Monday, January 1, 2023"
|
|
except ValueError:
|
|
formatted_date = appointment_date
|
|
|
|
# Clear booking data as the process is complete
|
|
self.booking_data = {}
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": f"Appointment successfully booked!",
|
|
"appointment_details": {
|
|
"id": booking_result.get("data", {}).get("id", ""),
|
|
"practitioner_id": practitioner_id,
|
|
"patient_id": patient_id,
|
|
"patient_name": patient_name,
|
|
"date": formatted_date,
|
|
"time": display_time or start_time,
|
|
"appointment_type": appointment_type,
|
|
"service": service,
|
|
"payment_type": payment_type,
|
|
"notes": notes
|
|
},
|
|
"confirmation": f"Appointment confirmed for {patient_name} on {formatted_date} at {display_time or start_time}",
|
|
"next_step": "appointment_booked"
|
|
}
|
|
else:
|
|
error_message = "Failed to book appointment"
|
|
try:
|
|
error_data = response.json()
|
|
if "message" in error_data:
|
|
error_message = error_data["message"]
|
|
except:
|
|
error_message = f"Failed to book appointment. Status code: {response.status_code}"
|
|
|
|
return {
|
|
"status": "error",
|
|
"message": error_message,
|
|
"details": response.text,
|
|
"next_step": "retry_booking"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error confirming appointment booking: {str(e)}")
|
|
return {
|
|
"status": "error",
|
|
"message": f"Error confirming appointment booking: {str(e)}",
|
|
"next_step": "retry_booking"
|
|
}
|
|
|
|
|
|
|
|
def _time_matches(self, slot_time, input_time):
|
|
"""Helper function to compare if two time strings match, handling different formats"""
|
|
try:
|
|
# Normalize both times to 24-hour format for comparison
|
|
slot_time_obj = datetime.datetime.strptime(slot_time, "%I:%M %p")
|
|
slot_time_24h = slot_time_obj.strftime("%H:%M")
|
|
|
|
input_time_obj = datetime.datetime.strptime(input_time, "%I:%M %p")
|
|
input_time_24h = input_time_obj.strftime("%H:%M")
|
|
|
|
return slot_time_24h == input_time_24h
|
|
except ValueError:
|
|
# If parsing fails, do a direct string comparison
|
|
return slot_time.strip() == input_time.strip() |