import asyncio import datetime import os import aiofiles import discord import json import aiohttp from typing import Optional from discord.ext import commands, tasks from dotenv import dotenv_values from loguru import logger as logging from aiohttp_requests import requests from nut_interface import NUT def display_time(seconds, granularity=2): _intervals = ( ('w', 604800), ('d', 86400), ('h', 3600), ('m', 60), ('s', 1), ) result = [] for name, count in _intervals: value = seconds // count if value: seconds -= value * count if value == 1: name = name.rstrip('s') result.append("{}{}".format(value, name)) return ' '.join(result[:granularity]) def sgol_spu(ctx): return ctx.channel.id == 1107507678034526249 class NutCog(commands.Cog): def __init__(self, bot): self.bot = bot self.nut = None # type: NUT self.alternate_nut: Optional[NUT] = None self.self_testing = False self.status_channel = None self.status_message = None self.log_channel = None self.selftest_sch = None self.gotify = None self.timers = {} @commands.Cog.listener() async def on_ready(self): self.nut = NUT(login="admin", password="stereo") self.alternate_nut = NUT(login="admin", password="stereo", ups="cyberpower") await self.load_secrets() if self.status_channel and self.status_message: logging.info("Found status channel and message") self.update_status.start() if self.log_channel: logging.info("Found log channel") self.check_for_events.start() if self.selftest_sch and self.log_channel: logging.info("Found selftest schedule") self.scheduled_self_test.start() if dotenv_values('.env')['GOTIFY_URL'] and dotenv_values('.env')['GOTIFY_TOKEN']: self.gotify = dotenv_values('.env')['GOTIFY_URL'] logging.info("Gotify ready") logging.info("Nut cog ready") async def load_secrets(self): if os.path.exists("secrets.json"): async with aiofiles.open("secrets.json", "r") as f: secrets = json.loads(await f.read()) for key, value in secrets.items(): if "channel" in key: self.__dict__[key] = await self.bot.fetch_channel(value) if value else None elif "message" in key: self.__dict__[key] = await self.status_channel.fetch_message(value) if value else None elif "sch" in key: self.__dict__[key] = value async def save_secrets(self): secrets = { "status_channel": self.status_channel.id if self.status_channel else None, "status_message": self.status_message.id if self.status_message else None, "log_channel": self.log_channel.id if self.log_channel else None, "selftest_sch": self.selftest_sch if self.selftest_sch else None } async with aiofiles.open("secrets.json", "w") as f: await f.write(json.dumps(secrets)) @tasks.loop(seconds=60) async def update_status(self): if self.status_channel and self.status_message: embed = self.create_status_embed("UPS Status") await self.status_message.edit(embed=embed) def create_status_embed(self, title, description=None, color=None): status = self.nut.get_status() info = self.nut.get_info() alt_status = self.alternate_nut.get_status() alt_info = self.alternate_nut.get_info() if not color: color = discord.Color.green() if "DISCHRG" not in alt_status else discord.Color.orange() if not description: description = f"Main UPS: {info['ups.mfr']} {info['ups.model']}\n" \ f"Alternate UPS: CyberPower {alt_info['ups.mfr']}" embed = discord.Embed(title=title, description=description, color=color, timestamp=discord.utils.utcnow()) embed.add_field(name="Status", value=status) embed.add_field(name="Output Watts", value=f"{self.alternate_nut.get_output_watts():.1f}W") embed.add_field(name="Battery Charge", value=f"{info['battery.charge']}%") embed.add_field(name="Input Voltage", value=f"{float(alt_info['input.voltage']):.1f}V") embed.add_field(name="Battery Voltage", value=f"{float(info['battery.voltage']):.1f}V") embed.add_field(name="Battery Runtime", value=display_time(int(info['battery.runtime']))) return embed def create_gotify_message(self, message): status = self.nut.get_status() info = self.nut.get_info() alt_status = self.alternate_nut.get_status() alt_info = self.alternate_nut.get_info() msg = f"{message}\n" \ f"Status: {status}\n" \ f"Output Watts: {self.alternate_nut.get_output_watts():.1f}W\n" \ f"Battery Charge: {info['battery.charge']}%\n" \ f"Input Voltage: {float(alt_info['input.voltage']):.1f}V\n" \ f"Battery Voltage: {float(info['battery.voltage']):.1f}V\n" \ f"Battery Runtime: {display_time(int(info['battery.runtime']))}" return msg async def send_gotify_message(self, title: str, message: str, priority: int = 5): data = {'title': title, 'message': message, 'priority': priority} await requests.post(f"{self.gotify}/message", params={'channel': 'UPSAlerts'}, json=data) @tasks.loop(seconds=1) async def check_for_events(self): events = self.alternate_nut.check_for_events() for desc, event in events.items(): if desc not in self.timers.keys(): self.timers[desc] = 0 if event and not self.self_testing: self.timers[desc] += 1 # Initial event if self.timers[desc] == 1: logging.info(f"Event {desc} triggered") info = self.alternate_nut.get_info() await self.send_event_message(NUT.events[desc][0], NUT.events[desc][1], color=discord.Color.red() if not desc == "minor" else discord.Color.orange()) # Minor event still going if self.timers[desc] == 15 and desc == "minor": logging.info(f"Event {desc} still going") await self.send_event_message("What", "Minor event is still occuring... " "you should probably check that out.", color=discord.Color.orange()) else: # Event cleared if self.timers[desc] > 0 and desc != "minor" and desc != "low_batt": logging.info(f"Event {desc} cleared after {self.timers[desc]} seconds") t = display_time(self.timers[desc]) await self.send_event_message("Utility Power Restored", f"Utility power has been restored after {t}.", color=discord.Color.green()) # Low battery cleared if self.timers[desc] > 0 and desc == "low_batt": logging.info(f"Event {desc} cleared") info = self.alternate_nut.get_battery_charge() await self.send_event_message("Battery Charging", f"UPS Battery has charged past 20%. " f"(Battery charge: {info}%)", color=discord.Color.orange()) # Minor event cleared if self.timers[desc] > 15 and desc == "minor": logging.info(f"Event {desc} cleared after {self.timers[desc]} seconds") t = display_time(self.timers[desc]) await self.send_event_message("Minor Event Cleared", f"Minor event has been cleared after {t}." f" Whatever you did, it worked.", color=discord.Color.green()) self.timers[desc] = 0 @tasks.loop(seconds=58) async def scheduled_self_test(self): if self.selftest_sch: now = datetime.datetime.now() if now.weekday() == self.selftest_sch[0]\ and now.hour == self.selftest_sch[1]\ and now.minute == self.selftest_sch[2]: logging.info("Scheduled self test starting") self.self_testing = True await self.send_event_message("Scheduled Self Test", "Scheduled self test started.", priority=1) self.alternate_nut.run_self_test() results = await self.alternate_nut.get_self_test_results() await self.send_event_message("Scheduled Self Test", f"Scheduled self test completed. Results: {results}", color=discord.Color.green(), priority=1) self.self_testing = False logging.info("Scheduled self test completed") @commands.command(name="trigger") @commands.check(sgol_spu) async def trigger_event(self, ctx, event: str, duration: int = 10): if event in NUT.events.keys(): events = {} for e in NUT.events.keys(): events[e] = False events[event] = True logging.info(f"Triggering event {event}") self.alternate_nut.override_events(events) await ctx.send(f"Triggered event {event} for {duration} seconds.") await asyncio.sleep(int(duration)) logging.info(f"Clearing event {event}") self.alternate_nut.override_events(None) else: await ctx.send(f"Event {event} not found") async def send_event_message(self, event, desc, color=discord.Color.red(), priority=10): if "Minor" in event: color = discord.Color.orange() embed = self.create_status_embed(event, desc, color) gotify = self.create_gotify_message(desc) await self.send_gotify_message(title=event, message=gotify, priority=priority) await self.log_channel.send(embed=embed) @commands.command(name="setstatuschannel", aliases=['ssc']) @commands.check(sgol_spu) async def set_status_channel(self, ctx, channel: discord.TextChannel): self.status_channel = channel embed = discord.Embed(title="Status", description="Status will be posted here") self.status_message = await self.status_channel.send(embed=embed) await self.save_secrets() await ctx.send(f"Set status channel to {self.status_channel.mention}") @commands.command(name="setlogchannel", aliases=['slc']) @commands.check(sgol_spu) async def set_log_channel(self, ctx, channel: discord.TextChannel): self.log_channel = channel await self.save_secrets() await ctx.send(f"Set log channel to {self.log_channel.mention}") @commands.command(name="ping") async def ping(self, ctx): await ctx.send("pang") @commands.command( name="info", help="Get current UPS information. Specify 'ups' to get info for a specific UPS, 'main' or 'alt'" ) @commands.check(sgol_spu) async def current_info(self, ctx, ups: str = "main"): if ups.lower() == "main": info = self.nut.get_info() elif ups.lower() == "alt": info = self.alternate_nut.get_info() else: await ctx.send("Invalid UPS specified. Use 'main' or 'alt'.") return message = "```" max_length = max([len(key) for key in info.keys()]) for key, value in info.items(): next_add = f"{key.ljust(max_length)}: {value}\n" if len(message) + len(next_add) > 2000: await ctx.send(message + "```") message = "```" message += next_add message += "```" await ctx.send(message) @commands.command(name="self_test", aliases=['selftest', 'st']) @commands.check(sgol_spu) async def self_test(self, ctx): self.self_testing = True self.alternate_nut.run_self_test() embed = self.create_status_embed("Self Test", "Self test started.") await ctx.send(embed=embed) results = await self.alternate_nut.get_self_test_results() embed = self.create_status_embed("Self Test", f"Self test completed. Results: {results}", color=discord.Color.green()) await ctx.send(embed=embed) self.self_testing = False @commands.command(name="schedule_self_test", aliases=['schedule_selftest', 'schedule_st', 'sst']) @commands.check(sgol_spu) async def schedule_self_test(self, ctx, weekday: str, time: str): weekdays = ["mon", "tue", "wed", "thu", "fri", "sat", "sun"] if weekday.lower() not in weekdays: await ctx.send("Invalid weekday. Valid weekdays are: mon, tue, wed, thu, fri, sat, sun") return try: hour, minute = time.split(":") hour = int(hour) minute = int(minute) except ValueError: await ctx.send("Invalid time. Time should be in the format HH:MM") return if hour < 0 or hour > 23: await ctx.send("Invalid time. Hour should be between 0 and 23") return if minute < 0 or minute > 59: await ctx.send("Invalid time. Minute should be between 0 and 59") return self.selftest_sch = (weekdays.index(weekday.lower()), hour, minute) if not self.scheduled_self_test.is_running(): self.scheduled_self_test.start() await self.save_secrets() await ctx.send(f"Self test scheduled for every {weekday} at {hour}:{minute}") @commands.command(name="status") async def status(self, ctx): status = self.nut.get_status() await ctx.send(status) @commands.command(name="fuck_xfinity") @commands.check(sgol_spu) async def get_ip(self, ctx): # Get the current external IP address async with aiohttp.ClientSession() as session: async with session.get('https://api.ipify.org') as resp: embed = discord.Embed( title='Current IP Address', description=await resp.text(encoding='utf-8'), color=discord.Color.blue() ) await ctx.send(embed=embed) async def setup(bot): await bot.add_cog(NutCog(bot))