Refactor network and temperature monitoring scripts: implement structured classes for network scanning and temperature monitoring, enhance error handling, and integrate Discord notifications for temperature alerts.

This commit is contained in:
Aaron 2025-11-04 18:55:10 -05:00
parent 89ae69e16f
commit f2e610e62e
5 changed files with 306 additions and 84 deletions

72
Scripts/monitors.py Normal file
View File

@ -0,0 +1,72 @@
import time
from scripts.discord_webhook import send_discord_message
from scripts.temperature_sensor import TemperatureSensor
class Monitor:
"""Base class for all monitoring tasks."""
def __init__(self, interval=300):
"""
interval: seconds between checks
"""
self.interval = interval
self.last_check_ms = 0
def should_run(self):
"""Check if enough time has passed to run again."""
now = time.ticks_ms()
if time.ticks_diff(now, self.last_check_ms) >= (self.interval * 1000):
self.last_check_ms = now
return True
return False
def run(self):
"""Override this in subclasses to implement monitoring logic."""
pass
class TemperatureMonitor(Monitor):
"""Monitor temperature sensors and report to Discord."""
def __init__(self, pin=10, interval=300, alert_high=None, alert_low=None):
super().__init__(interval)
self.sensor = TemperatureSensor(pin=pin)
self.alert_high = alert_high # Alert if temp goes above this
self.alert_low = alert_low # Alert if temp goes below this
def run(self):
"""Read all sensors and report temperatures."""
temps = self.sensor.read_all_temps(unit='F')
if not temps:
print("No temperature readings available")
return
# Format message
temp_msg = "🌡️ Temperature readings:\n"
alerts = []
for rom, temp in temps.items():
sensor_id = rom.hex()[:8]
temp_msg += f"Sensor {sensor_id}: {temp:.1f}°F\n"
# Check alerts
if self.alert_high and temp > self.alert_high:
alerts.append(f"⚠️ Sensor {sensor_id} HIGH: {temp:.1f}°F (threshold: {self.alert_high}°F)")
if self.alert_low and temp < self.alert_low:
alerts.append(f"⚠️ Sensor {sensor_id} LOW: {temp:.1f}°F (threshold: {self.alert_low}°F)")
# Send regular update
send_discord_message(temp_msg.strip())
# Send alerts separately
for alert in alerts:
send_discord_message(alert)
def run_monitors(monitors):
"""
Run all monitors in the list, checking if each should run based on interval.
Call this in your main loop.
"""
for monitor in monitors:
if monitor.should_run():
try:
monitor.run()
except Exception as e:
print(f"Error running monitor {monitor.__class__.__name__}: {e}")

View File

@ -1,4 +1,75 @@
import network
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
print(wlan.scan())
import time
def scan_networks(timeout=10):
"""
Scan for available WiFi networks and return formatted results.
Returns list of dicts with network info.
"""
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
print("Scanning for networks...")
networks = wlan.scan()
# Format results
results = []
for net in networks:
ssid, bssid, channel, rssi, security, hidden = net
# Decode SSID if bytes
if isinstance(ssid, bytes):
ssid = ssid.decode('utf-8')
# Security type mapping
security_types = {
0: "Open",
1: "WEP",
2: "WPA-PSK",
3: "WPA2-PSK",
4: "WPA/WPA2-PSK"
}
results.append({
"ssid": ssid,
"bssid": ":".join(["%02X" % b for b in bssid]),
"channel": channel,
"rssi": rssi,
"security": security_types.get(security, "Unknown"),
"hidden": bool(hidden)
})
# Sort by signal strength (strongest first)
results.sort(key=lambda x: x["rssi"], reverse=True)
return results
def print_networks(networks):
"""Pretty print network scan results."""
print("\n" + "="*70)
print(f"Found {len(networks)} networks:")
print("="*70)
for i, net in enumerate(networks, 1):
print(f"\n{i}. {net['ssid']}")
print(f" BSSID: {net['bssid']}")
print(f" Channel: {net['channel']}")
print(f" Signal: {net['rssi']} dBm")
print(f" Security: {net['security']}")
if net['hidden']:
print(f" (Hidden Network)")
print("\n" + "="*70)
def find_network(ssid):
"""Find a specific network by SSID."""
networks = scan_networks()
for net in networks:
if net['ssid'] == ssid:
return net
return None
# Run scan if executed directly
if __name__ == "__main__":
networks = scan_networks()
print_networks(networks)

