Beyond SSH: Building a Secure, Real-time Command Streaming Agent with Python

By matrixswarm
October 2, 2025 AT 12:43 AM (updated: 1 month ago)

Terminal access is the lifeblood of infrastructure, but traditional SSH can be cumbersome for quick diagnostics and security-hardened environments. What if you could execute shell commands on remote nodes, stream the output in real-time to a modern panel, and enforce a strict command whitelist—all secured by end-to-end encryption and digital signatures? This article dives into the architecture of the TerminalStreamer agent, a powerful, self-cleaning, and secure component we built using the internal Python agent framework.

🐚 The Agent's Mission: Secure, Streamed Diagnostics

The TerminalStreamer agent is designed to live on a remote machine and execute Linux commands on demand, broadcasting the results back to a central operator panel (in our case, the StreamViewer panel). Key Features:

- Safety First: Commands are restricted by a configurable whitelist and an optional safe_shell_mode.

- Streaming: Supports both single-shot commands and continuous polling (e.g., refreshing top -n 1 every 5 seconds).

- Security: All streamed output is protected with ephemeral AES encryption and RSA digital signatures.

- Autonomy: A session monitor automatically cleans up streams if the remote viewer disconnects, preventing resource leaks.

1. The Agent's Configuration and Security Context

{
    "universal_id": "terminal-1",
    "name": "terminal_streamer",
    "tags": {
        "packet_signing": {
            "in": true,
            "out": true
        }
    },
    "config": {
        "safe_shell_mode": true,
        "whitelist": ["uptime", "df -h", "free -m", "whoami"],
        "service-manager": [
            {
                "role": ["terminal.stream.start@cmd_start_stream_terminal", "terminal.stream.stop@cmd_stop_stream_terminal"],
                "scope": ["parent", "any"],
                // ... priority settings
            }
        ]
    }
}




- packet_signing Tags: Explicitly mandates that all command packets sent to the agent and all data packets broadcasted from the agent must be signed, ensuring command authenticity and data integrity.

- safe_shell_mode and whitelist: By default, safe_shell_mode is true. Any command executed via the agent must begin with one of the strings in the whitelist (e.g., uptime, df -h, free -m). This prevents RCE attacks by limiting the agent's scope to approved diagnostic utilities.

- Service Registration: The agent registers two RPC services: terminal.stream.start and terminal.stream.stop, mapping them to the cmd_start_stream_terminal and cmd_stop_stream_terminal methods, respectively.

2. Core Execution: The Whitelist and Subprocess

The main logic resides in the _run_loop method, which is started in its own thread upon receiving a stream.start command. Command Execution Flow 1. Safety Check: Before anything runs, the _is_safe_command method checks the command against the agent's whitelist.

def _is_safe_command(self, cmd: str) -> bool:
    # Check if the command starts with any whitelisted entry
    return any(cmd.strip().startswith(w) for w in self.whitelist)

# In _run_loop:
if self.safe_shell_mode:
    if not self._is_safe_command(cmd):
        output = f"[BLOCKED] Command not allowed: {cmd}"
    else:
        # Executes via shell only if safe
        output = subprocess.check_output(cmd, shell=True, ...)
else:
    # Unrestricted execution if safe_shell_mode is disabled
    output = subprocess.check_output(cmd, shell=True, ...)

2. Execution: subprocess.check_output(cmd, shell=True, ...) runs the command, captures STDOUT and STDERR, and returns the output as a string.

3. Broadcast: The result is sent via _broadcast_output.

4. Loop/Break: If refresh_sec is greater than 0, the thread sleeps and loops; otherwise, it breaks, and the session ends.

3. The Security Pipeline: Encrypt, Sign, Broadcast

The output must be securely delivered back to the operator panel. This is handled by the _broadcast_output method, leveraging the security utilities of the platform.



def _broadcast_output(self, sess, token, output, handler):
    # 1. Create the plain payload (command output, session details)
    payload = {
        "handler": handler,
        "content": {
            "session_id": sess,
            "token": token,
            "output": output,
            "timestamp": int(time.time()),
        }
    }

    # 2. Encrypt the payload using Ephemeral AES (Sealed)
    #    Uses the remote recipient's public key to securely exchange the AES key.
    sealed = encrypt_with_ephemeral_aes(payload, remote_pub_pem)

    # 3. Assemble the packet content (including the signed data)
    content = {
        "serial": self._serial_num,
        "content": sealed,
        "timestamp": int(time.time()),
    }

    # 4. Sign the entire content using the agent's private RSA key
    sig = sign_data(content, self._signing_key_obj)
    content["sig"] = sig

    # 5. Pass the signed, encrypted packet to the RPC router for delivery
    # ... self.pass_packet(pk, ep.get_universal_id())

This ensures that only the intended recipient can decrypt the command output, and the signature proves the output truly came from the authenticated TerminalStreamer agent.

4. Automatic Cleanup: The Session Monitor

A critical feature for long-running stream agents is autonomous cleanup. The _monitor_sessions thread addresses this by preventing streams from running indefinitely if the remote client disconnects (e.g., the browser is closed). How Stale Streams Are Detected 1. When the StreamViewer panel is open, it continuously touches a flag file on the central RPC relay called: connected.flag.. 2. The _monitor_sessions thread periodically calls has_fresh_broadcast_flag. 3. has_fresh_broadcast_flag checks the timestamp (os.path.getmtime) of this flag file. 4. If the flag's age exceeds a threshold (e.g., 30 seconds), the session is considered stale. 5. The monitor calls cmd_stop_stream_terminal, which safely sets the thread's stop flag and removes the entry from the active_streams dictionary. This mechanism provides a robust, decentralized heartbeat that cleans up system resources, ensuring the remote node is only running diagnostics while actively being viewed.

Comments 0

Category: agent spotlight

Tags: #secure, #ssh, #diagnostics, #streamed, #alternatives

Author: matrixswarm

Views: 69

Added: October 2, 2025

Updated: October 2, 2025