loader_ac.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. import os
  2. import numpy as np
  3. import pandas as pd
  4. from data.loader import BaseLoader
  5. __all__ = ["DataLoader"]
  6. class DataLoader(BaseLoader):
  7. def __init__(
  8. self,
  9. path: "str" = "csv/both",
  10. count: "int" = 10,
  11. hold: "int" = 200,
  12. win: "int" = 5
  13. ):
  14. self.__c, self.__h, self.__w = count, hold, win
  15. self._rawH: "pd.DataFrame" = pd.DataFrame()
  16. self._rawN: "pd.DataFrame" = pd.DataFrame()
  17. super().__init__(path)
  18. self._trans()
  19. def _load(self, path: "str") -> "None":
  20. for name in os.listdir(path):
  21. data = pd.read_csv(f"{path}/{name}", header=None)
  22. if name.startswith("1"):
  23. self._rawH = pd.concat([self._rawH, data])
  24. else:
  25. self._rawN = pd.concat([self._rawN, data])
  26. def _isMax(self, data: "np.ndarray", idx: "int") -> "bool":
  27. if idx < self.__w or idx > data.size - self.__w:
  28. return False
  29. for i in range(1, self.__w + 1):
  30. if data[idx] < data[idx - i] or data[idx] < data[idx + i]:
  31. return False
  32. return True
  33. def _fft(self, data: "np.ndarray") -> "np.ndarray":
  34. time = np.linspace(0, 0.38 * data.size / 128, data.size)
  35. fft = np.fft.fft(data)
  36. frq = np.fft.fftfreq(data.size, time[1] - time[0])
  37. pFrq = frq[:frq.size // 2]
  38. pFft = np.abs(2.0 / data.size * fft[:fft.size // 2])
  39. result, idx = np.zeros((self.__c, 2)), 0
  40. for i in range(self.__w, pFft.size - self.__w):
  41. if self._isMax(pFft, i) and pFft[i] > self.__h:
  42. result[idx][0], result[idx][1] = pFrq[i], pFft[i]
  43. idx += 1
  44. if idx >= self.__c:
  45. break
  46. return result.flatten()
  47. def _trans(self) -> "None":
  48. have = np.array([np.append(line[:3], self._fft(line[3:])) for line in self._rawH.to_numpy()])
  49. none = np.array([np.append(line[:3], self._fft(line[3:])) for line in self._rawN.to_numpy()])
  50. np.random.shuffle(have)
  51. count = int(have.shape[0] * 0.85)
  52. self.have = np.array(have[:count])
  53. self.test = np.array(have[count:])
  54. self.none = np.array(none)
  55. def toh(self, file: "str") -> "None":
  56. pass
  57. @staticmethod
  58. def _n2s(arr: "list") -> "str":
  59. res = "\t{"
  60. for itr in arr:
  61. res += f"{itr:.6f},"
  62. return res[:-1] + "}"
  63. def toc(self, file: "str"):
  64. temp = (
  65. "#ifndef LOCAL_DATA_H\n"
  66. "#define LOCAL_DATA_H\n"
  67. "\n"
  68. "#define HaveCount {haveCount}\n"
  69. "#define TestCount {testCount}\n"
  70. "#define NoneCount {noneCount}\n"
  71. "#define Features {features}\n"
  72. "\n"
  73. "static double Have[HaveCount][Features] = {{\n"
  74. "{have}\n"
  75. "}};\n"
  76. "\n"
  77. "static double Test[TestCount][Features] = {{\n"
  78. "{test}\n"
  79. "}};\n"
  80. "\n"
  81. "static double None[NoneCount][Features] = {{\n"
  82. "{none}\n"
  83. "}};\n"
  84. "\n"
  85. "#endif"
  86. )
  87. have = ",\n".join([self._n2s(line) for line in self.have])
  88. test = ",\n".join([self._n2s(line) for line in self.test])
  89. none = ",\n".join([self._n2s(line) for line in self.none])
  90. content = temp.format(
  91. haveCount=self.have.shape[0],
  92. testCount=self.test.shape[0],
  93. noneCount=self.none.shape[0],
  94. features=self.have.shape[1],
  95. have=have, test=test, none=none
  96. )
  97. with open(file, "w", encoding="utf-8") as fp:
  98. fp.write(content)