uart.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import sys
  2. import json
  3. import serial
  4. import numpy as np
  5. import libsvm.svmutil as svm
  6. class OneClassSvmService:
  7. def __init__(self, conf_path: "str"):
  8. self._conf: "dict" = {
  9. "model": "svm.tin",
  10. "fft": {
  11. "win": 10,
  12. "hold": 500
  13. },
  14. "uart": {
  15. "port": "/dev/ttyS5",
  16. "rate": 115200,
  17. "secsPer128": 0.38
  18. },
  19. "log": {
  20. "std": True,
  21. "file": ""
  22. }
  23. }
  24. self._uart: "serial.Serial" = ...
  25. self._model: "svm.svm_model" = ...
  26. self._read_conf(conf_path)
  27. self._open_uart()
  28. self._init_model()
  29. def _read_conf(self, file: "str") -> "None":
  30. with open(file, encoding="utf-8") as fp:
  31. data = json.load(fp)
  32. for key in self._conf:
  33. if key in data:
  34. self._conf[key] = data[key]
  35. def _open_uart(self) -> "None":
  36. self._uart = serial.Serial(self._conf["uart"]["port"], self._conf["uart"]["rate"])
  37. assert self._uart.isOpen(), f"{self._conf['uart']['port']}@{self._conf['uart']['rate']} open failed."
  38. def _init_model(self) -> "None":
  39. self._model = svm.svm_load_model(self._conf["model"])
  40. def _isMax(self, data: "np.ndarray", idx: "int") -> "bool":
  41. if idx < self._conf["fft"]["win"] or idx > data.size - self._conf["fft"]["win"]:
  42. return False
  43. for i in range(1, self._conf["fft"]["win"] + 1):
  44. if data[idx] < data[idx - i] or data[idx] < data[idx + i]:
  45. return False
  46. return True
  47. def _fft(self, data: "np.ndarray") -> "np.ndarray":
  48. time = np.linspace(0, self._conf["uart"]["secsPer128"] * data.size / 128, data.size)
  49. fft = np.fft.fft(data)
  50. frq = np.fft.fftfreq(data.size, time[1] - time[0])
  51. pFrq = frq[:frq.size // 2]
  52. pFft = np.abs(2.0 / data.size * fft[:fft.size // 2])
  53. result, idx = np.zeros((10, 2)), 0
  54. for i in range(self._conf["fft"]["win"], pFft.size - self._conf["fft"]["win"]):
  55. if self._isMax(pFft, i) and pFft[i] > self._conf["fft"]["hold"]:
  56. result[idx][0], result[idx][1] = pFrq[i], pFft[i]
  57. idx += 1
  58. if idx >= 10:
  59. break
  60. return result.flatten()
  61. def _l2x(self, buff: "list[str]") -> "list[dict]":
  62. arr = np.array([float(itr) for itr in buff])
  63. data = arr[:3].tolist() + self._fft(arr[3:]).tolist()
  64. return [{i + 1: data[i] for i in range(len(data))}]
  65. def run(self) -> "None":
  66. buff = ""
  67. try:
  68. while True:
  69. get = self._uart.read().decode("utf-8")
  70. if get in "\r\n":
  71. buff = buff.strip(",")
  72. if buff == "":
  73. continue
  74. x = self._l2x(buff.split(","))
  75. res = svm.svm_predict([], x, self._model, "-q")
  76. print(f"feature size: {len(x[0])}, predict: {res[0][0]}")
  77. # self._uart.write(f"{res}".encode("utf-8"))
  78. buff = ""
  79. else:
  80. buff += get
  81. except Exception as e:
  82. self._uart.close()
  83. print(f"error: {e}")
  84. if __name__ == "__main__":
  85. config_file = "config.json"
  86. if len(sys.argv) == 2:
  87. config_file = sys.argv[1]
  88. server = OneClassSvmService(config_file)
  89. server.run()