Gatekeeper

August 2, 2025 AT 11:01 PM (updated: 1 month ago)

`gatekeeper` is a specialized sentinel agent that monitors SSH login activity. It provides real-time alerts for new SSH sessions, enriched with geographic location data to help identify unauthorized access attempts.

⚙️ How it Works

1. **Log Tailing**: The agent actively monitors the system's authentication log file (`/var/log/secure` or `/var/log/auth.log`) for new entries. 2. **Event Parsing**: It specifically looks for lines indicating a successful SSH login ("Accepted"). When a login event is detected, it parses the line to extract the username, source IP address, and authentication method (password or public key). 3. **GeoIP Enrichment**: For each login, it uses a local MaxMind GeoIP database to resolve the source IP address to a geographic location (city and country). 4. **Alerting**: It formats the gathered information—user, IP, location, and time—into a structured alert message and dispatches it to designated alert-handling agents within the swarm.

🧩 Configuration

* **`log_path`** (Default: `/var/log/secure` or `/var/log/auth.log`): The absolute path to the SSH authentication log file. The agent auto-detects the correct path on most systems.
* **`maxmind_db`** (Default: `GeoLite2-City.mmdb`): The path to the MaxMind GeoIP database file used for IP address lookups.
* **`always_alert`** (Default: `1`): When enabled (`1`), an alert is sent for every login. If disabled (`0`), it will only alert once per unique IP address within a 5-minute cooldown period.
* **`geoip_enabled`** (Default: `1`): Enables or disables the GeoIP lookup functionality.

🧭 Directive

matrix_directive = {
    "universal_id": "matrix",
    "name": "matrix",
    "children": [
        {
            "universal_id": "gatekeeper-1",
            "name": "gatekeeper",
            "config": {
                "always_alert": 0,
                "geoip_enabled": 1
            }
        }
    ]
}

📦 Source

# Authored by Daniel F MacDonald and ChatGPT aka The Generals
# Docstrings by Gemini
import sys
import os
sys.path.insert(0, os.getenv("SITE_ROOT"))
sys.path.insert(0, os.getenv("AGENT_PATH"))
import json
import time
import subprocess
import ipaddress
from datetime import datetime
from matrixswarm.core.boot_agent import BootAgent
from matrixswarm.core.utils.swarm_sleep import interruptible_sleep
import geoip2.database
import requests
from matrixswarm.core.class_lib.packet_delivery.utility.encryption.utility.identity import IdentityObject

