#!/usr/bin/python3 # # # Make sure to set you credentials in the ip_checker_config.ini file # # import os import time import json import logging import datetime import colorlog import requests import subprocess import logging.handlers from configparser import ConfigParser from email.mime.text import MIMEText # Instantiate config = ConfigParser() # Parse existing file config.read('ip_checker_config.ini') # config file values app_token = config.get('Pushover', 'APP_TOKEN') user_key = config.get('Pushover', 'USER_KEY') nfty_url = config.get('Nfty', 'NFTY_URL') alert_method = config.get('Settings', 'ALERT_METHOD') # Docker container's name or ID container_name = 'qbittorrent-openvpn' dependent_container_name = 'qbittorrent' get_ip_address = 'https://ipinfo.io/ip' interval_seconds = 60 logs_folder = 'logs' # Create logs folder if it doesn't exist if not os.path.exists(logs_folder): os.makedirs(logs_folder) # Getting the log file path def get_log_file_path(): return os.path.join(logs_folder, 'ip_checker.log') # Rotate and archive log files every 24hrs def rotate_log_files(log_file): now = datetime.datetime.now() timestamp = now.strftime('%Y-%m-%d') rotated_log_file = f'{timestamp}.log' rotated_log_file = f'ip_checker_{timestamp}.log' rotated_log_path = os.path.join(logs_folder, rotated_log_file) # Rename the current log file to the rotated log file os.rename(log_file, rotated_log_path) # Formating time def format_remaining_time(remaining_time): minutes, seconds = divmod(remaining_time.seconds, 60) return f'{minutes} minutes {seconds} seconds' # Getitng external IP address from internet provider def get_external_ip(): try: response = requests.get(f'{get_ip_address}') response.raise_for_status() return response.text.strip() except requests.RequestException as e: logger.warning(f'[WARN] - Error fetching external IP: {e}') logger.info(' ') return None # Getting containers current IP address def get_vpn_container_ip(container_name): try: cmd = ["docker", "exec", container_name, "curl", "-s", get_ip_address] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: logger.warning(f'[WARN] - Error fetching VPN container IP: {result.stderr}') logger.info(' ') return None return result.stdout.strip() except Exception as e: logger.warning(f'[WARN] - Error: {e}') logger.info(' ') return None # Function to stop the container that depends on VPN connnection. def stop_dependent_container(container_name): """ Stop the specified Docker container. """ try: subprocess.run(["docker", "stop", container_name], check=True) logger.info(f'[INFO] - Container {container_name} stopped.') logger.info(' ') except subprocess.CalledProcessError as e: logger.warning(f'[WARN] - Failed stopping container {container_name}: {e}') logger.info(' ') # Notifacation alert for ntfy def send_ntfy_alert(message, urgent=False): """ Send a notification using ntfy. """ try: topic = "autoheal" if urgent else "regular" response = requests.post(f'{nfty_url}', data=message) response.raise_for_status() logger.info('[INFO] - nty notification sent.') logger.info(' ') except requests.RequestException as e: logger.warning(f'[WARN] - Failed to send ntfy notification: {e}') logger.info(' ') # Notifacation alert for pushover def send_pushover_alert(app_token, user_key): # Define notification title and message title = "** IP CHECKER ALERT **" message = f"URGENT!! VPN is down! IPs match" try: url = "https://api.pushover.net/1/messages.json" data = { "token": app_token, "user": user_key, "title": title, "html": "1", "message": message, } response = requests.post(url, data=data) if response.status_code == 200: logger.info('[INFO] - Pushover notification sent.') else: logger.warning(f'[WARN] - Failed to send Pushover notification: {response.text}') except Exception as e: logger.warning(f'[WARN] - Failed to send Pushover notification: {e}') logger.info(' ') # Function to send an alert to the specified notification service def send_alert(alert_method): if alert_method == 'ntfy': # Send urgent ntfy notification send_ntfy_alert("Urgent: VPN is down! IPs match.", urgent=True) elif alert_method == 'pushover': # Send urgent pushover notification send_pushover_alert(app_token, user_key) # Stop the dependent container stop_dependent_container(dependent_container_name) logger.critical(f'[CRIT] - Urgent: VPN is down! IPs match.') logger.info(' ') def main(): global last_log_date # Declare as global to modify the variable outside local scope # Variable to keep track of the last log file creation date last_log_date = datetime.datetime.now().strftime('%Y-%m-%d') while True: isp_ip = get_external_ip() vpn_container_ip = get_vpn_container_ip(container_name) # Check if a new log file needs to be created after rotation current_log_date = datetime.datetime.now().strftime('%Y-%m-%d') if current_log_date != last_log_date: # Perform log rotation if it's a new day last_log_date = current_log_date rotate_log_files(log_file) # Update the file handler with the new log file path file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.INFO) file_handler.setFormatter(formatter) # Remove any existing file handlers and add the new file handler for old_handler in logger.handlers[:]: if isinstance(old_handler, logging.FileHandler): logger.removeHandler(old_handler) logger.addHandler(file_handler) if isp_ip and vpn_container_ip: if isp_ip == vpn_container_ip: send_alert(alert_method) else: # if both isp and vpn dont match just create a log entry that includes date, time, and IP logger.info(f'[INFO] - VPN IP: {vpn_container_ip}') logger.info(' ') # Wait for 2 minutes before next check time.sleep(120) if __name__ == "__main__": # Variable to keep track of the last log file creation date last_log_date = None # Set the logging level to INFO logger = logging.getLogger() logger.setLevel(logging.INFO) # Create a colorlog formatter formatter = colorlog.ColoredFormatter( '%(log_color)s%(asctime)s - %(message)s', datefmt='%Y-%m-%d %I:%M:%S %p', log_colors={ 'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'white,bg_red', } ) # Create a stream handler for console output console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_handler.setFormatter(formatter) logger.addHandler(console_handler) # Create a file handler for the current log file log_file = get_log_file_path() file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.INFO) file_handler.setFormatter(formatter) logger.addHandler(file_handler) # Create a timed rotating file handler for log rotation rotating_file_handler = logging.handlers.TimedRotatingFileHandler( log_file, when='midnight', interval=1, backupCount=365, atTime=datetime.time(0, 0) # Rotate at midnight ) rotating_file_handler.setFormatter(formatter) logger.addHandler(rotating_file_handler) main()