MySQL Watchdog

July 29, 2025 AT 04:37 AM (updated: 5 days ago)

The `mysql_watchdog` is a service monitor designed to ensure a MySQL or MariaDB database server remains online and healthy.

⚙️ How it Works

The agent periodically checks the database service using `systemctl is-active`. If the service is down, it will attempt to restart it. It also checks if the service is listening on its configured network port and socket file. If any checks fail, it sends a detailed alert packet to agents with the `hive.alert.send_alert_msg` role. If restarts fail repeatedly, it will disable itself to prevent a restart loop.

🧩 Configuration

* **`check_interval_sec`** (Default: `20`): How often, in seconds, to check the service.
* **`mysql_port`** (Default: `3306`): The network port the database should be listening on.
* **`socket_path`** (Default: `/var/run/mysqld/mysqld.sock`): The path to the MySQL socket file.
* **`service_name`** (Default: `"mysql"`): The name of the service for `systemctl` (use `"mariadb"` for MariaDB).
* **`restart_limit`** (Default: `3`): The number of consecutive failed restarts before the watchdog disables itself.

🧭 Directive

matrix_directive = {
    "universal_id": "matrix",
    "name": "matrix",
    "children": [
        {
            "universal_id": "mysql-sentry-1",
            "name": "mysql_watchdog",
            "config": {
                "service_name": "mariadb"
            }
        }
    ]
}

📦 Source

#Authored by Daniel F MacDonald and ChatGPT aka The Generals
#Gemini, docstring-ing and added code enhancements.
import sys
import os
sys.path.insert(0, os.getenv("SITE_ROOT"))
sys.path.insert(0, os.getenv("AGENT_PATH"))

import requests
import subprocess
import time
from datetime import datetime
from matrixswarm.core.boot_agent import BootAgent
from matrixswarm.core.utils.swarm_sleep import interruptible_sleep
from matrixswarm.core.mixin.agent_summary_mixin import AgentSummaryMixin
from matrixswarm.core.class_lib.packet_delivery.utility.encryption.utility.identity import IdentityObject

