123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- import os
- import sys
- import json
- import time
- import serial
- import numpy as np
- import pandas as pd
- import libsvm.svmutil as svm
- class GpioMonitor:
- class Gpio:
- OUTPUT, HIGH, LOW = -1, 0, 1
- GPIO = Gpio()
- @staticmethod
- def wiringPiSetup():
- pass
- @staticmethod
- def pinMode(port: "int", gpio: "Gpio"):
- pass
- @staticmethod
- def digitalWrite(port: "int", gpio: "Gpio"):
- pass
- try:
- import wiringpi # noqa
- except ModuleNotFoundError:
- wiringpi = GpioMonitor
- class Service:
- _Mode = {"predict": 1, "train": 2, "have": 3, "none": 4}
- def __init__(self, conf_path: "str"):
- self._conf: "dict" = {
- "data": {
- "count": 700,
- "size": 131,
- "have": "data/have",
- "none": "data/none"
- },
- "fft": {
- "win": 10,
- "hold": 500,
- "rate": 337
- },
- "mode": "predict",
- "model": {
- "file": "data/svm.tin",
- "gamma": 2.57e-6,
- "nu": 7.1e-3
- },
- "uart": {
- "file": "/dev/ttyS5",
- "baud": 115200
- },
- "gpio": {
- "wpi": 2,
- "time": 3
- },
- "log": True
- }
- self._uart: "serial.Serial"
- self._model: "svm.svm_model"
- self._read_conf(conf_path)
- self._log(self._conf)
- self._init_gpio()
- self._open_uart()
- def _read_conf(self, file: "str") -> "None":
- with open(file, encoding="utf-8") as fp:
- data = json.load(fp)
- for key in self._conf:
- if key in data:
- self._conf[key] = data[key]
- if self._conf["mode"] not in self._Mode:
- self._conf["mode"] = "predict"
- def _log(self, *args):
- if self._conf["log"]:
- print("Log =>", *args)
- def _init_gpio(self) -> "None":
- wiringpi.wiringPiSetup()
- wiringpi.pinMode(self._conf["gpio"]["wpi"], wiringpi.GPIO.OUTPUT)
- self._log(f"gpio {self._conf['gpio']['wpi']} has been set to `OUTPUT`")
- def _lon(self, sec: "float" = -1) -> "None":
- wiringpi.digitalWrite(self._conf["gpio"]["wpi"], wiringpi.GPIO.HIGH)
- self._log("light on")
- if sec > 0:
- time.sleep(sec)
- self._lof()
- def _lof(self, sec: "float" = -1) -> "None":
- wiringpi.digitalWrite(self._conf["gpio"]["wpi"], wiringpi.GPIO.LOW)
- self._log("light off")
- if sec > 0:
- time.sleep(sec)
- self._lon()
- def _flash(self, total: "float", count: "int") -> "None":
- gap: "float" = total / (5 * count - 2)
- on, off = 2 * gap, 3 * gap
- for i in range(count):
- if i != 0:
- time.sleep(on)
- self._lof(off)
- def _open_uart(self) -> "None":
- self._uart = serial.Serial(self._conf["uart"]["file"], self._conf["uart"]["baud"])
- if not self._uart.isOpen():
- self._log(f"uart: {self._conf['uart']['file']}@{self._conf['uart']['baud']} open failed.")
- sys.exit(-1)
- def _isMax(self, data: "np.ndarray", idx: "int") -> "bool":
- if idx < self._conf["fft"]["win"] or idx > data.size - self._conf["fft"]["win"]:
- return False
- for i in range(1, self._conf["fft"]["win"] + 1):
- if data[idx] < data[idx - i] or data[idx] < data[idx + i]:
- return False
- return True
- def _fft(self, data: "np.ndarray") -> "np.ndarray":
- scale = np.linspace(0, data.size / self._conf["fft"]["rate"], data.size)
- fft = np.fft.fft(data)
- frq = np.fft.fftfreq(data.size, scale[1] - scale[0])
- pFrq = frq[:frq.size // 2]
- pFft = np.abs(2.0 / data.size * fft[:fft.size // 2])
- result, idx = np.zeros((10, 2)), 0
- for i in range(self._conf["fft"]["win"], pFft.size - self._conf["fft"]["win"]):
- if self._isMax(pFft, i) and pFft[i] > self._conf["fft"]["hold"]:
- result[idx][0], result[idx][1] = pFrq[i], pFft[i]
- idx += 1
- if idx >= 10:
- break
- return result.flatten()
- def _l2x(self, buff: "list[str]") -> "list[dict]":
- arr = np.array([float(itr) for itr in buff])
- data = arr[:3].tolist() + self._fft(arr[3:]).tolist()
- return [{i + 1: data[i] for i in range(len(data))}]
- def _run_predict(self):
- self._model = svm.svm_load_model(self._conf["model"]["file"])
- self._log(f"model loaded from: `{self._conf['model']['file']}`")
- buff = ""
- try:
- while True:
- get = self._uart.read().decode("utf-8")
- if get in "\r\n":
- buff = buff.strip(",")
- if buff == "":
- continue
- arr = buff.split(",")
- if len(arr) != self._conf["data"]["size"]:
- self._log(f"expect {self._conf['data']['size']} nums, got {len(arr)} instead.")
- buff = ""
- continue
- x = self._l2x(arr)
- res = svm.svm_predict([], x, self._model, "-q")
- buff = ""
- self._log(f"feature size: {len(x[0])}, predict: {res[0][0]}")
- if res[0][0] == 1:
- self._flash(self._conf["gpio"]["time"], 4)
- else:
- buff += get
- except Exception as e:
- self._uart.close()
- self._log(f"error: {e}")
- def _load_data(self) -> "tuple[np.ndarray, np.ndarray, np.ndarray]":
- root = self._conf["data"]["have"]
- have: "pd.DataFrame" = pd.DataFrame()
- for name in os.listdir(root):
- data = pd.read_csv(f"{root}/{name}", header=None)
- have = pd.concat([have, data])
- haveNp = have.to_numpy()
- np.random.shuffle(haveNp)
- root = self._conf["data"]["none"]
- none: "pd.DataFrame" = pd.DataFrame()
- for name in os.listdir(root):
- data = pd.read_csv(f"{root}/{name}", header=None)
- none = pd.concat([none, data])
- count = int(haveNp.shape[0] * 0.85)
- self._log("data loaded from dataset dir.")
- return haveNp[:count], haveNp[count:], none.to_numpy()
- def _run_train(self):
- have, test, none = self._load_data()
- have = [np.append(line[:3], self._fft(line[3:])) for line in have]
- test = [np.append(line[:3], self._fft(line[3:])) for line in test]
- none = [np.append(line[:3], self._fft(line[3:])) for line in none]
- data = [{i + 1: line[i] for i in range(len(line))} for line in have]
- tags = np.ones(len(data)).tolist()
- pro = svm.svm_problem(tags, data)
- pra = svm.svm_parameter(f"-s 2 -g {self._conf['model']['gamma']} -n {self._conf['model']['nu']}")
- model = svm.svm_train(pro, pra)
- # predict test
- data = [{i + 1: line[i] for i in range(len(line))} for line in test]
- tags = np.ones(len(data)).tolist()
- svm.svm_predict(y=tags, x=data, m=model)
- # predict none
- data = [{i + 1: line[i] for i in range(len(line))} for line in none]
- tags = (np.ones(len(data)) * -1).tolist()
- svm.svm_predict(y=tags, x=data, m=model)
- svm.svm_save_model(self._conf["model"]["file"], model)
- self._log(f"model saved in {self._conf['model']['file']}.")
- def _run_collect(self):
- buff = ""
- try:
- res, count = [], 0
- while count < self._conf["data"]["count"]:
- get = self._uart.read().decode("utf-8")
- if get in "\r\n":
- buff = buff.strip(",")
- if buff == "":
- continue
- if tmp := buff.count(",") != self._conf["data"]["size"] - 1:
- self._log(f"expect {self._conf['data']['size']} nums, got {tmp} instead.")
- continue
- res.append(buff)
- count += 1
- else:
- buff += get
- path = self._conf["data"][self._conf["mode"]]
- name = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime(time.time()))
- file = f"{path}/{name}.csv"
- with open(file, "wt") as fp:
- fp.writelines(res)
- self._log(f"saved {count} samples into `{file}`")
- self._flash(3, 1)
- except Exception as e:
- self._uart.close()
- self._log(f"error: {e}")
- def run(self) -> "None":
- try:
- self._lon()
- if self._conf["mode"] == "predict":
- self._run_predict()
- elif self._conf["mode"] == "train":
- self._run_train()
- else:
- self._run_collect()
- except Exception as e:
- self._log(e)
- self._lof()
- if __name__ == "__main__":
- config_file = "config.json"
- if len(sys.argv) == 2:
- config_file = sys.argv[1]
- server = Service(config_file)
- server.run()
|