calendar-chatbot / tools /modify_event_tool.py
taha-the-data-scientist's picture
Upload 4 files
465fb49 verified
raw
history blame
23.4 kB
from agentpro.tools import Tool
from typing import Any, List, Tuple, Optional
import datetime
import dateutil.parser
import re
class ModifyEventTool(Tool):
"""
Tool to change the date, time, and/or duration of an existing Google Calendar event.
Accepts natural‐language commands such as:
• “Shift the first meeting on Saturday to Monday”
• “Shift the first meeting on Saturday to 5 AM”
• “Reschedule all my meetings from Saturday to Sunday.”
• “Reschedule the second meeting from Monday to Wednesday at 10 AM.”
• “Shift the first meeting on 7th June to 9th June.”
• “Change my first meeting on June 10, 2025, to June 12, 2025, at 2 PM.”
• “Reschedule all of my tomorrow meetings to Sunday.”
• “Move my ‘Team Sync’ meeting today from 3 PM to 4 PM.”
• “Change ‘Project Discussion’ meeting tomorrow to 2:30 PM.”
• “Reschedule my third appointment tomorrow to Friday at 11 AM.”
"""
# ── 1) Class attributes ─────────────────────────────────────────────────────────────────
name: str = "Modify Event"
description: str = (
"Change the date, time, and/or duration of an existing Google Calendar event. "
"Supports natural‐language like 'Shift the first meeting on Saturday to 5 AM.'"
)
action_type: str = "modify_event"
input_format: str = (
"Natural‐language command describing which event(s) to modify and how.\n"
"Examples:\n"
" • “Shift the first meeting on Saturday to Monday.”\n"
" • “Shift the first meeting on Saturday to 5 AM.”\n"
" • “Shift the first meeting on 7th June to 9th June.”\n"
" • “Reschedule all of my tomorrow meetings to Sunday.”\n"
" • “Move my ‘Team Sync’ meeting today from 3 PM to 4 PM.”\n"
" • “Change ‘Project Discussion’ meeting tomorrow to 2:30 PM.”\n"
" • “Reschedule the second meeting from Monday to Wednesday at 10 AM.”\n"
" • “Change my first meeting on June 10, 2025, to June 12, 2025, at 2 PM.”\n"
" • “Reschedule my third appointment tomorrow to Friday at 11 AM.”\n"
)
# ── 2) We expect a Google Calendar “service” to be passed in at instantiation ───────────
service: Any
# ── 3) Main entry point ─────────────────────────────────────────────────────────────────
def run(self, input_text: Any) -> str:
"""
Parse a natural‐language modification command, identify target event(s), and update them.
Returns a human‐readable confirmation or an error message if something goes wrong.
"""
text = str(input_text).strip()
# 1) Split into “source” (which event(s) to modify) vs “target” (new date/time)
source_part, target_part = self._split_source_target(text)
if source_part is None or target_part is None:
return (
"Sorry, I couldn't identify which part of your command specifies the modification. "
"Please use a format like “Shift the first meeting on Saturday to 5 AM.”"
)
# 2) From source_part, extract ordinal (“first”, “second”, “last”, “all”), title (if any),
# and source_date_spec (weekday/explicit date/today/tomorrow).
ordinal = self._extract_ordinal(source_part)
title = self._extract_title(source_part)
source_date_spec = self._extract_date_spec(source_part)
if source_date_spec is None:
return (
"Sorry, I couldn't determine which day or date you meant. "
"Please specify a weekday (e.g., 'Monday'), 'today', 'tomorrow', or an explicit date (e.g., 'June 10, 2025')."
)
# 3) Resolve source_date_spec → a concrete `datetime.date`
source_date = self._resolve_date(source_date_spec)
if source_date is None:
return f"Sorry, I couldn’t parse the date '{source_date_spec}'."
# 4) Fetch all non‐all‐day events on that source_date
events = self._fetch_events_on_date(source_date)
if not events:
return f"You have no non‐all‐day events on {source_date.strftime('%B %d, %Y')}."
# 5) Select which events to modify based on ordinal/title
target_events = self._select_target_events(events, ordinal, title)
if isinstance(target_events, str):
# an error message string
return target_events
if not target_events:
return f"No events found matching that specification on {source_date.strftime('%B %d, %Y')}."
# 6) Parse the “target” spec to determine new_date_spec and new_time_spec
new_date_spec, new_time_spec = self._parse_target_part(target_part)
# 7) Resolve new_date_spec → `datetime.date` (if given). If omitted, keep original date.
new_date: Optional[datetime.date] = None
if new_date_spec:
new_date = self._resolve_date(new_date_spec)
if new_date is None:
return f"Sorry, I couldn’t parse the target date '{new_date_spec}'."
# 8) Resolve new_time_spec → either (new_start_time, new_end_time) or (new_start_time, None).
new_start_time: Optional[datetime.time] = None
new_end_time: Optional[datetime.time] = None
if new_time_spec:
parsed = self._resolve_time_spec(new_time_spec)
if parsed is None:
return (
f"Sorry, I couldn’t parse the target time '{new_time_spec}'. "
f"Please specify like 'at 2 PM', 'to 4 PM', or 'from 3 PM to 5 PM'."
)
new_start_time, new_end_time = parsed
# 9) For each selected event, compute new start/end datetimes (preserving duration logic)
updates: List[Tuple[str, datetime.datetime, datetime.datetime, datetime.datetime, datetime.datetime]] = []
# Each tuple: (event_summary, old_start_dt, old_end_dt, new_start_dt, new_end_dt)
for ev in target_events:
old_start_dt = datetime.datetime.fromisoformat(
ev["start"]["dateTime"].replace("Z", "+00:00")
)
old_end_dt = datetime.datetime.fromisoformat(
ev["end"]["dateTime"].replace("Z", "+00:00")
)
original_duration = old_end_dt - old_start_dt
# Determine which date to apply: either new_date or old_start_dt.date()
apply_date = new_date if new_date else old_start_dt.date()
# Keep the same tzinfo as the original event
tzinfo = old_start_dt.tzinfo
if new_start_time:
# Case A: New time is provided (could be start-only or start+end)
new_start_dt = datetime.datetime.combine(apply_date, new_start_time, tzinfo=tzinfo)
if new_end_time:
new_end_dt = datetime.datetime.combine(apply_date, new_end_time, tzinfo=tzinfo)
else:
# Single new_start: preserve original duration
new_end_dt = new_start_dt + original_duration
else:
# Case B: No new time provided → keep original start/end times but shift date if needed
original_start_time = old_start_dt.time()
original_end_time = old_end_dt.time()
new_start_dt = datetime.datetime.combine(apply_date, original_start_time, tzinfo=tzinfo)
new_end_dt = datetime.datetime.combine(apply_date, original_end_time, tzinfo=tzinfo)
# 10) Update the event in Google Calendar
_ = self._update_event(
event_id=ev["id"],
new_start_iso=new_start_dt.isoformat(),
new_end_iso=new_end_dt.isoformat(),
)
updates.append((ev.get("summary", "(no title)"), old_start_dt, old_end_dt, new_start_dt, new_end_dt))
# 11) Return a confirmation message
return self._format_confirmation(updates)
# ────────────────────────────────────────────────────────────────────────────────────────────
def _split_source_target(self, text: str) -> Tuple[Optional[str], Optional[str]]:
"""
Naively split the input_text into source_part (which events) vs target_part (new date/time).
We look for the first occurrence of ' to ' that is not part of a 'from X to Y' time range.
"""
lowered = text.lower()
# If there's a "from X to Y" time‐range, skip that " to " and find the next " to "
time_range_match = re.search(
r"\bfrom\s+\d{1,2}(:\d{2})?\s*(am|pm)?\s+to\s+\d{1,2}(:\d{2})?\s*(am|pm)?",
lowered
)
if time_range_match:
span_start, span_end = time_range_match.span()
next_to = lowered.find(" to ", span_end)
if next_to != -1:
before = text[:next_to]
after = text[next_to + len(" to "):]
return before.strip(), after.strip()
# Fallback: split on the first ' to '
parts = re.split(r"\s+to\s+", text, maxsplit=1)
if len(parts) == 2:
return parts[0].strip(), parts[1].strip()
return None, None
def _extract_ordinal(self, text: str) -> Optional[str]:
"""
Extract ordinal keyword (first, second, third, ..., last, all).
Returns the matched keyword (lowercased) or None if none found.
"""
match = re.search(
r"\b(first|second|third|fourth|fifth|sixth|seventh|eighth|ninth|tenth|last|all)\b",
text,
re.IGNORECASE
)
return match.group(1).lower() if match else None
def _extract_title(self, text: str) -> Optional[str]:
"""
If the user specified an event title in quotes (single or double),
return that substring (without quotes). Otherwise, None.
"""
match = re.search(r"[‘'“\"]([^‘'”\"]+)[’'”\"]", text)
if match:
return match.group(1).strip()
return None
def _extract_date_spec(self, text: str) -> Optional[str]:
"""
Return the substring that indicates a source date: 'today', 'tomorrow',
a weekday name, or an explicit date. Otherwise, None.
"""
lowered = text.lower()
if "today" in lowered:
return "today"
if "tomorrow" in lowered:
return "tomorrow"
# Weekday names
weekdays = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
for wd in weekdays:
if re.search(rf"\b{wd}\b", lowered):
return wd
# Check if there's an explicit date phrase: must contain a month name or ISO format
if re.search(r"\b(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\w*\b", lowered) \
or re.search(r"\b\d{4}-\d{2}-\d{2}\b", text) \
or re.search(r"\b\d{1,2}(st|nd|rd|th)?\s+(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\w*\b", lowered):
# We'll hand the full text to dateutil.parse later
return text
return None
def _resolve_date(self, spec: str) -> Optional[datetime.date]:
"""
Convert specifications like 'today', 'tomorrow', 'Saturday',
'7th June', 'June 10, 2025' into a datetime.date object.
"""
spec_lower = spec.strip().lower()
today = datetime.date.today()
if spec_lower == "today":
return today
if spec_lower == "tomorrow":
return today + datetime.timedelta(days=1)
# Weekday resolution: find next occurrence (this week or next)
weekdays_map = {
"monday": 0,
"tuesday": 1,
"wednesday": 2,
"thursday": 3,
"friday": 4,
"saturday": 5,
"sunday": 6,
}
if spec_lower in weekdays_map:
target_wd = weekdays_map[spec_lower]
today_wd = today.weekday() # Monday=0 ... Sunday=6
if target_wd >= today_wd:
delta = target_wd - today_wd
else:
delta = 7 - (today_wd - target_wd)
return today + datetime.timedelta(days=delta)
# Try explicit date parsing (assume current year if omitted)
try:
parsed = dateutil.parser.parse(
spec,
fuzzy=True,
default=datetime.datetime(today.year, 1, 1),
)
return parsed.date()
except (ValueError, OverflowError):
return None
def _fetch_events_on_date(self, date_obj: datetime.date) -> List[dict]:
"""
Fetch all non‐all‐day events on the provided date (UTC midnight to next midnight).
Returns a list of event dicts (as returned by Google Calendar API).
"""
start_of_day = datetime.datetime.combine(date_obj, datetime.time.min).isoformat() + "Z"
end_of_day = (datetime.datetime.combine(date_obj, datetime.time.min)
+ datetime.timedelta(days=1)).isoformat() + "Z"
events_res = (
self.service.events()
.list(
calendarId="primary",
timeMin=start_of_day,
timeMax=end_of_day,
singleEvents=True,
orderBy="startTime"
)
.execute()
)
items = events_res.get("items", [])
# Filter out all‐day events (those have 'start.date' instead of 'start.dateTime')
non_all_day = [ev for ev in items if ev.get("start", {}).get("dateTime")]
return non_all_day
def _select_target_events(
self,
events: List[dict],
ordinal: Optional[str],
title: Optional[str]
) -> Any:
"""
Given a list of events on the same date, choose which to modify based on:
- If title is provided: filter by case‐insensitive substring match.
• If exactly one match → return [that_event].
• If multiple:
- If ordinal == 'first' or 'last' → pick earliest or latest among those matches.
- Else → return an error string prompting clarification.
• If no match → return [].
- If no title but ordinal provided:
- Sort all events by start time.
- 'first' → [earliest], 'second' → [second-earliest], 'last' → [latest], 'all' → all.
- If the specified ordinal index is out of range → return [].
- If neither title nor ordinal → return an error string asking for clarification.
"""
# Title-based selection
if title:
matches = [
ev for ev in events
if title.lower() in (ev.get("summary", "") or "").lower()
]
if not matches:
return []
if len(matches) == 1:
return matches
# Multiple matches and ordinal present?
if ordinal in ("first", "last"):
sorted_by_time = sorted(
matches,
key=lambda ev: datetime.datetime.fromisoformat(
ev["start"]["dateTime"].replace("Z", "+00:00")
)
)
return [sorted_by_time[0]] if ordinal == "first" else [sorted_by_time[-1]]
return (
f"Multiple events match '{title}'. Which one did you mean? "
f"You can say 'first {title}' or 'last {title}'."
)
# No title; rely on ordinal
if ordinal:
sorted_all = sorted(
events,
key=lambda ev: datetime.datetime.fromisoformat(
ev["start"]["dateTime"].replace("Z", "+00:00")
)
)
if ordinal == "all":
return sorted_all
if ordinal == "first":
return [sorted_all[0]] if sorted_all else []
if ordinal == "last":
return [sorted_all[-1]] if sorted_all else []
ord_map = {
"second": 1,
"third": 2,
"fourth": 3,
"fifth": 4,
"sixth": 5,
"seventh": 6,
"eighth": 7,
"ninth": 8,
"tenth": 9
}
if ordinal in ord_map:
idx = ord_map[ordinal]
return [sorted_all[idx]] if idx < len(sorted_all) else []
return []
# Neither title nor ordinal → ambiguous
return (
"Please specify which event(s) to modify (e.g., 'first meeting', "
"'last appointment', 'all meetings', or include the title in quotes)."
)
def _parse_target_part(self, text: str) -> Tuple[Optional[str], Optional[str]]:
"""
Given the target_part (everything after 'to'), determine:
- new_date_spec (like 'Monday', 'June 12, 2025', 'Friday') OR None if no date.
- new_time_spec (like '5 AM', '2:30 PM', '3 PM to 4 PM') OR None if no time.
Strategy:
1) Look explicitly for date keywords first:
• 'today' or 'tomorrow'
• weekday names
• explicit date phrases containing a month name or ISO format (YYYY-MM-DD)
2) Only if one of those appears do we set new_date_spec. Otherwise, new_date_spec stays None.
3) Independently, look for time‐range patterns or single times ("at 5 PM", "5 AM", etc.).
4) Return (date_spec, time_spec). If neither is found, return (None, None).
"""
lowered = text.lower()
# 1) Identify new_date_spec (only if a date keyword is present)
new_date_spec: Optional[str] = None
if "today" in lowered:
new_date_spec = "today"
elif "tomorrow" in lowered:
new_date_spec = "tomorrow"
else:
# Weekday names
weekdays = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
for wd in weekdays:
if re.search(rf"\b{wd}\b", lowered):
new_date_spec = wd
break
# If still None, check for explicit date phrase: must contain a month name or ISO format
if new_date_spec is None:
if re.search(r"\b(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\w*\b", lowered) \
or re.search(r"\b\d{4}-\d{2}-\d{2}\b", text) \
or re.search(r"\b\d{1,2}(st|nd|rd|th)?\s+(?:jan|feb|mar|apr|may|jun|jul|aug|sep|oct|nov|dec)\w*\b", lowered):
new_date_spec = text
# 2) Identify new_time_spec (if any)
# Look for 'from X to Y' or single time patterns like 'at X', 'X AM/PM', etc.
time_pattern = re.compile(
r"(?P<from>from\s+)?"
r"(?P<h1>\d{1,2}(:\d{2})?\s*(am|pm))"
r"\s*(to\s*(?P<h2>\d{1,2}(:\d{2})?\s*(am|pm)))?",
re.IGNORECASE
)
time_match = time_pattern.search(text)
new_time_spec: Optional[str] = time_match.group(0) if time_match else None
return new_date_spec, new_time_spec
def _resolve_time_spec(self, spec: str) -> Optional[Tuple[datetime.time, Optional[datetime.time]]]:
"""
Convert strings like '3 PM to 4 PM', 'at 2:30 PM', '2 PM', 'from 3 PM to 5 PM'
into (new_start_time, new_end_time) where new_end_time may be None (meaning preserve original duration).
"""
spec = spec.strip().lower()
# Find all time tokens in the spec
times = re.findall(r"(\d{1,2}(:\d{2})?\s*(am|pm))", spec, re.IGNORECASE)
parsed_times: List[datetime.time] = []
for ttuple in times:
tstr = ttuple[0]
try:
dt = dateutil.parser.parse(tstr)
parsed_times.append(dt.time())
except (ValueError, OverflowError):
continue
if not parsed_times:
return None
if len(parsed_times) == 1:
# Single new time: assume new start; preserve duration later
return parsed_times[0], None
# Two times: first is new_start, second is new_end
return parsed_times[0], parsed_times[1]
def _update_event(self, event_id: str, new_start_iso: str, new_end_iso: str) -> dict:
"""
Call Google Calendar API to patch the event's start and end times.
Returns the patched event resource.
"""
updated = (
self.service.events()
.patch(
calendarId="primary",
eventId=event_id,
body={
"start": {"dateTime": new_start_iso},
"end": {"dateTime": new_end_iso},
}
)
.execute()
)
return updated
def _format_confirmation(
self,
updates: List[Tuple[str, datetime.datetime, datetime.datetime, datetime.datetime, datetime.datetime]]
) -> str:
"""
Given a list of tuples:
(event_summary, old_start_dt, old_end_dt, new_start_dt, new_end_dt)
produce a combined, human‐readable confirmation string.
"""
lines: List[str] = []
for summary, old_start, old_end, new_start, new_end in updates:
old_fmt = (
f"{old_start.strftime('%B %d, %Y %I:%M %p').lstrip('0')}–"
f"{old_end.strftime('%I:%M %p').lstrip('0')}"
)
new_fmt = (
f"{new_start.strftime('%B %d, %Y %I:%M %p').lstrip('0')}–"
f"{new_end.strftime('%I:%M %p').lstrip('0')}"
)
lines.append(
f"The meeting \"{summary}\" originally scheduled for {old_fmt} has been rescheduled to {new_fmt}."
)
return " ".join(lines)