Agent Doctor
`agent_doctor` is a system-level diagnostic agent responsible for monitoring the health and responsiveness of all other agents within the swarm. It acts as a watchdog, periodically performing health checks and reporting the status of each agent's core processes.
⚙️ How it Works
1. **Log Tailing**: The agent actively monitors the system's authentication log file (`/var/log/secure` or `/var/log/auth.log`) for new entries. 2. **Event Parsing**: It specifically looks for lines indicating a successful SSH login ("Accepted"). When a login event is detected, it parses the line to extract the username, source IP address, and authentication method (password or public key). 3. **GeoIP Enrichment**: For each login, it uses a local MaxMind GeoIP database to resolve the source IP address to a geographic location (city and country). 4. **Alerting**: It formats the gathered information—user, IP, location, and time—into a structured alert message and dispatches it to designated alert-handling agents within the swarm.
🧩 Configuration
* **`log_path`** (Default: `/var/log/secure` or `/var/log/auth.log`): The absolute path to the SSH authentication log file. The agent auto-detects the correct path on most systems.
* **`maxmind_db`** (Default: `GeoLite2-City.mmdb`): The path to the MaxMind GeoIP database file used for IP address lookups.
* **`always_alert`** (Default: `1`): When enabled (`1`), an alert is sent for every login. If disabled (`0`), it will only alert once per unique IP address within a 5-minute cooldown period.
* **`geoip_enabled`** (Default: `1`): Enables or disables the GeoIP lookup functionality.
🧭 Directive
matrix_directive = {
"universal_id": "matrix",
"name": "matrix",
"children": [
{
"universal_id": "gatekeeper-1",
"name": "gatekeeper",
"config": {
"always_alert": 0,
"geoip_enabled": 1
}
}
]
}
📦 Source
import sys
import os
sys.path.insert(0, os.getenv("SITE_ROOT"))
sys.path.insert(0, os.getenv("AGENT_PATH"))
import json
import time
import subprocess
import ipaddress
from datetime import datetime
from matrixswarm.core.boot_agent import BootAgent
from matrixswarm.core.utils.swarm_sleep import interruptible_sleep
import geoip2.database
import requests
from matrixswarm.core.class_lib.packet_delivery.utility.encryption.utility.identity import IdentityObject
class Agent(BootAgent):
def __init__(self):
super().__init__()
self.name = "Gatekeeper"
cfg = self.tree_node.get("config", {})
if os.path.exists("/var/log/secure"):
self.log_path = "/var/log/secure"
elif os.path.exists("/var/log/auth.log"):
self.log_path = "/var/log/auth.log"
else:
self.log_path = cfg.get("log_path", "/var/log/secure") # Debian/Ubuntu — change to /var/log/secure on RHEL/CentOS
self.maxmind_db = cfg.get("maxmind_db", "GeoLite2-City.mmdb")
self.geoip_enabled = cfg.get("geoip_enabled", 1)
self.always_alert = bool(cfg.get("always_alert", 1))
self.cooldown_sec = 300
self.last_alerts = {}
cfg_db = str(cfg.get("maxmind_db", "")).strip()
# If it's an absolute path or a path relative to install_path
if cfg_db and os.path.isfile(cfg_db):
self.mmdb_path = cfg_db
else:
self.mmdb_path = os.path.join(self.path_resolution["install_path"], "maxmind", "GeoLite2-City.mmdb")
self.log_dir = os.path.join(self.path_resolution["comm_path"], "gatekeeper")
os.makedirs(self.log_dir, exist_ok=True)
def should_alert(self, key):
if self.always_alert:
return True
now = time.time()
last = self.last_alerts.get(key, 0)
if now - last > self.cooldown_sec:
self.last_alerts[key] = now
return True
return False
def resolve_ip(self, ip):
if not os.path.exists(self.mmdb_path):
self.log(f"[GATEKEEPER][GEOIP] DB not found at {self.mmdb_path}")
return {"ip": ip, "city": None, "region": None, "country": None}
try:
reader = geoip2.database.Reader(self.mmdb_path)
response = reader.city(ip)
return {
"ip": ip,
"city": response.city.name,
"region": response.subdivisions[0].name if response.subdivisions else None,
"country": response.country.name
}
except Exception as e:
self.log(f"[GATEKEEPER][GEOIP][ERROR] {e}")
return {"ip": ip}
def drop_alert(self, info):
pk1 = self.get_delivery_packet("standard.command.packet")
pk1.set_data({"handler": "cmd_send_alert_msg"})
pk2 = self.get_delivery_packet("notify.alert.general")
try:
server_ip = requests.get("https://api.ipify.org").text.strip()
except Exception:
server_ip = "Unknown"
# Force inject message
msg_text = (
f"🛡️ SSH Login Detected\n\n"
f"• Server IP: {server_ip}\n"
f"• User: {info.get('user')}\n"
f"• IP: {info.get('ip')}\n"
f"• Location: {info.get('city')}, {info.get('country')}\n"
f"• Time: {info.get('timestamp')}\n"
f"• Auth: {info.get('auth_method')}\n"
f"• Terminal: {info.get('tty')}"
)
pk2.set_data({
"msg": msg_text,
"universal_id": self.command_line_args.get("universal_id", "unknown"),
"level": "critical",
"cause": "SSH Login Detected",
"origin": self.command_line_args.get("universal_id", "unknown")
})
pk1.set_packet(pk2, "content")
alert_nodes = self.get_nodes_by_role("hive.alert.send_alert_msg")
if not alert_nodes:
self.log("[WATCHDOG][ALERT] No alert-compatible agents found.")
return
for node in alert_nodes:
self.pass_packet(pk1, node["universal_id"])
def tail_log(self):
self.log(f"[GATEKEEPER] Tailing: {self.log_path}")
with subprocess.Popen(["tail", "-n", "0", "-F", self.log_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) as proc:
for line in proc.stdout:
if "Accepted" in line and "from" in line:
try:
timestamp = " ".join(line.strip().split()[0:3])
if "password" in line:
auth_method = "password"
elif "publickey" in line:
auth_method = "public key"
else:
auth_method = "unknown"
user = line.split("for")[1].split("from")[0].strip()
ip = line.split("from")[1].split()[0].strip()
try:
ipaddress.ip_address(ip)
except ValueError:
self.log(f"[GATEKEEPER][SKIP] Invalid IP: {ip}")
return
tty = "unknown"
geo = self.resolve_ip(ip)
alert_data = {
"user": user,
"ip": ip,
"tty": tty,
"auth_method": auth_method,
"timestamp": timestamp,
**geo
}
if self.should_alert(ip):
self.drop_alert(alert_data)
self.persist(alert_data)
except Exception as e:
self.log(f"[GATEKEEPER][PARSER][ERROR] Failed to parse login line: {e}")
def persist(self, data):
fname = f"ssh_{self.today()}.log"
path = os.path.join(self.log_dir, fname)
with open(path, "a", encoding="utf-8") as f:
f.write(json.dumps(data) + "\n")
def today(self):
return datetime.now().strftime("%Y-%m-%d")
def worker(self, config:dict = None, identity:IdentityObject = None):
self.tail_log()
interruptible_sleep(self, 10)
if __name__ == "__main__":
agent = Agent()
agent.boot()
⚙️ How it Works
1. **Swarm Scan**: The doctor begins by scanning the central communication directory to compile a list of all registered agents. 2. **Phantom Detection**: For each agent found, it cross-references with the pod directory to ensure a boot file exists. If an agent's communication files are present but its boot file is missing, it is flagged as a "phantom" and reported. 3. **Beacon Verification**: For legitimate agents, the doctor checks a special `hello.moto` directory for beacon files (e.g., `poke.heartbeat`, `poke.worker`). These files are regularly updated by each agent's active threads. 4. **Consciousness Check**: It reads the timestamp from each beacon file and calculates its age. Based on this age, it determines the status of the thread as: * `✅`: Healthy and responsive. * `⚠️ Stale`: The beacon hasn't been updated recently. * `💥 Expired`: The beacon is significantly out of date. * `💀 Dead`: The thread has explicitly reported a fatal error. 5. **Reporting**: The doctor logs a consolidated status report for every agent in the swarm, providing a clear and immediate overview of the swarm's operational health.
🧩 Configuration
* **`max_allowed_beacon_age`** (Default: `8`): The maximum age in seconds a beacon can be before it is considered "stale". A beacon twice this age is considered "expired".
🧭 Directive
matrix_directive = {
"universal_id": "matrix",
"name": "matrix",
"children": [
{
"universal_id": "agent-doctor-1",
"name": "agent_doctor",
"config": {
"max_allowed_beacon_age": 10
}
}
]
}
📦 Source
# Authored by Daniel F MacDonald and ChatGPT aka The Generals
# Docstrings by Gemini
import sys
import os
sys.path.insert(0, os.getenv("SITE_ROOT"))
sys.path.insert(0, os.getenv("AGENT_PATH"))
import time
import json
from matrixswarm.core.class_lib.packet_delivery.utility.encryption.utility.identity import IdentityObject
from matrixswarm.core.boot_agent import BootAgent
from matrixswarm.core.utils.swarm_sleep import interruptible_sleep
class Agent(BootAgent):
"""
The AgentDoctor is a diagnostic agent that monitors the health and status of all other agents in the swarm.
It periodically checks for agent beacons to ensure they are alive and responsive, reporting any anomalies.
"""
def __init__(self):
"""Initializes the AgentDoctor agent, setting its name and the maximum age for a beacon to be considered valid."""
super().__init__()
self.name = "AgentDoctor"
self.max_allowed_beacon_age = 8 # seconds
def pre_boot(self):
"""Logs a message indicating that the diagnostics module is armed and ready."""
self.log("[DOCTOR] Swarm-wide diagnostics module armed.")
def post_boot(self):
"""Logs messages indicating the start of monitoring and registration with the Matrix."""
self.log("[DOCTOR] Monitoring active threads via intelligent beacon protocol.")
self.log("[IDENTITY] Registering with Matrix...")
#self.dispatch_identity_command()
def is_phantom(self, agent_id):
"""
Checks if an agent is a 'phantom'—meaning its communication directory exists, but its corresponding
pod (and boot file) does not.
Args:
agent_id (str): The universal ID of the agent to check.
Returns:
bool: True if the agent is a phantom, False otherwise.
"""
pod_root = self.path_resolution["pod_path"]
for pod_id in os.listdir(pod_root):
boot_file = os.path.join(pod_root, pod_id, "boot.json")
try:
with open(boot_file, encoding="utf-8") as f:
boot_data = json.load(f)
if boot_data.get("universal_id") == agent_id:
return False
except:
continue
return True
def read_poke_file(self, path):
"""
Reads a 'poke' file which contains the last seen timestamp of an agent's thread.
Args:
path (str): The path to the poke file.
Returns:
dict: A dictionary containing the status and last seen time, or an error.
"""
try:
with open(path, "r", encoding="utf-8") as f:
raw = f.read().strip()
if raw.startswith("{"):
return json.loads(raw)
else:
return {"status": "alive", "last_seen": float(raw)}
except Exception as e:
return {"status": "error", "error": str(e)}
def verify_agent_consciousness(self, agent_id, threads=("heartbeat","worker", "packet_listener")):
"""
Verifies the status of an agent by checking the beacon files for its various threads.
Args:
agent_id (str): The universal ID of the agent to verify.
threads (tuple, optional): A tuple of thread names to check. Defaults to ("heartbeat", "worker", "packet_listener").
Returns:
dict: A dictionary with the status of each checked thread.
"""
comm_path = os.path.join(self.path_resolution["comm_path"], agent_id)
beacon_dir = os.path.join(comm_path, "hello.moto")
now = time.time()
status_report = {}
if not os.path.isdir(beacon_dir):
self.log(f"[DOCTOR] Missing hello.moto folder for {agent_id}")
for thread in threads:
status_report[thread] = "❌ no beacon (no hello.moto)"
return status_report
for thread in threads:
poke_file = os.path.join(beacon_dir, f"poke.{thread}")
if not os.path.exists(poke_file):
status_report[thread] = "❌ no beacon"
continue
beacon = self.read_poke_file(poke_file)
status = beacon.get("status", "unknown")
timeout = beacon.get("timeout", self.max_allowed_beacon_age)
if status == "alive":
age = round(now - beacon.get("last_seen", 0), 2)
if age > timeout * 2:
status_report[thread] = f"💥 reflex expired ({age}s > {timeout}s)"
elif age > timeout:
status_report[thread] = f"⚠️ stale ({age}s)"
else:
status_report[thread] = f"✅ {age}s"
elif status == "dead":
err = beacon.get("error", "no error provided")
status_report[thread] = f"💀 dead: {err}"
elif status == "unused":
status_report[thread] = f"🟦 unused"
else:
status_report[thread] = f"❓ unknown status"
return status_report
def worker(self, config:dict = None, identity:IdentityObject = None):
"""
The main worker loop for the AgentDoctor. It periodically scans all agents in the swarm,
checks their status, and logs a report.
Args:
config (dict, optional): Configuration dictionary. Defaults to None.
identity (IdentityObject, optional): Identity object for the agent. Defaults to None.
"""
self.log("[DOCTOR] Beginning swarm scan...")
agents = self.get_agents_list()
for agent_id in agents:
if agent_id == self.command_line_args.get("universal_id"):
continue
if self.is_phantom(agent_id):
self.log(f"🩺 {agent_id}\n • 👻 phantom agent — comm exists, no pod detected")
continue
status = self.verify_agent_consciousness(agent_id)
log_lines = [f"🩺 {agent_id}"]
for thread, stat in status.items():
log_lines.append(f" • {thread:<16} {stat}")
self.log("\n".join(log_lines))
interruptible_sleep(self, 30)
def get_agents_list(self):
"""
Retrieves a list of all agent IDs from the communication directory.
Returns:
list: A list of agent universal IDs.
"""
comm_path = self.path_resolution.get("comm_path", "/matrix/ai/latest/comm")
agents = []
for agent_id in os.listdir(comm_path):
base = os.path.join(comm_path, agent_id)
if not os.path.isdir(base):
continue
if os.path.isdir(os.path.join(base, "incoming")) or os.path.isdir(os.path.join(base, "hello.moto")):
agents.append(agent_id)
return agents
if __name__ == "__main__":
agent = Agent()
agent.boot()
Comments 0
Category: monitoring
Tags: #monitoring, #watchdog, #swarm, #agent, #devops, #health, #check, #diagnostics, #system, #management, #intelligence, #status
Version: v1.0.0
Author: matrixswarm
Views: 76
Added: July 29, 2025Updated: July 29, 2025