vigil/services/mail.py

191 lines
6.1 KiB
Python

# _ __________________
# | | / / _/ ____/ _/ /
# | | / // // / __ / // /
# | |/ // // /_/ // // /___
# |___/___/\____/___/_____/
# © Uthmn 2025 under MIT license
import smtplib
from email.mime.text import MIMEText
from imaplib import IMAP4_SSL
from email import message_from_bytes
from dotenv import load_dotenv
from os import getenv
from os.path import dirname, join, exists
import time
# Exit if .env does not exist
if not exists(join(dirname(__file__), "../.env")):
print("Please create a .env file in the root directory.")
exit(1)
# Load environment variables from .env file
dotenv_path = join(dirname(__file__), "../.env")
load_dotenv(dotenv_path)
# SMTP server settings
SMTP_SERVER = getenv("SMTP_SERVER")
SMTP_PORT = getenv("SMTP_PORT")
SMTP_USERNAME = getenv("SMTP_USERNAME")
SMTP_PASSWORD = getenv("SMTP_PASSWORD")
# IMAP server settings
IMAP_SERVER = getenv("IMAP_SERVER")
IMAP_PORT = getenv("IMAP_PORT")
IMAP_USERNAME = getenv("IMAP_USERNAME")
IMAP_PASSWORD = getenv("IMAP_PASSWORD")
# Retry settings
MAX_RETRIES = 3
RETRY_DELAY = 2 # in seconds
# Check if all environment variables are set
if not SMTP_SERVER or not SMTP_PORT or not SMTP_USERNAME or not SMTP_PASSWORD or not IMAP_SERVER or not IMAP_PORT or not IMAP_USERNAME or not IMAP_PASSWORD:
print("Please set all environment variables in .env file.")
exit(1)
# Make sure SMTP and IMAP ports are integers
SMTP_PORT = int(SMTP_PORT)
IMAP_PORT = int(IMAP_PORT)
def connect_smtp_with_retry():
"""Connect to SMTP server with retry"""
for attempt in range(MAX_RETRIES):
try:
server = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT)
server.login(SMTP_USERNAME, SMTP_PASSWORD)
return server
except Exception as e:
if attempt < MAX_RETRIES - 1:
wait_time = RETRY_DELAY * (2 ** attempt) # Exponential backoff
print(f"SMTP connection attempt {attempt + 1} failed: {e}")
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
print(f"Failed to connect to SMTP after {MAX_RETRIES} attempts: {e}")
raise
def connect_imap_with_retry():
"""Connect to IMAP server with retry"""
for attempt in range(MAX_RETRIES):
try:
M = IMAP4_SSL(host=IMAP_SERVER, port=IMAP_PORT)
M.login(IMAP_USERNAME, IMAP_PASSWORD)
return M
except Exception as e:
if attempt < MAX_RETRIES - 1:
wait_time = RETRY_DELAY * (2 ** attempt) # Exponential backoff
print(f"IMAP connection attempt {attempt + 1} failed: {e}")
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
print(f"Failed to connect to IMAP after {MAX_RETRIES} attempts: {e}")
raise
# Test all connections
try:
server = connect_smtp_with_retry()
server.quit()
except Exception as e:
print(f"Initial SMTP connection test failed: {e}")
exit(1)
try:
M = connect_imap_with_retry()
M.logout()
except Exception as e:
print(f"Initial IMAP connection test failed: {e}")
exit(1)
def send_email(receiver_emails, subject, body):
sender_email = SMTP_USERNAME
# Create the email
message = MIMEText(body, "html")
message["Subject"] = subject
message["From"] = sender_email
message["To"] = ", ".join(receiver_emails)
# Connect using SSL with retry
try:
server = connect_smtp_with_retry()
try:
server.sendmail(sender_email, receiver_emails, message.as_string())
print("HTML email sent successfully!")
finally:
server.quit()
except Exception as e:
print(f"Failed to send email: {e}")
def receive_emails(folder="INBOX", limit=50):
emails = {}
try:
M = connect_imap_with_retry()
try:
M.select(folder)
# Get all message IDs
typ, data = M.search(None, 'ALL')
if typ != 'OK' or not data[0]:
print("No messages found.")
return emails
all_ids = data[0].split()
last_ids = all_ids[-limit:] # Take only the last 'limit' messages
for num in last_ids:
typ, msg_data = M.fetch(num, '(RFC822)')
if typ != 'OK':
print(f"Failed to fetch message {num}")
continue
# Parse the email
msg = message_from_bytes(msg_data[0][1])
msg_id = msg.get('Message-ID') or num.decode()
# Extract headers
subject = msg.get('Subject', '')
sender = msg.get('From', '')
date = msg.get('Date', '')
# Extract plain text and HTML
body_text = ""
body_html = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if "attachment" in content_disposition:
continue
payload = part.get_payload(decode=True)
if payload:
payload = payload.decode(errors="ignore")
if content_type == "text/plain":
body_text = payload
elif content_type == "text/html":
body_html = payload
else:
payload = msg.get_payload(decode=True)
if payload:
body_text = payload.decode(errors="ignore")
emails[msg_id] = {
"subject": subject,
"from": sender,
"date": date,
"body_text": body_text,
"body_html": body_html
}
finally:
M.close()
M.logout()
except Exception as e:
print(f"Error receiving emails: {e}")
return emails