Log Watcher

July 31, 2025 AT 04:31 AM (updated: 5 days ago)

The Log Watcher agent is a system-level sentry designed to patrol critical service logs, extract structured digests, and optionally forward those digests to the Oracle agent for deeper, AI-based anomaly analysis. It runs quietly in the background, generating periodic reports that summarize recent activity across web, mail, and security daemons — then dispatches alerts to the Alert Hive when potential issues surface.

⚙️ How it Works

Collection Phase 1. The agent loads one or more collector modules (httpd, sshd, fail2ban, systemd, postfix, dovecot, etc.). Each collector scans configured log paths, respecting rotate_depth and max_lines, and compiles summaries. Digest Generation 2. Every patrol cycle (default 6 hours) or manual command (cmd_generate_system_log_digest) runs a fresh digest cycle. The agent formats results into a readable, delimited block and sends them to the Phoenix GUI via CallbackDispatcher. Oracle Integration (optional) 3. When enable_oracle is true, the digest is passed to the Oracle agent. Oracle performs semantic analysis (e.g., anomaly detection or summarization) and returns its response to cmd_oracle_response. The Log Watcher attaches that analysis to the digest and republishes it to Phoenix. Alert Dispatch 4. If Oracle times out or detects an anomaly, the agent forwards a notification to any node with the hive.alert role, using the notify.alert.general packet type.

🧭 Integration Phoenix GUI and Oracle ecosystem

The Log Watcher integrates seamlessly with the Phoenix GUI and Oracle ecosystem:

*** Phoenix Panel → log_watcher.log_watcher provides a visual digest feed.

*** Oracle Role → receives prompts for AI analysis.

*** Alert Role → handles escalation or external notifications.

*** It relies on the swarm’s built-in RPC and callback mechanisms, so all output is routed through

🧩 Configuration

* **`log_path`** (Required): The absolute path to the log file you want to monitor (e.g., `/var/log/auth.log`, `/var/log/nginx/access.log`).
* **`service_name`** (Default: `generic_log`): A unique name for the service generating the log. This name is used by `forensic_detective` to load the correct investigator factory. For this agent, it should typically be kept as `generic_log`.
* **`report_to_role`** (Default: `hive.forensics.data_feed`): The swarm role that should receive the status reports. This should point to your `forensic_detective` agents.
* **`severity_rules`** (Required): A dictionary that defines the keywords used to classify log lines. You can define keywords for `CRITICAL` and `WARNING` levels. Any line not matching these is considered `INFO`.

🧭 Directive

{
            "universal_id": "triclops-1",
            "name": "log_watcher",
            "tags": {
                "packet_signing": {
                    "in": True,
                    "out": True
                }
            },
            "config": {
                "ui": {
                    "agent_tree": {"emoji": "👁️👁️👁️"},
                    "panel": ["log_watcher.log_watcher"]
                },
                "service-manager": [{
                    "role": ["logwatch.generate.digest@cmd_generate_system_log_digest"],
                }],
                "collectors": {
                    "httpd": { "paths": ["/var/log/httpd/error_log"], "rotate_depth": 1, "max_lines": 1000 },
                    "sshd": { "paths": ["/var/log/secure"], "rotate_depth": 1, "max_lines": 1000 },
                    "fail2ban": { "paths": ["/var/log/fail2ban.log"], "rotate_depth": 1, "max_lines": 500 },
                    "systemd": { "paths": ["/var/log/messages"], "rotate_depth": 1, "max_lines": 500 },
                    "postfix": { "paths": ["/var/log/maillog"], "rotate_depth": 1, "max_lines": 500 },
                    "dovecot": { "paths": ["/var/log/maillog"], "rotate_depth": 1, "max_lines": 500 }
                },
                "enable_oracle": 1,
                "oracle_role": "hive.oracle",
                "oracle_timeout": 120,
                "patrol_interval_hours": 6,
                "alert_role": "hive.alert"
            }
        },

📦 Source

# Authored by Daniel F MacDonald and ChatGPT-5 aka The Generals
# ChatGPT-3 Docstrings
import os, sys, time, json, uuid, importlib, threading
from Crypto.PublicKey import RSA

sys.path.insert(0, os.getenv("SITE_ROOT"))
sys.path.insert(0, os.getenv("AGENT_PATH"))