81
Scripts/networking.py Normal file
View File

@ -0,0 +1,81 @@
import network
import time
from secrets import secrets
RECONNECT_COOLDOWN_MS = 60000 # 60 seconds
def connect_wifi(led=None, timeout=10):
"""
Connect to WiFi using secrets['ssid'] / secrets['password'].
If `led` (machine.Pin) is provided, pulse it once on successful connect.
Returns the WLAN object or None on failure.
"""
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
print("Connecting to WiFi...", end="")
wifi.connect(secrets['ssid'], secrets['password'])
# Wait for connection with timeout
max_wait = timeout
while max_wait > 0:
if wifi.status() < 0 or wifi.status() >= 3:
break
max_wait -= 1
print(".", end="")
time.sleep(1)
if wifi.isconnected():
print("\nConnected! Network config:", wifi.ifconfig())
if led:
led.on()
time.sleep(1)
led.off()
return wifi
else:
print("\nConnection failed!")
return None
def monitor_connection(wifi, led, on_reconnect=None, on_loop=None, loop_interval=60):
"""
Monitor WiFi connection and attempt reconnects with cooldown.
Blinks LED fast when disconnected, slow when connected.
Calls on_reconnect() callback when connection is restored.
Calls on_loop() callback every loop_interval seconds when connected.
Never returns (infinite loop).
"""
last_attempt_ms = time.ticks_ms()
last_loop_ms = time.ticks_ms()
while True:
if not wifi or not wifi.isconnected():
# Fast blink when disconnected
led.on()
time.sleep(0.2)
led.off()
time.sleep(0.2)
# Only try to reconnect after cooldown
if time.ticks_diff(time.ticks_ms(), last_attempt_ms) >= RECONNECT_COOLDOWN_MS:
last_attempt_ms = time.ticks_ms()
wifi = connect_wifi(led)
# Notify when connection is restored
if wifi and wifi.isconnected() and on_reconnect:
on_reconnect()
else:
# Slow blink when connected
led.on()
time.sleep(1)
led.off()
time.sleep(1)
# Call loop callback at interval
if on_loop and time.ticks_diff(time.ticks_ms(), last_loop_ms) >= (loop_interval * 1000):
last_loop_ms = time.ticks_ms()
try:
on_loop()
except Exception as e:
print(f"Error in loop callback: {e}")
time.sleep(0.1)

View File

