223 lines
7.1 KiB
Python
223 lines
7.1 KiB
Python
# _ __________________
|
|
# | | / / _/ ____/ _/ /
|
|
# | | / // // / __ / // /
|
|
# | |/ // // /_/ // // /___
|
|
# |___/___/\____/___/_____/
|
|
# © Uthmn 2025 under MIT license
|
|
|
|
import time
|
|
from datetime import datetime, timedelta
|
|
from typing import Union
|
|
|
|
import services.apt
|
|
import services.mail
|
|
|
|
from socket import gethostname
|
|
|
|
import json
|
|
from os.path import dirname, join, exists
|
|
|
|
import typer
|
|
|
|
app = typer.Typer()
|
|
|
|
def wait_until(target_time):
|
|
now = datetime.now()
|
|
seconds_to_wait = (target_time - now).total_seconds()
|
|
if seconds_to_wait > 0:
|
|
time.sleep(seconds_to_wait)
|
|
|
|
def schedule(mode, hour, minute=0, receiver_email=None):
|
|
print(f"Scheduler started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - Mode: {mode}")
|
|
while True:
|
|
try:
|
|
now = datetime.now()
|
|
if mode == "daily":
|
|
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
|
|
if target <= now:
|
|
target += timedelta(days=1)
|
|
elif mode == "hourly":
|
|
target = now.replace(minute=minute, second=0, microsecond=0)
|
|
if target <= now:
|
|
target += timedelta(hours=1)
|
|
else:
|
|
raise ValueError("Mode must be 'daily' or 'hourly'.")
|
|
|
|
wait_until(target)
|
|
generate_email(receiver_email)
|
|
except Exception as e:
|
|
print(f"Error in scheduled task: {e}")
|
|
time.sleep(60) # Wait before retrying
|
|
|
|
@app.command()
|
|
def serve(receiver_email: str = None, check_hourly: bool = False, check_daily: str = "18"):
|
|
"""
|
|
Runs scheduled checks and sends emails.
|
|
|
|
- check_hourly=True → every hour
|
|
- check_daily=int → hour of day for daily email
|
|
"""
|
|
|
|
if check_hourly and check_daily:
|
|
check_daily = False
|
|
|
|
if check_hourly:
|
|
schedule("hourly", 0, receiver_email=receiver_email)
|
|
elif check_daily:
|
|
# Convert CLI string to int
|
|
if check_daily.lower() == "true":
|
|
hour = 18
|
|
else:
|
|
try:
|
|
hour = int(check_daily)
|
|
except ValueError:
|
|
print(f"Invalid check_daily value: {check_daily}")
|
|
exit(1)
|
|
schedule("daily", hour, receiver_email=receiver_email)
|
|
else:
|
|
print("No schedule selected.")
|
|
|
|
@app.command()
|
|
def now(receiver_email: str = None):
|
|
"""
|
|
Checks for apt upgrades and emails them then exits.
|
|
"""
|
|
generate_email(receiver_email)
|
|
|
|
def generate_email(receiver_emails: Union[list, None]):
|
|
services.apt.require_root()
|
|
if not services.apt.detect_apt():
|
|
print("Apt not found on this system.")
|
|
exit(1)
|
|
updates = services.apt.check_updates()
|
|
|
|
# For testing
|
|
#updates = {
|
|
# "vim": {
|
|
# "installed_version": "8.2.3400",
|
|
# "latest_version": "8.2.3400",
|
|
# "repo": "universe",
|
|
# "security": True
|
|
# },
|
|
# "vim-gtk3": {
|
|
# "installed_version": "8.2.3400",
|
|
# "latest_version": "8.2.3400",
|
|
# "repo": "universe",
|
|
# "security": False
|
|
# }
|
|
#}
|
|
|
|
receiver_email = []
|
|
if receiver_emails:
|
|
if "," in receiver_emails:
|
|
receiver_email = [email.strip() for email in receiver_emails.split(",")]
|
|
else:
|
|
receiver_email = [receiver_emails.strip()]
|
|
|
|
# If no CLI emails are provided
|
|
if not receiver_email:
|
|
users_json_path = join(dirname(__file__), "users.json")
|
|
if exists(users_json_path):
|
|
with open(users_json_path, "r") as f:
|
|
receiver_email = json.load(f)
|
|
else:
|
|
print("No email address provided and users.json not found.")
|
|
exit(1)
|
|
|
|
# Get how many security updates are available
|
|
security_updates = 0
|
|
for package in updates:
|
|
if updates[package]["security"]:
|
|
security_updates += 1
|
|
|
|
# Get how many total updates are available
|
|
total_updates = len(updates)
|
|
# Get how many general updates are available
|
|
general_updates = total_updates - security_updates
|
|
|
|
# Check if there are any updates at all
|
|
if total_updates == 0:
|
|
print("No updates available.")
|
|
return
|
|
|
|
# Get system hostname
|
|
hostname = gethostname()
|
|
|
|
# Create email subject
|
|
subject = f"{hostname}> {security_updates} security updates, {general_updates} general updates available"
|
|
|
|
# Build security updates section
|
|
security_chunks = ""
|
|
|
|
for package in updates:
|
|
if not updates[package]["security"]:
|
|
continue
|
|
chunk = f'''
|
|
<tr>
|
|
<td style="border: 1px solid #ddd; padding: 8px;">{package}</td>
|
|
<td style="border: 1px solid #ddd; padding: 8px;">{updates[package]["installed_version"]}</td>
|
|
<td style="border: 1px solid #ddd; padding: 8px;">{updates[package]["latest_version"]}</td>
|
|
<td style="border: 1px solid #ddd; padding: 8px;">{updates[package]["repo"]}</td>
|
|
</tr>
|
|
'''
|
|
security_chunks += chunk
|
|
|
|
security = ""
|
|
if security_updates > 0:
|
|
security = f'''
|
|
<h1 style="font-family: Arial, sans-serif;">Security</h1>
|
|
<table style="border-collapse: collapse; width: 100%; font-family: Arial, sans-serif;">
|
|
<tr>
|
|
<th style="border: 1px solid #ddd; background-color: #f2f2f2; padding: 8px;">Package</th>
|
|
<th style="border: 1px solid #ddd; background-color: #f2f2f2; padding: 8px;">Installed</th>
|
|
<th style="border: 1px solid #ddd; background-color: #f2f2f2; padding: 8px;">Latest</th>
|
|
<th style="border: 1px solid #ddd; background-color: #f2f2f2; padding: 8px;">Repository</th>
|
|
</tr>
|
|
{security_chunks}
|
|
</table>
|
|
'''
|
|
|
|
# Build general updates section
|
|
general_chunks = ""
|
|
|
|
for package in updates:
|
|
if updates[package]["security"]:
|
|
continue
|
|
chunk = f'''
|
|
<tr>
|
|
<td style="border: 1px solid #ddd; padding: 8px;">{package}</td>
|
|
<td style="border: 1px solid #ddd; padding: 8px;">{updates[package]["installed_version"]}</td>
|
|
<td style="border: 1px solid #ddd; padding: 8px;">{updates[package]["latest_version"]}</td>
|
|
<td style="border: 1px solid #ddd; padding: 8px;">{updates[package]["repo"]}</td>
|
|
</tr>
|
|
'''
|
|
general_chunks += chunk
|
|
|
|
general = ""
|
|
if general_updates > 0:
|
|
general = f'''
|
|
<h1 style="font-family: Arial, sans-serif;">General</h1>
|
|
<table style="border-collapse: collapse; width: 100%; font-family: Arial, sans-serif;">
|
|
<tr>
|
|
<th style="border: 1px solid #ddd; background-color: #f2f2f2; padding: 8px;">Package</th>
|
|
<th style="border: 1px solid #ddd; background-color: #f2f2f2; padding: 8px;">Installed</th>
|
|
<th style="border: 1px solid #ddd; background-color: #f2f2f2; padding: 8px;">Latest</th>
|
|
<th style="border: 1px solid #ddd; background-color: #f2f2f2; padding: 8px;">Repository</th>
|
|
</tr>
|
|
{general_chunks}
|
|
</table>
|
|
'''
|
|
|
|
html = security + general
|
|
|
|
services.mail.send_email(receiver_email, subject, html)
|
|
|
|
@app.callback()
|
|
def callback():
|
|
"""
|
|
Checks apt for upgrades and emails them
|
|
"""
|
|
|
|
if __name__ == "__main__":
|
|
app()
|