from core.python_core.boot_agent import BootAgent
from core.python_core.utils.swarm_sleep import interruptible_sleep
from core.python_core.class_lib.gui.callback_dispatcher import PhoenixCallbackDispatcher, CallbackCtx
from core.python_core.class_lib.packet_delivery.utility.encryption.utility.identity import IdentityObject
from core.python_core.utils.crypto_utils import pem_fix


class Agent(BootAgent):

    def __init__(self):
        super().__init__()


        try:
            cfg = self.tree_node.get("config", {})

            self.AGENT_VERSION = "2.0.0"
            self._interval = cfg.get("check_interval_sec", 30)
            self._patrol_interval = cfg.get("patrol_interval_hours", 6) * 3600
            self._last_patrol = 0
            self.enable_oracle = bool(cfg.get("enable_oracle", 0))
            self.oracle_role = cfg.get("oracle_role", "hive.oracle")
            self.alert_role = cfg.get("alert_role", "hive.alert")
            self._last_alert=0
            self._alert_cooldown=0

            self.oracle_timeout = int(cfg.get("oracle_timeout", 120))
            self.collectors = cfg.get("collectors", ["httpd", "sshd"])
            self._rpc_role = self.tree_node.get("rpc_router_role", "hive.rpc")

            self.oracle_stack = {}

            # encryption
            self._signing_keys = self.tree_node.get('config', {}).get('security', {}).get('signing', {})
            self._has_signing_keys = bool(self._signing_keys.get('privkey')) and bool(self._signing_keys.get('remote_pubkey'))

            if self._has_signing_keys:
                priv_pem = self._signing_keys.get("privkey")
                priv_pem = pem_fix(priv_pem)
                self._signing_key_obj = RSA.import_key(priv_pem.encode() if isinstance(priv_pem, str) else priv_pem)

            self._serial_num = self.tree_node.get('serial', {})

            self._emit_beacon = self.check_for_thread_poke("worker", timeout=self._interval * 2, emit_to_file_interval=10)

        except Exception as e:
            self.log(error=e, block="main_try")

    # ------------------------------------------------------------------
    def post_boot(self):
        self.log(f"{self.NAME} v{self.AGENT_VERSION} — Oracle-aware patrol online.")

    def worker(self, config=None, identity:IdentityObject=None):
        """
        Main scheduled loop.

        Performs one *digest patrol* every ``self._interval`` seconds:
        • Emits a beacon so Phoenix’s liveness watchdog doesn’t mark it stale
        • Flushes expired Oracle requests from ``self.oracle_stack``
        • Triggers :pymeth:`_run_digest_cycle` if the patrol timer has elapsed.

        Notes
        -----
        Runs inside the MatrixSwarm thread harness; any uncaught exception is
        logged and the loop sleeps briefly before continuing.
        """
        try:
            self._emit_beacon()
            now = time.time()
            self._check_oracle_timeouts()
            self.oracle_stack = {k: v for k, v in self.oracle_stack.items()
                                 if now - v["timestamp"] < self.oracle_timeout}

            if self.enable_oracle and (now - self._last_patrol) >= self._patrol_interval:
                self._last_patrol = now
                self._run_digest_cycle(use_oracle=True, patrol=True)
        except Exception as e:
            self.log(error=e, block="main_try", level="ERROR")

        interruptible_sleep(self, self._interval)

    # ------------------------------------------------------------------
    def run_collectors(self):

        results = {}
        for name in self.collectors:
            try:
                mod = importlib.import_module(f"log_watcher.factory.collectors.{name}")
                results[name] = mod.collect()
            except Exception as e:
                self.log(f"[COLLECTOR][ERROR] {name}: {e}", level="ERROR")
        return results

    # ------------------------------------------------------------------
    def cmd_generate_system_log_digest(self, content, packet, identity=None):
        try:
            requested_collectors = content.get("collectors")
            if requested_collectors:
                self.collectors = requested_collectors

            use_oracle = bool(content.get("use_oracle", False))
            session_id = content.get("session_id")
            return_handler = content.get("return_handler")
            include_details = bool(content.get("include_details", True))

            # ⚙ preserve GUI token across whole path
            query_id = f"log_digest_{int(time.time())}"
            token = content.get("token", query_id)

            summary = self.run_collectors()
            formatted = self._render_digest(summary, use_full_format=include_details)

            # immediate digest to GUI, tagged with same token
            self._broadcast_output(
                token=token,
                session_id=session_id,
                offset=0,
                lines=formatted.splitlines(),
                return_handler=return_handler,
            )

            if not use_oracle:
                return

            # store oracle job with same token
            self.oracle_stack[query_id] = {
                "timestamp": time.time(),
                "payload": summary,
                "session_id": session_id,
                "return_handler": return_handler,
                "token": token,
            }

            self._send_to_oracle(summary, query_id)
            threading.Thread(
                target=self._oracle_timeout_watchdog,
                args=(query_id, self.oracle_timeout),
                daemon=True,
            ).start()

        except Exception as e:
            self.log(error=e, block="main_try", level="ERROR")



    def _oracle_timeout_watchdog(self, query_id, timeout):

        try:
            start = time.time()
            while time.time() - start < timeout:
                time.sleep(2)
                if query_id not in self.oracle_stack:
                    return  # Oracle responded in time

            entry = self.oracle_stack.pop(query_id, None)
            if not entry:
                return

            lines = [
                "⚠ Oracle timeout — no response received in time.",
                "Returning base digest without analysis.",
            ]

            self._broadcast_output(
                token=entry["token"],
                session_id=entry["session_id"],
                offset=0,
                lines=lines,
                return_handler=entry["return_handler"],
            )

        except Exception as e:
            self.log(error=e, block="main_try", level="ERROR")


    def _run_digest_cycle(self, use_oracle=False, session_id=None, return_handler=None, patrol=False):
        """Runs collector cycle manually or during patrol, with optional Oracle analysis."""

        try:
            include_details = True  # default verbosity
            token = f"patrol_{uuid.uuid4().hex[:8]}" if patrol else f"manual_{uuid.uuid4().hex[:8]}"

            summary = self.run_collectors()
            formatted = self._render_digest(summary, use_full_format=include_details)

            # Only broadcast to cockpit when it's an interactive (manual) run
            if not patrol and session_id:
                self._broadcast_output(
                    token=token,
                    session_id=session_id,
                    offset=0,
                    lines=formatted.splitlines(),
                    return_handler=return_handler,
                )

            if not use_oracle:
                return

            # prevent stacking duplicate patrol queries
            if patrol and len(self.oracle_stack) > 3:
                self.log("[LOGWATCH] Throttling patrol Oracle submissions.")
                # trim oldest entries
                oldest = sorted(self.oracle_stack.keys())[:-2]
                for oid in oldest:
                    self.oracle_stack.pop(oid, None)
                return

            # avoid duplicate summaries
            if any(v.get("payload") == summary for v in self.oracle_stack.values()):
                self.log("[LOGWATCH] Identical digest already pending; skipping.")
                return

            query_id = f"log_digest_{int(time.time())}"
            self.oracle_stack[query_id] = {
                "timestamp": time.time(),
                "payload": summary,
                "session_id": session_id,
                "return_handler": return_handler or "logwatch_panel.update",
                "token": token,
                "patrol": patrol,
            }

            # send job to Oracle
            self._send_to_oracle(summary, query_id)

            # spin watchdog for timeout
            threading.Thread(
                target=self._oracle_timeout_watchdog,
                args=(query_id, self.oracle_timeout),
                daemon=True,
            ).start()

        except Exception as e:
            self.log(error=e, block="main_try", level="ERROR")

    def _render_digest(self, summary: dict, use_full_format: bool = True) -> str:
        out = []
        for name, section in summary.items():

            try:
                header = f"--------------------- {name.upper()} Begin ------------------------"
                footer = f"---------------------- {name.upper()} End -------------------------"

                out.append(header)
                if not use_full_format:
                    out.append(section.get("summary", "(no summary)"))
                else:
                    for key, values in section.items():
                        if key == "summary":
                            out.append(section["summary"])
                            continue
                        if isinstance(values, list):
                            out.append(f"\n## {key.upper()} ##")
                            out.extend(values)
                out.append(footer)
                out.append("")  # spacing

            except Exception as e:
                self.log(error=e, block="main_try", level="ERROR")

        return "\n".join(out)

    # ------------------------------------------------------------------
    def _send_to_oracle(self, summary, query_id):

        try:
            endpoints = self.get_nodes_by_role(self.oracle_role, return_count=1)
            if not endpoints:
                self.log(f"[LOGWATCH] No Oracle agents found for role {self.oracle_role}.")
                return

            prompt = (
                "Analyze the following system log digest for anomalies or potential issues.\n\n"
                f"{json.dumps(summary, indent=2)}"
            )
            pk = self.get_delivery_packet("standard.command.packet")
            pk.set_data({
                "handler": "cmd_msg_prompt",
                "content": {
                    "prompt": prompt,
                    "query_id": query_id,
                    "target_universal_id": self.command_line_args.get("universal_id"),
                    "return_handler": "cmd_oracle_response",
                },
            })

            for ep in endpoints:
                pk.set_payload_item("handler", ep.get_handler())
                self.pass_packet(pk, ep.get_universal_id())

            self.log(f"[LOGWATCH] Digest sent to Oracle (query {query_id}).")

        except Exception as e:
            self.log(error=e, block="main_try", level="ERROR")


    # ------------------------------------------------------------------
    def cmd_oracle_response(self, content, packet, identity=None):
        try:
            query_id = content.get("query_id")
            entry = self.oracle_stack.pop(query_id, None)
            if not entry:
                self.log(f"[ORACLE] Received unknown query_id: {query_id}")
                return

            token = entry["token"]
            lines = [
                "=== ORACLE ANALYSIS BEGIN ===",
                content.get("response", "").strip(),
                "=== ORACLE ANALYSIS END ===",
            ]
            self._broadcast_output(
                token=token,
                session_id=entry["session_id"],
                offset=0,
                lines=lines,
                return_handler=entry["return_handler"],
            )
        except Exception as e:
            self.log(error=e, block="main_try", level="ERROR")

    # ------------------------------------------------------------------
    def _check_oracle_timeouts(self):
        now = time.time()
        expired = [qid for qid, v in self.oracle_stack.items() if now - v["timestamp"] > self.oracle_timeout]
        try:
            for qid in expired:
                entry = self.oracle_stack.pop(qid)
                msg = f"[LOGWATCH] Oracle timeout — no response for {qid}."
                self._broadcast_output(
                    token=entry.get("token", qid),
                    session_id=entry.get("session_id"),
                    offset=0,
                    lines=[msg],
                    return_handler="logwatch_panel.update",
                )
                self._send_alert(msg, qid)
        except Exception as e:
            self.log(error=e, block="main_try", level="ERROR")


    # ------------------------------------------------------------------
    def _send_alert(self, message, incident_id):
        """Push an alert if Oracle finds or times out on a concern."""

        try:

            if not message or not str(message).strip():
                self.log(f"[LOGWATCH][ALERT] Empty message for incident {incident_id}, skipping.")
                return
            if time.time() - self._last_alert < self._alert_cooldown:
                self.log("[LOGWATCH][ALERT] Cooldown active, skipping duplicate alert.")
                return
            self._last_alert = time.time()

            endpoints = self.get_nodes_by_role(self.alert_role)
            if not endpoints:
                self.log("[LOGWATCH][ALERT] No alert agents found.")
                return

            pk = self.get_delivery_packet("notify.alert.general")
            pk.set_data({
                "msg": message,
                "cause": "Oracle-Analyzed Digest",
                "origin": self.command_line_args.get("universal_id"),
            })
            cmd = self.get_delivery_packet("standard.command.packet")
            cmd.set_data({"handler": "cmd_send_alert_msg"})
            cmd.set_packet(pk, "content")

            for ep in endpoints:
                cmd.set_payload_item("handler", ep.get_handler())
                self.pass_packet(cmd, ep.get_universal_id())

            self.log(f"[LOGWATCH][ALERT] Dispatched alert for {incident_id}.")

        except Exception as e:
            self.log(error=e, block="main_try", level="ERROR")

    # ------------------------------------------------------------------
    def _broadcast_output(self, token, session_id, offset, lines, return_handler):
        try:
            ctx = (
                CallbackCtx(agent=self)
                .set_rpc_role(self._rpc_role)
                .set_response_handler(return_handler)
                .set_confirm_response(True)
                .set_session_id(session_id)
                .set_token(token)
            )
            payload = {
                "session_id": session_id,
                "token": token,
                "start_line": offset,
                "lines": lines,
                "next_offset": offset + len(lines),
                "timestamp": int(time.time()),
            }

            dispatcher = PhoenixCallbackDispatcher(self)
            dispatcher.dispatch(ctx=ctx, content=payload)
        except Exception as e:
            self.log("[LOGWATCH][ERROR] broadcast_output failed", error=e)

if __name__ == "__main__":
    agent = Agent()
    agent.boot()

Comments 0

Category: monitoring

Tags: #monitoring, #security, #real-time, #forensics, #analysis, #file, #log, #sensor, #logging, #watcher, #rotation, #investigator

Version: v1.0.0

Author: matrixswarm

Views: 129

Added: July 31, 2025

Updated: October 27, 2025