Permissions Guardian

`permissions_guardian` is a mission-based agent responsible for enforcing file and directory permissions across designated paths. It detects discrepancies against a configured policy and optionally corrects them. Each run is single-pass and stateful, leaving behind a verifiable, encrypted change history.
⚙️ How it Works
1. **Config-Driven Scanning**: The agent reads a list of target directories from its `config` block. Each target specifies a path and the desired `file_mode` and `dir_mode` (e.g., `0o644` for files, `0o755` for directories). 2. **Recursive Traversal**: It walks each target directory recursively, comparing the current permissions of each file and subdirectory to the policy. 3. **Permission Enforcement**: If `log_only` is `False`, the agent applies the correct permissions via `chmod`. Otherwise, it only logs discrepancies. 4. **Encrypted History**: Each scan’s results—including what was changed or flagged—are saved into a persistent, encrypted `.state/scan_history.json.enc` file using AES-GCM. 5. **Receipt Drop**: On completion, a `mission.complete` receipt is written to signal to parent cron-based orchestrators that the agent has completed its task and can be rescheduled.
🧩 Configuration
* **`is_cron_job`** (Default: `1`): Indicates the agent is designed to run in a cron-managed loop.
* **`cron_interval_sec`** (Default: `300`): Interval in seconds between cron invocations.
* **`log_only`** (Default: `1`): When enabled, the agent logs permission issues without applying fixes.
* **`targets`** (Required): A list of dictionaries, each specifying:
- `path`: The target path key (e.g., `agent_path`, `core_path`, `comm_path`).
- `file_mode`: Desired permissions for files (e.g., `420` == `0o644`).
- `dir_mode`: Desired permissions for directories (e.g., `493` == `0o755`).
🧭 Directive
"universal_id": "perm-guardian-1",
"name": "permissions_guardian",
"config": {
"is_cron_job": 1,
"cron_interval_sec": 300,
"log_only": 1,
"targets": [
{"path": "agent_path", "dir_mode": 493, "file_mode": 420},
{"path": "core_path", "dir_mode": 493, "file_mode": 420},
{"path": "comm_path", "dir_mode": 509, "file_mode": 436}
]
}
📦 Source
# permissions_guardian.py
# Authored by Daniel F MacDonald and Gemini
# Refactored to include persistent, encrypted history and refined cron signals.
# ChatGPT added code enhancements.
import sys
import os
sys.path.insert(0, os.getenv("SITE_ROOT"))
sys.path.insert(0, os.getenv("AGENT_PATH"))
import stat
import json
import time
import base64
from Crypto.Cipher import AES
from matrixswarm.core.boot_agent import BootAgent
from matrixswarm.core.class_lib.packet_delivery.utility.encryption.utility.identity import IdentityObject
class Agent(BootAgent):
"""
A mission-based agent that enforces file and directory permissions.
This agent scans specified directory paths, checks the permissions of all
files and subdirectories against a defined policy, and corrects any
discrepancies. It maintains an encrypted history of all changes made.
After a single run, it terminates itself, designed to be managed and
re-activated by a parent 'cron_manager' agent.
Attributes:
targets (list): A list of dictionaries, where each dictionary defines
a target path and the desired permissions for its files/dirs.
log_only (bool): If True, the agent will only report permission
discrepancies without making any changes to the filesystem.
_state_path (str): The path to a private directory for storing agent state.
_history_file_path (str): The full path to the encrypted scan history file.
"""
def __init__(self):
"""
Initializes the agent by loading its configuration from the directive.
"""
super().__init__()
config = self.tree_node.get("config", {})
self.targets = config.get("targets", [])
self.log_only = config.get("log_only", False)
# Agent state for scan history (encrypted)
self._state_path = os.path.join(self.path_resolution["comm_path_resolved"], ".state")
os.makedirs(self._state_path, exist_ok=True)
self._history_file_path = os.path.join(self._state_path, "scan_history.json.enc")
def worker(self, config: dict = None, identity: IdentityObject = None):
"""
The main operational logic for the agent's single-run mission.
This method orchestrates the entire process: loading the past scan
history, executing the new scan, saving the updated history, and
finally dropping a 'mission.complete' receipt before shutting down.
Args:
config (dict, optional): Live configuration data. Not used by this agent.
identity (IdentityObject, optional): Sender identity. Not used by this agent.
"""
self.log("Permissions Guardian: starting scan.")
scan_history = self._load_history()
for target in self.targets:
scan_history += self._scan_and_enforce(target)
self._save_history(scan_history)
self.log("Permissions scan complete. Dropping mission.complete receipt.")
self._drop_mission_complete_receipt()
self.log("Standing down for the parent to respawn (cron-style).")
self.running = False # Agent will exit after 1 pass
def _get_aes_key(self, which="agent"):
"""
Retrieves the raw bytes of either the agent's private key or the swarm key.
Args:
which (str): The key to retrieve, either "agent" or "swarm".
Returns:
bytes: The raw 32-byte AES key.
"""
if which == "swarm":
key_b64 = self.swarm_key
else:
key_b64 = self.private_key
# Accept bytes (already decoded) or base64 string
if isinstance(key_b64, bytes):
return key_b64
return base64.b64decode(key_b64)
def _encrypt(self, data: bytes, which="agent") -> bytes:
"""
Encrypts data using AES-GCM with the specified key.
Args:
data (bytes): The plaintext data to encrypt.
which (str): The key to use for encryption ("agent" or "swarm").
Returns:
bytes: The encrypted data blob, containing nonce, tag, and ciphertext.
"""
key = self._get_aes_key(which)
cipher = AES.new(key, AES.MODE_GCM)
ciphertext, tag = cipher.encrypt_and_digest(data)
return cipher.nonce + tag + ciphertext
def _decrypt(self, data: bytes, which="agent") -> bytes:
"""
Decrypts an AES-GCM encrypted data blob.
Args:
data (bytes): The encrypted blob (nonce + tag + ciphertext).
which (str): The key to use for decryption ("agent" or "swarm").
Returns:
bytes: The original plaintext data.
"""
key = self._get_aes_key(which)
nonce = data[:16]
tag = data[16:32]
ciphertext = data[32:]
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
return cipher.decrypt_and_verify(ciphertext, tag)
def _load_history(self):
"""
Loads and decrypts the scan history from the agent's state file.
Returns:
list: A list of dictionaries representing past scan events, or an
empty list if no history exists or decryption fails.
"""
if not os.path.exists(self._history_file_path) or os.path.getsize(self._history_file_path) < 32:
return []
try:
with open(self._history_file_path, "rb") as f:
encrypted = f.read()
return json.loads(self._decrypt(encrypted).decode('utf-8'))
except Exception as e:
self.log("History decrypt failed. Starting fresh.", error=e, level="WARNING")
return []
def _save_history(self, history):
"""
Encrypts and saves the scan history to the agent's state file.
Args:
history (list): The list of scan events to save.
"""
try:
with open(self._history_file_path, "wb") as f:
f.write(self._encrypt(json.dumps(history, indent=2).encode('utf-8')))
self.log("Scan history saved and encrypted.")
except Exception as e:
self.log("Failed to save encrypted scan history.", error=e, level="ERROR")
def _scan_and_enforce(self, target):
"""
Scans a single target directory and enforces the defined permissions.
Args:
target (dict): A configuration dictionary for a single scan target.
Returns:
list: A list of log events generated during the scan.
"""
path_key = target.get("path")
if not path_key:
self.log("No path key specified for target. Skipping.", level="WARNING")
return []
root_path = self.path_resolution.get(path_key, path_key)
if not os.path.isdir(root_path):
self.log(f"Root path {root_path} not found. Skipping.", level="WARNING")
return []
dir_mode = target.get("dir_mode", 0o755)
file_mode = target.get("file_mode", 0o644)
log_events = []
for dirpath, dirnames, filenames in os.walk(root_path):
# Fix directories
for d in dirnames:
full_path = os.path.join(dirpath, d)
log_events += self._check_and_set_perm(full_path, dir_mode)
# Fix files
for f in filenames:
full_path = os.path.join(dirpath, f)
log_events += self._check_and_set_perm(full_path, file_mode)
return log_events
def _check_and_set_perm(self, path, desired_mode):
"""
Checks and, if necessary, corrects the permissions of a single file or directory.
Args:
path (str): The full path to the file or directory.
desired_mode (int): The desired permission mode in octal (e.g., 0o755).
Returns:
list: A list containing a log event if a change was made, otherwise empty.
"""
log_events = []
try:
current_mode = stat.S_IMODE(os.stat(path).st_mode)
if current_mode != desired_mode:
action = "chmod" if not self.log_only else "log_only"
log_msg = f"{action.upper()} {path}: {oct(current_mode)} -> {oct(desired_mode)}"
self.log(log_msg)
log_events.append({
"timestamp": time.time(),
"path": path,
"previous_mode": oct(current_mode),
"enforced_mode": oct(desired_mode),
"action_taken": action
})
if not self.log_only:
os.chmod(path, desired_mode)
except Exception as e:
self.log(f"Could not process {path}", error=e, level="ERROR")
return log_events
def _drop_mission_complete_receipt(self):
"""
Creates a 'mission.complete' file to signal successful task completion.
This file acts as a signal to the parent 'cron_manager' agent,
indicating that this run was successful and it is safe to schedule
the next run after the specified interval.
"""
receipt_path = os.path.join(self.path_resolution["comm_path_resolved"], "hello.moto", "mission.complete")
with open(receipt_path, "w") as f:
f.write(str(time.time()))
if __name__ == "__main__":
agent = Agent()
agent.boot()
Comments 0
Category: security
Tags: #security, #system-utility, #permissions-guardian, #file-permissions, #auditing, #cron-job, #mission-based, #chmod
Version: v1.0.0
Author: matrixswarm
Views: 7
Added: July 29, 2025Updated: July 29, 2025