import time from scripts.discord_webhook import send_discord_message from scripts.temperature_sensor import TemperatureSensor class Monitor: """Base class for all monitoring tasks.""" def __init__(self, interval=300): """ interval: seconds between checks """ self.interval = interval self.last_check_ms = 0 def should_run(self): """Check if enough time has passed to run again.""" now = time.ticks_ms() if time.ticks_diff(now, self.last_check_ms) >= (self.interval * 1000): self.last_check_ms = now return True return False def run(self): """Override this in subclasses to implement monitoring logic.""" pass class TemperatureMonitor(Monitor): """Monitor for tracking temperature readings and alerts.""" def __init__(self, sensor, label, check_interval=10, report_interval=60, alert_high=None, alert_low=None, log_file="/temp_logs.csv", send_alerts_to_separate_channel=False): """ Initialize temperature monitor. Args: sensor: TemperatureSensor instance label: Label for this sensor check_interval: How often to check temp (seconds) report_interval: How often to report/log temp (seconds) alert_high: High temp threshold for alerts alert_low: Low temp threshold for alerts log_file: Path to CSV log file send_alerts_to_separate_channel: Use separate Discord channel for alerts """ self.sensor = sensor self.label = label self.check_interval = check_interval self.report_interval = report_interval self.alert_high = alert_high self.alert_low = alert_low self.log_file = log_file self.send_alerts_to_separate_channel = send_alerts_to_separate_channel self.last_check = 0 self.last_report = 0 self.alert_sent = False self.alert_start_time = None # Track when alert started def should_run(self): """Check if it's time to run this monitor.""" current_time = time.time() if current_time - self.last_check >= self.check_interval: self.last_check = current_time return True return False def run(self): """Check temperature and handle alerts/logging.""" current_time = time.time() # Read temperature temps = self.sensor.read_all_temps(unit='F') if not temps: return temp = list(temps.values())[0] # Get first temp reading # Check for alerts alert_condition = False alert_message = "" if self.alert_high and temp > self.alert_high: alert_condition = True alert_message = "⚠️ {} temperature HIGH: {:.1f}°F (threshold: {:.1f}°F)".format( self.label, temp, self.alert_high ) elif self.alert_low and temp < self.alert_low: alert_condition = True alert_message = "⚠️ {} temperature LOW: {:.1f}°F (threshold: {:.1f}°F)".format( self.label, temp, self.alert_low ) # Handle alert state changes if alert_condition and not self.alert_sent: # Alert triggered self.alert_start_time = current_time print(alert_message) # Send to appropriate Discord channel if self.send_alerts_to_separate_channel: from scripts.discord_webhook import send_alert_message send_alert_message(alert_message) else: from scripts.discord_webhook import send_discord_message send_discord_message(alert_message) self.alert_sent = True elif not alert_condition and self.alert_sent: # Alert resolved duration = current_time - self.alert_start_time if self.alert_start_time else 0 # Format duration if duration >= 3600: hours = int(duration / 3600) minutes = int((duration % 3600) / 60) duration_str = "{}h {}m".format(hours, minutes) elif duration >= 60: minutes = int(duration / 60) seconds = int(duration % 60) duration_str = "{}m {}s".format(minutes, seconds) else: duration_str = "{}s".format(int(duration)) recovery_message = "✅ {} temperature back to normal: {:.1f}°F (was out of range for {})".format( self.label, temp, duration_str ) print(recovery_message) # Send to appropriate Discord channel if self.send_alerts_to_separate_channel: from scripts.discord_webhook import send_alert_message send_alert_message(recovery_message) else: from scripts.discord_webhook import send_discord_message send_discord_message(recovery_message) self.alert_sent = False self.alert_start_time = None # Log temperature at report interval if current_time - self.last_report >= self.report_interval: self.last_report = current_time self._log_temperature(temp) def _log_temperature(self, temp): """Log temperature to CSV file.""" try: # Get timestamp t = time.localtime() timestamp = "{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}".format( t[0], t[1], t[2], t[3], t[4], t[5] ) # Append to log file with open(self.log_file, 'a') as f: f.write("{},{},{:.2f}\n".format( timestamp, self.label, temp )) except Exception as e: print("Error logging temperature: {}".format(e)) class ACMonitor(Monitor): def __init__(self, ac_controller, temp_sensor, target_temp=75.0, temp_swing=2.0, interval=30): super().__init__(interval) self.ac = ac_controller self.sensor = temp_sensor # <-- This is set from main.py self.target_temp = target_temp self.temp_swing = temp_swing self.last_notified_state = None def run(self): """Check temperature and control AC.""" temps = self.sensor.read_all_temps(unit='F') if not temps: return # Use first sensor reading (assuming single inside sensor) current_temp = list(temps.values())[0] # Cooling logic with temperature swing # Turn ON if: temp > target + temp_swing # Turn OFF if: temp < target - temp_swing if current_temp > (self.target_temp + self.temp_swing): # Too hot, turn AC on if self.ac.turn_on(): if not self.last_notified_state: send_discord_message(f"❄️ AC turned ON - Current: {current_temp:.1f}°F, Target: {self.target_temp:.1f}°F") self.last_notified_state = True elif current_temp < (self.target_temp - self.temp_swing): # Cool enough, turn AC off if self.ac.turn_off(): if self.last_notified_state: send_discord_message(f"✅ AC turned OFF - Current: {current_temp:.1f}°F, Target: {self.target_temp:.1f}°F") self.last_notified_state = False # Else: within temp_swing range, maintain current state class HeaterMonitor(Monitor): """Monitor temperature and control heater automatically.""" def __init__(self, heater_controller, temp_sensor, target_temp=70.0, temp_swing=2.0, interval=30): """ heater_controller: HeaterController instance temp_sensor: TemperatureSensor instance (inside temp) target_temp: Target temperature in °F temp_swing: Temperature swing allowed (prevents rapid cycling) interval: Seconds between checks """ super().__init__(interval) self.heater = heater_controller self.sensor = temp_sensor self.target_temp = target_temp self.temp_swing = temp_swing self.last_notified_state = None def run(self): """Check temperature and control heater.""" temps = self.sensor.read_all_temps(unit='F') if not temps: return # Use first sensor reading (assuming single inside sensor) current_temp = list(temps.values())[0] # Heating logic with temperature swing # Turn ON if: temp < target - temp_swing # Turn OFF if: temp > target + temp_swing if current_temp < (self.target_temp - self.temp_swing): # Too cold, turn heater on if self.heater.turn_on(): if not self.last_notified_state: send_discord_message(f"🔥 Heater turned ON - Current: {current_temp:.1f}°F, Target: {self.target_temp:.1f}°F") self.last_notified_state = True elif current_temp > (self.target_temp + self.temp_swing): # Warm enough, turn heater off if self.heater.turn_off(): if self.last_notified_state: send_discord_message(f"✅ Heater turned OFF - Current: {current_temp:.1f}°F, Target: {self.target_temp:.1f}°F") self.last_notified_state = False # Else: within temp_swing range, maintain current state class WiFiMonitor(Monitor): """Monitor WiFi connection and handle reconnection.""" def __init__(self, wifi, led, interval=5, reconnect_cooldown=60): super().__init__(interval) self.wifi = wifi self.led = led self.reconnect_cooldown = reconnect_cooldown self.last_reconnect_attempt = 0 self.was_connected = wifi.isconnected() if wifi else False def run(self): """Check WiFi status, blink LED, attempt reconnect if needed.""" import network from scripts.networking import connect_wifi is_connected = self.wifi.isconnected() if self.wifi else False if not is_connected: # Fast blink when disconnected self.led.on() time.sleep(0.2) self.led.off() # Try reconnect if cooldown passed now = time.ticks_ms() if time.ticks_diff(now, self.last_reconnect_attempt) >= (self.reconnect_cooldown * 1000): self.last_reconnect_attempt = now # print("Attempting WiFi reconnect...") self.wifi = connect_wifi(self.led) if self.wifi and self.wifi.isconnected(): send_discord_message("WiFi connection restored 🔄") self.was_connected = True else: # Slow blink when connected self.led.on() time.sleep(1) self.led.off() # Notify if connection was just restored if not self.was_connected: send_discord_message("WiFi connection restored 🔄") self.was_connected = True def run_monitors(monitors): """ Run all monitors in the list, checking if each should run based on interval. Call this in your main loop. """ for monitor in monitors: if monitor.should_run(): try: monitor.run() except Exception as e: print(f"Error running monitor {monitor.__class__.__name__}: {e}")