Contact Reflex
The contact_reflex agent acts as a secure, intelligent gateway for external messages, such as from a website contact form. It monitors a specified directory for new, encrypted message files. Upon detection, it decrypts the message, optionally uses an Oracle agent for AI-powered spam classification, enriches the data with GeoIP information, and forwards clean messages to a designated operator.
⚙️ How it Works
The agent's primary function is to securely process incoming messages through a multi-step pipeline: Watches Directory: The agent uses an efficient file system watcher (watchdog/inotify) to monitor a directory for new encrypted .json files dropped by an external source (e.g., a PHP script). Decrypts Message: When a new file appears, the agent uses its private RSA key to decrypt a one-time AES key contained within the file's metadata. It then uses this AES key to decrypt the main message payload. Queries Oracle (Optional): If enabled, the agent sends the decrypted message content to an oracle agent. The oracle uses an LLM to classify the message as "spam" or "not_spam". Forwards Clean Message: If the message is classified as clean (or if the oracle is disabled or times out), the agent formats it into a human-readable report, enriches it with the sender's geographic location via a GeoIP lookup, and sends it to an operator via another agent with an alerting role (e.g., discord_relay).
🧩 Configuration
The agent's behavior is controlled by the following options in its config block.
**privkey**: Required. The agent's private RSA key in PEM format. This is used to decrypt incoming messages.
**watched_paths**: A list of absolute directory paths that the agent should monitor for new message files.
**enable_oracle**: (Default: false) Set to true to enable the AI-powered spam filter by sending messages to an oracle agent.
**oracle_timeout**: (Default: 15) The number of seconds to wait for a response from the oracle before timing out. The communication between contact_reflex and oracle is asynchronies. A stack is maintained which uses the oracle_timeout to determine if to dispose of the message after the given interval is surpassed.
**enable_fallback_forward**: (Default: true) If true, messages will be forwarded to the operator even if the oracle times out.
**geoip_enabled**: (Default: true) Set to false to disable enriching messages with the sender's geographic location.
**maxmind_db**: (Optional) The path to a local MaxMind GeoLite2 City database (.mmdb file) for GeoIP lookups.
**alert_to_role**: (Default: "hive.alert.send_alert_msg") The role of the agent(s) that should receive the final formatted message.
🧭 Directive
matrix_directive = {
"universal_id": "matrix",
"name": "matrix",
"children": [
# The contact agent itself
{
"universal_id": "contact-form-1",
"name": "contact_reflex",
"config": {
# The private key for decrypting messages
"privkey": "-----BEGIN RSA PRIVATE KEY-----\\n...\\n-----END RSA PRIVATE KEY-----",
# The directory where the PHP script drops encrypted files
"watched_paths": ["/var/www/my_site/public_html/messages_out"],
# Enable the AI spam filter
"enable_oracle": True
}
},
# An Oracle agent is needed for the spam filter
{
"universal_id": "oracle-1",
"name": "oracle",
"config": {
# Best practice is to set the key via environment variable
"api_key": os.getenv("OPENAI_API_KEY_2")
}
},
# A relay agent to send the final message to an operator
{
"universal_id": "discord-relay-1",
"name": "discord_relay",
"config": {
# ... other config for discord relay
}
}
]
}
📦 Source
# Authored by Daniel F MacDonald and ChatGPT aka The Generals
# Docstrings by Gemini
import sys
import os
sys.path.insert(0, os.getenv("SITE_ROOT"))
sys.path.insert(0, os.getenv("AGENT_PATH"))
import base64
import json
import hashlib
import time
import threading
import requests
import geoip2.database
import bleach
import ipaddress
from Crypto.Cipher import AES, PKCS1_OAEP
from Crypto.PublicKey import RSA
from matrixswarm.core.boot_agent import BootAgent
from matrixswarm.core.utils.swarm_sleep import interruptible_sleep
from matrixswarm.core.class_lib.packet_delivery.utility.encryption.utility.identity import IdentityObject
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
INOTIFY_AVAILABLE = True
# Define the handler class ONLY if the necessary imports succeed
class ContactFileHandler(FileSystemEventHandler):
def __init__(self, agent):
self.agent = agent
def on_created(self, event):
try:
if event.is_directory or not event.src_path.endswith('.json'):
return
self.agent.handle_contact_file(event.src_path)
except Exception as e:
# Use the agent's logger if available, otherwise print
if hasattr(self.agent, 'log'):
self.agent.log(f"Error in FileSystemEventHandler: {e}", level="ERROR")
else:
print(f"[ContactFileHandler][ERROR] {e}")
except ImportError:
# If watchdog is not installed or broken, set the flag and create a
# dummy class so the rest of the agent doesn't crash.
INOTIFY_AVAILABLE = False
class ContactFileHandler:
pass
class Agent(BootAgent):
"""
A secure, intelligent contact form processor for the MatrixSwarm.
This agent acts as a gateway for external messages, such as those from a
website contact form. It monitors a specified directory for new, encrypted
message files. Upon detection, it decrypts the message, optionally uses an
"Oracle" LLM agent for spam classification, enriches the data with GeoIP
information, and forwards clean messages to a designated operator via an
alerting agent.
"""
def __init__(self):
"""
Initializes the agent by loading its configuration and keys.
This method sets up the agent's operational parameters from the boot
directive, including the paths to watch for new messages, the private
RSA key for decryption, and settings for the Oracle spam filter. It
contains a critical guard that will cause the agent to exit if the
private key is not provided in the configuration.
"""
super().__init__()
try:
self.AGENT_VERSION = "1.0.0"
self._private_config = self.tree_node.get("config", {})
self._initialized_from_tree = False
self._watchers = []
self.watch_folder = self.tree_node['config'].get('watch_folder', '/sites/matrixswarm/public_html/outgoing_msgs/')
#key generated using directive encryption
privkey_pem = self.tree_node['config'].get('privkey') # Already loaded from vault
if "\\n" in privkey_pem:
privkey_pem = privkey_pem.replace("\\n", "\n")
self.privkey_for_external_comm = RSA.import_key(privkey_pem)
self.oracle_timeout = int(self._private_config.get('oracle_timeout', 15))
self.pending_evals = {}
self.memory_vectors = {}
self.enable_oracle = bool(self._private_config.get('enable_oracle', 0))
self._suspended = bool(self._private_config.get('suspended', 0))
self.geoip_enabled = self._private_config.get("geoip_enabled", 1)
self.alert_role = self._private_config.get("alert_to_role", "hive.alert.send_alert_msg") # Optional
cfg_db = str(self._private_config.get("maxmind_db", "")).strip()
# If it's an absolute path or a path relative to install_path
if cfg_db and os.path.isfile(cfg_db):
self.mmdb_path = cfg_db
else:
self.mmdb_path = os.path.join(self.path_resolution["install_path"], "maxmind", "GeoLite2-City.mmdb")
# if can't contact oracle, then still forward message
self.enable_fallback_forward = bool(self._private_config.get('enable_fallback_forward', 1))
self.bot_filter = ["buy now", "click here", "viagra", "bitcoin"]
except Exception as e:
self.log(error=e, block="main_try")
def post_boot(self):
"""A one-time setup hook called after the main threads have started."""
self.log(f"{self.NAME} v{self.AGENT_VERSION} – may this email find you in good spirits.")
def cmd_update_agent_config(self):
"""
Stops old watchers and starts new ones based on the current config.
This method is called when a new configuration is received. It ensures
a clean state by terminating any existing file system watchers before
initializing new ones according to the updated settings, such as new
paths to watch or a change in the agent's suspended state.
"""
try:
self._initialized_from_tree = True
self.log(f"[CONFIG] Applying new agent configuration.")
# Stop and clear all existing directory watchers
for watcher in self._watchers:
try:
watcher.stop()
watcher.join() # Wait for thread to terminate
except Exception as e:
self.log(f"Error stopping a watcher: {e}", level="WARNING")
self._watchers = []
if self._private_config.get("suspended", False):
self._suspended = True
self.log("[WATCHER] Agent is suspended per configuration. No watchers will be started.")
return
self._suspended = False
paths = self._private_config.get("watched_paths", [])
for path in paths:
self._start_dir_watcher(path)
except Exception as e:
self.log(error=e, block="main_try")
def worker(self, config:IdentityObject=None, identity=None):
"""
The main throttled loop for the agent.
This method is called repeatedly by the BootAgent's throttled worker
wrapper. Its primary responsibilities are to process any incoming live
configuration changes and to trigger periodic tasks, such as checking
for timed-out requests to the Oracle agent.
Args:
config (dict, optional): A dictionary containing new configuration
data sent to the agent's /config drop zone.
identity (IdentityObject, optional): The verified identity of the
sender of the configuration packet. Not used in this agent.
"""
try:
if config and isinstance(config, dict):
self.log(f"[CONFIG] New configuration packet received.")
# This logic allows for both full and partial config updates
if config.get("partial_config"):
config.pop("partial_config", None)
self._private_config.update(config)
self.log("[WORKER] Partial config merged.")
else:
self._private_config = config
self.log("[WORKER] Full config override applied.")
# Flag that the new config needs to be applied to the watchers
self._initialized_from_tree = False
# If a new config has been loaded, re-initialize the watchers
if not self._initialized_from_tree:
self.cmd_update_agent_config()
# The main loop now only checks for oracle timeouts and other periodic tasks.
# File handling is managed by the watcher threads.
if not self._suspended:
self.check_oracle_responses()
except Exception as e:
self.log(error=e, block="WORKER_LOOP")
interruptible_sleep(self, 10)
def _start_dir_watcher(self, watch_folder):
"""
Starts a file system watcher for a specific directory.
This method chooses the best available watching mechanism. It prefers
the efficient, event-based `watchdog` library (which uses inotify on
Linux) if it's available. If not, it gracefully falls back to a
manual polling thread.
Args:
watch_folder (str): The absolute path to the directory to monitor.
"""
# === GUARD: Validate the path before starting a watcher ===
if not os.path.isdir(watch_folder):
self.log(f"[WATCHER][ERROR] Path does not exist or is not a directory: '{watch_folder}'. Skipping.", level="ERROR")
return
try:
if INOTIFY_AVAILABLE:
observer = Observer()
handler = ContactFileHandler(self)
observer.schedule(handler, watch_folder, recursive=False)
observer.start()
self._watchers.append(observer)
self.log(f"[INOTIFY] Started watching: {watch_folder}")
else:
# Fallback to manual polling if watchdog is not installed
poll_thread = threading.Thread(target=self._poll_dir, args=(watch_folder,), daemon=True)
poll_thread.start()
self._watchers.append(poll_thread) # Store thread to manage it if needed
self.log(f"[POLLING] Started watching: {watch_folder}")
except Exception as e:
self.log(f"Failed to start watcher for '{watch_folder}'", error=e, block="START_WATCHER")
def _poll_dir(self, watch_folder):
"""
Fallback directory polling method if `watchdog` is unavailable.
This method runs in a dedicated thread and periodically scans a directory
for new .json files, providing a less efficient but highly portable
alternative to inotify-based watching.
Args:
watch_folder (str): The absolute path to the directory to poll.
"""
seen_files = set(f for f in os.listdir(watch_folder) if f.endswith('.json'))
while self.running and not self._suspended:
try:
current_files = set(f for f in os.listdir(watch_folder) if f.endswith('.json'))
new_files = current_files - seen_files
for fname in new_files:
self.handle_contact_file(os.path.join(watch_folder, fname))
seen_files = current_files
except Exception as e:
self.log(f"Error during polling of '{watch_folder}': {e}", level="ERROR")
interruptible_sleep(self, 10)
def handle_contact_file(self, fpath):
"""
Securely decrypts, processes, and cleans up a single contact file.
This is the core processing function for new messages. It performs the
two-layer decryption (RSA for the AES key, then AES for the message),
parses the JSON payload, and passes the result to the next stage of
processing. It includes robust error handling to quarantine any file
that is malformed or cannot be decrypted, preventing processing loops.
Args:
fpath (str): The full path to the encrypted .json message file.
"""
# === GUARD: Ensure file exists before trying to open it ===
if not os.path.exists(fpath):
return
try:
with open(fpath, 'r') as f:
blob = json.load(f)
# Decrypt the AES key with our private RSA key
cipher_rsa = PKCS1_OAEP.new(self.privkey_for_external_comm)
aes_key = cipher_rsa.decrypt(base64.b64decode(blob['meta']['encrypted_key']))
# Use the decrypted AES key to decrypt the message
msg_cipher = AES.new(aes_key, AES.MODE_GCM, nonce=base64.b64decode(blob['meta']['nonce']))
plaintext = msg_cipher.decrypt_and_verify(
base64.b64decode(blob['data']), base64.b64decode(blob['meta']['tag'])
)
message = json.loads(plaintext)
# If decryption and parsing succeed, process the message
self.process_new_message(message)
# On full success, remove the file
os.remove(fpath)
self.log(f"Successfully processed and removed: {os.path.basename(fpath)}")
except (json.JSONDecodeError, KeyError, ValueError, TypeError) as e:
# These errors indicate a malformed, invalid, or undecryptable file.
self.log(f"Failed to parse or decrypt file '{os.path.basename(fpath)}'. Error: {e}", level="ERROR")
self._quarantine_file(fpath)
except Exception as e:
# Catch any other unexpected errors during file handling
self.log(f"An unexpected error occurred handling '{os.path.basename(fpath)}'", error=e, level="ERROR")
self._quarantine_file(fpath)
def _quarantine_file(self, fpath):
"""
Moves a problematic file to a 'failed_msgs' directory.
This helper prevents the agent from getting stuck in a loop trying to
process a file that is corrupted or cannot be decrypted.
Args:
fpath (str): The full path to the file to be quarantined.
"""
try:
if not os.path.exists(fpath):
return
base_dir = os.path.dirname(fpath)
quarantine_dir = os.path.join(base_dir, "failed_msgs")
os.makedirs(quarantine_dir, exist_ok=True)
# Move the file
new_path = os.path.join(quarantine_dir, os.path.basename(fpath))
os.rename(fpath, new_path)
self.log(f"Moved problematic file to quarantine: {new_path}", level="WARNING")
except Exception as e:
self.log(f"Could not quarantine file '{os.path.basename(fpath)}'", error=e, level="ERROR")
def process_new_message(self, data):
"""
Analyzes and sanitizes a decrypted message, then decides on the next action.
This method sanitizes the user-provided fields ('name', 'email', 'message')
to prevent cross-site scripting (XSS) attacks. It then fingerprints the
message for spam detection and, based on the agent's configuration,
either sends it to the Oracle for AI-based classification or
forwards it directly to the operator.
Args:
data (dict): The decrypted message payload as a dictionary.
"""
try:
# Sanitize user-provided input to prevent XSS
# Strips any HTML tags and attributes, leaving only plain text.
clean_title = bleach.clean(data.get("title", ""), tags=[], strip=True)
clean_name = bleach.clean(data.get("name", ""), tags=[], strip=True)
clean_email = bleach.clean(data.get("email", ""), tags=[], strip=True)
clean_message = bleach.clean(data.get("message", ""), tags=[], strip=True)
# Update the data dictionary with sanitized content
sanitized_data = data.copy()
sanitized_data["title"] = clean_name
sanitized_data["name"] = clean_name
sanitized_data["email"] = clean_email
sanitized_data["message"] = clean_message
# Use the sanitized message for all further processing
body = sanitized_data.get("message", "")
query_id = f"q_{int(time.time())}"
fingerprint = hashlib.sha256(body.encode()).hexdigest()[:16]
# Check against known spam fingerprints
if fingerprint in self.memory_vectors:
self.log(f"Message discarded; matches known spam fingerprint: {fingerprint}")
return
self.pending_evals[query_id] = {
"data": sanitized_data, # Use the sanitized data
"timestamp": time.time(),
"sent": False,
"fingerprint": fingerprint
}
if self.enable_oracle:
self.send_to_oracle(query_id, body)
else:
# If oracle is disabled, forward the sanitized message directly
self.forward_clean_message(sanitized_data)
except Exception as e:
self.log(f"Error processing new message", error=e)
def send_to_oracle(self, query_id, text):
"""
Constructs and sends a command packet to an Oracle agent.
This method requests an AI-based classification of the message text
to determine if it is spam.
Args:
query_id (str): A unique identifier for this query.
text (str): The message body to be analyzed.
"""
try:
# Standardized Oracle packet
pk_command = self.get_delivery_packet("standard.command.packet")
pk_command.set_data({
"handler": "cmd_msg_prompt", # Standard handler for Oracle text analysis
"timestamp": int(time.time()),
"content": {
"prompt": f"Classify the following message as 'spam' or 'not_spam'.\n\n{text}",
"message_type": "contact_form",
"query_id": query_id,
"target_universal_id": self.command_line_args.get("universal_id", "contact-reflex"),
"return_handler": "cmd_oracle_response",
},
})
# Send to the first agent serving as hive.oracle
for node in self.get_nodes_by_role("hive.oracle","any",1):
self.pass_packet(pk_command, node["universal_id"])
# Mark as sent
self.pending_evals[query_id]['sent'] = True
except Exception as e:
self.log(error=e, block='main_try')
def cmd_oracle_response(self, content, packet, identity=None):
"""
Handles the asynchronous response from the Oracle agent.
This method is a command handler that is triggered when an Oracle
agent sends back its analysis. It interprets the response and either
discards the message as spam or forwards it to the operator.
Args:
content (dict): The payload from the Oracle, containing the
'response' and original 'query_id'.
packet (dict): The raw packet object.
identity (IdentityObject, optional): The verified identity of the
sending Oracle agent.
"""
try:
query_id = content.get("query_id",0)
oracle_response = content.get("response") or content.get("content", {}).get("response")
self.log(f"[CONTACT_REFLEX][ORACLE_RESPONSE] Received: session {query_id}, response: {oracle_response}")
# Locate the original pending eval
record = self.pending_evals.pop(query_id, None)
if not record:
self.log(f"[CONTACT_REFLEX][WARN] No pending eval found for query_id : {query_id}")
return
msg = record['data']
# Interpret oracle's result (e.g. spam/not_spam)
if oracle_response == "spam":
self.memory_vectors[record['fingerprint']] = time.time()
self.log("[CONTACT_REFLEX] Message flagged as spam. Not relayed.")
else:
self.forward_clean_message(msg)
except Exception as e:
self.log(f"[CONTACT_REFLEX][ERROR][cmd_oracle_response] {e}")
def check_oracle_responses(self):
"""
Periodically checks for and handles timed-out Oracle requests.
This method prevents messages from getting stuck if an Oracle agent
fails to respond. If a query has been pending for longer than the
configured `oracle_timeout`, this method will either forward or
discard the message based on the `enable_fallback_forward` setting.
"""
try:
now = time.time()
expired = [qid for qid, v in self.pending_evals.items()
if v['sent'] and now - v['timestamp'] > self.oracle_timeout]
for qid in expired:
data = self.pending_evals.pop(qid)
if self.enable_fallback_forward:
self.log(f"[ORACLE] Timeout. Forwarding message with fail note: {qid}")
data['data']['oracle_status'] = "timeout"
self.forward_clean_message(data['data'])
except Exception as e:
self.log(f"[CONTACT][ERROR] {e}")
def forward_clean_message(self, msg):
"""
Formats a clean message, validates the sender's IP, and prepares it for operator delivery.
This method validates the provided IP address. If valid, it enriches
the message data with GeoIP information (if enabled) and formats the
entire payload into a human-readable string before passing it to the
`alert_operator` method for final delivery.
Args:
msg (dict): The clean message payload.
"""
try:
ip_string = msg.get('sender_ip')
ip = None
geo = None
# 🧐 Validate the IP address before using it
if ip_string:
try:
ipaddress.ip_address(ip_string)
ip = ip_string
except ValueError:
self.log(f"[VALIDATION] Received invalid IP address: '{ip_string}'. Skipping GeoIP lookup.", level="WARNING")
# Resolve the IP if it's valid and GeoIP is enabled
if ip and self.geoip_enabled:
geo = self.resolve_ip(ip)
# Compose the message payload
payload = (
f"📥 *New Contact Message* {msg.get('site', '')}\n"
f"👤 *Name:* {msg.get('name', 'none')}\n"
f"✉️ *Email:* {msg.get('email', 'none')}\n"
f"💬 *Message:* {msg.get('message', 'none')[:800]}\n"
f"🕓 *Time:* {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(msg.get('timestamp', time.time())))}\n"
)
# Add GeoIP info if available
if geo and isinstance(geo, dict):
city = geo.get("city", "Unknown")
region = geo.get("region", "")
country = geo.get("country", "Unknown")
payload += f"🌐 *IP:* {geo.get('ip', 'none')} ({city}, {region}, {country})\n"
elif ip: # Only show the IP if it was validated
payload += f"🌐 *IP:* {ip}\n"
if msg.get("oracle_status") == "timeout":
payload += "⚠️ Oracle failed to respond. Spam filter skipped.\n"
self.alert_operator("Contact Inbox", payload)
except Exception as e:
self.log(f"[FORWARD_CLEAN_MESSAGE][ERROR] {e}\n[MESSAGE] {msg}", error=e, block='main_try')
def resolve_ip(self, ip):
"""
Resolves an IP address to a geographic location using a local database.
Args:
ip (str): The IP address to look up.
Returns:
dict: A dictionary containing location data (city, region, country)
on success, or False on failure.
"""
try:
reader = geoip2.database.Reader(self.mmdb_path)
response = reader.city(ip)
return {
"ip": ip,
"city": response.city.name,
"region": response.subdivisions[0].name if response.subdivisions else None,
"country": response.country.name
}
except Exception as e:
self.log(error=e, block='main_try')
return False
def alert_operator(self, title, message):
"""
Sends the final, formatted message to the configured alerting agent.
This method constructs a standard alert packet and sends it to all
agents that have the configured `alert_to_role` (e.g., a Discord or
Telegram relay agent).
Args:
title (str): The title for the alert.
message (str): The formatted message body.
"""
try:
pk1 = self.get_delivery_packet("standard.command.packet")
pk1.set_data({"handler": "cmd_send_alert_msg"})
try:
server_ip = requests.get("https://api.ipify.org").text.strip()
except Exception:
server_ip = "Unknown"
pk2 = self.get_delivery_packet("notify.alert.general")
pk2.set_data({
"server_ip": server_ip,
"universal_id": self.command_line_args.get("universal_id", "contact_reflex"),
"level": "info",
"msg": message,
"formatted_msg": message,
"cause": title,
"origin": "contact_reflex"
})
pk1.set_packet(pk2, "content")
if self.alert_role:
for node in self.get_nodes_by_role(self.alert_role):
self.pass_packet(pk1, node["universal_id"])
except Exception as e:
self.log(f"[CONTACT][ERROR] {e}")
if __name__ == "__main__":
agent = Agent()
agent.boot()
Comments 0
Category: communication
Tags: #gateway, #watchdog, #contact-form, #decryption, #rsa, #aes-gcm, #spam-filter, #oracle-client, #geoip, #php-integration
Version: v1.0.0
Author: matrixswarm
Views: 116
Added: July 28, 2025Updated: July 28, 2025