Almost finished, just have to do a quick check for everything to work, add error handling (+retries), polish it up and then make sure all features implemented fully.
This commit is contained in:
parent
7c43ce18f2
commit
14bd208378
87
main.py
87
main.py
@ -6,6 +6,8 @@
|
||||
# © Uthmn 2025 under MIT license
|
||||
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Union
|
||||
|
||||
import services.apt
|
||||
import services.mail
|
||||
@ -21,30 +23,66 @@ import typer
|
||||
|
||||
app = typer.Typer()
|
||||
|
||||
def wait_until(target_time):
|
||||
now = datetime.now()
|
||||
seconds_to_wait = (target_time - now).total_seconds()
|
||||
if seconds_to_wait > 0:
|
||||
time.sleep(seconds_to_wait)
|
||||
|
||||
def schedule(mode, hour, minute=0, receiver_email=None):
|
||||
print(f"Scheduler started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - Mode: {mode}")
|
||||
while True:
|
||||
now = datetime.now()
|
||||
if mode == "daily":
|
||||
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
|
||||
if target <= now:
|
||||
target += timedelta(days=1)
|
||||
elif mode == "hourly":
|
||||
target = now.replace(minute=minute, second=0, microsecond=0)
|
||||
if target <= now:
|
||||
target += timedelta(hours=1)
|
||||
else:
|
||||
raise ValueError("Mode must be 'daily' or 'hourly'.")
|
||||
|
||||
wait_until(target)
|
||||
generate_email(receiver_email)
|
||||
|
||||
@app.command()
|
||||
def serve(receiver_email: str = None, check_interval: int = 24):
|
||||
def serve(receiver_email: str = None, check_hourly: bool = False, check_daily: str = "18"):
|
||||
"""
|
||||
Checks for updates at a fixed interval and sends email at that interval as a daemon. Doesn't email without updates.
|
||||
Accepts check_interval as a parameter in hours (default 24)
|
||||
Runs scheduled checks and sends emails.
|
||||
|
||||
- check_hourly=True → every hour
|
||||
- check_daily=int → hour of day for daily email
|
||||
"""
|
||||
|
||||
if check_hourly and check_daily:
|
||||
check_daily = False
|
||||
|
||||
if check_hourly:
|
||||
schedule("hourly", 0, receiver_email=receiver_email)
|
||||
elif check_daily:
|
||||
# Convert CLI string to int
|
||||
if check_daily.lower() == "true":
|
||||
hour = 18
|
||||
else:
|
||||
try:
|
||||
hour = int(check_daily)
|
||||
except ValueError:
|
||||
print(f"Invalid check_daily value: {check_daily}")
|
||||
exit(1)
|
||||
schedule("daily", hour, receiver_email=receiver_email)
|
||||
else:
|
||||
print("No schedule selected.")
|
||||
|
||||
@app.command()
|
||||
def now(receiver_email: str = None):
|
||||
# Comma seperate
|
||||
# FIXME: Fix if nonetype
|
||||
receiver_emails = []
|
||||
if receiver_email:
|
||||
if "," in receiver_email:
|
||||
receiver_emails = [email.strip() for email in receiver_email.split(",")]
|
||||
else:
|
||||
receiver_emails = [receiver_email.strip()]
|
||||
|
||||
"""
|
||||
Checks for apt upgrades and emails them then exits.
|
||||
"""
|
||||
generate_email(receiver_emails)
|
||||
generate_email(receiver_email)
|
||||
|
||||
def generate_email(receiver_email: list):
|
||||
def generate_email(receiver_emails: Union[list, None]):
|
||||
services.apt.require_root()
|
||||
if not services.apt.detect_apt():
|
||||
print("Apt not found on this system.")
|
||||
@ -67,14 +105,21 @@ def generate_email(receiver_email: list):
|
||||
# }
|
||||
#}
|
||||
|
||||
if exists(join(dirname(__file__), "users.json")):
|
||||
with open(join(dirname(__file__), "users.json"), "r") as f:
|
||||
users = json.load(f)
|
||||
receiver_email = []
|
||||
if receiver_emails:
|
||||
if "," in receiver_emails:
|
||||
receiver_email = [email.strip() for email in receiver_emails.split(",")]
|
||||
else:
|
||||
receiver_email = [receiver_emails.strip()]
|
||||
|
||||
receiver_email = users
|
||||
else:
|
||||
if receiver_email is None:
|
||||
print("No email address provided.")
|
||||
# If no CLI emails are provided
|
||||
if not receiver_email:
|
||||
users_json_path = join(dirname(__file__), "users.json")
|
||||
if exists(users_json_path):
|
||||
with open(users_json_path, "r") as f:
|
||||
receiver_email = json.load(f)
|
||||
else:
|
||||
print("No email address provided and users.json not found.")
|
||||
exit(1)
|
||||
|
||||
# Get how many security updates are available
|
||||
|
||||
Loading…
Reference in New Issue
Block a user