vigil/main.py

221 lines
6.9 KiB
Python

# _ __________________
# | | / / _/ ____/ _/ /
# | | / // // / __ / // /
# | |/ // // /_/ // // /___
# |___/___/\____/___/_____/
# © Uthmn 2025 under MIT license
import time
from datetime import datetime, timedelta
from typing import Union
import services.apt
import services.mail
from socket import gethostname
import json
from os.path import dirname, join, exists
import typer
# FIXME: Error handling
app = typer.Typer()
def wait_until(target_time):
now = datetime.now()
seconds_to_wait = (target_time - now).total_seconds()
if seconds_to_wait > 0:
time.sleep(seconds_to_wait)
def schedule(mode, hour, minute=0, receiver_email=None):
print(f"Scheduler started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - Mode: {mode}")
while True:
now = datetime.now()
if mode == "daily":
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
elif mode == "hourly":
target = now.replace(minute=minute, second=0, microsecond=0)
if target <= now:
target += timedelta(hours=1)
else:
raise ValueError("Mode must be 'daily' or 'hourly'.")
wait_until(target)
generate_email(receiver_email)
@app.command()
def serve(receiver_email: str = None, check_hourly: bool = False, check_daily: str = "18"):
"""
Runs scheduled checks and sends emails.
- check_hourly=True → every hour
- check_daily=int → hour of day for daily email
"""
if check_hourly and check_daily:
check_daily = False
if check_hourly:
schedule("hourly", 0, receiver_email=receiver_email)
elif check_daily:
# Convert CLI string to int
if check_daily.lower() == "true":
hour = 18
else:
try:
hour = int(check_daily)
except ValueError:
print(f"Invalid check_daily value: {check_daily}")
exit(1)
schedule("daily", hour, receiver_email=receiver_email)
else:
print("No schedule selected.")
@app.command()
def now(receiver_email: str = None):
"""
Checks for apt upgrades and emails them then exits.
"""
generate_email(receiver_email)
def generate_email(receiver_emails: Union[list, None]):
services.apt.require_root()
if not services.apt.detect_apt():
print("Apt not found on this system.")
exit(1)
updates = services.apt.check_updates()
# For testing
#updates = {
# "vim": {
# "installed_version": "8.2.3400",
# "latest_version": "8.2.3400",
# "repo": "universe",
# "security": True
# },
# "vim-gtk3": {
# "installed_version": "8.2.3400",
# "latest_version": "8.2.3400",
# "repo": "universe",
# "security": False
# }
#}
receiver_email = []
if receiver_emails:
if "," in receiver_emails:
receiver_email = [email.strip() for email in receiver_emails.split(",")]
else:
receiver_email = [receiver_emails.strip()]
# If no CLI emails are provided
if not receiver_email:
users_json_path = join(dirname(__file__), "users.json")
if exists(users_json_path):
with open(users_json_path, "r") as f:
receiver_email = json.load(f)
else:
print("No email address provided and users.json not found.")
exit(1)
# Get how many security updates are available
security_updates = 0
for package in updates:
if updates[package]["security"]:
security_updates += 1
# Get how many total updates are available
total_updates = len(updates)
# Get how many general updates are available
general_updates = total_updates - security_updates
# Check if there are any updates at all
if total_updates == 0:
print("No updates available.")
return
# Get system hostname
hostname = gethostname()
# Create email subject
subject = f"{hostname}> {security_updates} security updates, {general_updates} general updates available"
# Build security updates section
security_chunks = ""
for package in updates:
if not updates[package]["security"]:
continue
chunk = f'''
<tr>
<td style="border: 1px solid #ddd; padding: 8px;">{package}</td>
<td style="border: 1px solid #ddd; padding: 8px;">{updates[package]["installed_version"]}</td>
<td style="border: 1px solid #ddd; padding: 8px;">{updates[package]["latest_version"]}</td>
<td style="border: 1px solid #ddd; padding: 8px;">{updates[package]["repo"]}</td>
</tr>
'''
security_chunks += chunk
security = ""
if security_updates > 0:
security = f'''
<h1 style="font-family: Arial, sans-serif;">Security</h1>
<table style="border-collapse: collapse; width: 100%; font-family: Arial, sans-serif;">
<tr>
<th style="border: 1px solid #ddd; background-color: #f2f2f2; padding: 8px;">Package</th>
<th style="border: 1px solid #ddd; background-color: #f2f2f2; padding: 8px;">Installed</th>
<th style="border: 1px solid #ddd; background-color: #f2f2f2; padding: 8px;">Latest</th>
<th style="border: 1px solid #ddd; background-color: #f2f2f2; padding: 8px;">Repository</th>
</tr>
{security_chunks}
</table>
'''
# Build general updates section
general_chunks = ""
for package in updates:
if updates[package]["security"]:
continue
chunk = f'''
<tr>
<td style="border: 1px solid #ddd; padding: 8px;">{package}</td>
<td style="border: 1px solid #ddd; padding: 8px;">{updates[package]["installed_version"]}</td>
<td style="border: 1px solid #ddd; padding: 8px;">{updates[package]["latest_version"]}</td>
<td style="border: 1px solid #ddd; padding: 8px;">{updates[package]["repo"]}</td>
</tr>
'''
general_chunks += chunk
general = ""
if general_updates > 0:
general = f'''
<h1 style="font-family: Arial, sans-serif;">General</h1>
<table style="border-collapse: collapse; width: 100%; font-family: Arial, sans-serif;">
<tr>
<th style="border: 1px solid #ddd; background-color: #f2f2f2; padding: 8px;">Package</th>
<th style="border: 1px solid #ddd; background-color: #f2f2f2; padding: 8px;">Installed</th>
<th style="border: 1px solid #ddd; background-color: #f2f2f2; padding: 8px;">Latest</th>
<th style="border: 1px solid #ddd; background-color: #f2f2f2; padding: 8px;">Repository</th>
</tr>
{general_chunks}
</table>
'''
html = security + general
services.mail.send_email(receiver_email, subject, html)
@app.callback()
def callback():
"""
Checks apt for upgrades and emails them
"""
if __name__ == "__main__":
app()