Matrix Websocket

July 20, 2025 AT 04:06 AM (updated: 4 months ago)

The `matrix_websocket` agent provides a secure, real-time, bidirectional communication channel for the swarm. It's primarily used by GUIs and other external tools to receive live updates and send commands without relying on traditional HTTP polling.

⚙️ How it Works

The agent starts a WebSocket server using the `websockets` library on a specified port, secured with TLS certificates. It maintains a set of connected clients. Other agents can send packets to the `matrix_websocket` agent with a handler like `cmd_broadcast` or `cmd_send_alert_msg`. The WebSocket agent then serializes this packet to JSON and broadcasts it instantly to all connected clients.

🧩 Configuration

* **`port`** (Default: `8765`): The port on which the WebSocket server will listen.
* **Certificates**: Requires `server.crt` and `server.key` to be present in the `/socket_certs` directory. These can be generated using the `generate_certs.sh` script.


allowlist_ips: dict of allowed ips to access the server.

🧭 Directive

matrix_directive = {
    "universal_id": "matrix",
    "name": "matrix",
    "children": [
        {
            "universal_id": "matrix-websocket-1",
            "name": "matrix_websocket",
            "config": {
                "port": 8765
                "allowlist_ips": [
                         # allowed list of ips to access server
                ],
            }
        }
    ]
}

📦 Source

import sys
import os
sys.path.insert(0, os.getenv("SITE_ROOT"))
sys.path.insert(0, os.getenv("AGENT_PATH"))
#Authored by Daniel F MacDonald and ChatGPT aka The Generals
import ssl
import time
import copy
import threading
import asyncio
import websockets
import json
from matrixswarm.core.class_lib.packet_delivery.utility.encryption.utility.identity import IdentityObject

from matrixswarm.core.boot_agent import BootAgent

