# _ __________________ # | | / / _/ ____/ _/ / # | | / // // / __ / // / # | |/ // // /_/ // // /___ # |___/___/\____/___/_____/ # © Uthmn 2025 under MIT license import time from datetime import datetime, timedelta from typing import Union import services.apt import services.mail import html as hypertext import re from socket import gethostname import json from os.path import exists from services.logger import logger import typer import signal # Its beautiful ;) EMAIL_REGEX = re.compile(r""" (?:[a-z0-9!#$%&'*+\x2f=?^_`\x7b-\x7d~\x2d]+(?:\.[a-z0-9!#$%&'*+\x2f=?^_`\x7b-\x7d~\x2d]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9\x2d]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9\x2d]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9\x2d]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\]) """, re.VERBOSE | re.IGNORECASE) # Change for version of Vigil __version__ = "1.0.0" shutdown_flag = False def handle_shutdown(signum, frame): global shutdown_flag logger.info("Shutdown signal received; finishing current cycle and exiting...") shutdown_flag = True signal.signal(signal.SIGINT, handle_shutdown) signal.signal(signal.SIGTERM, handle_shutdown) app = typer.Typer() def wait_until_interruptible(target_time): global shutdown_flag while not shutdown_flag: now = datetime.now() remaining = (target_time - now).total_seconds() if remaining <= 0: return # sleep in small chunks so shutdown is responsive time.sleep(min(remaining, 0.5)) def schedule(mode, hour, minute=0, receiver_email=None): global shutdown_flag logger.info(f"Scheduler started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - Mode: {mode}") while not shutdown_flag: try: now = datetime.now() if mode == "daily": target = now.replace(hour=hour, minute=minute, second=0, microsecond=0) if target <= now: target += timedelta(days=1) elif mode == "hourly": target = now.replace(minute=minute, second=0, microsecond=0) if target <= now: target += timedelta(hours=1) else: raise ValueError("Mode must be 'daily' or 'hourly'.") logger.info(f"Next check scheduled for {target.strftime('%Y-%m-%d %H:%M:%S')}") wait_until_interruptible(target) if shutdown_flag: break generate_email(receiver_email) except Exception as e: logger.error(f"Error in scheduled task: {e}", exc_info=True) time.sleep(60) logger.info("Scheduler exited cleanly.") @app.command() def serve(receiver_email: str = None, check_hourly: bool = False, check_daily: str = "18"): """ Runs scheduled checks and sends emails. - check_hourly=True → every hour - check_daily=int → hour of day for daily email """ if check_hourly and check_daily not in [False, "false", "False", None]: check_daily = False if check_hourly: schedule("hourly", 0, receiver_email=receiver_email) elif check_daily: # Convert CLI string to int if check_daily.lower() == "true": hour = 18 else: try: hour = int(check_daily) except ValueError: logger.error(f"Invalid check_daily value: {check_daily}") raise typer.Exit(1) schedule("daily", hour, receiver_email=receiver_email) else: logger.error("No schedule selected.") @app.command() def now(receiver_email: str = None): """ Checks for apt upgrades and emails them then exits. """ generate_email(receiver_email) def generate_email(receiver_emails: Union[list, None]): services.apt.require_root() if not services.apt.detect_apt(): logger.error("Apt not found on this system.") raise typer.Exit(1) try: updates = services.apt.check_updates() except Exception as e: logger.error(f"Failed to check for updates: {e}", exc_info=True) raise typer.Exit(1) receiver_email = [] if receiver_emails: if "," in receiver_emails: receiver_email = [email.strip() for email in receiver_emails.split(",")] else: receiver_email = [receiver_emails.strip()] # If no CLI emails are provided if not receiver_email: users_json_path = "/etc/vigil/users.json" if exists(users_json_path): try: with open(users_json_path, "r") as f: data = json.load(f) if not isinstance(data, list): logger.error("users.json must contain a list of email addresses") raise typer.Exit(1) receiver_email = [str(email).strip() for email in data] except json.JSONDecodeError as e: logger.error(f"Invalid JSON in users.json: {e}") raise typer.Exit(1) else: logger.error("No email address provided and /etc/vigil/users.json not found.") raise typer.Exit(1) # Validate email address(es) for email in receiver_email: if not EMAIL_REGEX.match(email): logger.error(f"Invalid email address: {email}") raise typer.Exit(1) # Get how many security updates are available security_updates = 0 for package in updates: if updates[package]["security"]: security_updates += 1 # Get how many total updates are available total_updates = len(updates) # Get how many general updates are available general_updates = total_updates - security_updates # Check if there are any updates at all if total_updates == 0: logger.info("No updates available.") return # Get system hostname hostname = gethostname() # Create email subject subject = f"{hostname}> {security_updates} security updates, {general_updates} general updates available" # Build security updates section security_chunks = "" for package in updates: if not updates[package]["security"]: continue chunk = f''' {hypertext.escape(package)} {hypertext.escape(updates[package]["installed_version"])} {hypertext.escape(updates[package]["latest_version"])} {hypertext.escape(updates[package]["repo"])} ''' security_chunks += chunk security = "" if security_updates > 0: security = f'''

Security

{security_chunks}
Package Installed Latest Repository
''' # Build general updates section general_chunks = "" for package in updates: if updates[package]["security"]: continue chunk = f''' {hypertext.escape(package)} {hypertext.escape(updates[package]["installed_version"])} {hypertext.escape(updates[package]["latest_version"])} {hypertext.escape(updates[package]["repo"])} ''' general_chunks += chunk general = "" if general_updates > 0: general = f'''

General

{general_chunks}
Package Installed Latest Repository
''' html = security + general try: services.mail.send_email(receiver_email, subject, html) logger.info(f"Update notification sent: {security_updates} security, {general_updates} general updates to {len(receiver_email)} recipient(s)") except Exception as e: logger.error(f"Failed to send email notification: {e}", exc_info=True) raise typer.Exit(1) def version_callback(value: bool): if value: typer.echo(f"Vigil {__version__}") raise typer.Exit() @app.callback() def callback( version: bool = typer.Option( None, "--version", "-v", callback=version_callback, is_eager=True, help="Show version and exit" ) ): """ Checks apt for upgrades and emails them """ if __name__ == "__main__": app()