Network Health Monitor
The NetworkHealthMonitor agent is a comprehensive service for observing the host system's network activity. It is designed to periodically check interface status, measure traffic rates, count active connections, log packet errors, and identify the top processes consuming network and system resources.
⚙️ How it Works
The agent runs in a continuous loop, performing a series of checks at a configured interval. Check Interfaces and Rates: The agent gathers statistics for all network interfaces not on the exclusion list. It checks if they are active, their speed, and logs any transmit/receive errors or dropped packets. It also calculates the current transmit (TX) and receive (RX) data rates in Mbps and compares them against configured thresholds. Check Connections: It counts the total number of active network connections and groups them by status (e.g., ESTABLISHED, TIME_WAIT). This total is compared against a connection threshold to detect potential connection floods. Find Top Processes: It identifies the top N processes that are consuming the most resources, sorted by CPU usage, memory, and network I/O. Send Report: If any rate or connection threshold is breached, or if any packet errors are detected, the agent sends a WARNING report containing specific details about the issue. If all checks are within normal bounds, it sends a routine INFO report. All reports include a full set of collected metrics and are sent to the configured report_to_role.
🧩 Configuration
The agent's behavior is driven by its config block in the boot directive.
**check_interval_sec (Default: 30): The time in seconds between each network health check.
**exclude_interfaces (Default: []): A list of network interface names to ignore during checks (e.g., "lo").
**tx_threshold_mbps (Default: 100): The outbound traffic rate in Mbps that will trigger a WARNING report.
**rx_threshold_mbps (Default: 100): The inbound traffic rate in Mbps that will trigger a WARNING report.
**conn_threshold (Default: 1000): The total number of connections that will trigger a WARNING report.
**top_n_procs (Default: 5): The number of top resource-consuming processes to include in the report.
**report_to_role (Default: "hive.forensics.data_feed"): The role that status reports will be sent to.
**report_handler (Default: "cmd_ingest_status_report"): The command handler the receiving agent should use.
🧭 Directive
atrix_directive = {
"universal_id": "matrix",
"name": "matrix",
"children": [
{
"universal_id": "forensic-alpha-1",
"name": "forensic_detective"
# This agent will receive the reports by default
},
{
"universal_id": "net-health-1",
"name": "network_health",
"config": {
"check_interval_sec": 60,
"exclude_interfaces": ["lo"],
"tx_threshold_mbps": 250,
"rx_threshold_mbps": 500,
"conn_threshold": 2000
}
}
]
}
📦 Source
#Authored by Daniel F MacDonald and ChatGPT aka The Generals
import sys
import os
import psutil
import socket
from datetime import datetime
sys.path.insert(0, os.getenv("SITE_ROOT"))
sys.path.insert(0, os.getenv("AGENT_PATH"))
from matrixswarm.core.boot_agent import BootAgent
from matrixswarm.core.utils.swarm_sleep import interruptible_sleep
from matrixswarm.core.class_lib.packet_delivery.utility.encryption.utility.identity import IdentityObject
class Agent(BootAgent):
"""
MatrixSwarm NetworkHealthMonitor agent.
Reports interface status, connections, load, packet errors, and top process hogs.
"""
def __init__(self):
super().__init__()
self.name = "NetworkHealthMonitor"
config = self.tree_node.get("config", {})
self.iface_exclude = set(config.get("exclude_interfaces", []))
self.tx_threshold_mbps = config.get("tx_threshold_mbps", 100) # example: warn if outbound > 100 Mbps
self.rx_threshold_mbps = config.get("rx_threshold_mbps", 100)
self.conn_threshold = config.get("conn_threshold", 1000)
self.top_n_procs = config.get("top_n_procs", 5)
self.check_interval_sec = config.get("check_interval_sec", 30)
self.report_to_role = config.get("report_to_role", "hive.forensics.data_feed")
self.report_handler = config.get("report_handler", "cmd_ingest_status_report")
# For traffic rate measurement
self.prev_net_io = None
self.prev_time = None
self.log("NetworkHealthMonitor initialized.")
def send_status_report(self, status, severity, details, metrics=None):
"""Sends status to the configured role."""
content = {
"handler": self.report_handler,
"content": {
"source_agent": self.name,
"service_name": "system.network",
"status": status,
"details": details,
"severity": severity,
"metrics": metrics or {},
}
}
report_nodes = self.get_nodes_by_role(self.report_to_role)
if not report_nodes:
return
pk = self.get_delivery_packet("standard.command.packet")
pk.set_data(content)
for node in report_nodes:
self.pass_packet(pk, node["universal_id"])
def get_network_summary(self):
"""Collects interface/IP status and errors/drops."""
metrics = {}
if_stats = psutil.net_if_stats()
if_addrs = psutil.net_if_addrs()
if_io = psutil.net_io_counters(pernic=True)
now = datetime.now().isoformat()
# Rate measurement setup
tx_rates = {}
rx_rates = {}
if self.prev_net_io and self.prev_time:
dt = (datetime.now() - self.prev_time).total_seconds()
for iface in if_io:
if iface in self.prev_net_io:
tx_bps = (if_io[iface].bytes_sent - self.prev_net_io[iface].bytes_sent) / dt * 8
rx_bps = (if_io[iface].bytes_recv - self.prev_net_io[iface].bytes_recv) / dt * 8
tx_rates[iface] = round(tx_bps / 1e6, 2) # Mbps
rx_rates[iface] = round(rx_bps / 1e6, 2) # Mbps
summary = []
for iface, stats in if_stats.items():
if iface in self.iface_exclude:
continue
addr_info = [a.address for a in if_addrs.get(iface, []) if a.family == socket.AF_INET]
io = if_io.get(iface)
line = {
"iface": iface,
"ip": addr_info,
"up": stats.isup,
"speed_mbps": stats.speed,
"tx_errs": io.errout if io else 0,
"rx_errs": io.errin if io else 0,
"drops": (io.dropin + io.dropout) if io else 0,
"tx_rate_mbps": tx_rates.get(iface, 0),
"rx_rate_mbps": rx_rates.get(iface, 0),
}
summary.append(line)
metrics["interfaces"] = summary
self.prev_net_io = if_io
self.prev_time = datetime.now()
return metrics
def get_conn_summary(self):
"""Summarizes active TCP/UDP connections."""
conns = psutil.net_connections()
count = len(conns)
by_status = {}
for c in conns:
key = c.status if hasattr(c, 'status') else 'UNKNOWN'
by_status[key] = by_status.get(key, 0) + 1
return {"total_connections": count, "by_status": by_status}
def get_top_process_hogs(self):
"""Returns top processes by network bytes, CPU, and memory."""
proc_list = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_info', 'io_counters']):
try:
net_io = proc.io_counters() if proc.io_counters else None
info = {
"pid": proc.info['pid'],
"name": proc.info['name'],
"cpu_percent": proc.info['cpu_percent'],
"rss_mb": proc.info['memory_info'].rss / (1024 ** 2) if proc.info['memory_info'] else 0,
"write_bytes": net_io.write_bytes if net_io else 0,
"read_bytes": net_io.read_bytes if net_io else 0,
}
proc_list.append(info)
except Exception:
continue
# Sort by cpu_percent, then rss, then write_bytes
proc_list.sort(key=lambda x: (x['cpu_percent'], x['rss_mb'], x['write_bytes']), reverse=True)
return proc_list[:self.top_n_procs]
def worker(self, config: dict = None, identity: IdentityObject = None):
try:
metrics = {}
# 1. Interfaces and rates
net_metrics = self.get_network_summary()
metrics.update(net_metrics)
iface_alerts = []
for iface in net_metrics["interfaces"]:
if iface["tx_rate_mbps"] > self.tx_threshold_mbps:
iface_alerts.append(f"{iface['iface']} outbound {iface['tx_rate_mbps']} Mbps > {self.tx_threshold_mbps} Mbps")
if iface["rx_rate_mbps"] > self.rx_threshold_mbps:
iface_alerts.append(f"{iface['iface']} inbound {iface['rx_rate_mbps']} Mbps > {self.rx_threshold_mbps} Mbps")
if iface["tx_errs"] > 0 or iface["rx_errs"] > 0 or iface["drops"] > 0:
iface_alerts.append(f"{iface['iface']} errors: TX={iface['tx_errs']} RX={iface['rx_errs']} drops={iface['drops']}")
# 2. Connections
conn_metrics = self.get_conn_summary()
metrics.update(conn_metrics)
conn_alert = conn_metrics["total_connections"] > self.conn_threshold
# 3. Top process hogs
metrics["top_procs"] = self.get_top_process_hogs()
# Send alerts or regular report
if iface_alerts or conn_alert:
status = "network_issue"
severity = "WARNING"
details = "; ".join(iface_alerts)
if conn_alert:
details += f"; High connection count: {conn_metrics['total_connections']} > {self.conn_threshold}"
self.send_status_report(status, severity, details, metrics)
else:
# Routine, informational report (could throttle if you only want to send on change/issue)
self.send_status_report("healthy", "INFO", "Network health within normal bounds.", metrics)
except Exception as e:
self.log(f"Error in NetworkHealthMonitor: {e}")
interruptible_sleep(self, self.check_interval_sec)
if __name__ == "__main__":
agent = Agent()
agent.boot()
Comments 0
Category: monitoring
Tags: #agent, #devops, #psutil, #network-health, #network-monitoring, #traffic-analysis, #bandwidth-monitor, #packet-errors, #process-monitoring, #system-administration
Version: v1.0.0
Author: matrixswarm
Views: 93
Added: July 26, 2025Updated: July 26, 2025