Crypto Alert

August 2, 2025 AT 11:11 PM (updated: 1 month ago)

The `crypto_alert` agent is a highly flexible cryptocurrency market monitor. It connects to various exchanges via a dynamic factory system to track asset prices and trigger alerts based on user-defined conditions. This allows operators to create real-time notifications for significant market movements, price thresholds, or specific asset conversion rates.

βš™οΈ How it Works

The agent operates in a configurable loop with the following logic: 1. **Dynamic Exchange Loading**: On initialization, the agent reads the `exchange` from its configuration and dynamically loads the corresponding handler module from the `crypto_alert.factory.cryptocurrency.exchange` directory. This allows for easy expansion to new exchanges without altering the core agent logic. 2. **Polling**: The agent periodically fetches price data for a specified trading pair from the loaded exchange. The polling frequency is configurable via `poll_interval`. 3. **Trigger Evaluation**: It evaluates the fetched price data against a specified `trigger_type`. The agent supports several conditions, including percentage change, absolute price change, crossing a price threshold, and asset-to-asset conversion rates. 4. **Alerting**: If a trigger condition is met, the agent constructs and dispatches an alert packet to other agents in the swarm that have the configured `alert_role` and are listening for the `alert_handler`. 5. **Self-Deactivation**: The agent can be configured to stop after a certain number of trigger events to prevent alert flooding, after which it will mark itself as inactive.

🧩 Configuration

Add the `crypto_alert` agent to your directive and customize its behavior in the `config` block.

**Core Options:**

* **`exchange`** (Default: `"coingecko"`): The name of the exchange to use. This must correspond to a module name in the exchange factory.
* **`poll_interval`** (Default: `20`): How often, in seconds, to check the asset's price.
* **`pair`** (Default: `"BTC/USDT"`): The trading pair to monitor (e.g., "ETH/USDT", "SOL/USDT").
* **`active`** (Default: `True`): Set to `False` to disable the agent's monitoring loop.

**Triggering Options:**

* **`trigger_type`** (Default: `"price_change_above"`): The core condition to check for. See the "Trigger Types" section below for all options.
* **`change_percent`** (Default: `1.5`): The percentage change required to trigger a `price_change` alert.
* **`change_absolute`** (Default: `1000`): The absolute price change (in quote currency) required to trigger a `price_delta` alert.
* **`threshold`** (Default: `0`): The price level to check against for `price` or `asset_conversion` alerts.
* **`from_asset`** / **`to_asset`** / **`from_amount`**: Used only with the `asset_conversion` trigger to check the value of one asset in terms of another.

**Alerting & Limits:**

* **`alert_handler`** (Required): The handler that a receiving agent (e.g., `discord_relay`) should use to process the alert.
* **`alert_role`** (Required): The role of the agent(s) that should receive the alert packet.
* **`limit_mode`** (Default: `"forever"`): Determines if the agent should deactivate. Set to `"limited"` to use the activation limit.
* **`activation_limit`** (Default: `1`): If `limit_mode` is `"limited"`, the agent will deactivate after this many alerts.

---
## Trigger Types

The `trigger_type` option defines the agent's monitoring logic.

* **`price_change_above`**: Triggers when the price increases by more than `change_percent` since the last check.
* **`price_change_below`**: Triggers when the price decreases by more than `change_percent` since the last check.
* **`price_delta_above`**: Triggers when the price increases by more than `change_absolute`.
* **`price_delta_below`**: Triggers when the price decreases by more than `change_absolute`.
* **`price_above`**: Triggers if the current price is greater than `threshold`.
* **`price_below`**: Triggers if the current price is less than `threshold`.
* **`asset_conversion`**: Triggers if `from_amount` of `from_asset` is worth more than `threshold` of `to_asset`.

🧭 Directive

You won't be creating a directive for crypto_alert, because the agent is loaded on demand

πŸ“¦ Source

import sys
import os
sys.path.insert(0, os.getenv("SITE_ROOT"))
sys.path.insert(0, os.getenv("AGENT_PATH"))

import importlib
import time
from matrixswarm.core.boot_agent import BootAgent
from matrixswarm.core.utils.swarm_sleep import interruptible_sleep
from matrixswarm.core.class_lib.packet_delivery.utility.encryption.utility.identity import IdentityObject

