# _ __________________
# | | / / _/ ____/ _/ /
# | | / // // / __ / // /
# | |/ // // /_/ // // /___
# |___/___/\____/___/_____/
# © Uthmn 2025 under MIT license
import time
from datetime import datetime, timedelta
from typing import Union
import services.apt
import services.mail
import html as hypertext
import re
from socket import gethostname
import json
from os.path import exists
from services.logger import logger
import typer
import signal
# Its beautiful ;)
EMAIL_REGEX = re.compile(r"""
(?:[a-z0-9!#$%&'*+\x2f=?^_`\x7b-\x7d~\x2d]+(?:\.[a-z0-9!#$%&'*+\x2f=?^_`\x7b-\x7d~\x2d]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9\x2d]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9\x2d]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9\x2d]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])
""", re.VERBOSE | re.IGNORECASE)
# Change for version of Vigil
__version__ = "1.0.0"
shutdown_flag = False
def handle_shutdown(signum, frame):
global shutdown_flag
logger.info("Shutdown signal received; finishing current cycle and exiting...")
shutdown_flag = True
signal.signal(signal.SIGINT, handle_shutdown)
signal.signal(signal.SIGTERM, handle_shutdown)
app = typer.Typer()
def wait_until_interruptible(target_time):
global shutdown_flag
while not shutdown_flag:
now = datetime.now()
remaining = (target_time - now).total_seconds()
if remaining <= 0:
return
# sleep in small chunks so shutdown is responsive
time.sleep(min(remaining, 0.5))
def schedule(mode, hour, minute=0, receiver_email=None):
global shutdown_flag
logger.info(f"Scheduler started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - Mode: {mode}")
while not shutdown_flag:
try:
now = datetime.now()
if mode == "daily":
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
elif mode == "hourly":
target = now.replace(minute=minute, second=0, microsecond=0)
if target <= now:
target += timedelta(hours=1)
else:
raise ValueError("Mode must be 'daily' or 'hourly'.")
logger.info(f"Next check scheduled for {target.strftime('%Y-%m-%d %H:%M:%S')}")
wait_until_interruptible(target)
if shutdown_flag:
break
generate_email(receiver_email)
except Exception as e:
logger.error(f"Error in scheduled task: {e}", exc_info=True)
time.sleep(60)
logger.info("Scheduler exited cleanly.")
@app.command()
def serve(receiver_email: str = None, check_hourly: bool = False, check_daily: str = "18"):
"""
Runs scheduled checks and sends emails.
- check_hourly=True → every hour
- check_daily=int → hour of day for daily email
"""
if check_hourly and check_daily not in [False, "false", "False", None]:
check_daily = False
if check_hourly:
schedule("hourly", 0, receiver_email=receiver_email)
elif check_daily:
# Convert CLI string to int
if check_daily.lower() == "true":
hour = 18
else:
try:
hour = int(check_daily)
except ValueError:
logger.error(f"Invalid check_daily value: {check_daily}")
raise typer.Exit(1)
schedule("daily", hour, receiver_email=receiver_email)
else:
logger.error("No schedule selected.")
@app.command()
def now(receiver_email: str = None):
"""
Checks for apt upgrades and emails them then exits.
"""
generate_email(receiver_email)
def generate_email(receiver_emails: Union[list, None]):
services.apt.require_root()
if not services.apt.detect_apt():
logger.error("Apt not found on this system.")
raise typer.Exit(1)
try:
updates = services.apt.check_updates()
except Exception as e:
logger.error(f"Failed to check for updates: {e}", exc_info=True)
raise typer.Exit(1)
receiver_email = []
if receiver_emails:
if "," in receiver_emails:
receiver_email = [email.strip() for email in receiver_emails.split(",")]
else:
receiver_email = [receiver_emails.strip()]
# If no CLI emails are provided
if not receiver_email:
users_json_path = "/etc/vigil/users.json"
if exists(users_json_path):
try:
with open(users_json_path, "r") as f:
data = json.load(f)
if not isinstance(data, list):
logger.error("users.json must contain a list of email addresses")
raise typer.Exit(1)
receiver_email = [str(email).strip() for email in data]
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in users.json: {e}")
raise typer.Exit(1)
else:
logger.error("No email address provided and /etc/vigil/users.json not found.")
raise typer.Exit(1)
# Validate email address(es)
for email in receiver_email:
if not EMAIL_REGEX.match(email):
logger.error(f"Invalid email address: {email}")
raise typer.Exit(1)
# Get how many security updates are available
security_updates = 0
for package in updates:
if updates[package]["security"]:
security_updates += 1
# Get how many total updates are available
total_updates = len(updates)
# Get how many general updates are available
general_updates = total_updates - security_updates
# Check if there are any updates at all
if total_updates == 0:
logger.info("No updates available.")
return
# Get system hostname
hostname = gethostname()
# Create email subject
subject = f"{hostname}> {security_updates} security updates, {general_updates} general updates available"
# Build security updates section
security_chunks = ""
for package in updates:
if not updates[package]["security"]:
continue
chunk = f'''
| {hypertext.escape(package)} |
{hypertext.escape(updates[package]["installed_version"])} |
{hypertext.escape(updates[package]["latest_version"])} |
{hypertext.escape(updates[package]["repo"])} |
'''
security_chunks += chunk
security = ""
if security_updates > 0:
security = f'''
Security
| Package |
Installed |
Latest |
Repository |
{security_chunks}
'''
# Build general updates section
general_chunks = ""
for package in updates:
if updates[package]["security"]:
continue
chunk = f'''
| {hypertext.escape(package)} |
{hypertext.escape(updates[package]["installed_version"])} |
{hypertext.escape(updates[package]["latest_version"])} |
{hypertext.escape(updates[package]["repo"])} |
'''
general_chunks += chunk
general = ""
if general_updates > 0:
general = f'''
General
| Package |
Installed |
Latest |
Repository |
{general_chunks}
'''
html = security + general
try:
services.mail.send_email(receiver_email, subject, html)
logger.info(f"Update notification sent: {security_updates} security, {general_updates} general updates to {len(receiver_email)} recipient(s)")
except Exception as e:
logger.error(f"Failed to send email notification: {e}", exc_info=True)
raise typer.Exit(1)
def version_callback(value: bool):
if value:
typer.echo(f"Vigil {__version__}")
raise typer.Exit()
@app.callback()
def callback(
version: bool = typer.Option(
None,
"--version",
"-v",
callback=version_callback,
is_eager=True,
help="Show version and exit"
)
):
"""
Checks apt for upgrades and emails them
"""
if __name__ == "__main__":
app()