Inspector / ip_checker

> adding scripts to repo
This commit is contained in:
2023-11-29 00:41:16 -08:00
parent 65714a45d8
commit 1032a63d8c
2 changed files with 483 additions and 0 deletions

273
ip_checker/ip_checker.py Executable file
View File

@ -0,0 +1,273 @@
#!/usr/bin/python3
#
#
#
#
#
import os
import time
import json
import logging
import datetime
import colorlog
import requests
import subprocess
import logging.handlers
from configparser import ConfigParser
from email.mime.text import MIMEText
# Instantiate
config = ConfigParser()
# Parse existing file
config.read('ip_checker_config.ini')
# config file values
app_token = config.get('Pushover', 'APP_TOKEN')
user_key = config.get('Pushover', 'USER_KEY')
nfty_url = config.get('Nfty', 'NFTY_URL')
alert_method = config.get('Settings', 'ALERT_METHOD')
# Docker container's name or ID
container_name = 'qbittorrent-openvpn'
dependent_container_name = 'qbittorrent'
get_ip_address = 'https://ipinfo.io/ip'
interval_seconds = 60
logs_folder = 'logs'
# Create logs folder if it doesn't exist
if not os.path.exists(logs_folder):
os.makedirs(logs_folder)
def get_log_file_path():
return os.path.join(logs_folder, 'ip_checker.log')
def rotate_log_files(log_file):
now = datetime.datetime.now()
timestamp = now.strftime('%Y-%m-%d')
rotated_log_file = f'{timestamp}.log'
rotated_log_file = f'ip_checker_{timestamp}.log'
rotated_log_path = os.path.join(logs_folder, rotated_log_file)
# Rename the current log file to the rotated log file
os.rename(log_file, rotated_log_path)
def format_remaining_time(remaining_time):
minutes, seconds = divmod(remaining_time.seconds, 60)
return f'{minutes} minutes {seconds} seconds'
def get_external_ip():
try:
response = requests.get(f'{get_ip_address}')
response.raise_for_status()
return response.text.strip()
except requests.RequestException as e:
logger.warning(f'[WARN] - Error fetching external IP: {e}')
logger.info(' ')
return None
def get_vpn_container_ip(container_name):
try:
cmd = ["docker", "exec", container_name, "curl", "-s", get_ip_address]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.warning(f'[WARN] - Error fetching VPN container IP: {result.stderr}')
logger.info(' ')
return None
return result.stdout.strip()
except Exception as e:
logger.warning(f'[WARN] - Error: {e}')
logger.info(' ')
return None
def stop_dependent_container(container_name):
"""
Stop the specified Docker container.
"""
try:
subprocess.run(["docker", "stop", container_name], check=True)
logger.info(f'[INFO] - Container {container_name} stopped.')
logger.info(' ')
except subprocess.CalledProcessError as e:
logger.warning(f'[WARN] - Failed stopping container {container_name}: {e}')
logger.info(' ')
def send_ntfy_alert(message, urgent=False):
"""
Send a notification using ntfy.
"""
try:
topic = "autoheal" if urgent else "regular"
response = requests.post(f'{nfty_url}', data=message)
response.raise_for_status()
logger.info('[INFO] - nty notification sent.')
logger.info(' ')
except requests.RequestException as e:
logger.warning(f'[WARN] - Failed to send ntfy notification: {e}')
logger.info(' ')
def send_pushover_alert(app_token, user_key):
# Define notification title and message
title = "** IP CHECKER ALERT **"
message = f"<font color=#ed2c39><b>URGENT!!</b></font> VPN is down! IPs match"
try:
url = "https://api.pushover.net/1/messages.json"
data = {
"token": app_token,
"user": user_key,
"title": title,
"html": "1",
"message": message,
}
response = requests.post(url, data=data)
if response.status_code == 200:
logger.info('[INFO] - Pushover notification sent.')
else:
logger.warning(f'[WARN] - Failed to send Pushover notification: {response.text}')
except Exception as e:
logger.warning(f'[WARN] - Failed to send Pushover notification: {e}')
logger.info(' ')
def send_alert(alert_method):
if alert_method == 'ntfy':
# Send urgent ntfy notification
send_ntfy_alert("Urgent: VPN is down! IPs match.", urgent=True)
elif alert_method == 'pushover':
# Send urgent pushover notification
send_pushover_alert(app_token, user_key)
# Stop the dependent container
stop_dependent_container(dependent_container_name)
logger.critical(f'[CRIT] - Urgent: VPN is down! IPs match.')
logger.info(' ')
def main():
global last_log_date # Declare as global to modify the variable outside local scope
# Variable to keep track of the last log file creation date
last_log_date = datetime.datetime.now().strftime('%Y-%m-%d')
while True:
isp_ip = get_external_ip()
vpn_container_ip = get_vpn_container_ip(container_name)
# Check if a new log file needs to be created after rotation
current_log_date = datetime.datetime.now().strftime('%Y-%m-%d')
if current_log_date != last_log_date:
# Perform log rotation if it's a new day
last_log_date = current_log_date
rotate_log_files(log_file)
# Update the file handler with the new log file path
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
# Remove any existing file handlers and add the new file handler
for old_handler in logger.handlers[:]:
if isinstance(old_handler, logging.FileHandler):
logger.removeHandler(old_handler)
logger.addHandler(file_handler)
if isp_ip and vpn_container_ip:
if isp_ip == vpn_container_ip:
send_alert(alert_method)
else:
# if both isp and vpn dont match just create a log entry that includes date, time, and IP
logger.info(f'[INFO] - VPN IP: {vpn_container_ip}')
logger.info(' ')
# Wait for 2 minutes before next check
time.sleep(120)
if __name__ == "__main__":
# Variable to keep track of the last log file creation date
last_log_date = None
# Set the logging level to INFO
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Create a colorlog formatter
formatter = colorlog.ColoredFormatter(
'%(log_color)s%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %I:%M:%S %p',
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'white,bg_red',
}
)
# Create a stream handler for console output
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# Create a file handler for the current log file
log_file = get_log_file_path()
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# Create a timed rotating file handler for log rotation
rotating_file_handler = logging.handlers.TimedRotatingFileHandler(
log_file,
when='midnight',
interval=1,
backupCount=365,
atTime=datetime.time(0, 0) # Rotate at midnight
)
rotating_file_handler.setFormatter(formatter)
logger.addHandler(rotating_file_handler)
main()