mirror of
https://github.com/oasis6212/Meshbot_weather.git
synced 2025-12-10 00:06:12 -06:00
173 lines
6.6 KiB
Python
173 lines
6.6 KiB
Python
import time
|
|
import threading
|
|
import requests
|
|
import logging
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class WeatherAlerts:
|
|
def __init__(self, lat, lon, interface, user_agent_app, user_agent_email, check_interval=300, message_delay=7, settings=None):
|
|
self.base_url = f"https://api.weather.gov/alerts/active"
|
|
self.params = {"point": f"{lat},{lon}"}
|
|
self.headers = {"User-Agent": f"({user_agent_app}, {user_agent_email})"}
|
|
self.interface = interface
|
|
self.check_interval = check_interval
|
|
self.message_delay = message_delay
|
|
self.settings = settings or {}
|
|
self.channel_index = self.settings.get('ALERT_CHANNEL_INDEX', 0)
|
|
|
|
# Add storage for current alert data
|
|
self.current_alert = None
|
|
self.last_alert_id = None
|
|
|
|
def check_alerts(self):
|
|
"""Check for new weather alerts and send notifications if needed."""
|
|
try:
|
|
logger.info("Updated weather alerts")
|
|
response = requests.get(self.base_url, params=self.params, headers=self.headers)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
if not data.get('features'):
|
|
self.current_alert = None # Clear current alert if no active alerts
|
|
return
|
|
|
|
latest_alert = data['features'][0]
|
|
alert_id = latest_alert['properties']['id']
|
|
|
|
if alert_id == self.last_alert_id:
|
|
return
|
|
|
|
self.last_alert_id = alert_id
|
|
self.current_alert = latest_alert # Store the full alert data
|
|
alert_props = latest_alert['properties']
|
|
|
|
# Check if description should be included
|
|
include_description = self.settings.get('ALERT_INCLUDE_DESCRIPTION', True)
|
|
|
|
# Prepare the alert message based on settings
|
|
if include_description:
|
|
full_message = (
|
|
f"{alert_props['headline']}\n"
|
|
f"Description: {alert_props['description']}"
|
|
)
|
|
else:
|
|
full_message = alert_props['headline']
|
|
|
|
# Split message into chunks and send
|
|
messages = self.split_message(full_message)
|
|
|
|
if not self.interface or not hasattr(self.interface, 'sendText'):
|
|
logger.error("Interface not properly configured for sending messages")
|
|
return
|
|
|
|
for i, msg in enumerate(messages, 1):
|
|
formatted_msg = f"--({i}/{len(messages)}) Alert--\n{msg}"
|
|
self.interface.sendText(
|
|
formatted_msg,
|
|
wantAck=False,
|
|
channelIndex=self.channel_index,
|
|
)
|
|
if i < len(messages): # Don't sleep after last message
|
|
time.sleep(self.message_delay)
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to fetch weather alerts: {str(e)}")
|
|
except Exception as e:
|
|
logger.error(f"Error checking weather alerts: {str(e)}")
|
|
|
|
def broadcast_full_alert(self, destination_id):
|
|
"""Broadcast the full alert information including description."""
|
|
# Check if full-alert command is enabled
|
|
if not self.settings.get('ENABLE_ALERT_COMMAND', True):
|
|
return False # Do nothing if full-alert command is disabled
|
|
|
|
# Check if there's a current alert
|
|
if not self.current_alert:
|
|
return False # Do nothing if no active alerts
|
|
|
|
# Get alert properties
|
|
alert_props = self.current_alert['properties']
|
|
full_message = (
|
|
f"{alert_props['headline']}\n"
|
|
f"Description: {alert_props['description']}"
|
|
)
|
|
|
|
# Split and send messages
|
|
messages = self.split_message(full_message)
|
|
for i, msg in enumerate(messages, 1):
|
|
formatted_msg = f"--({i}/{len(messages)}) Alert--\n{msg}"
|
|
self.interface.sendText(
|
|
formatted_msg,
|
|
wantAck=True,
|
|
destinationId=destination_id,
|
|
channelIndex=self.channel_index
|
|
)
|
|
if i < len(messages):
|
|
time.sleep(self.message_delay)
|
|
|
|
return True
|
|
|
|
def split_message(self, text, max_length=175):
|
|
"""Split a message into chunks of specified maximum length, preserving whole words.
|
|
|
|
Args:
|
|
text (str): Text to split
|
|
max_length (int, optional): Maximum length of each chunk. Defaults to 175.
|
|
|
|
Returns:
|
|
list: List of message chunks
|
|
"""
|
|
messages = []
|
|
lines = text.split('\n')
|
|
current_chunk = []
|
|
current_length = 0
|
|
|
|
for line in lines:
|
|
words = line.split()
|
|
for word in words:
|
|
# Check if adding this word would exceed the limit
|
|
# +1 for space, another +1 for potential newline
|
|
word_length = len(word) + (1 if current_chunk else 0)
|
|
|
|
if current_length + word_length > max_length:
|
|
# Current chunk is full, save it and start a new one
|
|
if current_chunk:
|
|
messages.append(' '.join(current_chunk))
|
|
current_chunk = [word]
|
|
current_length = len(word)
|
|
else:
|
|
current_chunk.append(word)
|
|
current_length += word_length
|
|
|
|
# Add a newline after each original line if we're continuing the same chunk
|
|
if current_chunk:
|
|
current_length += 1 # Account for the newline
|
|
if current_length > max_length:
|
|
# If adding newline would exceed limit, start new chunk
|
|
messages.append(' '.join(current_chunk))
|
|
current_chunk = []
|
|
current_length = 0
|
|
|
|
# Don't forget the last chunk
|
|
if current_chunk:
|
|
messages.append(' '.join(current_chunk))
|
|
|
|
return messages
|
|
|
|
def start_monitoring(self):
|
|
"""Start continuous monitoring of weather alerts in a separate thread."""
|
|
|
|
def monitor():
|
|
while True:
|
|
try:
|
|
self.check_alerts()
|
|
except Exception as e:
|
|
logger.error(f"Error in monitor thread: {str(e)}")
|
|
finally:
|
|
time.sleep(self.check_interval)
|
|
|
|
monitor_thread = threading.Thread(target=monitor, daemon=True)
|
|
monitor_thread.start() |