Spaces:
Running
Running
import imaplib | |
import email | |
import os | |
__all__ = ['GetEmails'] | |
class GetEmails: | |
dependencies = ['os'] | |
inputSchema = { | |
"name": "GetEmails", | |
"description": "Reads emails from an IMAP server.", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"num_emails": { | |
"type": "integer", | |
"description": "Number of emails to retrieve (default: 5).", | |
"default": 5 | |
} | |
}, | |
"required": [] | |
} | |
} | |
def __init__(self): | |
# Replace with your actual email provider's IMAP server address | |
self.imap_server = "imap.gmail.com" # e.g., imap.gmail.com | |
self.email_address = os.environ.get("EMAIL_ADDRESS") # Replace with your email address | |
self.password = os.environ.get("EMAIL_PASSWORD") # Replace with your password or app password | |
def run(self, **kwargs): | |
num_emails = kwargs.get("num_emails", 5) | |
try: | |
# Connect to the IMAP server | |
mail = imaplib.IMAP4_SSL(self.imap_server) | |
mail.login(self.email_address, self.password) | |
mail.select("inbox") # You can select other mailboxes like "sent" | |
# Search for emails (ALL = all emails, or use specific criteria) | |
result, data = mail.search(None, "ALL") | |
mail_ids = data[0].split() | |
# Get the most recent emails | |
recent_emails = mail_ids[-num_emails:] | |
email_messages = [] | |
for num in recent_emails: | |
result, msg_data = mail.fetch(num, '(RFC822)') | |
raw_email = msg_data[0][1] | |
msg = email.message_from_bytes(raw_email) | |
# Decode email headers and body | |
try: | |
subject = email.header.decode_header(msg["Subject"])[0][0] | |
if isinstance(subject, bytes): | |
subject = subject.decode() | |
except: | |
subject = "No Subject" | |
try: | |
from_ = email.header.decode_header(msg.get("From"))[0][0] | |
if isinstance(from_, bytes): | |
from_ = from_.decode() | |
except: | |
from_ = "Unknown Sender" | |
# Extract the body of the email | |
body = "" # Initialize body to an empty string | |
if msg.is_multipart(): | |
for part in msg.walk(): | |
ctype = part.get_content_type() | |
cdispo = str(part.get("Content-Disposition")) | |
if ctype == "text/plain" and "attachment" not in cdispo: | |
body = part.get_payload(decode=True).decode() | |
break | |
else: | |
body = msg.get_payload(decode=True).decode() | |
email_messages.append({ | |
"subject": subject, | |
"from": from_, | |
"body": body | |
}) | |
mail.close() | |
mail.logout() | |
return { | |
"status": "success", | |
"message": "Emails retrieved successfully.", | |
"output": email_messages | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"message": f"An error occurred: {str(e)}", | |
"output": None | |
} | |