import os import sys import json import time import serial import numpy as np import pandas as pd import libsvm.svmutil as svm class GpioMonitor: class Gpio: OUTPUT, HIGH, LOW = -1, 0, 1 GPIO = Gpio() @staticmethod def wiringPiSetup(): pass @staticmethod def pinMode(port: "int", gpio: "Gpio"): pass @staticmethod def digitalWrite(port: "int", gpio: "Gpio"): pass try: import wiringpi # noqa except ModuleNotFoundError: wiringpi = GpioMonitor class Service: _Mode = {"predict": 1, "train": 2, "have": 3, "none": 4} def __init__(self, conf_path: "str"): self._conf: "dict" = { "data": { "count": 700, "size": 131, "have": "data/have", "none": "data/none" }, "fft": { "win": 10, "hold": 500, "rate": 337 }, "mode": "predict", "model": { "file": "data/svm.tin", "gamma": 2.57e-6, "nu": 7.1e-3 }, "uart": { "file": "/dev/ttyS5", "baud": 115200 }, "gpio": { "wpi": 2, "time": 3 }, "log": True } self._uart: "serial.Serial" self._model: "svm.svm_model" self._read_conf(conf_path) self._log(self._conf) self._init_gpio() self._open_uart() def _read_conf(self, file: "str") -> "None": with open(file, encoding="utf-8") as fp: data = json.load(fp) for key in self._conf: if key in data: self._conf[key] = data[key] if self._conf["mode"] not in self._Mode: self._conf["mode"] = "predict" def _log(self, *args): if self._conf["log"]: print("Log =>", *args) def _init_gpio(self) -> "None": wiringpi.wiringPiSetup() wiringpi.pinMode(self._conf["gpio"]["wpi"], wiringpi.GPIO.OUTPUT) self._log(f"gpio {self._conf['gpio']['wpi']} has been set to `OUTPUT`") def _lon(self, sec: "float" = -1) -> "None": wiringpi.digitalWrite(self._conf["gpio"]["wpi"], wiringpi.GPIO.HIGH) self._log("light on") if sec > 0: time.sleep(sec) self._lof() def _lof(self, sec: "float" = -1) -> "None": wiringpi.digitalWrite(self._conf["gpio"]["wpi"], wiringpi.GPIO.LOW) self._log("light off") if sec > 0: time.sleep(sec) self._lon() def _flash(self, total: "float", count: "int") -> "None": gap: "float" = total / (5 * count - 2) on, off = 2 * gap, 3 * gap for i in range(count): if i != 0: time.sleep(on) self._lof(off) def _open_uart(self) -> "None": self._uart = serial.Serial(self._conf["uart"]["file"], self._conf["uart"]["baud"]) if not self._uart.isOpen(): self._log(f"uart: {self._conf['uart']['file']}@{self._conf['uart']['baud']} open failed.") sys.exit(-1) def _isMax(self, data: "np.ndarray", idx: "int") -> "bool": if idx < self._conf["fft"]["win"] or idx > data.size - self._conf["fft"]["win"]: return False for i in range(1, self._conf["fft"]["win"] + 1): if data[idx] < data[idx - i] or data[idx] < data[idx + i]: return False return True def _fft(self, data: "np.ndarray") -> "np.ndarray": scale = np.linspace(0, data.size / self._conf["fft"]["rate"], data.size) fft = np.fft.fft(data) frq = np.fft.fftfreq(data.size, scale[1] - scale[0]) pFrq = frq[:frq.size // 2] pFft = np.abs(2.0 / data.size * fft[:fft.size // 2]) result, idx = np.zeros((10, 2)), 0 for i in range(self._conf["fft"]["win"], pFft.size - self._conf["fft"]["win"]): if self._isMax(pFft, i) and pFft[i] > self._conf["fft"]["hold"]: result[idx][0], result[idx][1] = pFrq[i], pFft[i] idx += 1 if idx >= 10: break return result.flatten() def _l2x(self, buff: "list[str]") -> "list[dict]": arr = np.array([float(itr) for itr in buff]) data = arr[:3].tolist() + self._fft(arr[3:]).tolist() return [{i + 1: data[i] for i in range(len(data))}] def _run_predict(self): self._model = svm.svm_load_model(self._conf["model"]["file"]) self._log(f"model loaded from: `{self._conf['model']['file']}`") buff = "" try: while True: get = self._uart.read().decode("utf-8") if get in "\r\n": buff = buff.strip(",") if buff == "": continue arr = buff.split(",") if len(arr) != self._conf["data"]["size"]: self._log(f"expect {self._conf['data']['size']} nums, got {len(arr)} instead.") buff = "" continue x = self._l2x(arr) res = svm.svm_predict([], x, self._model, "-q") buff = "" self._log(f"feature size: {len(x[0])}, predict: {res[0][0]}") if res[0][0] == 1: self._flash(self._conf["gpio"]["time"], 4) else: buff += get except Exception as e: self._uart.close() self._log(f"error: {e}") def _load_data(self) -> "tuple[np.ndarray, np.ndarray, np.ndarray]": root = self._conf["data"]["have"] have: "pd.DataFrame" = pd.DataFrame() for name in os.listdir(root): data = pd.read_csv(f"{root}/{name}", header=None) have = pd.concat([have, data]) haveNp = have.to_numpy() np.random.shuffle(haveNp) root = self._conf["data"]["none"] none: "pd.DataFrame" = pd.DataFrame() for name in os.listdir(root): data = pd.read_csv(f"{root}/{name}", header=None) none = pd.concat([none, data]) count = int(haveNp.shape[0] * 0.85) self._log("data loaded from dataset dir.") return haveNp[:count], haveNp[count:], none.to_numpy() def _run_train(self): have, test, none = self._load_data() have = [np.append(line[:3], self._fft(line[3:])) for line in have] test = [np.append(line[:3], self._fft(line[3:])) for line in test] none = [np.append(line[:3], self._fft(line[3:])) for line in none] data = [{i + 1: line[i] for i in range(len(line))} for line in have] tags = np.ones(len(data)).tolist() pro = svm.svm_problem(tags, data) pra = svm.svm_parameter(f"-s 2 -g {self._conf['model']['gamma']} -n {self._conf['model']['nu']}") model = svm.svm_train(pro, pra) # predict test data = [{i + 1: line[i] for i in range(len(line))} for line in test] tags = np.ones(len(data)).tolist() svm.svm_predict(y=tags, x=data, m=model) # predict none data = [{i + 1: line[i] for i in range(len(line))} for line in none] tags = (np.ones(len(data)) * -1).tolist() svm.svm_predict(y=tags, x=data, m=model) svm.svm_save_model(self._conf["model"]["file"], model) self._log(f"model saved in {self._conf['model']['file']}.") def _run_collect(self): buff = "" try: res, count = [], 0 while count < self._conf["data"]["count"]: get = self._uart.read().decode("utf-8") if get in "\r\n": buff = buff.strip(",") if buff == "": continue if tmp := buff.count(",") != self._conf["data"]["size"] - 1: self._log(f"expect {self._conf['data']['size']} nums, got {tmp} instead.") continue res.append(buff) count += 1 else: buff += get path = self._conf["data"][self._conf["mode"]] name = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime(time.time())) file = f"{path}/{name}.csv" with open(file, "wt") as fp: fp.writelines(res) self._log(f"saved {count} samples into `{file}`") self._flash(3, 1) except Exception as e: self._uart.close() self._log(f"error: {e}") def run(self) -> "None": try: self._lon() if self._conf["mode"] == "predict": self._run_predict() elif self._conf["mode"] == "train": self._run_train() else: self._run_collect() except Exception as e: self._log(e) self._lof() if __name__ == "__main__": config_file = "config.json" if len(sys.argv) == 2: config_file = sys.argv[1] server = Service(config_file) server.run()