class Agent(BootAgent):
    def __init__(self):
        super().__init__()
        self.AGENT_VERSION = "1.2.0"

        self.clients = set()
        config = self.tree_node.get("config", {})
        self.allowlist_ips = config.get("allowlist_ips", [])
        self.port = config.get("port", 8765)
        self.clients = set()
        self.loop = None
        self.websocket_ready = False
        swarm_root = self.path_resolution["install_path"]
        self.cert_dir = os.path.join(swarm_root, "certs", "socket_certs")
        self._stop_event = None
        self._thread = None
        self._config = None
        self._lock = threading.Lock()

    def post_boot(self):
        self.log(f"{self.NAME} v{self.AGENT_VERSION}  agent ready and active.")

    def worker(self, config:dict = None, identity:IdentityObject = None):
        """
        Starts or restarts the WebSocket thread if config changes or thread is dead.
        """
        if config is None:
            config = self.tree_node.get("config", {})  # Default fallback

        with self._lock:
            if self._thread and self._thread.is_alive():
                if config == self._config:
                    # Config unchanged, thread alive  do nothing
                    return
                else:
                    self.log("[WS] Launching WebSocket thread... Or Config changed and restarting thread...")
                    self._stop_event.set()
                    self._thread.join(timeout=3)
            elif self._thread and not self._thread.is_alive():
                self.log("[WS] Previous thread is dead  restarting...")

            # Start new thread
            self._config = copy.deepcopy(config)  # Defensive copy
            self._stop_event = threading.Event()
            self._thread = threading.Thread(target=self.start_socket_loop, daemon=True)
            self._thread.start()
            self.log("[WS] WebSocket thread started.")

    def start_socket_loop(self):
        try:
            self.log("[WS] Booting WebSocket TLS thread...")
            time.sleep(1)

            cert_path = os.path.join(self.cert_dir, "server.crt")
            key_path = os.path.join(self.cert_dir, "server.key")

            if not os.path.exists(cert_path) or not os.path.exists(key_path):
                self.log(f"[WS][FATAL] Missing cert/key file at {cert_path} or {key_path}")
                self.running = False
                return

            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            self.loop = loop

            async def launch():
                self.log("[WS] Preparing SSL context...")
                ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
                ssl_context.load_cert_chain(certfile=cert_path, keyfile=key_path)

                self.log(f"[WS] Attempting to bind WebSocket on port {self.port}...")
                server = await websockets.serve(
                    self.websocket_handler,
                    "0.0.0.0",
                    self.port,
                    ssl=ssl_context
                )

                self.websocket_ready = True
                self.log(f"[WS] SECURE WebSocket bound on port {self.port} (TLS enabled)")
                await server.wait_closed()

            loop.run_until_complete(launch())

            # Run the loop in a background task that watches the stop event
            async def monitor_stop():
                while not self._stop_event.is_set():
                    await asyncio.sleep(1)
                self.log("[WS] Stop event received  shutting down WebSocket server.")
                loop.stop()

            # Schedule the monitor coroutine
            loop.create_task(monitor_stop())

            loop.run_forever()
            loop.close()
            self.log("[WS] Event loop closed.")


        except Exception as e:
            self.log(f"[WS][FATAL] WebSocket startup failed", error=e, block="main_try")
            self.running = False

    def cmd_health_report(self, content, packet, identity:IdentityObject = None):
        self.log(f"[RELAY] Received health report for {content.get('target_universal_id', '?')}")

    async def websocket_handler(self, websocket):

        try:

            ip = websocket.remote_address[0] if websocket.remote_address else "unknown"
            self.log(f"[WS][CONNECT] Client connected from IP: {ip}")

            if self.allowlist_ips and ip not in self.allowlist_ips:
                self.log(f"[WS][BLOCK] Connection from {ip} rejected (not in allowlist)")
                await websocket.close(reason="IP not allowed")
                return

            self.log("[WS][TRACE] >>> websocket_handler() CALLED <<<")
            self.log("[WS] HANDLER INIT - Client added")
            # Add the client securely to the clients set
            self.clients.add(websocket)
            self.log("[WS] HANDLER INIT - Client added")

            while True:
                self.log("[WS] Awaiting message...")

                try:
                    # Await a message from the client
                    message = await websocket.recv()
                    self.log(f"{repr(message)}")

                    # Attempt to decode JSON (if applicable)
                    try:

                        data = json.loads(message)

                        self.log(f"[WS][VALID MESSAGE] {data}")
                    except json.JSONDecodeError:
                        self.log("[WS][ERROR] Malformed JSON received")
                        await websocket.send(json.dumps({
                            "type": "error",
                            "message": "Invalid JSON format"
                        }))
                        continue

                    # Respond with acknowledgment
                    await websocket.send(json.dumps({
                        "type": "ack",
                        "echo": message
                    }))

                except websockets.ConnectionClosed as cc:
                    # Handle a graceful client disconnection
                    self.log(f"[WS][DISCONNECT] Client disconnected gracefully: ({cc.code}) {cc.reason}")
                    break

                except Exception as e:
                    # Handle unexpected errors during message handling
                    self.log(f"[WS][ERROR] Unexpected error during message processing: {e}")
                    break

        finally:
            # Ensure client is removed from the set upon disconnect
            self.clients.discard(websocket)
            self.log(f"[WS] Client disconnected and removed. Active clients: {len(self.clients)}")

    def cmd_rpc_route(self, content, packet, identity:IdentityObject = None):
        try:
            self.log("Incoming routed RPC packet.")

            self.cmd_broadcast(content, content)
            self.log(f"Routed response_id={content.get('response_id')} status={content.get('status')}")
        except Exception as e:
            self.log(error=e)  # Optional: write full trace to logs

    def cmd_send_alert_msg(self, content, packet, identity:IdentityObject = None):
        try:
            # Format the alert message
            msg = content.get("formatted_msg") or content.get("msg") or "[SWARM] Alert received."

            # Construct GUI-style feed packet
            broadcast_packet = {
                "handler": "cmd_alert_to_gui",
                "origin": content.get("origin", "unknown"),
                "timestamp": time.time(),
                "content": {
                    "msg": msg,
                    "level": content.get("level", "info"),
                    "origin": content.get("origin", "unknown"),
                    "formatted_msg": msg
                }
            }

            # Dispatch it via WebSocket
            self.cmd_broadcast(broadcast_packet["content"], broadcast_packet)

            self.log("Alert message sent to GUI feed.")
        except Exception as e:
            self.log(error=e)  # Optional: write full trace to logs

    def cmd_alert_to_gui(self, content, packet, identity:IdentityObject = None):
        self.log(f"Dispatching alert to GUI: {content}")
        self.cmd_broadcast(content, packet)

    def cmd_broadcast(self, content, packet, identity:IdentityObject = None):
        if not hasattr(self, "loop") or self.loop is None:
            self.log("[WS][REFLEX][SKIP] Event loop not ready.")
            return

        if not getattr(self, "websocket_ready", False):
            self.log("[WS][REFLEX][WAITING] Socket not bound.")
            return

        try:
            self.log(f"[WS][REFLEX]{packet}")
            data = json.dumps(packet)
            dead = []
            for client in self.clients:
                try:
                    asyncio.run_coroutine_threadsafe(client.send(data), self.loop)
                except Exception:
                    dead.append(client)

            for c in dead:
                self.clients.discard(c)

            self.log(f"Broadcasted to {len(self.clients)} clients.")
        except Exception as e:
            self.log(error=e)

if __name__ == "__main__":
    agent = Agent()
    agent.boot()

Comments 0

Category: communication

Tags: #matrix, #websocket, #api, #real-time, #secure, #bidirectional, #gui, #swarm, #live-updates, #agent, #networking

Version: v1.0.2

Author: matrixswarm

Views: 91

Added: July 20, 2025

Updated: July 26, 2025