Log Watcher

`log_watcher` is a flexible and robust agent designed to monitor any specified log file in real-time. It tails the file, intelligently classifies new entries based on custom rules, and reports them to the swarm for analysis, making it a foundational sensor for system and application monitoring.
⚙️ How it Works
1. **Log Tailing**: The agent continuously monitors a specified log file for new lines. It is built to handle log rotation gracefully, automatically detecting when a file has been rotated and seamlessly switching to the new file without losing its place. 2. **Severity Classification**: Using a configurable set of rules (`severity_rules`), the agent scans each new log line for keywords. Based on these keywords, it assigns a severity level (`INFO`, `WARNING`, `CRITICAL`) to the event. 3. **Report Dispatch**: Once a line is processed and classified, the agent packages the information—including the original log line, the assigned severity, and the service name—into a standardized status report. 4. **Targeted Reporting**: The report is sent to all agents within the swarm that have the designated `report_to_role` (e.g., `hive.forensics.data_feed`), ensuring that the data is delivered to the correct analytical agents like `forensic_detective`.
🧭 Integration with Forensic Detective
`log_watcher` is a primary data source for `forensic_detective`. This integration allows the swarm to turn raw log data into intelligent, correlated security alerts.
* **Triggering Analysis**: When `log_watcher` sends a report with a `CRITICAL` severity, it triggers `forensic_detective` to begin a new investigation.
* **Specialized Investigators**: The `forensic_detective` uses the `service_name` from the log watcher's report to load a specialized "investigator" module. For `log_watcher`, this is the `generic_log` investigator.
* **Contextual Correlation**: The `generic_log` investigator analyzes the critical log line and correlates it with recent `WARNING` level events from the same log file, providing crucial context about what led up to the critical event. This turns a single error line into a meaningful narrative, such as "Critical error preceded by multiple failed login attempts."
🧩 Configuration
* **`log_path`** (Required): The absolute path to the log file you want to monitor (e.g., `/var/log/auth.log`, `/var/log/nginx/access.log`).
* **`service_name`** (Default: `generic_log`): A unique name for the service generating the log. This name is used by `forensic_detective` to load the correct investigator factory. For this agent, it should typically be kept as `generic_log`.
* **`report_to_role`** (Default: `hive.forensics.data_feed`): The swarm role that should receive the status reports. This should point to your `forensic_detective` agents.
* **`severity_rules`** (Required): A dictionary that defines the keywords used to classify log lines. You can define keywords for `CRITICAL` and `WARNING` levels. Any line not matching these is considered `INFO`.
🧭 Directive
matrix_directive = {
"universal_id": "matrix",
"name": "matrix",
"children": [
{
"universal_id": "auth-log-monitor-1",
"name": "log_watcher",
"config": {
"log_path": "/var/log/auth.log",
"service_name": "generic_log",
"report_to_role": "hive.forensics.data_feed",
"severity_rules": {
"CRITICAL": [
"session opened for user root"
],
"WARNING": [
"failed password",
"invalid user",
"authentication failure"
]
}
}
}
]
}
📦 Source
# log_watcher.py
# Authored by Daniel F MacDonald and Gemini
import os
import sys
sys.path.insert(0, os.getenv("SITE_ROOT"))
sys.path.insert(0, os.getenv("AGENT_PATH"))
import time
from matrixswarm.core.boot_agent import BootAgent
from matrixswarm.core.utils.swarm_sleep import interruptible_sleep
from matrixswarm.core.class_lib.packet_delivery.utility.encryption.utility.identity import IdentityObject
class Agent(BootAgent):
"""
A robust agent that tails a log file, parses new entries, and sends
them as status reports to the swarm for real-time analysis.
"""
def __init__(self):
"""Initializes the agent and its log watching configuration."""
super().__init__()
self.AGENT_VERSION = "1.0.0"
config = self.tree_node.get("config", {})
self.log_path = config.get("log_path")
self.service_name = config.get("service_name", "generic.log")
self.report_to_role = config.get("report_to_role", "hive.forensics.data_feed")
# Rules for classifying log severity based on keywords
self.severity_rules = config.get("severity_rules", {
"CRITICAL": ["fatal", "critical", "segfault", "segmentation fault"],
"WARNING": ["error", "warn", "warning", "denied", "failed"],
"INFO": [] # Default
})
# State tracking for log rotation
self._current_inode = None
self._log_file = None
def post_boot(self):
self.log(f"{self.NAME} v{self.AGENT_VERSION} – your logs tell many secrets...")
def worker_pre(self):
self.log(f"Beginning to watch log file: {self.log_path}")
self._open_log_file()
def worker(self, config: dict = None, identity: IdentityObject = None):
"""
Main worker loop that tails the log file and handles rotation.
"""
try:
if not self.log_path or not os.path.exists(self.log_path):
self.log(f"Log path '{self.log_path}' is invalid or not found. Worker will idle.", level="ERROR")
return
try:
# Check for log rotation
if not self._is_file_still_valid():
self.log("Log rotation detected. Re-opening file handle.", level="INFO")
self._open_log_file()
# Read new lines
line = self._log_file.readline()
if line:
self._process_line(line)
else:
# If no new line, sleep briefly to avoid busy-waiting
interruptible_sleep(self, 1) # CHANGED: Was 0.5, must be an integer
except Exception as e:
self.log("Error during log watch cycle.", error=e, level="ERROR")
interruptible_sleep(self, 10) # Longer sleep on error
except Exception as e:
self.log(error=e, block="main_try")
def _open_log_file(self):
"""Opens or re-opens the log file and seeks to the end."""
try:
if self._log_file:
self._log_file.close()
self._log_file = open(self.log_path, 'r')
self._current_inode = os.fstat(self._log_file.fileno()).st_ino
self._log_file.seek(0, 2) # Go to the end of the file
except Exception as e:
self.log(error=e, block="main_try")
def _is_file_still_valid(self):
"""Checks if the log file has been rotated."""
try:
return os.stat(self.log_path).st_ino == self._current_inode
except FileNotFoundError:
return False
def _process_line(self, line: str):
"""Parses a log line and sends it as a status report."""
try:
line = line.strip()
if not line:
return
severity = "INFO" # Default severity
line_lower = line.lower()
for level, keywords in self.severity_rules.items():
if any(keyword in line_lower for keyword in keywords):
severity = level
break
report = {
"source_agent": self.command_line_args.get("universal_id"),
"service_name": self.service_name,
"status": "log_entry_detected",
"severity": severity,
"details": {
"timestamp": time.time(),
"log_line": line
}
}
self._send_report(report)
except Exception as e:
self.log(error=e, block="main_try")
def _send_report(self, report_content: dict):
"""Constructs and sends a status report packet."""
try:
report_nodes = self.get_nodes_by_role(self.report_to_role)
if not report_nodes:
return
pk = self.get_delivery_packet("standard.command.packet")
pk.set_data({
"handler": "cmd_ingest_status_report",
"content": report_content
})
for node in report_nodes:
self.pass_packet(pk, node["universal_id"])
if self.debug.is_enabled():
self.log(f"Sent '{report_content['severity']}' log entry for '{self.service_name}'.")
except Exception as e:
self.log("Failed to send log report.", error=e, block="_send_report")
if __name__ == "__main__":
agent = Agent()
agent.boot()
Comments 0
Category: monitoring
Tags: #monitoring, #security, #real-time, #forensics, #analysis, #file, #log, #sensor, #logging, #watcher, #rotation, #investigator
Version: v1.0.0
Author: matrixswarm
Views: 2
Added: July 31, 2025Updated: August 3, 2025