import sys import json import serial import numpy as np import libsvm.svmutil as svm class OneClassSvmService: def __init__(self, conf_path: "str"): self._conf: "dict" = { "model": "svm.tin", "fft": { "win": 10, "hold": 500 }, "uart": { "port": "/dev/ttyS5", "rate": 115200, "secsPer128": 0.38 }, "log": { "std": True, "file": "" } } self._uart: "serial.Serial" = ... self._model: "svm.svm_model" = ... self._read_conf(conf_path) self._open_uart() self._init_model() def _read_conf(self, file: "str") -> "None": with open(file, encoding="utf-8") as fp: data = json.load(fp) for key in self._conf: if key in data: self._conf[key] = data[key] def _open_uart(self) -> "None": self._uart = serial.Serial(self._conf["uart"]["port"], self._conf["uart"]["rate"]) assert self._uart.isOpen(), f"{self._conf['uart']['port']}@{self._conf['uart']['rate']} open failed." def _init_model(self) -> "None": self._model = svm.svm_load_model(self._conf["model"]) def _isMax(self, data: "np.ndarray", idx: "int") -> "bool": if idx < self._conf["fft"]["win"] or idx > data.size - self._conf["fft"]["win"]: return False for i in range(1, self._conf["fft"]["win"] + 1): if data[idx] < data[idx - i] or data[idx] < data[idx + i]: return False return True def _fft(self, data: "np.ndarray") -> "np.ndarray": time = np.linspace(0, self._conf["uart"]["secsPer128"] * data.size / 128, data.size) fft = np.fft.fft(data) frq = np.fft.fftfreq(data.size, time[1] - time[0]) pFrq = frq[:frq.size // 2] pFft = np.abs(2.0 / data.size * fft[:fft.size // 2]) result, idx = np.zeros((10, 2)), 0 for i in range(self._conf["fft"]["win"], pFft.size - self._conf["fft"]["win"]): if self._isMax(pFft, i) and pFft[i] > self._conf["fft"]["hold"]: result[idx][0], result[idx][1] = pFrq[i], pFft[i] idx += 1 if idx >= 10: break return result.flatten() def _l2x(self, buff: "list[str]") -> "list[dict]": arr = np.array([float(itr) for itr in buff]) data = arr[:3].tolist() + self._fft(arr[3:]).tolist() return [{i + 1: data[i] for i in range(len(data))}] def run(self) -> "None": buff = "" try: while True: get = self._uart.read().decode("utf-8") if get in "\r\n": buff = buff.strip(",") if buff == "": continue x = self._l2x(buff.split(",")) res = svm.svm_predict([], x, self._model, "-q") print(f"feature size: {len(x[0])}, predict: {res[0][0]}") # self._uart.write(f"{res}".encode("utf-8")) buff = "" else: buff += get except Exception as e: self._uart.close() print(f"error: {e}") if __name__ == "__main__": config_file = "config.json" if len(sys.argv) == 2: config_file = sys.argv[1] server = OneClassSvmService(config_file) server.run()