253 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			253 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/python3
 | |
| #
 | |
| #
 | |
| # Make sure to set you credentials in the ip_checker_config.ini file
 | |
| #
 | |
| #
 | |
| 
 | |
| import os
 | |
| import time
 | |
| import json
 | |
| import logging
 | |
| import datetime
 | |
| import colorlog
 | |
| import requests
 | |
| import subprocess
 | |
| import logging.handlers
 | |
| from configparser import ConfigParser
 | |
| from email.mime.text import MIMEText
 | |
| 
 | |
| # Instantiate
 | |
| config = ConfigParser()
 | |
| 
 | |
| # Parse existing file
 | |
| config.read('ip_checker_config.ini')
 | |
| 
 | |
| # config file values
 | |
| app_token = config.get('Pushover', 'APP_TOKEN')
 | |
| user_key = config.get('Pushover', 'USER_KEY')
 | |
| nfty_url = config.get('Nfty', 'NFTY_URL')
 | |
| alert_method = config.get('Settings', 'ALERT_METHOD') 
 | |
| 
 | |
| # Docker container's name or ID
 | |
| container_name = 'qbittorrent-openvpn'
 | |
| dependent_container_name = 'qbittorrent'
 | |
| 
 | |
| get_ip_address = 'https://ipinfo.io/ip'
 | |
| 
 | |
| interval_seconds = 60
 | |
| logs_folder = 'logs'
 | |
| 
 | |
| # Create logs folder if it doesn't exist
 | |
| if not os.path.exists(logs_folder):
 | |
|     os.makedirs(logs_folder)
 | |
| 
 | |
| # Getting the log file path
 | |
| def get_log_file_path():
 | |
|     return os.path.join(logs_folder, 'ip_checker.log')
 | |
| 
 | |
| # Rotate and archive log files every 24hrs
 | |
| def rotate_log_files(log_file):
 | |
|     now = datetime.datetime.now()
 | |
|     timestamp = now.strftime('%Y-%m-%d')
 | |
|     rotated_log_file = f'{timestamp}.log'
 | |
|     rotated_log_file = f'ip_checker_{timestamp}.log'
 | |
|     rotated_log_path = os.path.join(logs_folder, rotated_log_file)
 | |
| 
 | |
|     # Rename the current log file to the rotated log file
 | |
|     os.rename(log_file, rotated_log_path)
 | |
| 
 | |
| # Formating time
 | |
| def format_remaining_time(remaining_time):
 | |
|     minutes, seconds = divmod(remaining_time.seconds, 60)
 | |
|     return f'{minutes} minutes {seconds} seconds'
 | |
| 
 | |
| # Getitng external IP address from internet provider
 | |
| def get_external_ip():
 | |
|     try:
 | |
|         response = requests.get(f'{get_ip_address}')
 | |
|         response.raise_for_status()
 | |
|         return response.text.strip()
 | |
|     except requests.RequestException as e:
 | |
|         logger.warning(f'[WARN] - Error fetching external IP: {e}')
 | |
|         logger.info(' ')
 | |
|         return None
 | |
| 
 | |
| # Getting containers current IP address 
 | |
| def get_vpn_container_ip(container_name):
 | |
|     try:
 | |
|         cmd = ["docker", "exec", container_name, "curl", "-s", get_ip_address]
 | |
|         result = subprocess.run(cmd, capture_output=True, text=True)
 | |
|         if result.returncode != 0:
 | |
|             logger.warning(f'[WARN] - Error fetching VPN container IP: {result.stderr}')
 | |
|             logger.info(' ')
 | |
|             return None
 | |
|         return result.stdout.strip()
 | |
|     except Exception as e:
 | |
|         logger.warning(f'[WARN] - Error: {e}')
 | |
|         logger.info(' ')
 | |
|         return None
 | |
| 
 | |
| # Function to stop the container that depends on VPN connnection. 
 | |
| def stop_dependent_container(container_name):
 | |
|     """
 | |
|     Stop the specified Docker container.
 | |
|     """
 | |
|     try:
 | |
|         subprocess.run(["docker", "stop", container_name], check=True)
 | |
|         logger.info(f'[INFO] - Container {container_name} stopped.')
 | |
|         logger.info(' ')
 | |
|     except subprocess.CalledProcessError as e:
 | |
|         logger.warning(f'[WARN] - Failed stopping container {container_name}: {e}')
 | |
|         logger.info(' ')
 | |
| 
 | |
| # Notifacation alert for ntfy
 | |
| def send_ntfy_alert(message, urgent=False):
 | |
|     """
 | |
|     Send a notification using ntfy.
 | |
|     """
 | |
|     try:
 | |
|         topic = "autoheal" if urgent else "regular"
 | |
|         response = requests.post(f'{nfty_url}', data=message)
 | |
|         response.raise_for_status()
 | |
|         logger.info('[INFO] - nty notification sent.')
 | |
|         logger.info(' ')
 | |
|     except requests.RequestException as e:
 | |
|         logger.warning(f'[WARN] - Failed to send ntfy notification: {e}')
 | |
|         logger.info(' ')
 | |
| 
 | |
| # Notifacation alert for pushover
 | |
| def send_pushover_alert(app_token, user_key):
 | |
| 
 | |
|     # Define notification title and message
 | |
|     title = "** IP CHECKER ALERT **"
 | |
|     message = f"<font color=#ed2c39><b>URGENT!!</b></font> VPN is down! IPs match"
 | |
| 
 | |
|     try:
 | |
|         url = "https://api.pushover.net/1/messages.json"
 | |
