Compare commits
3 Commits
1d98a136c7
...
e1e0d7b230
| Author | SHA1 | Date | |
|---|---|---|---|
| e1e0d7b230 | |||
| 564abcb371 | |||
| cb22c6b6ba |
24
main.py
Normal file
24
main.py
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
# _ __________________
|
||||||
|
# | | / / _/ ____/ _/ /
|
||||||
|
# | | / // // / __ / // /
|
||||||
|
# | |/ // // /_/ // // /___
|
||||||
|
# |___/___/\____/___/_____/
|
||||||
|
# © Uthmn 2025 under MIT license
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
import services.apt
|
||||||
|
import services.mail
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from os import getenv
|
||||||
|
from os.path import dirname, join
|
||||||
|
|
||||||
|
# Load environment variables from .env file
|
||||||
|
dotenv_path = join(dirname(__file__), ".env")
|
||||||
|
load_dotenv(dotenv_path)
|
||||||
|
|
||||||
|
# TODO: Flag --now to run temporarily and just execute hourly tasks instantly then exit
|
||||||
|
# TODO: Logging
|
||||||
|
# TODO: Email alerts + structure
|
||||||
|
# TODO: Use typer for CLI
|
||||||
10
requirements.txt
Normal file
10
requirements.txt
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
click==8.3.0
|
||||||
|
dotenv==0.9.9
|
||||||
|
markdown-it-py==4.0.0
|
||||||
|
mdurl==0.1.2
|
||||||
|
Pygments==2.19.2
|
||||||
|
python-dotenv==1.1.1
|
||||||
|
rich==14.2.0
|
||||||
|
shellingham==1.5.4
|
||||||
|
typer==0.19.2
|
||||||
|
typing_extensions==4.15.0
|
||||||
76
services/apt.py
Normal file
76
services/apt.py
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
# _ __________________
|
||||||
|
# | | / / _/ ____/ _/ /
|
||||||
|
# | | / // // / __ / // /
|
||||||
|
# | |/ // // /_/ // // /___
|
||||||
|
# |___/___/\____/___/_____/
|
||||||
|
# © Uthmn 2025 under MIT license
|
||||||
|
|
||||||
|
import subprocess
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import re
|
||||||
|
|
||||||
|
def require_root():
|
||||||
|
if os.geteuid() != 0:
|
||||||
|
print("This script requires root privileges. Please run with sudo.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
def detect_apt():
|
||||||
|
try:
|
||||||
|
subprocess.run(["apt", "--version"], capture_output=True, text=True, check=True)
|
||||||
|
return True
|
||||||
|
except FileNotFoundError:
|
||||||
|
return False
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
raise RuntimeError(f"Error running apt: {e.stderr or e}")
|
||||||
|
|
||||||
|
def check_updates():
|
||||||
|
# Update package list
|
||||||
|
try:
|
||||||
|
subprocess.run(["apt", "update"], capture_output=True, text=True, check=True)
|
||||||
|
except FileNotFoundError:
|
||||||
|
raise FileNotFoundError("apt not found on this system.")
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
raise RuntimeError(f"Error running 'apt update': {e.stderr or e.stdout or e}")
|
||||||
|
|
||||||
|
# List upgradable packages
|
||||||
|
try:
|
||||||
|
result = subprocess.run(
|
||||||
|
["apt", "list", "--upgradable"],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
check=True
|
||||||
|
)
|
||||||
|
upgrades = result.stdout.strip()
|
||||||
|
lines = [line for line in upgrades.splitlines() if line.strip()]
|
||||||
|
|
||||||
|
if len(lines) <= 1: # Only the header
|
||||||
|
return {} # No upgrades
|
||||||
|
|
||||||
|
package_dict = {}
|
||||||
|
for line in lines[1:]: # skip header
|
||||||
|
# Example line format:
|
||||||
|
# docker-ce/focal-security 5:24.0.1~3-0~ubuntu-focal amd64 [upgradable from: 5:23.0.0~3-0~ubuntu-focal]
|
||||||
|
match = re.match(
|
||||||
|
r"([^/]+)/([^\s]+)\s+([^\s]+)\s+[^\s]+\s+\[upgradable from: ([^\]]+)\]", line
|
||||||
|
)
|
||||||
|
if match:
|
||||||
|
name, repo, latest_version, installed_version = match.groups()
|
||||||
|
# Detect security update based on repo name containing "security"
|
||||||
|
is_security = "security" in repo.lower()
|
||||||
|
package_dict[name] = {
|
||||||
|
"installed_version": installed_version,
|
||||||
|
"latest_version": latest_version,
|
||||||
|
"repo": repo,
|
||||||
|
"security": is_security
|
||||||
|
}
|
||||||
|
return package_dict
|
||||||
|
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
raise RuntimeError(f"Error running 'apt list --upgradable': {e.stderr or e.stdout or e}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
require_root()
|
||||||
|
if detect_apt():
|
||||||
|
updates = check_updates()
|
||||||
|
print(updates)
|
||||||
121
services/mail.py
Normal file
121
services/mail.py
Normal file
@ -0,0 +1,121 @@
|
|||||||
|
# _ __________________
|
||||||
|
# | | / / _/ ____/ _/ /
|
||||||
|
# | | / // // / __ / // /
|
||||||
|
# | |/ // // /_/ // // /___
|
||||||
|
# |___/___/\____/___/_____/
|
||||||
|
# © Uthmn 2025 under MIT license
|
||||||
|
|
||||||
|
import smtplib
|
||||||
|
from email.mime.text import MIMEText
|
||||||
|
|
||||||
|
from imaplib import IMAP4_SSL
|
||||||
|
from email import message_from_bytes
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from os import getenv
|
||||||
|
from os.path import dirname, join
|
||||||
|
|
||||||
|
# Load environment variables from .env file
|
||||||
|
dotenv_path = join(dirname(__file__), "../.env")
|
||||||
|
load_dotenv(dotenv_path)
|
||||||
|
|
||||||
|
# SMTP server settings
|
||||||
|
SMTP_SERVER = getenv("SMTP_SERVER")
|
||||||
|
SMTP_PORT = int(getenv("SMTP_PORT"))
|
||||||
|
SMTP_USERNAME = getenv("SMTP_USERNAME")
|
||||||
|
SMTP_PASSWORD = getenv("SMTP_PASSWORD")
|
||||||
|
|
||||||
|
# IMAP server settings
|
||||||
|
IMAP_SERVER = getenv("IMAP_SERVER")
|
||||||
|
IMAP_PORT = int(getenv("IMAP_PORT"))
|
||||||
|
IMAP_USERNAME = getenv("IMAP_USERNAME")
|
||||||
|
IMAP_PASSWORD = getenv("IMAP_PASSWORD")
|
||||||
|
|
||||||
|
def send_email(receiver_email, subject, body):
|
||||||
|
sender_email = SMTP_USERNAME
|
||||||
|
|
||||||
|
# Create the email
|
||||||
|
message = MIMEText(body, "html")
|
||||||
|
message["Subject"] = subject
|
||||||
|
message["From"] = sender_email
|
||||||
|
message["To"] = receiver_email
|
||||||
|
|
||||||
|
# Connect using SSL
|
||||||
|
try:
|
||||||
|
with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server:
|
||||||
|
server.login(SMTP_USERNAME, SMTP_PASSWORD)
|
||||||
|
server.sendmail(sender_email, receiver_email, message.as_string())
|
||||||
|
print("HTML email sent successfully!")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Failed to send email: {e}")
|
||||||
|
|
||||||
|
def receive_emails(folder="INBOX", limit=50):
|
||||||
|
emails = {}
|
||||||
|
|
||||||
|
try:
|
||||||
|
M = IMAP4_SSL(host=IMAP_SERVER, port=IMAP_PORT)
|
||||||
|
M.login(IMAP_USERNAME, IMAP_PASSWORD)
|
||||||
|
M.select(folder)
|
||||||
|
|
||||||
|
# Get all message IDs
|
||||||
|
typ, data = M.search(None, 'ALL')
|
||||||
|
if typ != 'OK' or not data[0]:
|
||||||
|
print("No messages found.")
|
||||||
|
return emails
|
||||||
|
|
||||||
|
all_ids = data[0].split()
|
||||||
|
last_ids = all_ids[-limit:] # Take only the last 'limit' messages
|
||||||
|
|
||||||
|
for num in last_ids:
|
||||||
|
typ, msg_data = M.fetch(num, '(RFC822)')
|
||||||
|
if typ != 'OK':
|
||||||
|
print(f"Failed to fetch message {num}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Parse the email
|
||||||
|
msg = message_from_bytes(msg_data[0][1])
|
||||||
|
msg_id = msg.get('Message-ID') or num.decode()
|
||||||
|
|
||||||
|
# Extract headers
|
||||||
|
subject = msg.get('Subject', '')
|
||||||
|
sender = msg.get('From', '')
|
||||||
|
date = msg.get('Date', '')
|
||||||
|
|
||||||
|
# Extract plain text and HTML
|
||||||
|
body_text = ""
|
||||||
|
body_html = ""
|
||||||
|
if msg.is_multipart():
|
||||||
|
for part in msg.walk():
|
||||||
|
content_type = part.get_content_type()
|
||||||
|
content_disposition = str(part.get("Content-Disposition"))
|
||||||
|
if "attachment" in content_disposition:
|
||||||
|
continue
|
||||||
|
payload = part.get_payload(decode=True)
|
||||||
|
if payload:
|
||||||
|
payload = payload.decode(errors="ignore")
|
||||||
|
if content_type == "text/plain":
|
||||||
|
body_text = payload
|
||||||
|
elif content_type == "text/html":
|
||||||
|
body_html = payload
|
||||||
|
else:
|
||||||
|
payload = msg.get_payload(decode=True)
|
||||||
|
if payload:
|
||||||
|
body_text = payload.decode(errors="ignore")
|
||||||
|
|
||||||
|
emails[msg_id] = {
|
||||||
|
"subject": subject,
|
||||||
|
"from": sender,
|
||||||
|
"date": date,
|
||||||
|
"body_text": body_text,
|
||||||
|
"body_html": body_html
|
||||||
|
}
|
||||||
|
|
||||||
|
M.close()
|
||||||
|
M.logout()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error receiving emails: {e}")
|
||||||
|
|
||||||
|
return emails
|
||||||
|
|
||||||
|
print(receive_emails())
|
||||||
Loading…
Reference in New Issue
Block a user