Crypto Alert

The `crypto_alert` agent is a highly flexible cryptocurrency market monitor. It connects to various exchanges via a dynamic factory system to track asset prices and trigger alerts based on user-defined conditions. This allows operators to create real-time notifications for significant market movements, price thresholds, or specific asset conversion rates.
βοΈ How it Works
The agent operates in a configurable loop with the following logic: 1. **Dynamic Exchange Loading**: On initialization, the agent reads the `exchange` from its configuration and dynamically loads the corresponding handler module from the `crypto_alert.factory.cryptocurrency.exchange` directory. This allows for easy expansion to new exchanges without altering the core agent logic. 2. **Polling**: The agent periodically fetches price data for a specified trading pair from the loaded exchange. The polling frequency is configurable via `poll_interval`. 3. **Trigger Evaluation**: It evaluates the fetched price data against a specified `trigger_type`. The agent supports several conditions, including percentage change, absolute price change, crossing a price threshold, and asset-to-asset conversion rates. 4. **Alerting**: If a trigger condition is met, the agent constructs and dispatches an alert packet to other agents in the swarm that have the configured `alert_role` and are listening for the `alert_handler`. 5. **Self-Deactivation**: The agent can be configured to stop after a certain number of trigger events to prevent alert flooding, after which it will mark itself as inactive.
π§© Configuration
Add the `crypto_alert` agent to your directive and customize its behavior in the `config` block.
**Core Options:**
* **`exchange`** (Default: `"coingecko"`): The name of the exchange to use. This must correspond to a module name in the exchange factory.
* **`poll_interval`** (Default: `20`): How often, in seconds, to check the asset's price.
* **`pair`** (Default: `"BTC/USDT"`): The trading pair to monitor (e.g., "ETH/USDT", "SOL/USDT").
* **`active`** (Default: `True`): Set to `False` to disable the agent's monitoring loop.
**Triggering Options:**
* **`trigger_type`** (Default: `"price_change_above"`): The core condition to check for. See the "Trigger Types" section below for all options.
* **`change_percent`** (Default: `1.5`): The percentage change required to trigger a `price_change` alert.
* **`change_absolute`** (Default: `1000`): The absolute price change (in quote currency) required to trigger a `price_delta` alert.
* **`threshold`** (Default: `0`): The price level to check against for `price` or `asset_conversion` alerts.
* **`from_asset`** / **`to_asset`** / **`from_amount`**: Used only with the `asset_conversion` trigger to check the value of one asset in terms of another.
**Alerting & Limits:**
* **`alert_handler`** (Required): The handler that a receiving agent (e.g., `discord_relay`) should use to process the alert.
* **`alert_role`** (Required): The role of the agent(s) that should receive the alert packet.
* **`limit_mode`** (Default: `"forever"`): Determines if the agent should deactivate. Set to `"limited"` to use the activation limit.
* **`activation_limit`** (Default: `1`): If `limit_mode` is `"limited"`, the agent will deactivate after this many alerts.
---
## Trigger Types
The `trigger_type` option defines the agent's monitoring logic.
* **`price_change_above`**: Triggers when the price increases by more than `change_percent` since the last check.
* **`price_change_below`**: Triggers when the price decreases by more than `change_percent` since the last check.
* **`price_delta_above`**: Triggers when the price increases by more than `change_absolute`.
* **`price_delta_below`**: Triggers when the price decreases by more than `change_absolute`.
* **`price_above`**: Triggers if the current price is greater than `threshold`.
* **`price_below`**: Triggers if the current price is less than `threshold`.
* **`asset_conversion`**: Triggers if `from_amount` of `from_asset` is worth more than `threshold` of `to_asset`.
π§ Directive
You won't be creating a directive for crypto_alert, because the agent is loaded on demand
π¦ Source
import sys
import os
sys.path.insert(0, os.getenv("SITE_ROOT"))
sys.path.insert(0, os.getenv("AGENT_PATH"))
import importlib
import time
from matrixswarm.core.boot_agent import BootAgent
from matrixswarm.core.utils.swarm_sleep import interruptible_sleep
from matrixswarm.core.class_lib.packet_delivery.utility.encryption.utility.identity import IdentityObject
class Agent(BootAgent):
def __init__(self):
super().__init__()
self.name = "CryptoAgent"
self.exchange = None
self._initialized_from_tree = False
self._private_config = self.tree_node.get("config", {})
self._last_price = None
self.trigger_hits = 0
def cmd_update_agent_config(self):
try:
self._initialized_from_tree = True
exchange_name = self._private_config.get("exchange", "coingecko")
mod_path = f"crypto_alert.factory.cryptocurrency.exchange.{exchange_name}.price"
self.log(f"Attempting to load: {mod_path}", block="exchange_loader")
try:
module = importlib.import_module(mod_path)
importlib.reload(module)
ExchangeClass = getattr(module, "Exchange")
self.exchange = ExchangeClass(self)
self.log(f"[EXCHANGE-LOADER] β
Loaded exchange handler: {exchange_name}")
except Exception as e:
self.log("Could not load exchange module '{exchange_name}", error=e)
except Exception as e:
self.log("Failed to initialize config", error=e)
def worker(self,config:dict=None, identity:IdentityObject = None):
try:
if config and isinstance(config, dict):
self.log(f"config loaded: {config}")
if config.get("partial_config"):
config = config.copy() # avoid mutating the callerβs dict
config.pop("partial_config", None)
self._private_config.update(config)
self.log("[WORKER] π§© Partial config merged.")
else:
self._private_config = config
self.log("[WORKER] π Full config applied.")
self._initialized_from_tree = False
if not self._initialized_from_tree:
self.cmd_update_agent_config()
if not self._private_config.get("active", True):
self.log("π Agent marked inactive. Exiting cycle.")
return
trigger = self._private_config.get("trigger_type", "price_change_above")
# Break it into base + direction (e.g., price_change_above β price_change + above)
if "_" in trigger:
base_trigger, direction = trigger.rsplit("_", 1)
else:
base_trigger = trigger
direction = "above"
if base_trigger == "price_change":
self._run_price_change_monitor(direction)
elif base_trigger == "price_delta":
self._run_price_delta_monitor(direction)
elif base_trigger == "price":
self._run_price_threshold(direction)
elif base_trigger == "asset_conversion":
self._run_asset_conversion_check()
else:
self.log(f"[UNKNOWN TRIGGER] {trigger}")
except Exception as e:
self.log(error=e, block="main_try")
interval = int(self._private_config.get("poll_interval", 20))
interruptible_sleep(self, interval)
def _run_price_change_monitor(self, direction="above"):
try:
pair = self._private_config.get("pair", "BTC/USDT")
threshold_pct = float(self._private_config.get("change_percent", 1.5))
current = self.exchange.get_price(pair)
if current is None:
self.log("[CRYPTOAGENT][ERROR] No price received.")
return
if self._last_price is None:
self._last_price = current
self.log(f"[DEBUG] Initial price set to {self._last_price}")
return
delta = current - self._last_price
delta_pct = abs(delta / self._last_price) * 100
self.log(
f"[DEBUG] Current: {current}, Previous: {self._last_price}, Ξ = {delta:.2f}, Ξ% = {delta_pct:.4f}%")
condition = (direction == "above" and delta > 0) or (direction == "below" and delta < 0)
if condition and delta_pct >= threshold_pct:
self._alert(f"{pair} moved {delta_pct:.2f}% {direction.upper()} β from {self._last_price} to {current}")
self._last_price = current
except Exception as e:
self.log("Price change failure", error=e)
def _run_price_delta_monitor(self, direction="above"):
try:
pair = self._private_config.get("pair", "BTC/USDT")
threshold_abs = float(self._private_config.get("change_absolute", 1000))
current = self.exchange.get_price(pair)
if current is None:
return
if self._last_price is None:
self._last_price = current
self.log(f"[DEBUG] Initial price set to {self._last_price}")
return
delta = current - self._last_price
delta_abs = abs(delta)
self.log(f"[DEBUG] Ξ = {delta:.2f} vs threshold {threshold_abs:.2f}")
condition = (direction == "above" and delta > 0) or (direction == "below" and delta < 0)
if condition and delta_abs >= threshold_abs:
self._alert(f"{pair} moved ${delta_abs:.2f} {direction.upper()} β from {self._last_price} to {current}")
self._last_price = current
except Exception as e:
self.log(error=e)
def _run_price_threshold(self, mode):
try:
pair = self._private_config.get("pair", "BTC/USDT")
threshold = float(self._private_config.get("threshold", 0))
current = self.exchange.get_price(pair)
if current is None:
return
if mode == "above" and current > threshold:
self._alert(f"{pair} is above threshold: {current} > {threshold}")
elif mode == "below" and current < threshold:
self._alert(f"{pair} is below threshold: {current} < {threshold}")
except Exception as e:
self.log(error=e)
def _run_asset_conversion_check(self):
try:
from_asset = self._private_config.get("from_asset", "BTC")
to_asset = self._private_config.get("to_asset", "ETH")
from_amount = float(self._private_config.get("from_amount", 0.1))
threshold = float(self._private_config.get("threshold", 3.0))
pair1 = f"{from_asset}/USDT"
pair2 = f"{to_asset}/USDT"
price1 = self.exchange.get_price(pair1)
price2 = self.exchange.get_price(pair2)
if price1 is None or price2 is None:
return
value = from_amount * price1 / price2
self.log(f"[DEBUG] {from_amount} {from_asset} = {value:.4f} {to_asset}")
if value >= threshold:
self._alert(f"{from_amount} {from_asset} = {value:.4f} {to_asset} (β₯ {threshold})")
except Exception as e:
self.log("Conversion fail", error=e)
def _alert(self, message):
self.alert_operator(message)
self._handle_trigger_limit()
def _handle_trigger_limit(self):
self.trigger_hits += 1
limit_mode = self._private_config.get("limit_mode", "forever")
if limit_mode == "forever":
return
# Hardened: handle None, empty string, garbage
raw_limit = self._private_config.get("activation_limit", 1)
try:
max_triggers = int(raw_limit) if raw_limit is not None else 1
except (TypeError, ValueError) as e:
self.log("Invalid activation_limit value", error=e, block="TRIGGER-LIMIT")
max_triggers = 1
if self.trigger_hits >= max_triggers:
self.log("[CRYPTOAGENT] π― Max triggers reached. Marking agent inactive.")
self._private_config["active"] = False
self._save_config_patch()
return # stop further work this cycle
def _save_config_patch(self):
try:
uid = self.command_line_args.get("universal_id", "unknown")
patch = {
"target_universal_id": uid,
"config": {"active": False},
"push_live_config": True,
"respond_to": "crypto_gui_1",
"handler_role": "hive.rpc.route",
"handler": "cmd_rpc_route",
"response_handler": "rpc_result_update_agent",
"response_id": f"{uid}-deactivate"
}
pkt = self.get_delivery_packet("standard.command.packet")
pkt.set_data({
"handler": "cmd_update_agent",
"content": patch
})
self.pass_packet(pkt, "matrix")
except Exception as e:
self.log("Error saving config patch", error=e)
def _self_destruct(self):
try:
pk = self.get_delivery_packet("standard.command.packet")
pk.set_data({
"handler": "cmd_delete_agent",
"content": {
"target_universal_id": self.command_line_args.get("universal_id", "unknown")
}
})
self.pass_packet(pk, "matrix")
except Exception as e:
self.log(error=e)
def _send_rpc_update(self, payload):
try:
alert_role = self._private_config.get("rpc_router_role", "hive.rpc.route")
handler = self._private_config.get("rpc_push_handler", "crypto_agent_update")
if not alert_role:
self.log("[RPC] No routing role defined. Skipping reflex broadcast.")
return
pk1 = self.get_delivery_packet("standard.command.packet")
pk1.set_data({"handler": "cmd_forward_command"})
pk2 = self.get_delivery_packet("standard.general.json.packet", new=True)
pk2.set_data({
"handler": handler,
"filetype": "msg",
"content": payload
})
pk1.set_packet(pk2, "content")
# Broadcast to all gang members with that role
for node in self.get_nodes_by_role(alert_role):
self.pass_packet(pk1, node["universal_id"])
except Exception as e:
self.log("Captain Howdy RPC dispatch failed", error=e, block="captain_howdy_main_try")
def alert_operator(self, message):
alert_handler = self._private_config.get("alert_handler")
alert_role = self._private_config.get("alert_role")
if not all([alert_handler, alert_role]):
self.log("Alert dispatch missing routing fields", block="ALERT_HANDLER")
return
pk1 = self.get_delivery_packet("standard.command.packet")
pk1.set_data({"handler": alert_handler})
pk2 = self.get_delivery_packet("notify.alert.general", new=True)
pk2.set_data({
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"universal_id": self.command_line_args.get("universal_id", "unknown"),
"level": "critical",
"msg": message,
"formatted_msg": f"π Crypto Alert\n{message}",
"cause": "Crypto Alert",
"origin": self.command_line_args.get("universal_id", "unknown")
})
pk1.set_packet(pk2, "content")
for node in self.get_nodes_by_role(alert_role):
self.pass_packet(pk1, node["universal_id"])
if __name__ == "__main__":
agent = Agent()
agent.boot()
Comments 0
Category: monitoring
Tags: #real-time, #api-integration, #alerting, #crypto, #trading, #market-monitor, #dynamic-loading, #finance
Version: v1.0.0
Author: matrixswarm
Views: 71
Added: August 2, 2025Updated: August 2, 2025