Compare commits
55 Commits
8c512db131
...
12c3fd59fe
| Author | SHA1 | Date | |
|---|---|---|---|
| 12c3fd59fe | |||
| 2fba3367ab | |||
| 93b67a9142 | |||
| cdbb527743 | |||
| 7428a6b5a6 | |||
| c1e2460758 | |||
| d5149cf237 | |||
| f8e2a6e749 | |||
| 424d906760 | |||
| 1753966cf7 | |||
| 07d04a6084 | |||
| 473467e73f | |||
| fc318bb74d | |||
| 4ef7b00b74 | |||
| 35558805b2 | |||
| 49f5cd5596 | |||
| 295576857f | |||
| adc7049492 | |||
| 6fe498a3fd | |||
| 5cdc1c51d6 | |||
| 6f99df50ea | |||
| c4593caf9f | |||
| 0ceeb3ba41 | |||
| eec170056e | |||
| f2e610e62e | |||
| 89ae69e16f | |||
| 6b1f1dec80 | |||
| d3723b4f27 | |||
| 46d8177fca | |||
| 3d32c61008 | |||
| 58d6538344 | |||
| b5a2e216f2 | |||
| eca03e0e11 | |||
| a6c9429fda | |||
| 18e6e4b406 | |||
| d7a8a00636 | |||
| f1e89e1262 | |||
| 14b761647d | |||
| b6bd30ac5c | |||
| 4cd60205c3 | |||
| 4de23d6fa5 | |||
| 32dbd43079 | |||
| 7dcfe66855 | |||
| b0f411a5b8 | |||
| cf4e7fdddb | |||
| 15d1f4cfc3 | |||
| 815e3b175f | |||
| c3779ee4a4 | |||
| 2767f206a7 | |||
| fe5d724afa | |||
| 4ecf457c0a | |||
| 2160b6b9c6 | |||
| 6e7f15cf77 | |||
| e2378da17c | |||
| efba181492 |
5
.gitignore
vendored
Normal file
5
.gitignore
vendored
Normal file
@ -0,0 +1,5 @@
|
||||
/__pycache__/
|
||||
secrets.py
|
||||
/pymakr-test/
|
||||
.gitignore
|
||||
.vscode/
|
||||
@ -1,32 +0,0 @@
|
||||
# Items Needed for the Project
|
||||
|
||||
<p> There are quite a few items needed to get this project going, for instance, in order to get accurate time and data logs another item would be easiest integrate into the system. Adafruit DS3231 is designed to keep track of RTC data using a battery so in case of power outage, it won't reset. If we integrate through pi pico alone, everytime the pico resets the log would reset to jan 1, 2021 12:01 etc.. </p>
|
||||
|
||||
<p> So here we are, along with contactors, step ups, and few other miscaleneous stuff to get high voltage things going, here is the list. </p>
|
||||
|
||||
Components Needed:
|
||||
- [ ] [Adafruit DS3231 (Precision RTC Breakout)](https://www.adafruit.com/product/3013)
|
||||
- [x] [3vdc input 24-380vac output relay](https://amzn.to/41h5ESE) [Picked up 4]
|
||||
- [ ] Temp Probe
|
||||
- [ ] Humidity Probe
|
||||
|
||||
Things needed to cut on and off:
|
||||
- Cooling
|
||||
- Heating
|
||||
- Humidifier
|
||||
- Dehumidifier
|
||||
- Lights
|
||||
|
||||
Later:
|
||||
- Water
|
||||
|
||||
|
||||
Components Wanted:
|
||||
- Adafruit DS3231 (Precision RTC Breakout)
|
||||
- 3vdc to 24v step ups
|
||||
- 3vdc contactor?
|
||||
- 24vac contactor
|
||||
- 24vdc contactor
|
||||
- Voltage converter
|
||||
|
||||
<p> It seems there are 8 channel relay modules on amazon with a pinout for elegoo pi boards. Would be good for staging fans and/or lights. Although the lights can take 12 amps per 2 led boards I have. So would be more preferred to go with low voltage coil relay with high voltage output. Trying to think what else it could be useful for. Small switches and open/close sides on 8 channels.</p>
|
||||
@ -6,7 +6,7 @@ This will be a staging spot for all my python scripts mostly. Hopefully use this
|
||||
</p>
|
||||
|
||||
Other pages of concern:
|
||||
- [Items Needed](/Items-Needed.md)
|
||||
- [Items Needed](https://gitea.rcs1.top/sickprodigy/Auto-Garden/wiki/Items-Needed-for-the-Project)
|
||||
|
||||
Note 1:
|
||||
<p>
|
||||
|
||||
@ -1,11 +0,0 @@
|
||||
from machine import Pin, Timer
|
||||
|
||||
ledMachine = machine.Pin("LED", machine.Pin.OUT)
|
||||
led = Pin(12, Pin.OUT)
|
||||
timer = Timer()
|
||||
|
||||
def blink(timer):
|
||||
led.toggle()
|
||||
ledMachine.toggle()
|
||||
|
||||
timer.init(freq=2.5, mode=Timer.PERIODIC, callback=blink)
|
||||
@ -1,9 +0,0 @@
|
||||
import machine
|
||||
led = machine.Pin("LED", machine.Pin.OUT)
|
||||
led.off() # Make sure off
|
||||
led.on() # Turn on last command, stays on
|
||||
|
||||
contactorV12 = Pin(12, Pin.OUT)
|
||||
|
||||
contactorV12.off()
|
||||
contactorV12.on()
|
||||
@ -1,34 +0,0 @@
|
||||
import machine
|
||||
import utime
|
||||
|
||||
sensor_temp = machine.ADC(machine.ADC.CORE_TEMP)
|
||||
conversion_factor = 3.3 / (65535)
|
||||
|
||||
led_onboard = machine.Pin(25, machine.Pin.OUT)
|
||||
led_onboard.value(0)
|
||||
|
||||
rtc=machine.RTC()
|
||||
|
||||
file = open("temps.txt", "w")
|
||||
#file = open("temps.txt", "a") # To append to log file rather than overwrite
|
||||
|
||||
while True:
|
||||
reading = sensor_temp.read_u16() * conversion_factor
|
||||
timestamp=rtc.datetime()
|
||||
temperature = 27 - (reading - 0.706)/0.001721
|
||||
|
||||
timestring="%04d-%02d-%02d %02d:%02d:%02d"%(timestamp[0:3] +
|
||||
timestamp[4:7])
|
||||
file.write(timestring + "," + str(temperature) + "\n")
|
||||
file.flush()
|
||||
led_onboard.value(1)
|
||||
utime.sleep(0.01)
|
||||
led_onboard.value(0)
|
||||
|
||||
utime.sleep(30)
|
||||
|
||||
# Notes
|
||||
# Given that space is at a premium when writing logs to files on the
|
||||
# internal Flash, one option may be produce timestamps and then deltas or
|
||||
# changes after that, forcing a full timestamp every so often to ensure
|
||||
# things are synchronised
|
||||
@ -1,4 +0,0 @@
|
||||
import machine
|
||||
led = machine.Pin("LED", machine.Pin.OUT)
|
||||
led.off()
|
||||
led.on()
|
||||
@ -1,7 +0,0 @@
|
||||
# Auto Garden Scripts
|
||||
### Script I'd like to take note of
|
||||
|
||||
Scirpts:
|
||||
- [Lights on and Off at Intervals](Lights-on-off-intervals.py)
|
||||
- [Log Date and Time](Log-Date-and-Time.py)
|
||||
- [Onboard LED On](Onboard-LED-On.py)
|
||||
@ -1,8 +0,0 @@
|
||||
from machine import Pin, Timer
|
||||
led = machine.Pin("LED", machine.Pin.OUT)
|
||||
timer = Timer()
|
||||
|
||||
def blink(timer):
|
||||
led.toggle()
|
||||
|
||||
timer.init(freq=2.5, mode=Timer.PERIODIC, callback=blink)
|
||||
@ -1,30 +0,0 @@
|
||||
import network
|
||||
import urequests as requests #dependency faked as another dependency ig
|
||||
|
||||
url = "https://discord.com/api/webhooks/1199545130378608650/UyJj0e-YuSlmVZD87zaHitXZLC55RP6LVBUv3nt9ZWr6d4AGzSGKZ-zI6V_VwA6I4qSq" #webhook url, from here: https://i.imgur.com/f9XnAew.png
|
||||
|
||||
#for all params, see https://discordapp.com/developers/docs/resources/webhook#execute-webhook
|
||||
data = {
|
||||
"content" : "Testing",
|
||||
"username" : "Captain Hook"
|
||||
}
|
||||
|
||||
#leave this out if you dont want an embed
|
||||
#for all params, see https://discordapp.com/developers/docs/resources/channel#embed-object
|
||||
data["embeds"] = [
|
||||
{
|
||||
"description" : "Testing for temps",
|
||||
"title" : "Tent Temp=5000c"
|
||||
}
|
||||
]
|
||||
|
||||
result = requests.post(url, json = data)
|
||||
|
||||
try:
|
||||
result.raise_for_status()
|
||||
except requests.exceptions.HTTPError as err:
|
||||
print(err)
|
||||
else:
|
||||
print("Payload delivered successfully, code {}.".format(result.status_code))
|
||||
|
||||
#result: https://i.imgur.com/DRqXQzA.png
|
||||
55
Scripts/discord_webhook.py
Normal file
55
Scripts/discord_webhook.py
Normal file
@ -0,0 +1,55 @@
|
||||
import urequests as requests
|
||||
from secrets import secrets
|
||||
|
||||
def _escape_json_str(s: str) -> str:
|
||||
# minimal JSON string escaper for quotes/backslashes and control chars
|
||||
s = s.replace("\\", "\\\\")
|
||||
s = s.replace('"', '\\"')
|
||||
s = s.replace("\n", "\\n")
|
||||
s = s.replace("\r", "\\r")
|
||||
s = s.replace("\t", "\\t")
|
||||
return s
|
||||
|
||||
def send_discord_message(message, username="Auto Garden Bot", is_alert=False):
|
||||
resp = None
|
||||
|
||||
# Use alert webhook if specified, otherwise normal webhook
|
||||
if is_alert:
|
||||
url = secrets.get('discord_alert_webhook_url') or secrets.get('discord_webhook_url')
|
||||
else:
|
||||
url = secrets.get('discord_webhook_url')
|
||||
|
||||
try:
|
||||
if not url:
|
||||
# print("DEBUG: no webhook URL in secrets")
|
||||
return False
|
||||
|
||||
url = url.strip().strip('\'"')
|
||||
|
||||
# build JSON by hand so emoji (and other unicode) are preserved as UTF-8 bytes
|
||||
content = _escape_json_str(message)
|
||||
user = _escape_json_str(username)
|
||||
body_bytes = ('{"content":"%s","username":"%s"}' % (content, user)).encode("utf-8")
|
||||
|
||||
headers = {"Content-Type": "application/json; charset=utf-8"}
|
||||
|
||||
resp = requests.post(url, data=body_bytes, headers=headers)
|
||||
|
||||
status = getattr(resp, "status", getattr(resp, "status_code", None))
|
||||
|
||||
if status and 200 <= status < 300:
|
||||
# print("Discord message sent")
|
||||
return True
|
||||
else:
|
||||
# print(f"Discord webhook failed with status {status}")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
# print("Failed to send Discord message:", e)
|
||||
return False
|
||||
finally:
|
||||
if resp:
|
||||
try:
|
||||
resp.close()
|
||||
except:
|
||||
pass
|
||||
@ -1,14 +0,0 @@
|
||||
from machine import Pin, PWM
|
||||
from time import sleep
|
||||
|
||||
pwm = PWM(Pin(12))
|
||||
|
||||
pwm.freq(5000)
|
||||
|
||||
while True:
|
||||
for duty in range(65025):
|
||||
pwm.duty_u16(duty)
|
||||
sleep(0.0005)
|
||||
for duty in range(65025, 0, -1):
|
||||
pwm.duty_u16(duty)
|
||||
sleep(0.0005)
|
||||
@ -1,22 +0,0 @@
|
||||
from machine import Pin,PWM
|
||||
from pwm import myPWM
|
||||
import time
|
||||
|
||||
mypwm = myPWM(16, 17, 18, 19, 20, 21, 22, 26, 27, 28)
|
||||
chns = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
|
||||
dutys = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 65535, 32768, 16384, 8192, 4096, 2048, 1024, 512, 256, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
|
||||
delayTimes = 50
|
||||
|
||||
try:
|
||||
while True:
|
||||
for i in range(0, 20):
|
||||
for j in range(0, 10):
|
||||
mypwm.ledcWrite(chns[j], dutys[i+j])
|
||||
time.sleep_ms(delayTimes)
|
||||
|
||||
for i in range(0, 20):
|
||||
for j in range(0, 10):
|
||||
mypwm.ledcWrite(chns[9 -j], dutys[i+j])
|
||||
time.sleep_ms(delayTimes)
|
||||
except:
|
||||
mypwm.deinit()
|
||||
@ -1,8 +0,0 @@
|
||||
from machine import Pin, Timer
|
||||
led = Pin(15, Pin.OUT)
|
||||
timer = Timer()
|
||||
|
||||
def blink(timer):
|
||||
led.toggle()
|
||||
|
||||
timer.init(freq=2.5, mode=Timer.PERIODIC, callback=blink)
|
||||
@ -1,4 +0,0 @@
|
||||
import machine
|
||||
led = machine.Pin("LED", machine.Pin.OUT)
|
||||
led.off()
|
||||
led.on()
|
||||
@ -1,12 +0,0 @@
|
||||
from machine import Pin, Timer
|
||||
|
||||
# First Testing Env
|
||||
ledMachine = machine.Pin("LED", machine.Pin.OUT)
|
||||
# led = Pin(12, Pin.OUT)
|
||||
timer = Timer()
|
||||
|
||||
def blink(timer):
|
||||
# led.toggle()
|
||||
ledMachine.toggle()
|
||||
|
||||
timer.init(freq=2.5, mode=Timer.PERIODIC, callback=blink)
|
||||
161
Scripts/monitors.py
Normal file
161
Scripts/monitors.py
Normal file
@ -0,0 +1,161 @@
|
||||
import time
|
||||
from scripts.discord_webhook import send_discord_message
|
||||
from scripts.temperature_sensor import TemperatureSensor
|
||||
|
||||
class Monitor:
|
||||
"""Base class for all monitoring tasks."""
|
||||
def __init__(self, interval=300):
|
||||
"""
|
||||
interval: seconds between checks
|
||||
"""
|
||||
self.interval = interval
|
||||
self.last_check_ms = 0
|
||||
|
||||
def should_run(self):
|
||||
"""Check if enough time has passed to run again."""
|
||||
now = time.ticks_ms()
|
||||
if time.ticks_diff(now, self.last_check_ms) >= (self.interval * 1000):
|
||||
self.last_check_ms = now
|
||||
return True
|
||||
return False
|
||||
|
||||
def run(self):
|
||||
"""Override this in subclasses to implement monitoring logic."""
|
||||
pass
|
||||
|
||||
class TemperatureMonitor(Monitor):
|
||||
"""Monitor temperature sensors and report to Discord."""
|
||||
def __init__(self, sensor, label="Temp", check_interval=10, report_interval=30, alert_high=None, alert_low=None, log_file=None, send_alerts_to_separate_channel=False):
|
||||
super().__init__(check_interval) # Check interval for temp reading
|
||||
self.sensor = sensor
|
||||
self.label = label
|
||||
self.check_interval = check_interval
|
||||
self.report_interval = report_interval
|
||||
self.alert_high = alert_high
|
||||
self.alert_low = alert_low
|
||||
self.log_file = log_file
|
||||
self.send_alerts_to_separate_channel = send_alerts_to_separate_channel
|
||||
self.last_report_ms = 0
|
||||
self.last_alert_state = None # Track if we were in alert state
|
||||
|
||||
def run(self):
|
||||
"""Read sensors every check_interval, report/log every report_interval."""
|
||||
temps = self.sensor.read_all_temps(unit='F')
|
||||
if not temps:
|
||||
# print(f"No temperature readings available for {self.label}")
|
||||
return
|
||||
|
||||
now = time.ticks_ms()
|
||||
should_report = time.ticks_diff(now, self.last_report_ms) >= (self.report_interval * 1000)
|
||||
|
||||
for rom, temp in temps.items():
|
||||
sensor_id = rom.hex()[:8]
|
||||
|
||||
# Check if in alert state
|
||||
has_alert = False
|
||||
alert_type = None
|
||||
|
||||
if self.alert_high and temp > self.alert_high:
|
||||
has_alert = True
|
||||
alert_type = "HIGH"
|
||||
elif self.alert_low and temp < self.alert_low:
|
||||
has_alert = True
|
||||
alert_type = "LOW"
|
||||
|
||||
# Send alert immediately to alert channel (every check_interval, only if configured)
|
||||
if has_alert and self.send_alerts_to_separate_channel:
|
||||
alert_msg = f"🚨 {self.label} Temperature: {temp:.1f}°F ⚠️ {alert_type} (threshold: {self.alert_high if alert_type == 'HIGH' else self.alert_low}°F)"
|
||||
send_discord_message(alert_msg, is_alert=True)
|
||||
self.last_alert_state = True
|
||||
|
||||
# Send normal report at report_interval to regular channel (regardless of alert state)
|
||||
if should_report:
|
||||
temp_msg = f"🌡️ {self.label} Temperature: {temp:.1f}°F"
|
||||
|
||||
# Add alert indicator to regular report if in alert
|
||||
if has_alert:
|
||||
temp_msg += f" ⚠️ {alert_type}"
|
||||
|
||||
send_discord_message(temp_msg, is_alert=False)
|
||||
|
||||
# Send recovery message if we were in alert and now normal
|
||||
if not has_alert and self.last_alert_state:
|
||||
recovery_msg = f"✅ {self.label} Temperature back to normal: {temp:.1f}°F"
|
||||
send_discord_message(recovery_msg, is_alert=False)
|
||||
self.last_alert_state = False
|
||||
|
||||
# Log to file at report_interval
|
||||
if should_report and self.log_file:
|
||||
self._log_temp(sensor_id, temp)
|
||||
|
||||
# Update last report time
|
||||
if should_report:
|
||||
self.last_report_ms = now
|
||||
|
||||
def _log_temp(self, sensor_id, temp):
|
||||
"""Log temperature reading to file."""
|
||||
try:
|
||||
import time
|
||||
timestamp = time.localtime()
|
||||
log_entry = f"{timestamp[0]}-{timestamp[1]:02d}-{timestamp[2]:02d} {timestamp[3]:02d}:{timestamp[4]:02d}:{timestamp[5]:02d},{self.label},{sensor_id},{temp:.2f}\n"
|
||||
|
||||
with open(self.log_file, 'a') as f:
|
||||
f.write(log_entry)
|
||||
except Exception as e:
|
||||
print(f"Error logging temperature: {e}")
|
||||
|
||||
class WiFiMonitor(Monitor):
|
||||
"""Monitor WiFi connection and handle reconnection."""
|
||||
def __init__(self, wifi, led, interval=5, reconnect_cooldown=60):
|
||||
super().__init__(interval)
|
||||
self.wifi = wifi
|
||||
self.led = led
|
||||
self.reconnect_cooldown = reconnect_cooldown
|
||||
self.last_reconnect_attempt = 0
|
||||
self.was_connected = wifi.isconnected() if wifi else False
|
||||
|
||||
def run(self):
|
||||
"""Check WiFi status, blink LED, attempt reconnect if needed."""
|
||||
import network
|
||||
from scripts.networking import connect_wifi
|
||||
|
||||
is_connected = self.wifi.isconnected() if self.wifi else False
|
||||
|
||||
if not is_connected:
|
||||
# Fast blink when disconnected
|
||||
self.led.on()
|
||||
time.sleep(0.2)
|
||||
self.led.off()
|
||||
|
||||
# Try reconnect if cooldown passed
|
||||
now = time.ticks_ms()
|
||||
if time.ticks_diff(now, self.last_reconnect_attempt) >= (self.reconnect_cooldown * 1000):
|
||||
self.last_reconnect_attempt = now
|
||||
# print("Attempting WiFi reconnect...")
|
||||
self.wifi = connect_wifi(self.led)
|
||||
|
||||
if self.wifi and self.wifi.isconnected():
|
||||
send_discord_message("WiFi connection restored 🔄")
|
||||
self.was_connected = True
|
||||
else:
|
||||
# Slow blink when connected
|
||||
self.led.on()
|
||||
time.sleep(1)
|
||||
self.led.off()
|
||||
|
||||
# Notify if connection was just restored
|
||||
if not self.was_connected:
|
||||
send_discord_message("WiFi connection restored 🔄")
|
||||
self.was_connected = True
|
||||
|
||||
def run_monitors(monitors):
|
||||
"""
|
||||
Run all monitors in the list, checking if each should run based on interval.
|
||||
Call this in your main loop.
|
||||
"""
|
||||
for monitor in monitors:
|
||||
if monitor.should_run():
|
||||
try:
|
||||
monitor.run()
|
||||
except Exception as e:
|
||||
print(f"Error running monitor {monitor.__class__.__name__}: {e}")
|
||||
37
Scripts/networking.py
Normal file
37
Scripts/networking.py
Normal file
@ -0,0 +1,37 @@
|
||||
import network
|
||||
import time
|
||||
from secrets import secrets
|
||||
|
||||
RECONNECT_COOLDOWN_MS = 60000 # 60 seconds
|
||||
|
||||
def connect_wifi(led=None, timeout=10):
|
||||
"""
|
||||
Connect to WiFi using secrets['ssid'] / secrets['password'].
|
||||
If `led` (machine.Pin) is provided, pulse it once on successful connect.
|
||||
Returns the WLAN object or None on failure.
|
||||
"""
|
||||
wifi = network.WLAN(network.STA_IF)
|
||||
wifi.active(True)
|
||||
|
||||
# print("Connecting to WiFi...", end="")
|
||||
wifi.connect(secrets['ssid'], secrets['password'])
|
||||
|
||||
# Wait for connection with timeout
|
||||
max_wait = timeout
|
||||
while max_wait > 0:
|
||||
if wifi.status() < 0 or wifi.status() >= 3:
|
||||
break
|
||||
max_wait -= 1
|
||||
# print(".", end="")
|
||||
time.sleep(1)
|
||||
|
||||
if wifi.isconnected():
|
||||
# print("\nConnected! Network config:", wifi.ifconfig())
|
||||
if led:
|
||||
led.on()
|
||||
time.sleep(1)
|
||||
led.off()
|
||||
return wifi
|
||||
else:
|
||||
# print("\nConnection failed!")
|
||||
return None
|
||||
@ -1,63 +0,0 @@
|
||||
from machine import Pin,PWM
|
||||
|
||||
|
||||
class myPWM():
|
||||
def __init__(self, pwm0: int=16, pwm1: int=17, pwm2: int=18, pwm3: int=19, pwm4: int=20, pwm5: int=21, pwm6: int=22, pwm7: int=26, pwm8: int=27, pwm9: int=28, freq_num: int=10000):
|
||||
self._pwm0 = PWM(Pin(pwm0))
|
||||
self._pwm0.freq(freq_num)
|
||||
self._pwm1 = PWM(Pin(pwm1))
|
||||
self._pwm1.freq(freq_num)
|
||||
self._pwm2 = PWM(Pin(pwm2))
|
||||
self._pwm2.freq(freq_num)
|
||||
self._pwm3 = PWM(Pin(pwm3))
|
||||
self._pwm3.freq(freq_num)
|
||||
self._pwm4 = PWM(Pin(pwm4))
|
||||
self._pwm4.freq(freq_num)
|
||||
self._pwm5 = PWM(Pin(pwm5))
|
||||
self._pwm5.freq(freq_num)
|
||||
self._pwm6 = PWM(Pin(pwm6))
|
||||
self._pwm6.freq(freq_num)
|
||||
self._pwm7 = PWM(Pin(pwm7))
|
||||
self._pwm7.freq(freq_num)
|
||||
self._pwm8 = PWM(Pin(pwm8))
|
||||
self._pwm8.freq(freq_num)
|
||||
self._pwm9 = PWM(Pin(pwm9))
|
||||
self._pwm9.freq(freq_num)
|
||||
|
||||
def ledcWrite(self,chn,value):
|
||||
if chn == 0:
|
||||
self._pwm0.duty_u16(value)
|
||||
elif chn == 1:
|
||||
self._pwm1.duty_u16(value)
|
||||
elif chn == 2:
|
||||
self._pwm2.duty_u16(value)
|
||||
elif chn == 3:
|
||||
self._pwm3.duty_u16(value)
|
||||
elif chn == 4:
|
||||
self._pwm4.duty_u16(value)
|
||||
elif chn == 5:
|
||||
self._pwm5.duty_u16(value)
|
||||
elif chn == 6:
|
||||
self._pwm6.duty_u16(value)
|
||||
elif chn == 7:
|
||||
self._pwm7.duty_u16(value)
|
||||
elif chn == 8:
|
||||
self._pwm8.duty_u16(value)
|
||||
elif chn == 9:
|
||||
self._pwm9.duty_u16(value)
|
||||
|
||||
|
||||
|
||||
def deinit(self):
|
||||
self._pwm0.deinit()
|
||||
self._pwm1.deinit()
|
||||
self._pwm2.deinit()
|
||||
self._pwm3.deinit()
|
||||
self._pwm4.deinit()
|
||||
self._pwm5.deinit()
|
||||
self._pwm6.deinit()
|
||||
self._pwm7.deinit()
|
||||
self._pwm8.deinit()
|
||||
self._pwm9.deinit()
|
||||
|
||||
|
||||
@ -1,3 +0,0 @@
|
||||
from random import randint as rand
|
||||
num = rand(1, 100)
|
||||
print(num)
|
||||
@ -1,33 +0,0 @@
|
||||
from machine import Pin, PWM
|
||||
from random import randint
|
||||
import time
|
||||
|
||||
pins = [15, 14, 13]
|
||||
freq_num = 10000
|
||||
|
||||
pwm0 = PWM(Pin(pins[0])) #set PWM
|
||||
pwm1 = PWM(Pin(pins[1]))
|
||||
pwm2 = PWM(Pin(pins[2]))
|
||||
pwm0.freq(freq_num)
|
||||
pwm1.freq(freq_num)
|
||||
pwm2.freq(freq_num)
|
||||
|
||||
def setColor(r, g, b):
|
||||
pwm0.duty_u16(65535 - r)
|
||||
pwm1.duty_u16(65535 - g)
|
||||
pwm2.duty_u16(65535 - b)
|
||||
|
||||
try:
|
||||
while True:
|
||||
red = randint(0, 65535)
|
||||
green = randint(0, 65535)
|
||||
blue = randint(0, 65535)
|
||||
setColor(red, green, blue)
|
||||
print(red,"Red")
|
||||
print(green,"Green")
|
||||
print(blue,"Blue")
|
||||
time.sleep_ms(200)
|
||||
except:
|
||||
pwm0.deinit()
|
||||
pwm1.deinit()
|
||||
pwm2.deinit()
|
||||
@ -1,19 +0,0 @@
|
||||
import machine, onewire, ds18x20, time
|
||||
|
||||
ds_pin = machine.Pin(10)
|
||||
ds_sensor = ds18x20.DS18X20(onewire.OneWire(ds_pin))
|
||||
|
||||
roms = ds_sensor.scan()
|
||||
print('Found DS devices: ', roms)
|
||||
|
||||
while True:
|
||||
ds_sensor.convert_temp()
|
||||
time.sleep_ms(750)
|
||||
for rom in roms: # in a loop to get each sensor on the same pin since you can have multi sensors
|
||||
print(rom)
|
||||
tempC = ds_sensor.read_temp(rom)
|
||||
tempF = tempC * (9/5) +32 # convert to farenheit
|
||||
print('temperature (ºC):', "{:.2f}".format(tempC)) # The result will have two decimal places {:.2f}
|
||||
print('temperature (ºF):', "{:.2f}".format(tempF))
|
||||
print()
|
||||
time.sleep(5) # the loop will repeat every 5 seconds
|
||||
88
Scripts/temperature_sensor.py
Normal file
88
Scripts/temperature_sensor.py
Normal file
@ -0,0 +1,88 @@
|
||||
import machine
|
||||
import onewire
|
||||
import ds18x20
|
||||
import time
|
||||
|
||||
class TemperatureSensor:
|
||||
def __init__(self, pin=10, label=None):
|
||||
"""Initialize DS18X20 temperature sensor on the specified pin."""
|
||||
self.ds_pin = machine.Pin(pin)
|
||||
self.ds_sensor = ds18x20.DS18X20(onewire.OneWire(self.ds_pin))
|
||||
self.roms = []
|
||||
self.label = label # e.g., "Inside" or "Outside"
|
||||
self.scan_sensors()
|
||||
|
||||
def scan_sensors(self):
|
||||
"""Scan for connected DS18X20 sensors."""
|
||||
try:
|
||||
# Convert bytearray to bytes so they can be used as dict keys
|
||||
self.roms = [bytes(rom) for rom in self.ds_sensor.scan()]
|
||||
print(f'Found {len(self.roms)} DS18X20 sensor(s) on {self.label or "pin"}')
|
||||
return self.roms
|
||||
except Exception as e:
|
||||
print(f'Error scanning sensors: {e}')
|
||||
return []
|
||||
|
||||
def read_temp_c(self, rom=None):
|
||||
"""Read temperature in Celsius. If rom=None, reads first sensor."""
|
||||
try:
|
||||
self.ds_sensor.convert_temp()
|
||||
time.sleep_ms(750)
|
||||
|
||||
if rom is None and self.roms:
|
||||
rom = self.roms[0]
|
||||
|
||||
if rom:
|
||||
return self.ds_sensor.read_temp(rom)
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f'Error reading temperature: {e}')
|
||||
return None
|
||||
|
||||
def read_temp_f(self, rom=None):
|
||||
"""Read temperature in Fahrenheit."""
|
||||
temp_c = self.read_temp_c(rom)
|
||||
if temp_c is not None:
|
||||
return temp_c * (9/5) + 32
|
||||
return None
|
||||
|
||||
def read_all_temps(self, unit='F'):
|
||||
"""Read all connected sensors. Returns dict of {rom: temp}."""
|
||||
results = {}
|
||||
try:
|
||||
self.ds_sensor.convert_temp()
|
||||
time.sleep_ms(750)
|
||||
|
||||
for rom in self.roms:
|
||||
temp_c = self.ds_sensor.read_temp(rom)
|
||||
if unit.upper() == 'F':
|
||||
results[rom] = temp_c * (9/5) + 32
|
||||
else:
|
||||
results[rom] = temp_c
|
||||
except Exception as e:
|
||||
print(f'Error reading temperatures: {e}')
|
||||
|
||||
return results
|
||||
|
||||
# Sensor configuration registry
|
||||
SENSOR_CONFIG = {
|
||||
'inside': {
|
||||
'pin': 10,
|
||||
'label': 'Inside',
|
||||
'alert_high': 80.0,
|
||||
'alert_low': 70.0
|
||||
},
|
||||
'outside': {
|
||||
'pin': 11,
|
||||
'label': 'Outside',
|
||||
'alert_high': 85.0,
|
||||
'alert_low': 68.0
|
||||
}
|
||||
}
|
||||
|
||||
def get_configured_sensors():
|
||||
"""Return dictionary of configured sensor instances."""
|
||||
sensors = {}
|
||||
for key, config in SENSOR_CONFIG.items():
|
||||
sensors[key] = TemperatureSensor(pin=config['pin'], label=config['label'])
|
||||
return sensors
|
||||
50
main.py
Normal file
50
main.py
Normal file
@ -0,0 +1,50 @@
|
||||
from machine import Pin
|
||||
import time
|
||||
from scripts.networking import connect_wifi
|
||||
from scripts.discord_webhook import send_discord_message
|
||||
from scripts.monitors import TemperatureMonitor, WiFiMonitor, run_monitors
|
||||
from scripts.temperature_sensor import get_configured_sensors, SENSOR_CONFIG
|
||||
|
||||
# Initialize pins (LED light onboard)
|
||||
led = Pin("LED", Pin.OUT)
|
||||
led.low()
|
||||
|
||||
# Connect to WiFi
|
||||
wifi = connect_wifi(led)
|
||||
|
||||
# Send startup message if connected
|
||||
if wifi and wifi.isconnected():
|
||||
send_discord_message("Pico W online and connected ✅")
|
||||
|
||||
# Get configured sensors
|
||||
sensors = get_configured_sensors()
|
||||
|
||||
# Set up monitors
|
||||
monitors = [
|
||||
WiFiMonitor(wifi, led, interval=5, reconnect_cooldown=60),
|
||||
]
|
||||
|
||||
# Add temperature monitors from config
|
||||
for key, sensor in sensors.items():
|
||||
config = SENSOR_CONFIG[key]
|
||||
|
||||
# Inside temp alerts go to separate channel
|
||||
send_to_alert_channel = (key == 'inside')
|
||||
|
||||
monitors.append(
|
||||
TemperatureMonitor(
|
||||
sensor=sensor,
|
||||
label=config['label'],
|
||||
check_interval=10, # Check temp every 10 seconds
|
||||
report_interval=30, # Report/log every 30 seconds
|
||||
alert_high=config['alert_high'],
|
||||
alert_low=config['alert_low'],
|
||||
log_file="/temp_logs.csv",
|
||||
send_alerts_to_separate_channel=send_to_alert_channel
|
||||
)
|
||||
)
|
||||
|
||||
# Main monitoring loop
|
||||
while True:
|
||||
run_monitors(monitors)
|
||||
time.sleep(0.1)
|
||||
6
secrets.example.py
Normal file
6
secrets.example.py
Normal file
@ -0,0 +1,6 @@
|
||||
secrets = {
|
||||
'ssid': ' Change_to_wifi_SSID',
|
||||
'password': 'Change_to_wifi_Pasword',
|
||||
'discord_webhook_url': 'https://discord.com/api/webhooks/key/long-Combination_1234-ChangeMe', # normal updates
|
||||
'discord_alert_webhook_url': 'https://discord.com/api/webhooks/key/long-Combination_1234-ChangeMe', # alerts only
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user