class Agent(BootAgent):
    def __init__(self):
        super().__init__()
        self.name = "Gatekeeper"
        cfg = self.tree_node.get("config", {})

        if os.path.exists("/var/log/secure"):
            self.log_path = "/var/log/secure"
        elif os.path.exists("/var/log/auth.log"):
            self.log_path = "/var/log/auth.log"
        else:
            self.log_path = cfg.get("log_path", "/var/log/secure") # Debian/Ubuntu — change to /var/log/secure on RHEL/CentOS

        self.maxmind_db = cfg.get("maxmind_db", "GeoLite2-City.mmdb")
        self.geoip_enabled = cfg.get("geoip_enabled", 1)
        self.always_alert = bool(cfg.get("always_alert", 1))
        self.cooldown_sec = 300
        self.last_alerts = {}

        cfg_db = str(cfg.get("maxmind_db", "")).strip()

        # If it's an absolute path or a path relative to install_path
        if cfg_db and os.path.isfile(cfg_db):
            self.mmdb_path = cfg_db
        else:
            self.mmdb_path = os.path.join(self.path_resolution["install_path"], "maxmind", "GeoLite2-City.mmdb")

        self.log_dir = os.path.join(self.path_resolution["comm_path"], "gatekeeper")
        os.makedirs(self.log_dir, exist_ok=True)

    def should_alert(self, key):

        if self.always_alert:
            return True

        now = time.time()
        last = self.last_alerts.get(key, 0)
        if now - last > self.cooldown_sec:
            self.last_alerts[key] = now
            return True
        return False

    def resolve_ip(self, ip):
        if not os.path.exists(self.mmdb_path):
            self.log(f"[GATEKEEPER][GEOIP] DB not found at {self.mmdb_path}")
            return {"ip": ip, "city": None, "region": None, "country": None}

        try:
            reader = geoip2.database.Reader(self.mmdb_path)
            response = reader.city(ip)
            return {
                "ip": ip,
                "city": response.city.name,
                "region": response.subdivisions[0].name if response.subdivisions else None,
                "country": response.country.name
            }
        except Exception as e:
            self.log(f"[GATEKEEPER][GEOIP][ERROR] {e}")
            return {"ip": ip}

    def drop_alert(self, info):

        pk1 = self.get_delivery_packet("standard.command.packet")
        pk1.set_data({"handler": "cmd_send_alert_msg"})

        pk2 = self.get_delivery_packet("notify.alert.general")

        try:
            server_ip = requests.get("https://api.ipify.org").text.strip()
        except Exception:
            server_ip = "Unknown"

        # Force inject message
        msg_text = (
            f"🛡️ SSH Login Detected\n\n"
            f"• Server IP: {server_ip}\n"
            f"• User: {info.get('user')}\n"
            f"• IP: {info.get('ip')}\n"
            f"• Location: {info.get('city')}, {info.get('country')}\n"
            f"• Time: {info.get('timestamp')}\n"
            f"• Auth: {info.get('auth_method')}\n"
            f"• Terminal: {info.get('tty')}"
        )

        pk2.set_data({
            "msg": msg_text,
            "universal_id": self.command_line_args.get("universal_id", "unknown"),
            "level": "critical",
            "cause": "SSH Login Detected",
            "origin": self.command_line_args.get("universal_id", "unknown")
        })

        pk1.set_packet(pk2, "content")

        alert_nodes = self.get_nodes_by_role("hive.alert.send_alert_msg")
        if not alert_nodes:
            self.log("[WATCHDOG][ALERT] No alert-compatible agents found.")
            return

        for node in alert_nodes:
            self.pass_packet(pk1, node["universal_id"])

    def tail_log(self):
        self.log(f"[GATEKEEPER] Tailing: {self.log_path}")
        with subprocess.Popen(["tail", "-n", "0", "-F", self.log_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) as proc:
            for line in proc.stdout:
                if "Accepted" in line and "from" in line:
                    try:
                        timestamp = " ".join(line.strip().split()[0:3])
                        if "password" in line:
                            auth_method = "password"
                        elif "publickey" in line:
                            auth_method = "public key"
                        else:
                            auth_method = "unknown"

                        user = line.split("for")[1].split("from")[0].strip()
                        ip = line.split("from")[1].split()[0].strip()

                        try:
                            ipaddress.ip_address(ip)
                        except ValueError:
                            self.log(f"[GATEKEEPER][SKIP] Invalid IP: {ip}")
                            return

                        tty = "unknown"
                        geo = self.resolve_ip(ip)
                        alert_data = {
                            "user": user,
                            "ip": ip,
                            "tty": tty,
                            "auth_method": auth_method,
                            "timestamp": timestamp,
                            **geo
                        }

                        if self.should_alert(ip):
                            self.drop_alert(alert_data)

                        self.persist(alert_data)

                    except Exception as e:
                        self.log(f"[GATEKEEPER][PARSER][ERROR] Failed to parse login line: {e}")

    def persist(self, data):
        fname = f"ssh_{self.today()}.log"
        path = os.path.join(self.log_dir, fname)
        with open(path, "a", encoding="utf-8") as f:
            f.write(json.dumps(data) + "\n")

    def today(self):
        return datetime.now().strftime("%Y-%m-%d")

    def worker(self, config:dict = None, identity:IdentityObject = None):
        self.tail_log()
        interruptible_sleep(self, 10)

if __name__ == "__main__":
    agent = Agent()
    agent.boot()

Comments 0

Category: security

Tags: #security, #agent, #automation, #cybersecurity, #ssh, #sentinel, #ssh-monitoring, #login-alerts, #access-control, #real-time-alerts, #geolocation, #security-auditing, #threat-detection, #infosec

Version: v1.0.0

Author: matrixswarm

Views: 62

Added: August 2, 2025

Updated: August 2, 2025