import os import numpy as np import pandas as pd from data.loader import BaseLoader __all__ = ["DataLoader"] class DataLoader(BaseLoader): def __init__( self, path: "str" = "csv/both", count: "int" = 10, hold: "int" = 200, win: "int" = 5 ): self.__c, self.__h, self.__w = count, hold, win self._rawH: "pd.DataFrame" = pd.DataFrame() self._rawN: "pd.DataFrame" = pd.DataFrame() super().__init__(path) self._trans() def _load(self, path: "str") -> "None": for name in os.listdir(path): data = pd.read_csv(f"{path}/{name}", header=None) if name.startswith("1"): self._rawH = pd.concat([self._rawH, data]) else: self._rawN = pd.concat([self._rawN, data]) def _isMax(self, data: "np.ndarray", idx: "int") -> "bool": if idx < self.__w or idx > data.size - self.__w: return False for i in range(1, self.__w + 1): if data[idx] < data[idx - i] or data[idx] < data[idx + i]: return False return True def _fft(self, data: "np.ndarray") -> "np.ndarray": time = np.linspace(0, 0.38 * data.size / 128, data.size) fft = np.fft.fft(data) frq = np.fft.fftfreq(data.size, time[1] - time[0]) pFrq = frq[:frq.size // 2] pFft = np.abs(2.0 / data.size * fft[:fft.size // 2]) result, idx = np.zeros((self.__c, 2)), 0 for i in range(self.__w, pFft.size - self.__w): if self._isMax(pFft, i) and pFft[i] > self.__h: result[idx][0], result[idx][1] = pFrq[i], pFft[i] idx += 1 if idx >= self.__c: break return result.flatten() def _trans(self) -> "None": have = np.array([np.append(line[:3], self._fft(line[3:])) for line in self._rawH.to_numpy()]) none = np.array([np.append(line[:3], self._fft(line[3:])) for line in self._rawN.to_numpy()]) np.random.shuffle(have) count = int(have.shape[0] * 0.85) self.have = np.array(have[:count]) self.test = np.array(have[count:]) self.none = np.array(none) def toh(self, file: "str") -> "None": pass @staticmethod def _n2s(arr: "list") -> "str": res = "\t{" for itr in arr: res += f"{itr:.6f}," return res[:-1] + "}" def toc(self, file: "str"): temp = ( "#ifndef LOCAL_DATA_H\n" "#define LOCAL_DATA_H\n" "\n" "#define HaveCount {haveCount}\n" "#define TestCount {testCount}\n" "#define NoneCount {noneCount}\n" "#define Features {features}\n" "\n" "static double Have[HaveCount][Features] = {{\n" "{have}\n" "}};\n" "\n" "static double Test[TestCount][Features] = {{\n" "{test}\n" "}};\n" "\n" "static double None[NoneCount][Features] = {{\n" "{none}\n" "}};\n" "\n" "#endif" ) have = ",\n".join([self._n2s(line) for line in self.have]) test = ",\n".join([self._n2s(line) for line in self.test]) none = ",\n".join([self._n2s(line) for line in self.none]) content = temp.format( haveCount=self.have.shape[0], testCount=self.test.shape[0], noneCount=self.none.shape[0], features=self.have.shape[1], have=have, test=test, none=none ) with open(file, "w", encoding="utf-8") as fp: fp.write(content)