From 2d3684b410ef5a5dab7958e2aa73848027c177c5 Mon Sep 17 00:00:00 2001 From: Rayun Date: Sun, 10 Aug 2025 11:47:03 -0400 Subject: [PATCH] initial commit --- NutCog.py | 343 ++++++++++++++++++++++++++++++++++++++++++++ dsNut.py | 24 ++++ gotify_interface.py | 0 main.py | 74 ++++++++++ nut_interface.py | 159 ++++++++++++++++++++ requirements.txt | 6 + 6 files changed, 606 insertions(+) create mode 100644 NutCog.py create mode 100644 dsNut.py create mode 100644 gotify_interface.py create mode 100644 main.py create mode 100644 nut_interface.py create mode 100644 requirements.txt diff --git a/NutCog.py b/NutCog.py new file mode 100644 index 0000000..9aac406 --- /dev/null +++ b/NutCog.py @@ -0,0 +1,343 @@ +import asyncio +import datetime +import os +import aiofiles +import discord +import json +import aiohttp + +from typing import Optional + +from discord.ext import commands, tasks +from dotenv import dotenv_values +from loguru import logger as logging +from aiohttp_requests import requests + +from nut_interface import NUT + +def display_time(seconds, granularity=2): + _intervals = ( + ('w', 604800), + ('d', 86400), + ('h', 3600), + ('m', 60), + ('s', 1), + ) + + result = [] + + for name, count in _intervals: + value = seconds // count + if value: + seconds -= value * count + if value == 1: + name = name.rstrip('s') + result.append("{}{}".format(value, name)) + return ' '.join(result[:granularity]) + + +def sgol_spu(ctx): + return ctx.channel.id == 1107507678034526249 + + +class NutCog(commands.Cog): + def __init__(self, bot): + self.bot = bot + self.nut = None # type: NUT + self.alternate_nut: Optional[NUT] = None + self.self_testing = False + self.status_channel = None + self.status_message = None + self.log_channel = None + self.selftest_sch = None + self.gotify = None + self.timers = {} + + @commands.Cog.listener() + async def on_ready(self): + self.nut = NUT(login="admin", password="stereo") + self.alternate_nut = NUT(login="admin", password="stereo", ups="cyberpower") + await self.load_secrets() + if self.status_channel and self.status_message: + logging.info("Found status channel and message") + self.update_status.start() + if self.log_channel: + logging.info("Found log channel") + self.check_for_events.start() + if self.selftest_sch and self.log_channel: + logging.info("Found selftest schedule") + self.scheduled_self_test.start() + if dotenv_values('.env')['GOTIFY_URL'] and dotenv_values('.env')['GOTIFY_TOKEN']: + self.gotify = dotenv_values('.env')['GOTIFY_URL'] + logging.info("Gotify ready") + + logging.info("Nut cog ready") + + async def load_secrets(self): + if os.path.exists("secrets.json"): + async with aiofiles.open("secrets.json", "r") as f: + secrets = json.loads(await f.read()) + for key, value in secrets.items(): + if "channel" in key: + self.__dict__[key] = await self.bot.fetch_channel(value) if value else None + elif "message" in key: + self.__dict__[key] = await self.status_channel.fetch_message(value) if value else None + elif "sch" in key: + self.__dict__[key] = value + + async def save_secrets(self): + secrets = { + "status_channel": self.status_channel.id if self.status_channel else None, + "status_message": self.status_message.id if self.status_message else None, + "log_channel": self.log_channel.id if self.log_channel else None, + "selftest_sch": self.selftest_sch if self.selftest_sch else None + } + async with aiofiles.open("secrets.json", "w") as f: + await f.write(json.dumps(secrets)) + + @tasks.loop(seconds=60) + async def update_status(self): + if self.status_channel and self.status_message: + embed = self.create_status_embed("UPS Status") + await self.status_message.edit(embed=embed) + + def create_status_embed(self, title, description=None, color=None): + status = self.nut.get_status() + info = self.nut.get_info() + + alt_status = self.alternate_nut.get_status() + alt_info = self.alternate_nut.get_info() + + if not color: + color = discord.Color.green() if "DISCHRG" not in alt_status else discord.Color.orange() + + if not description: + description = f"Main UPS: {info['ups.mfr']} {info['ups.model']}\n" \ + f"Alternate UPS: CyberPower {alt_info['ups.mfr']}" + embed = discord.Embed(title=title, description=description, color=color, timestamp=discord.utils.utcnow()) + embed.add_field(name="Status", value=status) + embed.add_field(name="Output Watts", value=f"{self.alternate_nut.get_output_watts():.1f}W") + embed.add_field(name="Battery Charge", value=f"{info['battery.charge']}%") + embed.add_field(name="Input Voltage", value=f"{float(alt_info['input.voltage']):.1f}V") + embed.add_field(name="Battery Voltage", value=f"{float(info['battery.voltage']):.1f}V") + embed.add_field(name="Battery Runtime", value=display_time(int(info['battery.runtime']))) + return embed + + def create_gotify_message(self, message): + status = self.nut.get_status() + info = self.nut.get_info() + + alt_status = self.alternate_nut.get_status() + alt_info = self.alternate_nut.get_info() + + msg = f"{message}\n" \ + f"Status: {status}\n" \ + f"Output Watts: {self.alternate_nut.get_output_watts():.1f}W\n" \ + f"Battery Charge: {info['battery.charge']}%\n" \ + f"Input Voltage: {float(alt_info['input.voltage']):.1f}V\n" \ + f"Battery Voltage: {float(info['battery.voltage']):.1f}V\n" \ + f"Battery Runtime: {display_time(int(info['battery.runtime']))}" + return msg + + async def send_gotify_message(self, title: str, message: str, priority: int = 5): + data = {'title': title, 'message': message, 'priority': priority} + await requests.post(f"{self.gotify}/message", params={'channel': 'UPSAlerts'}, json=data) + + @tasks.loop(seconds=1) + async def check_for_events(self): + events = self.alternate_nut.check_for_events() + for desc, event in events.items(): + if desc not in self.timers.keys(): + self.timers[desc] = 0 + if event and not self.self_testing: + self.timers[desc] += 1 + + # Initial event + if self.timers[desc] == 1: + logging.info(f"Event {desc} triggered") + info = self.alternate_nut.get_info() + await self.send_event_message(NUT.events[desc][0], NUT.events[desc][1], color=discord.Color.red() + if not desc == "minor" else discord.Color.orange()) + + # Minor event still going + if self.timers[desc] == 15 and desc == "minor": + logging.info(f"Event {desc} still going") + await self.send_event_message("What", "Minor event is still occuring... " + "you should probably check that out.", + color=discord.Color.orange()) + else: + # Event cleared + if self.timers[desc] > 0 and desc != "minor" and desc != "low_batt": + logging.info(f"Event {desc} cleared after {self.timers[desc]} seconds") + t = display_time(self.timers[desc]) + await self.send_event_message("Utility Power Restored", + f"Utility power has been restored after {t}.", + color=discord.Color.green()) + + # Low battery cleared + if self.timers[desc] > 0 and desc == "low_batt": + logging.info(f"Event {desc} cleared") + info = self.alternate_nut.get_battery_charge() + await self.send_event_message("Battery Charging", f"UPS Battery has charged past 20%. " + f"(Battery charge: {info}%)", + color=discord.Color.orange()) + + # Minor event cleared + if self.timers[desc] > 15 and desc == "minor": + logging.info(f"Event {desc} cleared after {self.timers[desc]} seconds") + t = display_time(self.timers[desc]) + await self.send_event_message("Minor Event Cleared", + f"Minor event has been cleared after {t}." + f" Whatever you did, it worked.", + color=discord.Color.green()) + self.timers[desc] = 0 + + @tasks.loop(seconds=58) + async def scheduled_self_test(self): + if self.selftest_sch: + now = datetime.datetime.now() + if now.weekday() == self.selftest_sch[0]\ + and now.hour == self.selftest_sch[1]\ + and now.minute == self.selftest_sch[2]: + logging.info("Scheduled self test starting") + self.self_testing = True + await self.send_event_message("Scheduled Self Test", "Scheduled self test started.", priority=1) + self.alternate_nut.run_self_test() + results = await self.alternate_nut.get_self_test_results() + await self.send_event_message("Scheduled Self Test", + f"Scheduled self test completed. Results: {results}", + color=discord.Color.green(), + priority=1) + self.self_testing = False + logging.info("Scheduled self test completed") + + @commands.command(name="trigger") + @commands.check(sgol_spu) + async def trigger_event(self, ctx, event: str, duration: int = 10): + if event in NUT.events.keys(): + events = {} + for e in NUT.events.keys(): + events[e] = False + events[event] = True + logging.info(f"Triggering event {event}") + self.alternate_nut.override_events(events) + await ctx.send(f"Triggered event {event} for {duration} seconds.") + await asyncio.sleep(int(duration)) + logging.info(f"Clearing event {event}") + self.alternate_nut.override_events(None) + else: + await ctx.send(f"Event {event} not found") + + async def send_event_message(self, event, desc, color=discord.Color.red(), priority=10): + if "Minor" in event: + color = discord.Color.orange() + embed = self.create_status_embed(event, desc, color) + gotify = self.create_gotify_message(desc) + await self.send_gotify_message(title=event, message=gotify, priority=priority) + await self.log_channel.send(embed=embed) + + @commands.command(name="setstatuschannel", aliases=['ssc']) + @commands.check(sgol_spu) + async def set_status_channel(self, ctx, channel: discord.TextChannel): + self.status_channel = channel + embed = discord.Embed(title="Status", description="Status will be posted here") + self.status_message = await self.status_channel.send(embed=embed) + await self.save_secrets() + await ctx.send(f"Set status channel to {self.status_channel.mention}") + + @commands.command(name="setlogchannel", aliases=['slc']) + @commands.check(sgol_spu) + async def set_log_channel(self, ctx, channel: discord.TextChannel): + self.log_channel = channel + await self.save_secrets() + await ctx.send(f"Set log channel to {self.log_channel.mention}") + + @commands.command(name="ping") + async def ping(self, ctx): + await ctx.send("pang") + + @commands.command( + name="info", + help="Get current UPS information. Specify 'ups' to get info for a specific UPS, 'main' or 'alt'" + ) + @commands.check(sgol_spu) + async def current_info(self, ctx, ups: str = "main"): + if ups.lower() == "main": + info = self.nut.get_info() + elif ups.lower() == "alt": + info = self.alternate_nut.get_info() + else: + await ctx.send("Invalid UPS specified. Use 'main' or 'alt'.") + return + message = "```" + max_length = max([len(key) for key in info.keys()]) + for key, value in info.items(): + next_add = f"{key.ljust(max_length)}: {value}\n" + if len(message) + len(next_add) > 2000: + await ctx.send(message + "```") + message = "```" + message += next_add + + message += "```" + await ctx.send(message) + + @commands.command(name="self_test", aliases=['selftest', 'st']) + @commands.check(sgol_spu) + async def self_test(self, ctx): + self.self_testing = True + self.alternate_nut.run_self_test() + embed = self.create_status_embed("Self Test", "Self test started.") + await ctx.send(embed=embed) + results = await self.alternate_nut.get_self_test_results() + embed = self.create_status_embed("Self Test", f"Self test completed. Results: {results}", + color=discord.Color.green()) + await ctx.send(embed=embed) + self.self_testing = False + + @commands.command(name="schedule_self_test", aliases=['schedule_selftest', 'schedule_st', 'sst']) + @commands.check(sgol_spu) + async def schedule_self_test(self, ctx, weekday: str, time: str): + weekdays = ["mon", "tue", "wed", "thu", "fri", "sat", "sun"] + if weekday.lower() not in weekdays: + await ctx.send("Invalid weekday. Valid weekdays are: mon, tue, wed, thu, fri, sat, sun") + return + try: + hour, minute = time.split(":") + hour = int(hour) + minute = int(minute) + except ValueError: + await ctx.send("Invalid time. Time should be in the format HH:MM") + return + if hour < 0 or hour > 23: + await ctx.send("Invalid time. Hour should be between 0 and 23") + return + if minute < 0 or minute > 59: + await ctx.send("Invalid time. Minute should be between 0 and 59") + return + self.selftest_sch = (weekdays.index(weekday.lower()), hour, minute) + if not self.scheduled_self_test.is_running(): + self.scheduled_self_test.start() + await self.save_secrets() + await ctx.send(f"Self test scheduled for every {weekday} at {hour}:{minute}") + + @commands.command(name="status") + async def status(self, ctx): + status = self.nut.get_status() + await ctx.send(status) + + @commands.command(name="fuck_xfinity") + @commands.check(sgol_spu) + async def get_ip(self, ctx): + # Get the current external IP address + async with aiohttp.ClientSession() as session: + async with session.get('https://api.ipify.org') as resp: + embed = discord.Embed( + title='Current IP Address', + description=await resp.text(encoding='utf-8'), + color=discord.Color.blue() + ) + await ctx.send(embed=embed) + + +async def setup(bot): + await bot.add_cog(NutCog(bot)) diff --git a/dsNut.py b/dsNut.py new file mode 100644 index 0000000..1a53876 --- /dev/null +++ b/dsNut.py @@ -0,0 +1,24 @@ +from discord.ext import commands + +from nut_interface import NUT + +class Nut(commands.Cog): + def __init__(self, bot): + self.bot = bot + self.nut = None + + @commands.Cog.listener() + async def on_ready(self): + logging.info("Nut cog ready") + self.nut = NUT(login="admin", password="stereo") + + @commands.command(name="ping") + async def ping(self, ctx): + await ctx.send("pang") + + @commands.command(name="info") + async def current_info(self, ctx): + await ctx.send(self.nut.get_info()) + +async def setup(bot): + await bot.add_cog(Nut(bot)) diff --git a/gotify_interface.py b/gotify_interface.py new file mode 100644 index 0000000..e69de29 diff --git a/main.py b/main.py new file mode 100644 index 0000000..e564376 --- /dev/null +++ b/main.py @@ -0,0 +1,74 @@ +import asyncio +import discord +import traceback + +from discord.ext import commands +from discord.utils import oauth_url + +from loguru import logger as logging +from dotenv import dotenv_values + +# TODO: Log issues that happen +# TODO: Statistics of uptime + + +class NutBot(commands.Bot): + def __init__(self): + super().__init__(command_prefix=".", intents=discord.Intents.all()) + self.bot = super() + + async def setup_hook(self) -> None: + await self.load_extension('NutCog') + + async def on_ready(self): + perms = 469830672 + invite = oauth_url(super().user.id, permissions=discord.Permissions(perms)) + logging.info(f"Invite link: {invite}") + + async def on_command_error(self, context, exception): + if isinstance(exception, commands.NoPrivateMessage): + await context.send('{}, This command cannot be used in DMs.'.format(context.author.mention)) + elif isinstance(exception, commands.UserInputError): + pass # Silent ignore + await context.send('{}, {}'.format(context.author.mention, self.format_error(context, exception))) + elif isinstance(exception, commands.NotOwner): + await context.send('{}, {}'.format(context.author.mention, exception.args[0])) + elif isinstance(exception, commands.MissingPermissions): + permission_names = [name.replace('guild', 'server').replace('_', ' ').title() for name in + exception.missing_perms] + await context.send('{}, you need {} permissions to run this command!'.format( + context.author.mention, utils.pretty_concat(permission_names))) + elif isinstance(exception, commands.BotMissingPermissions): + permission_names = [name.replace('guild', 'server').replace('_', ' ').title() for name in + exception.missing_perms] + await context.send('{}, I need {} permissions to run this command!'.format( + context.author.mention, utils.pretty_concat(permission_names))) + elif isinstance(exception, commands.CommandOnCooldown): + await context.send( + '{}, That command is on cooldown! Try again in {:.2f}s!'.format(context.author.mention, + exception.retry_after)) + elif isinstance(exception, commands.MaxConcurrencyReached): + types = {discord.ext.commands.BucketType.default: "`Global`", + discord.ext.commands.BucketType.guild: "`Guild`", + discord.ext.commands.BucketType.channel: "`Channel`", + discord.ext.commands.BucketType.category: "`Category`", + discord.ext.commands.BucketType.member: "`Member`", + discord.ext.commands.BucketType.user: "`User`"} + await context.send( + '{}, That command has exceeded the max {} concurrency limit of `{}` instance! Please try again later.'.format( + context.author.mention, types[exception.per], exception.number)) + elif isinstance(exception, commands.CheckFailure): + await context.send('{}, {}'.format(context.author.mention, exception.args[0])) + else: + await context.send( + '```\n%s\n```' % ''.join(traceback.format_exception_only(type(exception), exception)).strip()) + # Print traceback to console + print(''.join(traceback.format_exception(type(exception), exception, exception.__traceback__)).strip()) + if isinstance(context.channel, discord.TextChannel): + pass # Silent ignore + else: + pass + + +bot = NutBot() +bot.run(token=dotenv_values(".env")['BOT_TOKEN']) diff --git a/nut_interface.py b/nut_interface.py new file mode 100644 index 0000000..48dbb16 --- /dev/null +++ b/nut_interface.py @@ -0,0 +1,159 @@ +import asyncio + +from time import sleep as s + +from loguru import logger as logging +from nut2 import PyNUTClient, PyNUTError + + +class NUT: + statuses = { + "ALARM": "ALARM", + "BOOST": "Voltage Boost Active", + "BYPASS": "Bypass Active", + "CAL": "Runtime Calibration", + "CHRG": "Battery Charging", + "COMM": "Communications Active", + "DISCHRG": "Battery Discharging", + "FSD": "Forced Shutdown", + "LB": "Low Battery", + "NOCOMM": "Communications Lost", + "OB": "On Battery", + "OFF": "Offline", + "OL": "Online", + "OVER": "Overloaded", + "RB": "Battery Needs Replaced", + "TEST": "Under Test", + "TRIM": "Voltage Trim Active" + } + + events = { + "blackout": ("Blackout", "Utility power has been lost."), + "brown": ("Brownout", "Utility power is experiencing a brownout."), + "overvolt": ("Overvoltage", "Utility power is experiencing an overvoltage."), + "overload": ("Output Overloaded", "The UPS is overloaded. (Output load: {l}%)"), + "low_batt": ("Low Battery", "The UPS battery is running low."), + "minor": ("Minor Event", "An event has occurred.") + } + + def __init__(self, host=None, port=None, login=None, password=None, **kwargs): + self.host = host + self.port = port + self.login = login + self.password = password + if self.host: + self.client = PyNUTClient( + host=self.host, + port=self.port, + login=self.login, + password=self.password + ) + else: + self.client = PyNUTClient(login=self.login, password=self.password) + self.ups = kwargs.get("ups") + self.ups = "ups1" if self.ups is None else self.ups + self.events_override = None + + def _reconnect(self): + self.client._connect() + + def set_ups(self, ups): + self.ups = ups + + @staticmethod + def nut_func(func): + def wrapper(self, *args, **kwargs): + while True: + try: + return func(self, *args, **kwargs) + except EOFError: + self._reconnect() + continue + return wrapper + + @nut_func + def get_info(self): + return self.client.list_vars(self.ups) + + @nut_func + def get_status(self): + stat = ", ".join([f"{self.statuses.get(i)}" for i in self.client.get(self.ups, "ups.status").split(" ")]) + return stat + + def override_events(self, events): + self.events_override = events + + def check_for_events(self) -> dict: + if self.events_override: + return self.events_override + events = {} + + info = self.get_info() + voltage = float(info['input.voltage']) + charge = float(info['battery.charge']) + + # Check for brownout + events['brown'] = 85.0 < voltage < 110.0 + + # Check for overvoltage + events['overvolt'] = voltage > 130.0 + + # Check for blackout + events['blackout'] = voltage < 85.0 + + # Check for overload + events['overload'] = float(info['ups.load']) > 95.0 + + # Check for low battery + events['low_batt'] = charge < 20.0 + + # Check for minor event + events['minor'] = 'DISCHRG' in info['ups.status'] and not events['brown'] and not events['blackout']\ + and not events['overvolt'] + + return events + + @nut_func + def get_output_watts(self): + max_watts = int(self.client.get(self.ups, "ups.realpower.nominal")) + return max_watts * (int(self.client.get(self.ups, "ups.load")) / 100.0) + + @nut_func + def get_max_watts(self): + return int(self.client.get(self.ups, "ups.realpower.nominal")) + + @nut_func + def get_input_voltage(self): + return float(self.client.get(self.ups, "input.voltage")) + + @nut_func + def get_battery_charge(self): + return float(self.client.get(self.ups, "battery.charge")) + + @nut_func + def set_beeper(self, beeper): + self.client.run_command(self.ups, "beeper.enable" if beeper else "beeper.disable") + + @nut_func + def run_self_test(self): + self.client.run_command(self.ups, "test.battery.start.quick") + + @nut_func + async def get_self_test_results(self): + while True: + if "DISCHRG" not in self.client.get(self.ups, "ups.status")\ + and self.client.get(self.ups, "ups.test.result") != "In progress": + await asyncio.sleep(0.2) + else: + break + while True: + if self.client.get(self.ups, "ups.test.result") == "In progress": + await asyncio.sleep(0.2) + else: + break + return self.client.get(self.ups, "ups.test.result") + + @nut_func + async def shutdown(self, timer=5): + await asyncio.sleep(timer) + self.client.run_command(self.ups, "shutdown.return") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..efed6da --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +discord.py==2.4.0 +aiofiles==23.2.1 +loguru==0.7.2 +python-dotenv==1.0.0 +nut2==2.1.1 +aiohttp-requests==0.2.4