class Agent(BootAgent, AgentSummaryMixin):
    """
    A watchdog agent that monitors a MySQL/MariaDB service. It checks if the service is running
    and listening on its designated port, attempts to restart it upon failure, and sends alerts
    and structured data reports about its status.
    """
    def __init__(self):
        """Initializes the MySQLWatchdog agent, setting up configuration parameters and statistics tracking."""
        super().__init__()
        self.name = "MySQLWatchdog"
        self.last_restart = None
        self.failed_restart_count = 0
        self.disabled = False
        cfg = self.tree_node.get("config", {})
        self.interval = cfg.get("check_interval_sec", 20)
        self.mysql_port = cfg.get("mysql_port", 3306)
        self.socket_path = cfg.get("socket_path", "/var/run/mysqld/mysqld.sock")
        self.failed_restart_limit = cfg.get("restart_limit", 3)
        self.alert_role = cfg.get("alert_to_role", None)
        self.report_role = cfg.get("report_to_role", None)
        self.alert_cooldown = cfg.get("alert_cooldown", 300)
        self.alert_thresholds = cfg.get("alert_thresholds", {"uptime_pct_min": 90, "slow_restart_sec": 10})
        self.service_name = cfg.get("service_name", "mysql")
        self.comm_targets = cfg.get("comm_targets", [])
        self.stats = {
            "date": self.today(),
            "restarts": 0,
            "uptime_sec": 0,
            "downtime_sec": 0,
            "last_status": None,
            "last_status_change": time.time(),
            "last_state": None
        }
        self.last_alerts = {}

    def today(self):
        """
        Returns the current date as a string in YYYY-MM-DD format.

        Returns:
            str: The current date.
        """
        return datetime.now().strftime("%Y-%m-%d")

    def is_mysql_running(self):
        """
        Checks if the MySQL service is active using systemd.

        Returns:
            bool: True if the service is running, False otherwise.
        """
        try:
            result = subprocess.run(
                ["systemctl", "is-active", "--quiet", self.service_name],
                check=False
            )
            return result.returncode == 0
        except Exception as e:
            self.log(f"[WATCHDOG][ERROR] Failed to check MySQL status: {e}")
            return False

    def restart_mysql(self):
        """
        Attempts to restart the MySQL service. If restarts fail repeatedly,
        it disables itself to prevent a restart loop.
        """
        if self.disabled:
            self.log("[WATCHDOG][DISABLED] Agent is disabled due to repeated failures.")
            return

        self.log("[WATCHDOG] Attempting to restart MySQL...")
        try:
            subprocess.run(["systemctl", "restart", self.service_name], check=True)
            self.log("[WATCHDOG] ✅ MySQL successfully restarted.")
            self.post_restart_check()
            self.last_restart = time.time()
            self.stats["restarts"] += 1
            self.failed_restart_count = 0  # reset on success
        except Exception as e:
            self.failed_restart_count += 1
            self.log(f"[WATCHDOG][FAIL] Restart failed: {e}")
            if self.failed_restart_count >= self.failed_restart_limit:
                self.disabled = True
                self.send_simple_alert("🛑 MySQL watchdog disabled after repeated restart failures.")
                self.log("[WATCHDOG][DISABLED] Max restart attempts reached. Watchdog disabled.")

    def update_status_metrics(self, is_running):
        """
        Updates the uptime and downtime statistics based on the current service status.

        Args:
            is_running (bool): The current running state of the service.
        """
        now = time.time()
        last = self.stats.get("last_status")
        elapsed = now - self.stats.get("last_status_change", now)
        # If state changed (or first run), update timing
        if last is not None:
            if last:
                self.stats["uptime_sec"] += elapsed
            else:
                self.stats["downtime_sec"] += elapsed

        self.stats["last_status"] = is_running
        self.stats["last_status_change"] = now

    def is_socket_accessible(self):
        """
        Checks if the MySQL socket file exists.

        Returns:
            bool: True if the socket exists, False otherwise.
        """
        return os.path.exists(self.socket_path)

    def is_mysql_listening(self):
        """
        Checks if any process is listening on the configured MySQL port.

        Returns:
            bool: True if the port is being listened on, False otherwise.
        """
        try:
            out = subprocess.check_output(["ss", "-ltn"])
            return f":{self.mysql_port}".encode() in out
        except Exception as e:
            self.log(f"[WATCHDOG][ERROR] Failed to scan ports: {e}")
            return False

    def worker_pre(self):
        """Logs the systemd unit being watched before the main worker loop starts."""
        self.log(f"[WATCHDOG] Watching systemd unit: {self.service_name}")

    def worker(self, config: dict = None, identity: IdentityObject = None):
        """
        The main worker loop. It checks the health of the MySQL service, handles state
        changes (e.g., failure, recovery), triggers restarts, and sends alerts/reports.

        Args:
            config (dict, optional): Configuration dictionary. Defaults to None.
            identity (IdentityObject, optional): Identity object for the agent. Defaults to None.
        """
        # Handle daily summary/report roll
        self.maybe_roll_day("mysql")

        # Health check
        is_healthy = self.is_mysql_running() and self.is_mysql_listening()
        last_state = self.stats.get("last_state")

        # First run: establish baseline
        if last_state is None:
            self.log(f"[WATCHDOG] Establishing baseline status for {self.service_name}...")
            self.stats["last_state"] = is_healthy
            self.stats["last_status_change"] = time.time()
            interruptible_sleep(self, self.interval)
            return

        # If state changed
        if is_healthy != last_state:
            self.update_status_metrics(is_healthy)

            if is_healthy:
                # Service just recovered
                self.log(f"[WATCHDOG] ✅ {self.service_name} has recovered.")
                self.send_simple_alert(f"✅ {self.service_name.capitalize()} has recovered and is now online.")
                self.send_data_report("RECOVERED", "INFO", "Service is back online and ports are open.")
            else:
                # Service just failed
                self.log(f"[WATCHDOG] ❌ {self.service_name} is NOT healthy.")
                diagnostics = self.collect_mysql_diagnostics()
                if self.should_alert("mysql-down"):
                    self.send_simple_alert(f"❌ {self.service_name.capitalize()} is DOWN. Attempting restart...")
                self.send_data_report(
                    status="DOWN", severity="CRITICAL",
                    details=f"Service {self.service_name} is not running or ports are not open.",
                    metrics=diagnostics
                )
                self.restart_mysql()

            # Always update last_state after alert/report
            self.stats["last_state"] = is_healthy
        else:
            # Stable, just accumulate
            self.update_status_metrics(is_healthy)
            if hasattr(self, "debug") and getattr(self.debug, "is_enabled", lambda: False)():
                self.log(f"[WATCHDOG] {'✅' if is_healthy else '❌'} {self.service_name} status is stable.")

        interruptible_sleep(self, self.interval)

    def should_alert(self, key):
        """
        Determines if an alert should be sent based on a cooldown period to avoid alert fatigue.

        Args:
            key (str): A unique key for the alert type.

        Returns:
            bool: True if an alert should be sent, False otherwise.
        """
        now = time.time()
        last = self.last_alerts.get(key, 0)
        if now - last > self.alert_cooldown:
            self.last_alerts[key] = now
            return True
        return False

    def post_restart_check(self):
        """
        Performs a check after a restart attempt to ensure the service
        is listening on its port.
        """
        time.sleep(5)
        if not self.is_mysql_listening():
            self.log(f"[WATCHDOG][CRIT] MySQL restarted but port {self.mysql_port} is still not listening.")
            self.send_simple_alert(f"🚨 MySQL restarted but never began listening on port {self.mysql_port}.")

    def send_simple_alert(self, message):
        """
        Sends a formatted, human-readable alert to agents with the designated alert role.

        Args:
            message (str): The core alert message to send.
        """
        if not self.alert_role: return
        alert_nodes = self.get_nodes_by_role(self.alert_role)
        if not alert_nodes: return

        pk1 = self.get_delivery_packet("standard.command.packet")
        pk1.set_data({"handler": "cmd_send_alert_msg"})
        try: server_ip = requests.get("https://api.ipify.org").text.strip()
        except Exception: server_ip = "Unknown"
        pk2 = self.get_delivery_packet("notify.alert.general")
        pk2.set_data({
            "server_ip": server_ip, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
            "universal_id": self.command_line_args.get("universal_id"), "level": "critical",
            "msg": message, "formatted_msg": f"📦 MySQL Watchdog\n{message}",
            "cause": "MySQL Sentinel Alert", "origin": self.command_line_args.get("universal_id")
        })
        pk1.set_packet(pk2, "content")
        for node in alert_nodes: self.pass_packet(pk1, node["universal_id"])

    def send_data_report(self, status, severity, details="", metrics=None):
        """
        Sends a structured data packet with detailed status and diagnostic information
        to agents with the designated reporting role.

        Args:
            status (str): The current status (e.g., "DOWN", "RECOVERED").
            severity (str): The severity level (e.g., "CRITICAL", "INFO").
            details (str, optional): A human-readable description of the event.
            metrics (dict, optional): A dictionary of diagnostic information.
        """
        if not self.report_role: return
        report_nodes = self.get_nodes_by_role(self.report_role)
        if not report_nodes: return

        pk1 = self.get_delivery_packet("standard.command.packet")
        pk1.set_data({"handler": "cmd_ingest_status_report"})
        pk2 = self.get_delivery_packet("standard.status.event.packet")
        pk2.set_data({
            "source_agent": self.command_line_args.get("universal_id"),
            "service_name": "mysql", "status": status, "details": details,
            "severity": severity, "metrics": metrics if metrics is not None else {}
        })
        pk1.set_packet(pk2, "content")
        for node in report_nodes:
            self.pass_packet(pk1, node["universal_id"])

    def collect_mysql_diagnostics(self):
        """
        Gathers MySQL-specific diagnostics, such as systemd status and recent log entries,
        at the moment of failure.

        Returns:
            dict: A dictionary containing diagnostic information.
        """
        info = {}
        # Get systemd status summary
        try:
            info['systemd_status'] = subprocess.check_output(
                ["systemctl", "status", self.service_name], text=True, stderr=subprocess.STDOUT
            ).strip()
        except Exception as e:
            info['systemd_status'] = f"Error: {e}"
        # Error log tail from common locations
        for log_path in ["/var/log/mysql/error.log", "/var/log/mariadb/mariadb.log"]:
            if os.path.exists(log_path):
                try: info['error_log'] = subprocess.check_output(["tail", "-n", "20", log_path], text=True)
                except Exception as e: info['error_log'] = f"Error: {e}"
                break
        return info

if __name__ == "__main__":
    agent = Agent()
    agent.boot()

Comments 0

Category: monitoring

Tags: #monitoring, #watchdog, #devops, #database, #service, #monitor, #health, #check, #availability, #mysql, #mariadb, #high, #autorestart

Version: v1.0.0

Author: matrixswarm

Views: 6

Added: July 29, 2025

Updated: July 29, 2025