You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
181 lines
6.6 KiB
181 lines
6.6 KiB
#!/usr/bin/env python |
|
import glob |
|
import logging |
|
import re |
|
import sys |
|
from time import sleep |
|
|
|
import yaml |
|
from simple_pid import PID |
|
|
|
SYSFS_HWMON_BASE = "/sys/class/hwmon/" |
|
|
|
|
|
class ThermalZone: |
|
def __init__(self, config, pyfan_parent) -> None: |
|
self.fans = config["fan"] |
|
self.temp_source = config["source"] |
|
self.factor = 1 / config["factor"] |
|
self.name = config["name"] |
|
self.target = config["target"] |
|
self.pyfan = pyfan_parent |
|
self.hwmap = self.pyfan.hwmap |
|
self.alias_replace = re.compile("|".join(self.hwmap.keys())) |
|
|
|
if "interval" not in config: |
|
config["interval"] = 3 |
|
logging.getLogger("pyfan").warning( |
|
"[%s] No interval specified, using default. This is deprecated since 1.6 and may be removed in future " |
|
"versions. See example config for reference.", self.name) |
|
|
|
self.pid = PID(config["pid"]["p"], config["pid"]["i"], config["pid"]["d"], setpoint=0, |
|
sample_time=config["interval"]) |
|
self.pid.output_limits = (0, 255) |
|
self.setup_pwm() |
|
|
|
logging.getLogger("pyfan").info("[%s] Source=%s Fans=%s Factor=%f %s", self.name, self.temp_source, |
|
self.fans, self.factor, self.pid) |
|
|
|
def eval(self): |
|
if self.get_temp(): |
|
diff = self.target - self.get_temp() |
|
val = int(self.pid(diff)) |
|
|
|
try: |
|
for target_fan in self.fans: |
|
if isinstance(target_fan, dict): |
|
fan = list(target_fan.keys())[0] |
|
fan_val = list(target_fan.values())[0] |
|
|
|
if isinstance(fan_val, list): |
|
if len(fan_val) < 2: |
|
logging.getLogger("pyfan").warning( |
|
"[%s] max/min for %s was not set correctly (%s)", self.name, fan, fan_val) |
|
|
|
if self.read_sysfs(fan) != min(fan_val[1], max(val, fan_val[0])): |
|
self.write_sysfs(fan, min(fan_val[1], max(val, fan_val[0]))) |
|
elif self.read_sysfs(fan) != min(val, fan_val): |
|
self.write_sysfs(fan, min(val, fan_val)) |
|
|
|
logging.getLogger("pyfan").debug("[%s] %s=%i%%", self.name, fan, |
|
int(int(self.read_sysfs(fan)) / 255 * 100)) |
|
elif self.read_sysfs(target_fan) != val: |
|
self.write_sysfs(target_fan, val) |
|
except OSError as err: |
|
logging.getLogger("pyfan").warning("[%s] Failed to set pwm, trying to reset it. (%s)", self.name, |
|
err.strerror) |
|
self.setup_pwm(1) |
|
|
|
logging.getLogger("pyfan").debug("[%s] %i%% D:%iC T:%iC %s", self.name, int(val / 255 * 100), diff, |
|
self.get_temp(), self.pid) |
|
|
|
def get_temp(self): |
|
if isinstance(self.temp_source, list): |
|
max_temp = -1.0 |
|
for fan in self.temp_source: |
|
if self.read_sysfs(fan): |
|
max_temp = max(float(self.read_sysfs(fan)) * self.factor, max_temp) |
|
|
|
return max_temp |
|
else: |
|
if self.read_sysfs(self.temp_source): |
|
return float(self.read_sysfs(self.temp_source)) * self.factor |
|
else: |
|
return None |
|
|
|
def restore(self): |
|
self.setup_pwm(2) |
|
|
|
def setup_pwm(self, value=1): |
|
for target_fan in self.fans: |
|
try: |
|
if isinstance(target_fan, dict): |
|
self.set_pwm_mode(list(target_fan.keys())[0], value) |
|
else: |
|
self.set_pwm_mode(target_fan, value) |
|
except FileNotFoundError: |
|
logging.getLogger("pyfan").warning("[%s] pwm not found. Try reloading hwmon map...", self.name) |
|
self.hwmap = self.pyfan.hwmap |
|
|
|
def replace_alias(self, path): |
|
replaced = self.alias_replace.sub(lambda x: self.hwmap[x.group()], path) |
|
logging.getLogger("pyfan").debug("[ALIAS] %s -> %s", path, replaced) |
|
return replaced |
|
|
|
def build_pwm_path(self, specific): |
|
return self.replace_alias(SYSFS_HWMON_BASE + specific) |
|
|
|
def write_sysfs(self, path, value): |
|
with open(self.build_pwm_path(path), "w") as sysfs_f: |
|
sysfs_f.write(str(value)) |
|
|
|
def read_sysfs(self, path): |
|
try: |
|
with open(self.build_pwm_path(path)) as sysfs_f: |
|
return sysfs_f.readline() |
|
except FileNotFoundError as err: |
|
logging.getLogger("pyfan").warning("[%s] temp source not found. Not ready yet or wrong path? (%s)", |
|
self.name, err.strerror) |
|
return None |
|
|
|
def set_pwm_mode(self, path, value=1): |
|
self.write_sysfs(path + "_enable", value) |
|
|
|
|
|
class PyFan: |
|
def __init__(self, config="/etc/pyfan") -> None: |
|
self.config = self.__load_config(config) |
|
logging.basicConfig(level=logging.getLevelName(self.config["loglevel"])) |
|
self.zones = [] |
|
if "pid_interval" not in self.config: |
|
self.interval = 0.2 |
|
logging.getLogger("pyfan").warning( |
|
"No pid_interval specified, using default. This is deprecated since 1.6 and may be removed in future " |
|
"versions. See example config for reference.") |
|
else: |
|
self.interval = self.config["pid_interval"] |
|
|
|
for zone in self.config["thermalzones"]: |
|
self.zones.append(ThermalZone(zone, self)) |
|
|
|
logging.getLogger("pyfan").info( |
|
"Created %d thermal zones, pid_interval=%f.", len(self.zones), self.interval) |
|
|
|
def __enter__(self): |
|
return self |
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
for zone in self.zones: |
|
zone.restore() |
|
|
|
def eval(self): |
|
for zone in self.zones: |
|
zone.eval() |
|
|
|
@property |
|
def hwmap(self): |
|
hwmon_map = {} |
|
|
|
names = glob.glob(SYSFS_HWMON_BASE + "hwmon*/name") |
|
|
|
for name in names: |
|
hwmon = name.split("/")[-2] |
|
with open(name) as file: |
|
hw_name = file.read().strip() |
|
hwmon_map[hw_name] = hwmon |
|
return hwmon_map |
|
|
|
@staticmethod |
|
def __load_config(path): |
|
with open(path) as cfg_file: |
|
return yaml.safe_load(cfg_file) |
|
|
|
|
|
if __name__ == "__main__": |
|
with PyFan() as pyfan: |
|
while True: |
|
try: |
|
pyfan.eval() |
|
sleep(pyfan.interval) |
|
except KeyboardInterrupt: |
|
sys.exit(0)
|
|
|