@ -1,19 +1,74 @@
import machine, onewire, ds18x20, time
import machine
import onewire
import ds18x20
import time
ds_pin = machine.Pin(10)
ds_sensor = ds18x20.DS18X20(onewire.OneWire(ds_pin))
class TemperatureSensor:
def __init__(self, pin=10):
"""Initialize DS18X20 temperature sensor on the specified pin."""
self.ds_pin = machine.Pin(pin)
self.ds_sensor = ds18x20.DS18X20(onewire.OneWire(self.ds_pin))
self.roms = []
self.scan_sensors()
def scan_sensors(self):
"""Scan for connected DS18X20 sensors."""
try:
self.roms = self.ds_sensor.scan()
print(f'Found {len(self.roms)} DS18X20 sensor(s)')
return self.roms
except Exception as e:
print(f'Error scanning sensors: {e}')
return []
def read_temp_c(self, rom=None):
"""Read temperature in Celsius. If rom=None, reads first sensor."""
try:
self.ds_sensor.convert_temp()
time.sleep_ms(750)
if rom is None and self.roms:
rom = self.roms[0]
if rom:
return self.ds_sensor.read_temp(rom)
return None
except Exception as e:
print(f'Error reading temperature: {e}')
return None
def read_temp_f(self, rom=None):
"""Read temperature in Fahrenheit."""
temp_c = self.read_temp_c(rom)
if temp_c is not None:
return temp_c * (9/5) + 32
return None
def read_all_temps(self, unit='F'):
"""Read all connected sensors. Returns dict of {rom: temp}."""
results = {}
try:
self.ds_sensor.convert_temp()
time.sleep_ms(750)
for rom in self.roms:
temp_c = self.ds_sensor.read_temp(rom)
if unit.upper() == 'F':
results[rom] = temp_c * (9/5) + 32
else:
results[rom] = temp_c
except Exception as e:
print(f'Error reading temperatures: {e}')
return results
roms = ds_sensor.scan()
print('Found DS devices: ', roms)
while True:
ds_sensor.convert_temp()
time.sleep_ms(750)
for rom in roms: # in a loop to get each sensor on the same pin since you can have multi sensors
print(rom)
tempC = ds_sensor.read_temp(rom)
tempF = tempC * (9/5) +32 # convert to farenheit
# print('temperature (ºC):', "{:.2f}".format(tempC)) # The result will have two decimal places {:.2f}
print('temperature (ºF):', "{:.2f}".format(tempF))
print()
time.sleep(5) # the loop will repeat every 5 seconds
# Example usage / test code
if __name__ == "__main__":
sensor = TemperatureSensor(pin=10)
while True:
temps = sensor.read_all_temps(unit='F')
for rom, temp in temps.items():
print(f'Sensor {rom.hex()}: {temp:.2f}°F')
print()
time.sleep(5)

71
main.py
View File

@ -1,78 +1,21 @@
from machine import Pin
import time
import network
import urequests as requests
from secrets import secrets
from scripts.networking import connect_wifi, monitor_connection
from scripts.discord_webhook import send_discord_message
# Initialize pins (LED light onboard)
led = Pin("LED", Pin.OUT)
# Initial state
led.low()
# Network connection
def connect_wifi():
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
print("Connecting to WiFi...", end="")
wifi.connect(secrets['ssid'], secrets['password'])
# Wait for connection with timeout
max_wait = 10
while max_wait > 0:
if wifi.status() < 0 or wifi.status() >= 3:
break
max_wait -= 1
print(".", end="")
time.sleep(1)
if wifi.isconnected():
print("\nConnected! Network config:", wifi.ifconfig())
led.on()
time.sleep(1)
led.off()
return wifi
else:
print("\nConnection failed!")
return None
# Connect to WiFi
wifi = connect_wifi()
wifi = connect_wifi(led)
# Send startup message if connected
if wifi and wifi.isconnected():
send_discord_message("Pico W online and connected ✅")
# Define reconnect callback
def on_wifi_restored():
send_discord_message("WiFi connection restored 🔄")
# Throttle reconnect attempts
RECONNECT_COOLDOWN_MS = 60000 # 60 seconds
last_attempt_ms = time.ticks_ms()
# Connection monitoring loop
while True:
if not wifi or not wifi.isconnected():
# Fast blink when disconnected
led.on()
time.sleep(0.2)
led.off()
time.sleep(0.2)
# Only try to reconnect after cooldown
if time.ticks_diff(time.ticks_ms(), last_attempt_ms) >= RECONNECT_COOLDOWN_MS:
last_attempt_ms = time.ticks_ms()
wifi = connect_wifi()
# Notify only when connection is restored
if wifi and wifi.isconnected():
send_discord_message("WiFi connection restored 🔄")
else:
# Slow blink when connected
led.on()
time.sleep(1)
led.off()
time.sleep(1)
time.sleep(0.1)
# Start connection monitoring loop (never returns)
monitor_connection(wifi, led, on_reconnect=on_wifi_restored)