|         data = {
 | |
|             "token": app_token,
 | |
|             "user": user_key,
 | |
|             "title": title,
 | |
|             "html": "1",
 | |
|             "message": message,
 | |
|         }
 | |
|         response = requests.post(url, data=data)
 | |
|         if response.status_code == 200:
 | |
|             logger.info('[INFO] - Pushover notification sent.')
 | |
|         else:
 | |
|             logger.warning(f'[WARN] - Failed to send Pushover notification: {response.text}')
 | |
|     except Exception as e:
 | |
|         logger.warning(f'[WARN] - Failed to send Pushover notification: {e}')
 | |
| 
 | |
|     logger.info(' ')
 | |
| 
 | |
| 
 | |
| # Function to send an alert to the specified notification service 
 | |
| def send_alert(alert_method):
 | |
|     if alert_method == 'ntfy':
 | |
|         # Send urgent ntfy notification
 | |
|         send_ntfy_alert("Urgent: VPN is down! IPs match.", urgent=True)
 | |
| 
 | |
|     elif alert_method == 'pushover':
 | |
|         # Send urgent pushover notification
 | |
|         send_pushover_alert(app_token, user_key)
 | |
| 
 | |
|     # Stop the dependent container
 | |
|     stop_dependent_container(dependent_container_name)
 | |
| 
 | |
|     logger.critical(f'[CRIT] - Urgent: VPN is down! IPs match.')
 | |
|     logger.info(' ')
 | |
| 
 | |
| def main():
 | |
|     global last_log_date  # Declare as global to modify the variable outside local scope
 | |
| 
 | |
|     # Variable to keep track of the last log file creation date
 | |
|     last_log_date = datetime.datetime.now().strftime('%Y-%m-%d')
 | |
| 
 | |
|     while True:
 | |
|         isp_ip = get_external_ip()
 | |
|         vpn_container_ip = get_vpn_container_ip(container_name)
 | |
| 
 | |
|         # Check if a new log file needs to be created after rotation
 | |
|         current_log_date = datetime.datetime.now().strftime('%Y-%m-%d')
 | |
|         if current_log_date != last_log_date:
 | |
|             # Perform log rotation if it's a new day
 | |
|             last_log_date = current_log_date
 | |
|             rotate_log_files(log_file)
 | |
| 
 | |
|             # Update the file handler with the new log file path
 | |
|             file_handler = logging.FileHandler(log_file)
 | |
|             file_handler.setLevel(logging.INFO)
 | |
|             file_handler.setFormatter(formatter)
 | |
| 
 | |
|             # Remove any existing file handlers and add the new file handler
 | |
|             for old_handler in logger.handlers[:]:
 | |
|                 if isinstance(old_handler, logging.FileHandler):
 | |
|                     logger.removeHandler(old_handler)
 | |
|             logger.addHandler(file_handler)
 | |
| 
 | |
|         if isp_ip and vpn_container_ip:
 | |
|             if isp_ip == vpn_container_ip:
 | |
|                 send_alert(alert_method)
 | |
|             else:
 | |
|                 # if both isp and vpn dont match just create a log entry that includes date, time, and IP
 | |
|                 logger.info(f'[INFO] - VPN IP: {vpn_container_ip}')
 | |
|                 logger.info(' ')
 | |
| 
 | |
|         # Wait for 2 minutes before next check
 | |
|         time.sleep(120)
 | |
| 
 | |
| if __name__ == "__main__":
 | |
| 
 | |
| 
 | |
|     # Variable to keep track of the last log file creation date
 | |
|     last_log_date = None  
 | |
| 
 | |
|     # Set the logging level to INFO
 | |
|     logger = logging.getLogger()
 | |
|     logger.setLevel(logging.INFO)
 | |
| 
 | |
|     # Create a colorlog formatter
 | |
|     formatter = colorlog.ColoredFormatter(
 | |
|         '%(log_color)s%(asctime)s - %(message)s',
 | |
|         datefmt='%Y-%m-%d %I:%M:%S %p',
 | |
|         log_colors={
 | |
|             'DEBUG': 'cyan',
 | |
|             'INFO': 'green',
 | |
|             'WARNING': 'yellow',
 | |
|             'ERROR': 'red',
 | |
|             'CRITICAL': 'white,bg_red',
 | |
|         }
 | |
|     )
 | |
| 
 | |
|     # Create a stream handler for console output
 | |
|     console_handler = logging.StreamHandler()
 | |
|     console_handler.setLevel(logging.INFO)
 | |
|     console_handler.setFormatter(formatter)
 | |
|     logger.addHandler(console_handler)
 | |
| 
 | |
|     # Create a file handler for the current log file
 | |
|     log_file = get_log_file_path()
 | |
|     file_handler = logging.FileHandler(log_file)
 | |
|     file_handler.setLevel(logging.INFO)
 | |
|     file_handler.setFormatter(formatter)
 | |
|     logger.addHandler(file_handler)
 | |
| 
 | |
|     # Create a timed rotating file handler for log rotation
 | |
|     rotating_file_handler = logging.handlers.TimedRotatingFileHandler(
 | |
|         log_file,
 | |
|         when='midnight',
 | |
|         interval=1,
 | |
|         backupCount=365,
 | |
|         atTime=datetime.time(0, 0)  # Rotate at midnight
 | |
|     )
 | |
|     rotating_file_handler.setFormatter(formatter)
 | |
|     logger.addHandler(rotating_file_handler)
 | |
| 
 | |
| 
 | |
| 
 | |
|     main()
 | |
| 
 | |
| 
 |