Compare commits

..

No commits in common. "7ea3b264d137bf7619acc561cef5b92a6c10e070" and "14bd20837867ae347ec60ce7785e029e0cb8217a" have entirely different histories.

4 changed files with 77 additions and 407 deletions

258
README.md
View File

@ -1,257 +1 @@
# Vigil
![Gitea Last Commit](https://img.shields.io/gitea/last-commit/uthmn/vigil?gitea_url=https%3A%2F%2Fgit.uthmn.com&link=https%3A%2F%2Fgit.uthmn.com%2Fufatih%2Fvigil%2Fcommits%2Fbranch%2Fmain)
[![MIT License](https://img.shields.io/badge/License-MIT-green.svg)](https://choosealicense.com/licenses/mit/)
Vigil is a simple Python based program for set and forget servers that checks for apt updates at fixed intervals and emails you when updates are available, allowing you to upgrade the server using replies.
Vigil is designed for stability and security, avoiding keeping credentials in the memory of the server for extended periods and helping keep your server up to date.
## Table of Contents
- [Getting Started](#getting-started)
- [Prerequisites](#prerequisites)
- [Installation and Running](#installation-and-running)
- [Installation](#installation)
- [Using the install script (recommended)](#using-the-install-script-recommended)
- [Using pipx](#using-pipx)
- [Using Docker](#using-docker)
- [Usage](#usage)
- [Running Vigil](#running-vigil)
- [1. Scheduled Mode (serve)](#1-scheduled-mode-serve)
- [2. Immediate Mode (now)](#2-immediate-mode-now)
- [Email Configuration](#email-configuration)
- [Systemd Integration](#systemd-integration)
- [Notes](#notes)
## Getting Started
These instructions will help you run Vigil on your own server using the git repository, if you want to use this software in production, refer to [Installation](#installation) instead.
### Prerequisites
- Python 3.8+
- Git
- SMTP server
- IMAP server
### Installation and Running
First clone the repository into a new directory:
```bash
git clone https://git.uthmn.com/ufatih/vigil.git
```
Now change into the directory:
```bash
cd vigil
```
Then create a `.env` file in the root directory with the following contents:
```bash
SMTP_SERVER=
SMTP_PORT=
SMTP_USERNAME=
SMTP_PASSWORD=
IMAP_SERVER=
IMAP_PORT=
IMAP_USERNAME=
IMAP_PASSWORD=
```
Fill in the values for the environment variables with the correct values for your SMTP and IMAP servers.
Now create a `users.json` file in the root directory with a list of email addresses to receive and manage server updates, i.e:
```json
[
"email1@example.com",
"email2@example.com"
]
```
Now we can initialise a new virtual environment and install the required packages:
```bash
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```
Finally, we can run the program:
```bash
sudo .venv/bin/python3 main.py serve
```
This will check for updates every hour and send an email to the provided email addresses.
## Installation
### Using the install script (recommended)
The easiest way to install Vigil is by using the interactive install script.
This script automatically checks for required dependencies, offers to install any that are missing, and can optionally configure Vigil to start automatically on boot via **systemd**.
Run the following command:
```bash
curl -sSL https://git.uthmn.com/ufatih/vigil/raw/main/install.sh | sudo bash
```
You can also review the script before executing it:
```bash
curl -sSL https://git.uthmn.com/ufatih/vigil/raw/main/install.sh -o install.sh
less install.sh
sudo bash install.sh
rm install.sh
```
You can now run Vigil using the following command:
```bash
vigil
```
### Using pipx
If you prefer to use pipx, you can install Vigil using the following command:
```bash
pipx install vigil
```
You can now run Vigil using the following command:
```bash
vigil
```
### Using docker
> [!WARN]
> The docker run command will not work yet and needs to be updated when released.
If you prefer to use Docker, you can download the latest Docker image from the [Docker Hub](https://hub.docker.com/r/uthmn/vigil)
```bash
docker pull uthmn/vigil
```
You can now run Vigil using the following command:
```bash
docker run -it --rm uthmn/vigil
```
## Usage
Once installed, Vigil can be run in two modes:
**Scheduled mode (recommended)**: Runs continuously and checks for updates at a fixed interval (hourly or daily).
**Immediate mode**: Performs a one-off update check and exits.
### Running Vigil
If installed via the install script or pipx, simply use:
```bash
vigil serve
```
If running from the repository directly:
```bash
sudo .venv/bin/python3 main.py serve
```
Vigil provides two main commands:
### 1. Scheduled Mode (serve)
Runs Vigil continuously, checking for updates based on the provided schedule and emailing the results.
```bash
vigil serve [OPTIONS]
```
#### Options (serve)
Option | Description | Default
--- | --- | ---
--receiver-email | Comma-separated list of email addresses to send updates to | Reads from users.json if not provided
--check-hourly | Enables hourly update checks | False
--check-daily <hour> | Enables daily update checks at a specific UTC hour (e.g. --check-daily 18) | 18
#### Examples
Check for updates every hour:
```bash
sudo vigil serve --check-hourly --receiver-email "admin@example.com, security@example.com"
```
Check for updates daily at 18:00 UTC:
```bash
sudo vigil serve --check-daily 18
```
### 2. Immediate Mode (now)
Performs a single update check, emails the results, and exits.
```bash
vigil now [OPTIONS]
```
#### Options (now)
Option | Description | Default
--- | --- | ---
--receiver-email | Comma-separated list of email addresses to send updates to | Reads from users.json
#### Example
```bash
sudo vigil now --receiver-email "admin@example.com"
```
### Email Configuration
If --receiver-email is not provided, Vigil looks for a file named users.json in the same directory as the executable, containing a list of recipient email addresses:
```json
[
"email1@example.com",
"email2@example.com"
]
```
### Systemd Integration
If you installed Vigil using the install script, it can optionally create a systemd service for automatic startup at boot.
You can manually control the service as follows:
```bash
sudo systemctl start vigil
sudo systemctl stop vigil
sudo systemctl enable vigil
sudo systemctl status vigil
```
This ensures Vigil runs continuously and automatically checks for updates at your configured interval.
### Notes
Vigil requires root privileges to check and list available APT updates.
When running inside Docker, environment variables and configuration files (.env, users.json) should be mounted or baked into the image.
Logs and errors are printed to standard output (and journalctl if running as a service).
# vigil

32
main.py
View File

@ -19,6 +19,8 @@ from os.path import dirname, join, exists
import typer
# FIXME: Error handling
app = typer.Typer()
def wait_until(target_time):
@ -30,24 +32,20 @@ def wait_until(target_time):
def schedule(mode, hour, minute=0, receiver_email=None):
print(f"Scheduler started at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - Mode: {mode}")
while True:
try:
now = datetime.now()
if mode == "daily":
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
elif mode == "hourly":
target = now.replace(minute=minute, second=0, microsecond=0)
if target <= now:
target += timedelta(hours=1)
else:
raise ValueError("Mode must be 'daily' or 'hourly'.")
now = datetime.now()
if mode == "daily":
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
elif mode == "hourly":
target = now.replace(minute=minute, second=0, microsecond=0)
if target <= now:
target += timedelta(hours=1)
else:
raise ValueError("Mode must be 'daily' or 'hourly'.")
wait_until(target)
generate_email(receiver_email)
except Exception as e:
print(f"Error in scheduled task: {e}")
time.sleep(60) # Wait before retrying
wait_until(target)
generate_email(receiver_email)
@app.command()
def serve(receiver_email: str = None, check_hourly: bool = False, check_daily: str = "18"):

View File

@ -27,7 +27,7 @@ def detect_apt():
def check_updates():
# Update package list
try:
subprocess.run(["apt", "update"], capture_output=True, text=True, check=True, timeout=300)
subprocess.run(["apt", "update"], capture_output=True, text=True, check=True)
except FileNotFoundError:
raise FileNotFoundError("apt not found on this system.")
except subprocess.CalledProcessError as e:
@ -39,8 +39,7 @@ def check_updates():
["apt", "list", "--upgradable"],
capture_output=True,
text=True,
check=True,
timeout=60,
check=True
)
upgrades = result.stdout.strip()
lines = [line for line in upgrades.splitlines() if line.strip()]

View File

@ -13,13 +13,7 @@ from email import message_from_bytes
from dotenv import load_dotenv
from os import getenv
from os.path import dirname, join, exists
import time
# Exit if .env does not exist
if not exists(join(dirname(__file__), "../.env")):
print("Please create a .env file in the root directory.")
exit(1)
from os.path import dirname, join
# Load environment variables from .env file
dotenv_path = join(dirname(__file__), "../.env")
@ -27,78 +21,16 @@ load_dotenv(dotenv_path)
# SMTP server settings
SMTP_SERVER = getenv("SMTP_SERVER")
SMTP_PORT = getenv("SMTP_PORT")
SMTP_PORT = int(getenv("SMTP_PORT"))
SMTP_USERNAME = getenv("SMTP_USERNAME")
SMTP_PASSWORD = getenv("SMTP_PASSWORD")
# IMAP server settings
IMAP_SERVER = getenv("IMAP_SERVER")
IMAP_PORT = getenv("IMAP_PORT")
IMAP_PORT = int(getenv("IMAP_PORT"))
IMAP_USERNAME = getenv("IMAP_USERNAME")
IMAP_PASSWORD = getenv("IMAP_PASSWORD")
# Retry settings
MAX_RETRIES = 3
RETRY_DELAY = 2 # in seconds
# Check if all environment variables are set
if not SMTP_SERVER or not SMTP_PORT or not SMTP_USERNAME or not SMTP_PASSWORD or not IMAP_SERVER or not IMAP_PORT or not IMAP_USERNAME or not IMAP_PASSWORD:
print("Please set all environment variables in .env file.")
exit(1)
# Make sure SMTP and IMAP ports are integers
SMTP_PORT = int(SMTP_PORT)
IMAP_PORT = int(IMAP_PORT)
def connect_smtp_with_retry():
"""Connect to SMTP server with retry"""
for attempt in range(MAX_RETRIES):
try:
server = smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT)
server.login(SMTP_USERNAME, SMTP_PASSWORD)
return server
except Exception as e:
if attempt < MAX_RETRIES - 1:
wait_time = RETRY_DELAY * (2 ** attempt) # Exponential backoff
print(f"SMTP connection attempt {attempt + 1} failed: {e}")
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
print(f"Failed to connect to SMTP after {MAX_RETRIES} attempts: {e}")
raise
def connect_imap_with_retry():
"""Connect to IMAP server with retry"""
for attempt in range(MAX_RETRIES):
try:
M = IMAP4_SSL(host=IMAP_SERVER, port=IMAP_PORT)
M.login(IMAP_USERNAME, IMAP_PASSWORD)
return M
except Exception as e:
if attempt < MAX_RETRIES - 1:
wait_time = RETRY_DELAY * (2 ** attempt) # Exponential backoff
print(f"IMAP connection attempt {attempt + 1} failed: {e}")
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
print(f"Failed to connect to IMAP after {MAX_RETRIES} attempts: {e}")
raise
# Test all connections
try:
server = connect_smtp_with_retry()
server.quit()
except Exception as e:
print(f"Initial SMTP connection test failed: {e}")
exit(1)
try:
M = connect_imap_with_retry()
M.logout()
except Exception as e:
print(f"Initial IMAP connection test failed: {e}")
exit(1)
def send_email(receiver_emails, subject, body):
sender_email = SMTP_USERNAME
@ -108,14 +40,12 @@ def send_email(receiver_emails, subject, body):
message["From"] = sender_email
message["To"] = ", ".join(receiver_emails)
# Connect using SSL with retry
# Connect using SSL
try:
server = connect_smtp_with_retry()
try:
with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT) as server:
server.login(SMTP_USERNAME, SMTP_PASSWORD)
server.sendmail(sender_email, receiver_emails, message.as_string())
print("HTML email sent successfully!")
finally:
server.quit()
print("HTML email sent successfully!")
except Exception as e:
print(f"Failed to send email: {e}")
@ -123,66 +53,65 @@ def receive_emails(folder="INBOX", limit=50):
emails = {}
try:
M = connect_imap_with_retry()
M = IMAP4_SSL(host=IMAP_SERVER, port=IMAP_PORT)
M.login(IMAP_USERNAME, IMAP_PASSWORD)
M.select(folder)
try:
M.select(folder)
# Get all message IDs
typ, data = M.search(None, 'ALL')
if typ != 'OK' or not data[0]:
print("No messages found.")
return emails
# Get all message IDs
typ, data = M.search(None, 'ALL')
if typ != 'OK' or not data[0]:
print("No messages found.")
return emails
all_ids = data[0].split()
last_ids = all_ids[-limit:] # Take only the last 'limit' messages
all_ids = data[0].split()
last_ids = all_ids[-limit:] # Take only the last 'limit' messages
for num in last_ids:
typ, msg_data = M.fetch(num, '(RFC822)')
if typ != 'OK':
print(f"Failed to fetch message {num}")
continue
for num in last_ids:
typ, msg_data = M.fetch(num, '(RFC822)')
if typ != 'OK':
print(f"Failed to fetch message {num}")
continue
# Parse the email
msg = message_from_bytes(msg_data[0][1])
msg_id = msg.get('Message-ID') or num.decode()
# Parse the email
msg = message_from_bytes(msg_data[0][1])
msg_id = msg.get('Message-ID') or num.decode()
# Extract headers
subject = msg.get('Subject', '')
sender = msg.get('From', '')
date = msg.get('Date', '')
# Extract headers
subject = msg.get('Subject', '')
sender = msg.get('From', '')
date = msg.get('Date', '')
# Extract plain text and HTML
body_text = ""
body_html = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if "attachment" in content_disposition:
continue
payload = part.get_payload(decode=True)
if payload:
payload = payload.decode(errors="ignore")
if content_type == "text/plain":
body_text = payload
elif content_type == "text/html":
body_html = payload
else:
payload = msg.get_payload(decode=True)
# Extract plain text and HTML
body_text = ""
body_html = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get("Content-Disposition"))
if "attachment" in content_disposition:
continue
payload = part.get_payload(decode=True)
if payload:
body_text = payload.decode(errors="ignore")
payload = payload.decode(errors="ignore")
if content_type == "text/plain":
body_text = payload
elif content_type == "text/html":
body_html = payload
else:
payload = msg.get_payload(decode=True)
if payload:
body_text = payload.decode(errors="ignore")
emails[msg_id] = {
"subject": subject,
"from": sender,
"date": date,
"body_text": body_text,
"body_html": body_html
}
finally:
M.close()
M.logout()
emails[msg_id] = {
"subject": subject,
"from": sender,
"date": date,
"body_text": body_text,
"body_html": body_html
}
M.close()
M.logout()
except Exception as e:
print(f"Error receiving emails: {e}")