301 lines
12 KiB
Python
301 lines
12 KiB
Python
import time
|
|
from scripts.discord_webhook import send_discord_message
|
|
from scripts.temperature_sensor import TemperatureSensor
|
|
|
|
class Monitor:
|
|
"""Base class for all monitoring tasks."""
|
|
def __init__(self, interval=300):
|
|
"""
|
|
interval: seconds between checks
|
|
"""
|
|
self.interval = interval
|
|
self.last_check_ms = 0
|
|
|
|
def should_run(self):
|
|
"""Check if enough time has passed to run again."""
|
|
now = time.ticks_ms()
|
|
if time.ticks_diff(now, self.last_check_ms) >= (self.interval * 1000):
|
|
self.last_check_ms = now
|
|
return True
|
|
return False
|
|
|
|
def run(self):
|
|
"""Override this in subclasses to implement monitoring logic."""
|
|
pass
|
|
|
|
class TemperatureMonitor(Monitor):
|
|
"""Monitor for tracking temperature readings and alerts."""
|
|
|
|
def __init__(self, sensor, label, check_interval=10, report_interval=60,
|
|
alert_high=None, alert_low=None, log_file="/temp_logs.csv",
|
|
send_alerts_to_separate_channel=False):
|
|
"""
|
|
Initialize temperature monitor.
|
|
|
|
Args:
|
|
sensor: TemperatureSensor instance
|
|
label: Label for this sensor
|
|
check_interval: How often to check temp (seconds)
|
|
report_interval: How often to report/log temp (seconds)
|
|
alert_high: High temp threshold for alerts
|
|
alert_low: Low temp threshold for alerts
|
|
log_file: Path to CSV log file
|
|
send_alerts_to_separate_channel: Use separate Discord channel for alerts
|
|
"""
|
|
self.sensor = sensor
|
|
self.label = label
|
|
self.check_interval = check_interval
|
|
self.report_interval = report_interval
|
|
self.alert_high = alert_high
|
|
self.alert_low = alert_low
|
|
self.log_file = log_file
|
|
self.send_alerts_to_separate_channel = send_alerts_to_separate_channel
|
|
|
|
self.last_check = 0
|
|
self.last_report = 0
|
|
self.alert_sent = False
|
|
self.alert_start_time = None # Track when alert started
|
|
|
|
def should_run(self):
|
|
"""Check if it's time to run this monitor."""
|
|
current_time = time.time()
|
|
if current_time - self.last_check >= self.check_interval:
|
|
self.last_check = current_time
|
|
return True
|
|
return False
|
|
|
|
def run(self):
|
|
"""Check temperature and handle alerts/logging."""
|
|
current_time = time.time()
|
|
|
|
# Read temperature
|
|
temps = self.sensor.read_all_temps(unit='F')
|
|
if not temps:
|
|
return
|
|
|
|
temp = list(temps.values())[0] # Get first temp reading
|
|
|
|
# Check for alerts
|
|
alert_condition = False
|
|
alert_message = ""
|
|
|
|
if self.alert_high and temp > self.alert_high:
|
|
alert_condition = True
|
|
alert_message = "⚠️ {} temperature HIGH: {:.1f}°F (threshold: {:.1f}°F)".format(
|
|
self.label, temp, self.alert_high
|
|
)
|
|
elif self.alert_low and temp < self.alert_low:
|
|
alert_condition = True
|
|
alert_message = "⚠️ {} temperature LOW: {:.1f}°F (threshold: {:.1f}°F)".format(
|
|
self.label, temp, self.alert_low
|
|
)
|
|
|
|
# Handle alert state changes
|
|
if alert_condition and not self.alert_sent:
|
|
# Alert triggered
|
|
self.alert_start_time = current_time
|
|
print(alert_message)
|
|
|
|
# Send to appropriate Discord channel
|
|
if self.send_alerts_to_separate_channel:
|
|
from scripts.discord_webhook import send_alert_message
|
|
send_alert_message(alert_message)
|
|
else:
|
|
from scripts.discord_webhook import send_discord_message
|
|
send_discord_message(alert_message)
|
|
|
|
self.alert_sent = True
|
|
|
|
elif not alert_condition and self.alert_sent:
|
|
# Alert resolved
|
|
duration = current_time - self.alert_start_time if self.alert_start_time else 0
|
|
|
|
# Format duration
|
|
if duration >= 3600:
|
|
hours = int(duration / 3600)
|
|
minutes = int((duration % 3600) / 60)
|
|
duration_str = "{}h {}m".format(hours, minutes)
|
|
elif duration >= 60:
|
|
minutes = int(duration / 60)
|
|
seconds = int(duration % 60)
|
|
duration_str = "{}m {}s".format(minutes, seconds)
|
|
else:
|
|
duration_str = "{}s".format(int(duration))
|
|
|
|
recovery_message = "✅ {} temperature back to normal: {:.1f}°F (was out of range for {})".format(
|
|
self.label, temp, duration_str
|
|
)
|
|
print(recovery_message)
|
|
|
|
# Send to appropriate Discord channel
|
|
if self.send_alerts_to_separate_channel:
|
|
from scripts.discord_webhook import send_alert_message
|
|
send_alert_message(recovery_message)
|
|
else:
|
|
from scripts.discord_webhook import send_discord_message
|
|
send_discord_message(recovery_message)
|
|
|
|
self.alert_sent = False
|
|
self.alert_start_time = None
|
|
|
|
# Log temperature at report interval
|
|
if current_time - self.last_report >= self.report_interval:
|
|
self.last_report = current_time
|
|
self._log_temperature(temp)
|
|
|
|
def _log_temperature(self, temp):
|
|
"""Log temperature to CSV file."""
|
|
try:
|
|
# Get timestamp
|
|
t = time.localtime()
|
|
timestamp = "{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format(
|
|
t[0], t[1], t[2], t[3], t[4], t[5]
|
|
)
|
|
|
|
# Append to log file
|
|
with open(self.log_file, 'a') as f:
|
|
f.write("{},{},{:.2f}\n".format(
|
|
timestamp, self.label, temp
|
|
))
|
|
except Exception as e:
|
|
print("Error logging temperature: {}".format(e))
|
|
|
|
class ACMonitor(Monitor):
|
|
def __init__(self, ac_controller, temp_sensor, target_temp=75.0, temp_swing=2.0, interval=30):
|
|
super().__init__(interval)
|
|
self.ac = ac_controller
|
|
self.sensor = temp_sensor # <-- This is set from main.py
|
|
self.target_temp = target_temp
|
|
self.temp_swing = temp_swing
|
|
self.last_notified_state = None
|
|
|
|
def run(self):
|
|
"""Check temperature and control AC."""
|
|
temps = self.sensor.read_all_temps(unit='F')
|
|
if not temps:
|
|
return
|
|
|
|
# Use first sensor reading (assuming single inside sensor)
|
|
current_temp = list(temps.values())[0]
|
|
|
|
# Cooling logic with temperature swing
|
|
# Turn ON if: temp > target + temp_swing
|
|
# Turn OFF if: temp < target - temp_swing
|
|
|
|
if current_temp > (self.target_temp + self.temp_swing):
|
|
# Too hot, turn AC on
|
|
if self.ac.turn_on():
|
|
if not self.last_notified_state:
|
|
send_discord_message(f"❄️ AC turned ON - Current: {current_temp:.1f}°F, Target: {self.target_temp:.1f}°F")
|
|
self.last_notified_state = True
|
|
|
|
elif current_temp < (self.target_temp - self.temp_swing):
|
|
# Cool enough, turn AC off
|
|
if self.ac.turn_off():
|
|
if self.last_notified_state:
|
|
send_discord_message(f"✅ AC turned OFF - Current: {current_temp:.1f}°F, Target: {self.target_temp:.1f}°F")
|
|
self.last_notified_state = False
|
|
|
|
# Else: within temp_swing range, maintain current state
|
|
|
|
class HeaterMonitor(Monitor):
|
|
"""Monitor temperature and control heater automatically."""
|
|
def __init__(self, heater_controller, temp_sensor, target_temp=70.0, temp_swing=2.0, interval=30):
|
|
"""
|
|
heater_controller: HeaterController instance
|
|
temp_sensor: TemperatureSensor instance (inside temp)
|
|
target_temp: Target temperature in °F
|
|
temp_swing: Temperature swing allowed (prevents rapid cycling)
|
|
interval: Seconds between checks
|
|
"""
|
|
super().__init__(interval)
|
|
self.heater = heater_controller
|
|
self.sensor = temp_sensor
|
|
self.target_temp = target_temp
|
|
self.temp_swing = temp_swing
|
|
self.last_notified_state = None
|
|
|
|
def run(self):
|
|
"""Check temperature and control heater."""
|
|
temps = self.sensor.read_all_temps(unit='F')
|
|
if not temps:
|
|
return
|
|
|
|
# Use first sensor reading (assuming single inside sensor)
|
|
current_temp = list(temps.values())[0]
|
|
|
|
# Heating logic with temperature swing
|
|
# Turn ON if: temp < target - temp_swing
|
|
# Turn OFF if: temp > target + temp_swing
|
|
|
|
if current_temp < (self.target_temp - self.temp_swing):
|
|
# Too cold, turn heater on
|
|
if self.heater.turn_on():
|
|
if not self.last_notified_state:
|
|
send_discord_message(f"🔥 Heater turned ON - Current: {current_temp:.1f}°F, Target: {self.target_temp:.1f}°F")
|
|
self.last_notified_state = True
|
|
|
|
elif current_temp > (self.target_temp + self.temp_swing):
|
|
# Warm enough, turn heater off
|
|
if self.heater.turn_off():
|
|
if self.last_notified_state:
|
|
send_discord_message(f"✅ Heater turned OFF - Current: {current_temp:.1f}°F, Target: {self.target_temp:.1f}°F")
|
|
self.last_notified_state = False
|
|
|
|
# Else: within temp_swing range, maintain current state
|
|
|
|
class WiFiMonitor(Monitor):
|
|
"""Monitor WiFi connection and handle reconnection."""
|
|
def __init__(self, wifi, led, interval=5, reconnect_cooldown=60):
|
|
super().__init__(interval)
|
|
self.wifi = wifi
|
|
self.led = led
|
|
self.reconnect_cooldown = reconnect_cooldown
|
|
self.last_reconnect_attempt = 0
|
|
self.was_connected = wifi.isconnected() if wifi else False
|
|
|
|
def run(self):
|
|
"""Check WiFi status, blink LED, attempt reconnect if needed."""
|
|
import network
|
|
from scripts.networking import connect_wifi
|
|
|
|
is_connected = self.wifi.isconnected() if self.wifi else False
|
|
|
|
if not is_connected:
|
|
# Fast blink when disconnected
|
|
self.led.on()
|
|
time.sleep(0.2)
|
|
self.led.off()
|
|
|
|
# Try reconnect if cooldown passed
|
|
now = time.ticks_ms()
|
|
if time.ticks_diff(now, self.last_reconnect_attempt) >= (self.reconnect_cooldown * 1000):
|
|
self.last_reconnect_attempt = now
|
|
# print("Attempting WiFi reconnect...")
|
|
self.wifi = connect_wifi(self.led)
|
|
|
|
if self.wifi and self.wifi.isconnected():
|
|
send_discord_message("WiFi connection restored 🔄")
|
|
self.was_connected = True
|
|
else:
|
|
# Slow blink when connected
|
|
self.led.on()
|
|
time.sleep(1)
|
|
self.led.off()
|
|
|
|
# Notify if connection was just restored
|
|
if not self.was_connected:
|
|
send_discord_message("WiFi connection restored 🔄")
|
|
self.was_connected = True
|
|
|
|
def run_monitors(monitors):
|
|
"""
|
|
Run all monitors in the list, checking if each should run based on interval.
|
|
Call this in your main loop.
|
|
"""
|
|
for monitor in monitors:
|
|
if monitor.should_run():
|
|
try:
|
|
monitor.run()
|
|
except Exception as e:
|
|
print(f"Error running monitor {monitor.__class__.__name__}: {e}") |