class Agent(BootAgent):


    def __init__(self):
        super().__init__()
        self.name = "CryptoAgent"
        self.exchange = None
        self._initialized_from_tree = False
        self._private_config = self.tree_node.get("config", {})
        self._last_price = None
        self.trigger_hits = 0

    def cmd_update_agent_config(self):

        try:
            self._initialized_from_tree = True
            exchange_name = self._private_config.get("exchange", "coingecko")
            mod_path = f"crypto_alert.factory.cryptocurrency.exchange.{exchange_name}.price"

            self.log(f"Attempting to load: {mod_path}", block="exchange_loader")
            try:
                module = importlib.import_module(mod_path)
                importlib.reload(module)
                ExchangeClass = getattr(module, "Exchange")
                self.exchange = ExchangeClass(self)
                self.log(f"[EXCHANGE-LOADER] βœ… Loaded exchange handler: {exchange_name}")
            except Exception as e:
                self.log("Could not load exchange module '{exchange_name}", error=e)

        except Exception as e:
            self.log("Failed to initialize config", error=e)

    def worker(self,config:dict=None, identity:IdentityObject = None):

        try:

            if config and isinstance(config, dict):
                self.log(f"config loaded: {config}")

                if config.get("partial_config"):
                    config = config.copy()  # avoid mutating the caller’s dict
                    config.pop("partial_config", None)
                    self._private_config.update(config)
                    self.log("[WORKER] 🧩 Partial config merged.")
                else:
                    self._private_config = config
                    self.log("[WORKER] πŸ” Full config applied.")

                self._initialized_from_tree = False

            if not self._initialized_from_tree:
                self.cmd_update_agent_config()

            if not self._private_config.get("active", True):
                self.log("πŸ”‡ Agent marked inactive. Exiting cycle.")
                return

            trigger = self._private_config.get("trigger_type", "price_change_above")

            # Break it into base + direction (e.g., price_change_above β†’ price_change + above)
            if "_" in trigger:
                base_trigger, direction = trigger.rsplit("_", 1)
            else:
                base_trigger = trigger
                direction = "above"

            if base_trigger == "price_change":
                self._run_price_change_monitor(direction)
            elif base_trigger == "price_delta":
                self._run_price_delta_monitor(direction)
            elif base_trigger == "price":
                self._run_price_threshold(direction)
            elif base_trigger == "asset_conversion":
                self._run_asset_conversion_check()
            else:
                self.log(f"[UNKNOWN TRIGGER] {trigger}")

        except Exception as e:
            self.log(error=e, block="main_try")

        interval = int(self._private_config.get("poll_interval", 20))
        interruptible_sleep(self, interval)

    def _run_price_change_monitor(self, direction="above"):
        try:
            pair = self._private_config.get("pair", "BTC/USDT")
            threshold_pct = float(self._private_config.get("change_percent", 1.5))
            current = self.exchange.get_price(pair)
            if current is None:
                self.log("[CRYPTOAGENT][ERROR] No price received.")
                return

            if self._last_price is None:
                self._last_price = current
                self.log(f"[DEBUG] Initial price set to {self._last_price}")
                return

            delta = current - self._last_price
            delta_pct = abs(delta / self._last_price) * 100
            self.log(
                f"[DEBUG] Current: {current}, Previous: {self._last_price}, Ξ” = {delta:.2f}, Ξ”% = {delta_pct:.4f}%")

            condition = (direction == "above" and delta > 0) or (direction == "below" and delta < 0)

            if condition and delta_pct >= threshold_pct:
                self._alert(f"{pair} moved {delta_pct:.2f}% {direction.upper()} β†’ from {self._last_price} to {current}")
                self._last_price = current

        except Exception as e:
            self.log("Price change failure", error=e)

    def _run_price_delta_monitor(self, direction="above"):
        try:
            pair = self._private_config.get("pair", "BTC/USDT")
            threshold_abs = float(self._private_config.get("change_absolute", 1000))
            current = self.exchange.get_price(pair)
            if current is None:
                return

            if self._last_price is None:
                self._last_price = current
                self.log(f"[DEBUG] Initial price set to {self._last_price}")
                return

            delta = current - self._last_price
            delta_abs = abs(delta)
            self.log(f"[DEBUG] Ξ” = {delta:.2f} vs threshold {threshold_abs:.2f}")

            condition = (direction == "above" and delta > 0) or (direction == "below" and delta < 0)

            if condition and delta_abs >= threshold_abs:
                self._alert(f"{pair} moved ${delta_abs:.2f} {direction.upper()} β†’ from {self._last_price} to {current}")
                self._last_price = current

        except Exception as e:
            self.log(error=e)

    def _run_price_threshold(self, mode):
        try:
            pair = self._private_config.get("pair", "BTC/USDT")
            threshold = float(self._private_config.get("threshold", 0))
            current = self.exchange.get_price(pair)
            if current is None:
                return

            if mode == "above" and current > threshold:
                self._alert(f"{pair} is above threshold: {current} > {threshold}")
            elif mode == "below" and current < threshold:
                self._alert(f"{pair} is below threshold: {current} < {threshold}")

        except Exception as e:
            self.log(error=e)

    def _run_asset_conversion_check(self):
        try:
            from_asset = self._private_config.get("from_asset", "BTC")
            to_asset = self._private_config.get("to_asset", "ETH")
            from_amount = float(self._private_config.get("from_amount", 0.1))
            threshold = float(self._private_config.get("threshold", 3.0))

            pair1 = f"{from_asset}/USDT"
            pair2 = f"{to_asset}/USDT"
            price1 = self.exchange.get_price(pair1)
            price2 = self.exchange.get_price(pair2)

            if price1 is None or price2 is None:
                return

            value = from_amount * price1 / price2
            self.log(f"[DEBUG] {from_amount} {from_asset} = {value:.4f} {to_asset}")

            if value >= threshold:
                self._alert(f"{from_amount} {from_asset} = {value:.4f} {to_asset} (β‰₯ {threshold})")

        except Exception as e:
            self.log("Conversion fail", error=e)

    def _alert(self, message):
        self.alert_operator(message)
        self._handle_trigger_limit()

    def _handle_trigger_limit(self):
        self.trigger_hits += 1
        limit_mode = self._private_config.get("limit_mode", "forever")

        if limit_mode == "forever":
            return

        # Hardened: handle None, empty string, garbage
        raw_limit = self._private_config.get("activation_limit", 1)
        try:
            max_triggers = int(raw_limit) if raw_limit is not None else 1
        except (TypeError, ValueError) as e:
            self.log("Invalid activation_limit value", error=e, block="TRIGGER-LIMIT")
            max_triggers = 1

        if self.trigger_hits >= max_triggers:
            self.log("[CRYPTOAGENT] 🎯 Max triggers reached. Marking agent inactive.")
            self._private_config["active"] = False
            self._save_config_patch()
            return  # stop further work this cycle

    def _save_config_patch(self):
        try:
            uid = self.command_line_args.get("universal_id", "unknown")
            patch = {
                "target_universal_id": uid,
                "config": {"active": False},
                "push_live_config": True,
                "respond_to": "crypto_gui_1",
                "handler_role": "hive.rpc.route",
                "handler": "cmd_rpc_route",
                "response_handler": "rpc_result_update_agent",
                "response_id": f"{uid}-deactivate"
            }

            pkt = self.get_delivery_packet("standard.command.packet")
            pkt.set_data({
                "handler": "cmd_update_agent",
                "content": patch
            })

            self.pass_packet(pkt, "matrix")

        except Exception as e:
            self.log("Error saving config patch", error=e)

    def _self_destruct(self):
        try:
            pk = self.get_delivery_packet("standard.command.packet")
            pk.set_data({
                "handler": "cmd_delete_agent",
                "content": {
                    "target_universal_id": self.command_line_args.get("universal_id", "unknown")
                }
            })

            self.pass_packet(pk, "matrix")

        except Exception as e:
            self.log(error=e)

    def _send_rpc_update(self, payload):
        try:
            alert_role = self._private_config.get("rpc_router_role", "hive.rpc.route")
            handler = self._private_config.get("rpc_push_handler", "crypto_agent_update")

            if not alert_role:
                self.log("[RPC] No routing role defined. Skipping reflex broadcast.")
                return

            pk1 = self.get_delivery_packet("standard.command.packet")
            pk1.set_data({"handler": "cmd_forward_command"})

            pk2 = self.get_delivery_packet("standard.general.json.packet", new=True)
            pk2.set_data({
                "handler": handler,
                "filetype": "msg",
                "content": payload
            })

            pk1.set_packet(pk2, "content")

            # Broadcast to all gang members with that role
            for node in self.get_nodes_by_role(alert_role):
                self.pass_packet(pk1, node["universal_id"])

        except Exception as e:
            self.log("Captain Howdy RPC dispatch failed", error=e, block="captain_howdy_main_try")

    def alert_operator(self, message):

        alert_handler = self._private_config.get("alert_handler")
        alert_role = self._private_config.get("alert_role")

        if not all([alert_handler, alert_role]):
            self.log("Alert dispatch missing routing fields", block="ALERT_HANDLER")
            return

        pk1 = self.get_delivery_packet("standard.command.packet")
        pk1.set_data({"handler": alert_handler})

        pk2 = self.get_delivery_packet("notify.alert.general", new=True)
        pk2.set_data({
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
            "universal_id": self.command_line_args.get("universal_id", "unknown"),
            "level": "critical",
            "msg": message,
            "formatted_msg": f"πŸ“ˆ Crypto Alert\n{message}",
            "cause": "Crypto Alert",
            "origin": self.command_line_args.get("universal_id", "unknown")
        })

        pk1.set_packet(pk2, "content")

        for node in self.get_nodes_by_role(alert_role):
            self.pass_packet(pk1, node["universal_id"])

if __name__ == "__main__":
    agent = Agent()
    agent.boot()

Comments 0

Category: monitoring

Tags: #real-time, #api-integration, #alerting, #crypto, #trading, #market-monitor, #dynamic-loading, #finance

Version: v1.0.0

Author: matrixswarm

Views: 71

Added: August 2, 2025

Updated: August 2, 2025