Files
nutbot/nut_interface.py
2025-08-10 11:47:03 -04:00

160 lines
4.9 KiB
Python

import asyncio
from time import sleep as s
from loguru import logger as logging
from nut2 import PyNUTClient, PyNUTError
class NUT:
statuses = {
"ALARM": "ALARM",
"BOOST": "Voltage Boost Active",
"BYPASS": "Bypass Active",
"CAL": "Runtime Calibration",
"CHRG": "Battery Charging",
"COMM": "Communications Active",
"DISCHRG": "Battery Discharging",
"FSD": "Forced Shutdown",
"LB": "Low Battery",
"NOCOMM": "Communications Lost",
"OB": "On Battery",
"OFF": "Offline",
"OL": "Online",
"OVER": "Overloaded",
"RB": "Battery Needs Replaced",
"TEST": "Under Test",
"TRIM": "Voltage Trim Active"
}
events = {
"blackout": ("Blackout", "Utility power has been lost."),
"brown": ("Brownout", "Utility power is experiencing a brownout."),
"overvolt": ("Overvoltage", "Utility power is experiencing an overvoltage."),
"overload": ("Output Overloaded", "The UPS is overloaded. (Output load: {l}%)"),
"low_batt": ("Low Battery", "The UPS battery is running low."),
"minor": ("Minor Event", "An event has occurred.")
}
def __init__(self, host=None, port=None, login=None, password=None, **kwargs):
self.host = host
self.port = port
self.login = login
self.password = password
if self.host:
self.client = PyNUTClient(
host=self.host,
port=self.port,
login=self.login,
password=self.password
)
else:
self.client = PyNUTClient(login=self.login, password=self.password)
self.ups = kwargs.get("ups")
self.ups = "ups1" if self.ups is None else self.ups
self.events_override = None
def _reconnect(self):
self.client._connect()
def set_ups(self, ups):
self.ups = ups
@staticmethod
def nut_func(func):
def wrapper(self, *args, **kwargs):
while True:
try:
return func(self, *args, **kwargs)
except EOFError:
self._reconnect()
continue
return wrapper
@nut_func
def get_info(self):
return self.client.list_vars(self.ups)
@nut_func
def get_status(self):
stat = ", ".join([f"{self.statuses.get(i)}" for i in self.client.get(self.ups, "ups.status").split(" ")])
return stat
def override_events(self, events):
self.events_override = events
def check_for_events(self) -> dict:
if self.events_override:
return self.events_override
events = {}
info = self.get_info()
voltage = float(info['input.voltage'])
charge = float(info['battery.charge'])
# Check for brownout
events['brown'] = 85.0 < voltage < 110.0
# Check for overvoltage
events['overvolt'] = voltage > 130.0
# Check for blackout
events['blackout'] = voltage < 85.0
# Check for overload
events['overload'] = float(info['ups.load']) > 95.0
# Check for low battery
events['low_batt'] = charge < 20.0
# Check for minor event
events['minor'] = 'DISCHRG' in info['ups.status'] and not events['brown'] and not events['blackout']\
and not events['overvolt']
return events
@nut_func
def get_output_watts(self):
max_watts = int(self.client.get(self.ups, "ups.realpower.nominal"))
return max_watts * (int(self.client.get(self.ups, "ups.load")) / 100.0)
@nut_func
def get_max_watts(self):
return int(self.client.get(self.ups, "ups.realpower.nominal"))
@nut_func
def get_input_voltage(self):
return float(self.client.get(self.ups, "input.voltage"))
@nut_func
def get_battery_charge(self):
return float(self.client.get(self.ups, "battery.charge"))
@nut_func
def set_beeper(self, beeper):
self.client.run_command(self.ups, "beeper.enable" if beeper else "beeper.disable")
@nut_func
def run_self_test(self):
self.client.run_command(self.ups, "test.battery.start.quick")
@nut_func
async def get_self_test_results(self):
while True:
if "DISCHRG" not in self.client.get(self.ups, "ups.status")\
and self.client.get(self.ups, "ups.test.result") != "In progress":
await asyncio.sleep(0.2)
else:
break
while True:
if self.client.get(self.ups, "ups.test.result") == "In progress":
await asyncio.sleep(0.2)
else:
break
return self.client.get(self.ups, "ups.test.result")
@nut_func
async def shutdown(self, timer=5):
await asyncio.sleep(timer)
self.client.run_command(self.ups, "shutdown.return")